diff --git a/.gitignore b/.gitignore index 4d85b68..84493dd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ steps -PROCESS_EVENT \ No newline at end of file +PROCESS_EVENT +docker +dev \ No newline at end of file diff --git a/producer/Dockerfile b/producer/Dockerfile new file mode 100644 index 0000000..e46ba3c --- /dev/null +++ b/producer/Dockerfile @@ -0,0 +1,7 @@ +FROM node:21-alpine +WORKDIR /home/app +COPY package.json pnpm-lock.yaml ./ +RUN npm install -g pnpm && pnpm install --prod --frozen-lockfile +COPY ./dist /home/app/dist +EXPOSE ${PORT} +CMD ["node", "dist/producer/src/index.js"] \ No newline at end of file diff --git a/producer/package.json b/producer/package.json index f5c3844..2b6d200 100644 --- a/producer/package.json +++ b/producer/package.json @@ -20,7 +20,8 @@ "dev": "node scripts/start_dev.js", "compile": "tsc", "build": "ts-node scripts/build.ts", - "build_all": "npm run compile && npm run build" + "build_all": "npm run compile && npm run build", + "docker-build": "node scripts/prepare_docker.js && docker build -t litlyx-producer ." }, "keywords": [], "author": "Emily", diff --git a/producer/scripts/prepare_docker.js b/producer/scripts/prepare_docker.js new file mode 100644 index 0000000..4c61b07 --- /dev/null +++ b/producer/scripts/prepare_docker.js @@ -0,0 +1,7 @@ + +const child = require('child_process'); + +const p = child.exec('pnpm run compile && pnpm run build'); + +p.stdout.on('data', (e) => { console.log(e.toString()); }); +p.stderr.on('data', (e) => { console.log(e.toString()); }); \ No newline at end of file diff --git a/shared/services/RedisStreamService.ts b/shared/services/RedisStreamService.ts index bfdf28d..df7bf33 100644 --- a/shared/services/RedisStreamService.ts +++ b/shared/services/RedisStreamService.ts @@ -7,7 +7,8 @@ export type ReadingLoopOptions = { empty?: number }, readBlock?: number, - streamName: string + streamName: string, + consumer?: string } export class RedisStreamService { @@ -23,7 +24,7 @@ export class RedisStreamService { } private static async readingLoop(options: ReadingLoopOptions, processFunction: (content: Record) => Promise) { - const result = await this.readFromStream(options.streamName, options.readBlock || 2500); + const result = await this.readFromStream(options.streamName, options.readBlock || 2500, options.consumer || 'base_consumer'); if (!result) { await new Promise(r => setTimeout(r, options.delay?.empty || 5000)); setTimeout(() => this.readingLoop(options, processFunction), 1); @@ -35,17 +36,29 @@ export class RedisStreamService { return; } - static startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record) => Promise) { + static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record) => Promise) { + + try { + await this.client.xGroupCreate(options.streamName, 'broker', '0', { MKSTREAM: true, }); + } catch (ex) { + + } + this.readingLoop(options, processFunction) } - private static async readFromStream(streamName: string, readBlock: number) { - const result = await this.client.xRead({ id: '0', key: streamName }, { COUNT: 1, BLOCK: readBlock }); + private static async readFromStream(streamName: string, readBlock: number, consumer: string) { + const result = await this.client.xReadGroup( + 'broker', consumer, + [{ key: streamName, id: '>' }], + { COUNT: 1, BLOCK: readBlock } + ); if (!result) return; if (result.length == 0) return; if (!result[0].messages) return; if (result[0].messages.length == 0) return; const message = result[0].messages[0]; + await this.client.xAck(streamName, 'broker', message.id); await this.client.xDel(streamName, message.id); const content = message.message; return content;