mirror of
https://github.com/Litlyx/litlyx
synced 2025-12-09 23:48:36 +01:00
remove shared
This commit is contained in:
3
producer/.gitignore
vendored
3
producer/.gitignore
vendored
@@ -5,4 +5,5 @@ ecosystem.config.js
|
|||||||
dist
|
dist
|
||||||
start_dev.js
|
start_dev.js
|
||||||
package-lock.json
|
package-lock.json
|
||||||
build_all.bat
|
build_all.bat
|
||||||
|
src/shared
|
||||||
@@ -1,90 +0,0 @@
|
|||||||
import { createClient } from 'redis';
|
|
||||||
import { requireEnv } from '../utils/requireEnv';
|
|
||||||
|
|
||||||
export type ReadingLoopOptions = {
|
|
||||||
stream_name: string,
|
|
||||||
group_name: ConsumerGroup,
|
|
||||||
consumer_name: string
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
type xReadGroupMessage = { id: string, message: { [x: string]: string } }
|
|
||||||
type xReadGgroupResult = { name: string, messages: xReadGroupMessage[] }[] | null
|
|
||||||
|
|
||||||
const consumerGroups = ['DATABASE'] as const;
|
|
||||||
|
|
||||||
type ConsumerGroup = typeof consumerGroups[number];
|
|
||||||
|
|
||||||
|
|
||||||
export class RedisStreamService {
|
|
||||||
|
|
||||||
private static client = createClient({
|
|
||||||
url: requireEnv("REDIS_URL"),
|
|
||||||
username: requireEnv("REDIS_USERNAME"),
|
|
||||||
password: requireEnv("REDIS_PASSWORD"),
|
|
||||||
database: process.env.DEV_MODE === 'true' ? 1 : 0
|
|
||||||
});
|
|
||||||
|
|
||||||
static async connect() {
|
|
||||||
await this.client.connect();
|
|
||||||
}
|
|
||||||
|
|
||||||
static async readFromStream(stream_name: string, group_name: string, consumer_name: string, process_function: (content: Record<string, string>) => Promise<any>) {
|
|
||||||
|
|
||||||
const result: xReadGgroupResult = await this.client.xReadGroup(group_name, consumer_name, [{ key: stream_name, id: '>' }], { COUNT: 5, BLOCK: 10000 });
|
|
||||||
|
|
||||||
if (!result) {
|
|
||||||
setTimeout(() => this.readFromStream(stream_name, group_name, consumer_name, process_function), 10);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const entry of result) {
|
|
||||||
for (const messageData of entry.messages) {
|
|
||||||
await process_function(messageData.message);
|
|
||||||
await this.client.xAck(stream_name, group_name, messageData.id);
|
|
||||||
await this.client.set(`ACK:${group_name}`, messageData.id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.trimStream(stream_name);
|
|
||||||
|
|
||||||
setTimeout(() => this.readFromStream(stream_name, group_name, consumer_name, process_function), 10);
|
|
||||||
return;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static async trimStream(stream_name: string) {
|
|
||||||
|
|
||||||
let lastMessageAck = '0';
|
|
||||||
|
|
||||||
for (const consumerGroup of consumerGroups) {
|
|
||||||
const lastAck = await this.client.get(`ACK:${consumerGroup}`);
|
|
||||||
if (!lastAck) continue;
|
|
||||||
if (lastAck > lastMessageAck) lastMessageAck = lastAck;
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.client.xTrim(stream_name, 'MINID', lastMessageAck as any);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) {
|
|
||||||
|
|
||||||
if (!consumerGroups.includes(options.group_name)) return console.error('GROUP NAME NOT ALLOWED');
|
|
||||||
|
|
||||||
console.log('Start reading loop')
|
|
||||||
|
|
||||||
try {
|
|
||||||
await this.client.xGroupCreate(options.stream_name, options.group_name, '0', { MKSTREAM: true });
|
|
||||||
} catch (ex) {
|
|
||||||
console.log('Group', options.group_name, 'already exist');
|
|
||||||
}
|
|
||||||
|
|
||||||
this.readFromStream(options.stream_name, options.group_name, options.consumer_name, processFunction);
|
|
||||||
}
|
|
||||||
|
|
||||||
static async addToStream(streamName: string, data: Record<string, string>) {
|
|
||||||
const result = await this.client.xAdd(streamName, "*", { ...data, timestamp: Date.now().toString() });
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,9 +0,0 @@
|
|||||||
|
|
||||||
export function requireEnv(name: string, errorMessage?: string) {
|
|
||||||
if (!process.env[name]) {
|
|
||||||
console.error(errorMessage || `ENV variable ${name} is required`);
|
|
||||||
return process.exit(1);
|
|
||||||
}
|
|
||||||
console.log('requireEnv', name, process.env[name]);
|
|
||||||
return process.env[name] as string;
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user