updates for testmode

This commit is contained in:
Emily
2025-01-29 17:14:10 +01:00
parent bfeee8673c
commit a2e4ed9ee0
22 changed files with 343 additions and 61 deletions

1
producer/.gitignore vendored
View File

@@ -1,6 +1,7 @@
node_modules
static
ecosystem.config.cjs
ecosystem.config.js
dist
start_dev.js
package-lock.json

View File

@@ -1,7 +1,8 @@
{
"dependencies": {
"cors": "^2.8.5",
"express": "^4.19.2"
"express": "^4.19.2",
"redis": "^4.7.0"
},
"devDependencies": {
"@types/cors": "^2.8.17",
@@ -19,7 +20,9 @@
"build_project": "node ../scripts/build.js",
"build": "npm run compile && npm run build_project",
"docker-build": "docker build -t litlyx-producer -f Dockerfile ../",
"docker-inspect": "docker run -it litlyx-producer sh"
"docker-inspect": "docker run -it litlyx-producer sh",
"workspace:shared": "ts-node ../scripts/producer/shared.ts",
"workspace:deploy": "ts-node ../scripts/producer/deploy.ts"
},
"keywords": [],
"author": "Emily",

View File

@@ -14,6 +14,9 @@ importers:
express:
specifier: ^4.19.2
version: 4.19.2
redis:
specifier: ^4.7.0
version: 4.7.0
devDependencies:
'@types/cors':
specifier: ^2.8.17
@@ -47,6 +50,35 @@ packages:
'@jridgewell/trace-mapping@0.3.9':
resolution: {integrity: sha512-3Belt6tdc8bPgAtbcmdtNJlirVoTmEb5e2gC94PnkwEW9jI6CAHUeoG85tjWP5WquqfavoMtMwiG4P926ZKKuQ==}
'@redis/bloom@1.2.0':
resolution: {integrity: sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==}
peerDependencies:
'@redis/client': ^1.0.0
'@redis/client@1.6.0':
resolution: {integrity: sha512-aR0uffYI700OEEH4gYnitAnv3vzVGXCFvYfdpu/CJKvk4pHfLPEy/JSZyrpQ+15WhXe1yJRXLtfQ84s4mEXnPg==}
engines: {node: '>=14'}
'@redis/graph@1.1.1':
resolution: {integrity: sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==}
peerDependencies:
'@redis/client': ^1.0.0
'@redis/json@1.0.7':
resolution: {integrity: sha512-6UyXfjVaTBTJtKNG4/9Z8PSpKE6XgSyEb8iwaqDcy+uKrd/DGYHTWkUdnQDyzm727V7p21WUMhsqz5oy65kPcQ==}
peerDependencies:
'@redis/client': ^1.0.0
'@redis/search@1.2.0':
resolution: {integrity: sha512-tYoDBbtqOVigEDMAcTGsRlMycIIjwMCgD8eR2t0NANeQmgK/lvxNAvYyb6bZDD4frHRhIHkJu2TBRvB0ERkOmw==}
peerDependencies:
'@redis/client': ^1.0.0
'@redis/time-series@1.1.0':
resolution: {integrity: sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g==}
peerDependencies:
'@redis/client': ^1.0.0
'@tsconfig/node10@1.0.11':
resolution: {integrity: sha512-DcRjDCujK/kCk/cUe8Xz8ZSpm8mS3mNNpta+jGCA6USEDfktlNvm1+IuZ9eTcDbNk41BHwpHHeW+N1lKCz4zOw==}
@@ -126,6 +158,10 @@ packages:
resolution: {integrity: sha512-GHTSNSYICQ7scH7sZ+M2rFopRoLh8t2bLSW6BbgrtLsahOIB5iyAVJf9GjWK3cYTDaMj4XdBpM1cA6pIS0Kv2w==}
engines: {node: '>= 0.4'}
cluster-key-slot@1.1.2:
resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==}
engines: {node: '>=0.10.0'}
content-disposition@0.5.4:
resolution: {integrity: sha512-FveZTNuGw04cxlAiWbzi6zTAL/lhehaWbTtgluJh4/E95DqMwTmha3KZN1aAWA8cFIhHzMZUvLevkw5Rqk+tSQ==}
engines: {node: '>= 0.6'}
@@ -213,6 +249,10 @@ packages:
function-bind@1.1.2:
resolution: {integrity: sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==}
generic-pool@3.9.0:
resolution: {integrity: sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==}
engines: {node: '>= 4'}
get-intrinsic@1.2.4:
resolution: {integrity: sha512-5uYhsJH8VJBTv7oslg4BznJYhDoRI6waYCxMmCdnTrcCrHA/fCFKoTFz2JKKE0HdDFUF7/oQuhzumXJK7paBRQ==}
engines: {node: '>= 0.4'}
@@ -321,6 +361,9 @@ packages:
resolution: {integrity: sha512-8zGqypfENjCIqGhgXToC8aB2r7YrBX+AQAfIPs/Mlk+BtPTztOvTS01NRW/3Eh60J+a48lt8qsCzirQ6loCVfA==}
engines: {node: '>= 0.8'}
redis@4.7.0:
resolution: {integrity: sha512-zvmkHEAdGMn+hMRXuMBtu4Vo5P6rHQjLoHftu+lBqq8ZTA3RCVC/WzD790bkKKiNFp7d5/9PcSD19fJyyRvOdQ==}
safe-buffer@5.2.1:
resolution: {integrity: sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==}
@@ -395,6 +438,9 @@ packages:
resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==}
engines: {node: '>= 0.8'}
yallist@4.0.0:
resolution: {integrity: sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==}
yn@3.1.1:
resolution: {integrity: sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==}
engines: {node: '>=6'}
@@ -414,6 +460,32 @@ snapshots:
'@jridgewell/resolve-uri': 3.1.2
'@jridgewell/sourcemap-codec': 1.4.15
'@redis/bloom@1.2.0(@redis/client@1.6.0)':
dependencies:
'@redis/client': 1.6.0
'@redis/client@1.6.0':
dependencies:
cluster-key-slot: 1.1.2
generic-pool: 3.9.0
yallist: 4.0.0
'@redis/graph@1.1.1(@redis/client@1.6.0)':
dependencies:
'@redis/client': 1.6.0
'@redis/json@1.0.7(@redis/client@1.6.0)':
dependencies:
'@redis/client': 1.6.0
'@redis/search@1.2.0(@redis/client@1.6.0)':
dependencies:
'@redis/client': 1.6.0
'@redis/time-series@1.1.0(@redis/client@1.6.0)':
dependencies:
'@redis/client': 1.6.0
'@tsconfig/node10@1.0.11': {}
'@tsconfig/node12@1.0.11': {}
@@ -514,6 +586,8 @@ snapshots:
get-intrinsic: 1.2.4
set-function-length: 1.2.2
cluster-key-slot@1.1.2: {}
content-disposition@0.5.4:
dependencies:
safe-buffer: 5.2.1
@@ -615,6 +689,8 @@ snapshots:
function-bind@1.1.2: {}
generic-pool@3.9.0: {}
get-intrinsic@1.2.4:
dependencies:
es-errors: 1.3.0
@@ -707,6 +783,15 @@ snapshots:
iconv-lite: 0.4.24
unpipe: 1.0.0
redis@4.7.0:
dependencies:
'@redis/bloom': 1.2.0(@redis/client@1.6.0)
'@redis/client': 1.6.0
'@redis/graph': 1.1.1(@redis/client@1.6.0)
'@redis/json': 1.0.7(@redis/client@1.6.0)
'@redis/search': 1.2.0(@redis/client@1.6.0)
'@redis/time-series': 1.1.0(@redis/client@1.6.0)
safe-buffer@5.2.1: {}
safer-buffer@2.1.2: {}
@@ -795,4 +880,6 @@ snapshots:
vary@1.1.2: {}
yallist@4.0.0: {}
yn@3.1.1: {}

