This commit is contained in:
Emily
2024-06-25 00:52:12 +02:00
parent 1414451d2b
commit f369880ba7
5 changed files with 37 additions and 7 deletions

4
.gitignore vendored
View File

@@ -1,2 +1,4 @@
steps
PROCESS_EVENT
PROCESS_EVENT
docker
dev

7
producer/Dockerfile Normal file
View File

@@ -0,0 +1,7 @@
FROM node:21-alpine
WORKDIR /home/app
COPY package.json pnpm-lock.yaml ./
RUN npm install -g pnpm && pnpm install --prod --frozen-lockfile
COPY ./dist /home/app/dist
EXPOSE ${PORT}
CMD ["node", "dist/producer/src/index.js"]

View File

@@ -20,7 +20,8 @@
"dev": "node scripts/start_dev.js",
"compile": "tsc",
"build": "ts-node scripts/build.ts",
"build_all": "npm run compile && npm run build"
"build_all": "npm run compile && npm run build",
"docker-build": "node scripts/prepare_docker.js && docker build -t litlyx-producer ."
},
"keywords": [],
"author": "Emily",

View File

@@ -0,0 +1,7 @@
const child = require('child_process');
const p = child.exec('pnpm run compile && pnpm run build');
p.stdout.on('data', (e) => { console.log(e.toString()); });
p.stderr.on('data', (e) => { console.log(e.toString()); });

View File

@@ -7,7 +7,8 @@ export type ReadingLoopOptions = {
empty?: number
},
readBlock?: number,
streamName: string
streamName: string,
consumer?: string
}
export class RedisStreamService {
@@ -23,7 +24,7 @@ export class RedisStreamService {
}
private static async readingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) {
const result = await this.readFromStream(options.streamName, options.readBlock || 2500);
const result = await this.readFromStream(options.streamName, options.readBlock || 2500, options.consumer || 'base_consumer');
if (!result) {
await new Promise(r => setTimeout(r, options.delay?.empty || 5000));
setTimeout(() => this.readingLoop(options, processFunction), 1);
@@ -35,17 +36,29 @@ export class RedisStreamService {
return;
}
static startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) {
static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) {
try {
await this.client.xGroupCreate(options.streamName, 'broker', '0', { MKSTREAM: true, });
} catch (ex) {
}
this.readingLoop(options, processFunction)
}
private static async readFromStream(streamName: string, readBlock: number) {
const result = await this.client.xRead({ id: '0', key: streamName }, { COUNT: 1, BLOCK: readBlock });
private static async readFromStream(streamName: string, readBlock: number, consumer: string) {
const result = await this.client.xReadGroup(
'broker', consumer,
[{ key: streamName, id: '>' }],
{ COUNT: 1, BLOCK: readBlock }
);
if (!result) return;
if (result.length == 0) return;
if (!result[0].messages) return;
if (result[0].messages.length == 0) return;
const message = result[0].messages[0];
await this.client.xAck(streamName, 'broker', message.id);
await this.client.xDel(streamName, message.id);
const content = message.message;
return content;