diff --git a/producer/.gitignore b/producer/.gitignore index e4e1475..15f4418 100644 --- a/producer/.gitignore +++ b/producer/.gitignore @@ -5,4 +5,5 @@ ecosystem.config.js dist start_dev.js package-lock.json -build_all.bat \ No newline at end of file +build_all.bat +src/shared \ No newline at end of file diff --git a/producer/src/shared/services/RedisStreamService.ts b/producer/src/shared/services/RedisStreamService.ts deleted file mode 100644 index 81a5ff7..0000000 --- a/producer/src/shared/services/RedisStreamService.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { createClient } from 'redis'; -import { requireEnv } from '../utils/requireEnv'; - -export type ReadingLoopOptions = { - stream_name: string, - group_name: ConsumerGroup, - consumer_name: string -} - - -type xReadGroupMessage = { id: string, message: { [x: string]: string } } -type xReadGgroupResult = { name: string, messages: xReadGroupMessage[] }[] | null - -const consumerGroups = ['DATABASE'] as const; - -type ConsumerGroup = typeof consumerGroups[number]; - - -export class RedisStreamService { - - private static client = createClient({ - url: requireEnv("REDIS_URL"), - username: requireEnv("REDIS_USERNAME"), - password: requireEnv("REDIS_PASSWORD"), - database: process.env.DEV_MODE === 'true' ? 1 : 0 - }); - - static async connect() { - await this.client.connect(); - } - - static async readFromStream(stream_name: string, group_name: string, consumer_name: string, process_function: (content: Record) => Promise) { - - const result: xReadGgroupResult = await this.client.xReadGroup(group_name, consumer_name, [{ key: stream_name, id: '>' }], { COUNT: 5, BLOCK: 10000 }); - - if (!result) { - setTimeout(() => this.readFromStream(stream_name, group_name, consumer_name, process_function), 10); - return; - } - - for (const entry of result) { - for (const messageData of entry.messages) { - await process_function(messageData.message); - await this.client.xAck(stream_name, group_name, messageData.id); - await this.client.set(`ACK:${group_name}`, messageData.id); - } - } - - await this.trimStream(stream_name); - - setTimeout(() => this.readFromStream(stream_name, group_name, consumer_name, process_function), 10); - return; - - } - - private static async trimStream(stream_name: string) { - - let lastMessageAck = '0'; - - for (const consumerGroup of consumerGroups) { - const lastAck = await this.client.get(`ACK:${consumerGroup}`); - if (!lastAck) continue; - if (lastAck > lastMessageAck) lastMessageAck = lastAck; - } - - await this.client.xTrim(stream_name, 'MINID', lastMessageAck as any); - - } - - static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record) => Promise) { - - if (!consumerGroups.includes(options.group_name)) return console.error('GROUP NAME NOT ALLOWED'); - - console.log('Start reading loop') - - try { - await this.client.xGroupCreate(options.stream_name, options.group_name, '0', { MKSTREAM: true }); - } catch (ex) { - console.log('Group', options.group_name, 'already exist'); - } - - this.readFromStream(options.stream_name, options.group_name, options.consumer_name, processFunction); - } - - static async addToStream(streamName: string, data: Record) { - const result = await this.client.xAdd(streamName, "*", { ...data, timestamp: Date.now().toString() }); - return result; - } - -} diff --git a/producer/src/shared/utils/requireEnv.ts b/producer/src/shared/utils/requireEnv.ts deleted file mode 100644 index a6497e0..0000000 --- a/producer/src/shared/utils/requireEnv.ts +++ /dev/null @@ -1,9 +0,0 @@ - -export function requireEnv(name: string, errorMessage?: string) { - if (!process.env[name]) { - console.error(errorMessage || `ENV variable ${name} is required`); - return process.exit(1); - } - console.log('requireEnv', name, process.env[name]); - return process.env[name] as string; -} \ No newline at end of file