add log to stream loop

This commit is contained in:
Emily
2024-08-30 17:27:34 +02:00
parent 6d26c3c8af
commit a2034551ec
2 changed files with 12 additions and 5 deletions

View File

@@ -19,15 +19,13 @@ export async function startStreamLoop() {
await RedisStreamService.startReadingLoop({ await RedisStreamService.startReadingLoop({
streamName: requireEnv('STREAM_NAME'), streamName: requireEnv('STREAM_NAME'),
delay: { base: 100, empty: 5000 }, delay: { base: 10, empty: 5000 },
readBlock: 2500 readBlock: 2000
}, processStreamEvent); }, processStreamEvent);
} }
export async function processStreamEvent(data: Record<string, string>) { export async function processStreamEvent(data: Record<string, string>) {
try { try {
const eventType = data._type; const eventType = data._type;

View File

@@ -13,6 +13,8 @@ export type ReadingLoopOptions = {
export class RedisStreamService { export class RedisStreamService {
private static processed = 0;
private static client = createClient({ private static client = createClient({
url: requireEnv("REDIS_URL"), url: requireEnv("REDIS_URL"),
username: requireEnv("REDIS_USERNAME"), username: requireEnv("REDIS_USERNAME"),
@@ -23,6 +25,10 @@ export class RedisStreamService {
static async connect() { static async connect() {
console.log('RedisStreamService DEV_MODE=', process.env.DEV_MODE === 'true'); console.log('RedisStreamService DEV_MODE=', process.env.DEV_MODE === 'true');
await this.client.connect(); await this.client.connect();
setInterval(() => {
console.log('Processed:', RedisStreamService.processed, '/s');
}, 1000)
} }
private static async readingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) { private static async readingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) {
@@ -33,6 +39,7 @@ export class RedisStreamService {
return; return;
} }
await processFunction(result); await processFunction(result);
RedisStreamService.processed++;
await new Promise(r => setTimeout(r, options.delay?.base || 100)); await new Promise(r => setTimeout(r, options.delay?.base || 100));
setTimeout(() => this.readingLoop(options, processFunction), 1); setTimeout(() => this.readingLoop(options, processFunction), 1);
return; return;
@@ -41,9 +48,11 @@ export class RedisStreamService {
static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) { static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) {
try { try {
console.log('Start reading loop');
await this.client.xGroupCreate(options.streamName, 'broker', '0', { MKSTREAM: true, }); await this.client.xGroupCreate(options.streamName, 'broker', '0', { MKSTREAM: true, });
console.log('Reading loop started');
} catch (ex) { } catch (ex) {
console.error(ex);
} }
this.readingLoop(options, processFunction) this.readingLoop(options, processFunction)