new consumer

This commit is contained in:
Emily
2024-09-18 23:05:07 +02:00
parent 628e471cec
commit 3c59551f88
21 changed files with 1568 additions and 1624 deletions

View File

@@ -11,7 +11,7 @@ export type ReadingLoopOptions = {
type xReadGroupMessage = { id: string, message: { [x: string]: string } }
type xReadGgroupResult = { name: string, messages: xReadGroupMessage[] }[] | null
const consumerGroups = ['DATABASE', 'LIMITS'] as const;
const consumerGroups = ['DATABASE'] as const;
type ConsumerGroup = typeof consumerGroups[number];
@@ -41,7 +41,6 @@ export class RedisStreamService {
}
for (const entry of result) {
console.log(`[${group_name}-${consumer_name}]`, 'Processing', entry.messages.length, 'messages');
for (const messageData of entry.messages) {
await process_function(messageData.message);
await this.client.xAck(stream_name, group_name, messageData.id);