View File

@@ -1,7 +1,7 @@
import { Router, json } from "express";
import { createSessionHash, getIPFromRequest } from "./utils";
import { requireEnv } from "@utils/requireEnv";
import { RedisStreamService } from "@services/RedisStreamService";
import { requireEnv } from "./shared/utils/requireEnv";
import { RedisStreamService } from "./shared/services/RedisStreamService";
const router = Router();

View File

@@ -1,5 +1,5 @@
import { requireEnv } from "@utils/requireEnv";
import { RedisStreamService } from "@services/RedisStreamService";
import { requireEnv } from "./shared/utils/requireEnv";
import { RedisStreamService } from "./shared/services/RedisStreamService";
import express from 'express';
import cors from 'cors';
@@ -21,9 +21,9 @@ app.post('/event', express.json(jsonOptions), async (req, res) => {
const ip = getIPFromRequest(req);
const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent);
const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent);
await RedisStreamService.addToStream(streamName, {
await RedisStreamService.addToStream(streamName, {
...req.body, _type: 'event', sessionHash, ip, flowHash
});
});
return res.sendStatus(200);
} catch (ex: any) {
return res.status(500).json({ error: ex.message });
@@ -59,8 +59,9 @@ app.post('/keep_alive', express.json(jsonOptions), async (req, res) => {
});
async function main() {
const PORT = requireEnv("PORT");
await RedisStreamService.connect();
app.listen(requireEnv("PORT"), () => console.log(`Listening on port ${requireEnv("PORT")}`));
app.listen(PORT, () => console.log(`Listening on port ${PORT}`));
}
main();

View File

@@ -0,0 +1,90 @@
import { createClient } from 'redis';
import { requireEnv } from '../utils/requireEnv';
export type ReadingLoopOptions = {
stream_name: string,
group_name: ConsumerGroup,
consumer_name: string
}
type xReadGroupMessage = { id: string, message: { [x: string]: string } }
type xReadGgroupResult = { name: string, messages: xReadGroupMessage[] }[] | null
const consumerGroups = ['DATABASE'] as const;
type ConsumerGroup = typeof consumerGroups[number];
export class RedisStreamService {
private static client = createClient({
url: requireEnv("REDIS_URL"),
username: requireEnv("REDIS_USERNAME"),
password: requireEnv("REDIS_PASSWORD"),
database: process.env.DEV_MODE === 'true' ? 1 : 0
});
static async connect() {
await this.client.connect();
}
static async readFromStream(stream_name: string, group_name: string, consumer_name: string, process_function: (content: Record<string, string>) => Promise<any>) {
const result: xReadGgroupResult = await this.client.xReadGroup(group_name, consumer_name, [{ key: stream_name, id: '>' }], { COUNT: 5, BLOCK: 10000 });
if (!result) {
setTimeout(() => this.readFromStream(stream_name, group_name, consumer_name, process_function), 10);
return;
}
for (const entry of result) {
for (const messageData of entry.messages) {
await process_function(messageData.message);
await this.client.xAck(stream_name, group_name, messageData.id);
await this.client.set(`ACK:${group_name}`, messageData.id);
}
}
await this.trimStream(stream_name);
setTimeout(() => this.readFromStream(stream_name, group_name, consumer_name, process_function), 10);
return;
}
private static async trimStream(stream_name: string) {
let lastMessageAck = '0';
for (const consumerGroup of consumerGroups) {
const lastAck = await this.client.get(`ACK:${consumerGroup}`);
if (!lastAck) continue;
if (lastAck > lastMessageAck) lastMessageAck = lastAck;
}
await this.client.xTrim(stream_name, 'MINID', lastMessageAck as any);
}
static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record<string, string>) => Promise<any>) {
if (!consumerGroups.includes(options.group_name)) return console.error('GROUP NAME NOT ALLOWED');
console.log('Start reading loop')
try {
await this.client.xGroupCreate(options.stream_name, options.group_name, '0', { MKSTREAM: true });
} catch (ex) {
console.log('Group', options.group_name, 'already exist');
}
this.readFromStream(options.stream_name, options.group_name, options.consumer_name, processFunction);
}
static async addToStream(streamName: string, data: Record<string, string>) {
const result = await this.client.xAdd(streamName, "*", { ...data, timestamp: Date.now().toString() });
return result;
}
}

View File

@@ -0,0 +1,9 @@
export function requireEnv(name: string, errorMessage?: string) {
if (!process.env[name]) {
console.error(errorMessage || `ENV variable ${name} is required`);
return process.exit(1);
}
console.log('requireEnv', name, process.env[name]);
return process.env[name] as string;
}