diff --git a/broker/src/StreamLoopController.ts b/broker/src/StreamLoopController.ts index 6366fef..ad2f5ad 100644 --- a/broker/src/StreamLoopController.ts +++ b/broker/src/StreamLoopController.ts @@ -19,15 +19,13 @@ export async function startStreamLoop() { await RedisStreamService.startReadingLoop({ streamName: requireEnv('STREAM_NAME'), - delay: { base: 100, empty: 5000 }, - readBlock: 2500 + delay: { base: 10, empty: 5000 }, + readBlock: 2000 }, processStreamEvent); } - - export async function processStreamEvent(data: Record) { try { const eventType = data._type; diff --git a/shared/services/RedisStreamService.ts b/shared/services/RedisStreamService.ts index aadeaee..54abc05 100644 --- a/shared/services/RedisStreamService.ts +++ b/shared/services/RedisStreamService.ts @@ -13,6 +13,8 @@ export type ReadingLoopOptions = { export class RedisStreamService { + private static processed = 0; + private static client = createClient({ url: requireEnv("REDIS_URL"), username: requireEnv("REDIS_USERNAME"), @@ -23,6 +25,10 @@ export class RedisStreamService { static async connect() { console.log('RedisStreamService DEV_MODE=', process.env.DEV_MODE === 'true'); await this.client.connect(); + setInterval(() => { + console.log('Processed:', RedisStreamService.processed, '/s'); + }, 1000) + } private static async readingLoop(options: ReadingLoopOptions, processFunction: (content: Record) => Promise) { @@ -33,6 +39,7 @@ export class RedisStreamService { return; } await processFunction(result); + RedisStreamService.processed++; await new Promise(r => setTimeout(r, options.delay?.base || 100)); setTimeout(() => this.readingLoop(options, processFunction), 1); return; @@ -41,9 +48,11 @@ export class RedisStreamService { static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record) => Promise) { try { + console.log('Start reading loop'); await this.client.xGroupCreate(options.streamName, 'broker', '0', { MKSTREAM: true, }); + console.log('Reading loop started'); } catch (ex) { - + console.error(ex); } this.readingLoop(options, processFunction)