rewrite consumer + testmode utils

This commit is contained in:
Emily
2025-02-01 15:26:26 +01:00
parent 4da840f2ec
commit 0963201a32
12 changed files with 135 additions and 40 deletions

View File

@@ -19,8 +19,7 @@
"scripts": { "scripts": {
"dev": "node scripts/start_dev.js", "dev": "node scripts/start_dev.js",
"compile": "tsc", "compile": "tsc",
"build_project": "node ../scripts/build.js", "build": "npm run compile && npm run create_db",
"build": "npm run compile && npm run build_project && npm run create_db",
"create_db": "cd scripts && ts-node create_database.ts", "create_db": "cd scripts && ts-node create_database.ts",
"docker-build": "docker build -t litlyx-consumer -f Dockerfile ../", "docker-build": "docker build -t litlyx-consumer -f Dockerfile ../",
"docker-inspect": "docker run -it litlyx-consumer sh", "docker-inspect": "docker run -it litlyx-consumer sh",

View File

@@ -2,7 +2,6 @@ import { ProjectModel } from "./shared/schema/project/ProjectSchema";
import { UserModel } from "./shared/schema/UserSchema"; import { UserModel } from "./shared/schema/UserSchema";
import { LimitNotifyModel } from "./shared/schema/broker/LimitNotifySchema"; import { LimitNotifyModel } from "./shared/schema/broker/LimitNotifySchema";
import { EmailService } from './shared/services/EmailService'; import { EmailService } from './shared/services/EmailService';
import { requireEnv } from "./shared/utils/requireEnv";
import { TProjectLimit } from "./shared/schema/project/ProjectsLimits"; import { TProjectLimit } from "./shared/schema/project/ProjectsLimits";
import { EmailServiceHelper } from "./EmailServiceHelper"; import { EmailServiceHelper } from "./EmailServiceHelper";

View File

@@ -5,26 +5,6 @@ import { requireEnv } from './shared/utils/requireEnv';
const stream_name = requireEnv('STREAM_NAME'); const stream_name = requireEnv('STREAM_NAME');
export class MetricsManager {
private static processTime = new Map<string, number[]>();
static onProcess(id: string, time: number) {
const target = this.processTime.get(id);
if (!target) {
this.processTime.set(id, [time]);
} else {
target.push(time);
if (target.length > 1000) target.splice(0, target.length - 1000);
}
}
static get() {
return Array.from(this.processTime.entries());
}
}
export const metricsRouter = Router(); export const metricsRouter = Router();
metricsRouter.get('/queue', async (req, res) => { metricsRouter.get('/queue', async (req, res) => {
@@ -39,7 +19,7 @@ metricsRouter.get('/queue', async (req, res) => {
metricsRouter.get('/durations', async (req, res) => { metricsRouter.get('/durations', async (req, res) => {
try { try {
const durations = MetricsManager.get(); const durations = RedisStreamService.METRICS_get()
res.json({ durations }); res.json({ durations });
} catch (ex) { } catch (ex) {
console.error(ex); console.error(ex);

View File

@@ -13,14 +13,14 @@ import express from 'express';
import { ProjectLimitModel } from './shared/schema/project/ProjectsLimits'; import { ProjectLimitModel } from './shared/schema/project/ProjectsLimits';
import { ProjectCountModel } from './shared/schema/project/ProjectsCounts'; import { ProjectCountModel } from './shared/schema/project/ProjectsCounts';
import { MetricsManager, metricsRouter } from './Metrics'; import { metricsRouter } from './Metrics';
const app = express(); const app = express();
app.use('/metrics', metricsRouter); app.use('/metrics', metricsRouter);
app.listen(process.env.PORT); app.listen(process.env.PORT, () => console.log(`Listening on port ${process.env.PORT}`));
connectDatabase(requireEnv('MONGO_CONNECTION_STRING')); connectDatabase(requireEnv('MONGO_CONNECTION_STRING'));
main(); main();
@@ -47,15 +47,15 @@ async function processStreamEntry(data: Record<string, string>) {
try { try {
const eventType = data._type; const eventType = data._type;
if (!eventType) return; if (!eventType) return console.log('No type');
const { pid, sessionHash } = data; const { pid, sessionHash } = data;
const project = await ProjectModel.exists({ _id: pid }); const project = await ProjectModel.exists({ _id: pid });
if (!project) return; if (!project) return console.log('No project');
const canLog = await checkLimits(pid); const canLog = await checkLimits(pid);
if (!canLog) return; if (!canLog) return console.log('No limits');
if (eventType === 'event') { if (eventType === 'event') {
await process_event(data, sessionHash); await process_event(data, sessionHash);
@@ -71,7 +71,7 @@ async function processStreamEntry(data: Record<string, string>) {
const duration = Date.now() - start; const duration = Date.now() - start;
MetricsManager.onProcess(CONSUMER_NAME, duration); RedisStreamService.METRICS_onProcess(CONSUMER_NAME, duration);
} }

View File

@@ -56,6 +56,7 @@ export default defineNuxtConfig({
STRIPE_WH_SECRET_TEST: process.env.STRIPE_WH_SECRET_TEST, STRIPE_WH_SECRET_TEST: process.env.STRIPE_WH_SECRET_TEST,
NOAUTH_USER_EMAIL: process.env.NOAUTH_USER_EMAIL, NOAUTH_USER_EMAIL: process.env.NOAUTH_USER_EMAIL,
NOAUTH_USER_NAME: process.env.NOAUTH_USER_NAME, NOAUTH_USER_NAME: process.env.NOAUTH_USER_NAME,
MODE: process.env.MODE || 'NONE',
SELFHOSTED: process.env.SELFHOSTED || 'FALSE', SELFHOSTED: process.env.SELFHOSTED || 'FALSE',
public: { public: {
AUTH_MODE: process.env.AUTH_MODE, AUTH_MODE: process.env.AUTH_MODE,

View File

@@ -14,7 +14,8 @@ export default async () => {
logger.info('[SERVER] Initializing'); logger.info('[SERVER] Initializing');
if (config.STRIPE_SECRET) { if (config.STRIPE_SECRET) {
StripeService.init(config.STRIPE_SECRET, config.STRIPE_WH_SECRET, false); const TEST_MODE = config.MODE === 'TEST';
StripeService.init(config.STRIPE_SECRET, config.STRIPE_WH_SECRET, TEST_MODE);
logger.info('[STRIPE] Initialized'); logger.info('[STRIPE] Initialized');
} else { } else {
StripeService.disable(); StripeService.disable();

View File

@@ -17,8 +17,7 @@
"scripts": { "scripts": {
"dev": "node scripts/start_dev.js", "dev": "node scripts/start_dev.js",
"compile": "tsc", "compile": "tsc",
"build_project": "node ../scripts/build.js", "build": "npm run compile",
"build": "npm run compile && npm run build_project",
"docker-build": "docker build -t litlyx-producer -f Dockerfile ../", "docker-build": "docker build -t litlyx-producer -f Dockerfile ../",
"docker-inspect": "docker run -it litlyx-producer sh", "docker-inspect": "docker run -it litlyx-producer sh",
"workspace:shared": "ts-node ../scripts/producer/shared.ts", "workspace:shared": "ts-node ../scripts/producer/shared.ts",

View File

@@ -16,7 +16,8 @@ router.post('/keep_alive', json(jsonOptions), async (req, res) => {
const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent);
await RedisStreamService.addToStream(streamName, { await RedisStreamService.addToStream(streamName, {
...req.body, _type: 'keep_alive', sessionHash, ip, ...req.body, _type: 'keep_alive', sessionHash, ip,
instant: req.body.instant + '' instant: req.body.instant + '',
timestamp: Date.now()
}); });
return res.sendStatus(200); return res.sendStatus(200);
} catch (ex: any) { } catch (ex: any) {
@@ -38,12 +39,14 @@ router.post('/metrics/push', json(jsonOptions), async (req, res) => {
...req.body, _type: 'visit', sessionHash, ip, ...req.body, _type: 'visit', sessionHash, ip,
screenWidth: '0', screenWidth: '0',
screenHeight: '0', screenHeight: '0',
type: req.body.type.toString() type: req.body.type.toString(),
timestamp: Date.now()
}); });
} else { } else {
await RedisStreamService.addToStream(streamName, { await RedisStreamService.addToStream(streamName, {
...req.body, _type: 'event', sessionHash, ip, ...req.body, _type: 'event', sessionHash, ip,
type: req.body.type.toString() type: req.body.type.toString(),
timestamp: Date.now()
}); });
} }

View File

@@ -22,7 +22,8 @@ app.post('/event', express.json(jsonOptions), async (req, res) => {
const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent);
const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent);
await RedisStreamService.addToStream(streamName, { await RedisStreamService.addToStream(streamName, {
...req.body, _type: 'event', sessionHash, ip, flowHash ...req.body, _type: 'event', sessionHash, ip, flowHash,
timestamp: Date.now()
}); });
return res.sendStatus(200); return res.sendStatus(200);
} catch (ex: any) { } catch (ex: any) {
@@ -35,7 +36,7 @@ app.post('/visit', express.json(jsonOptions), async (req, res) => {
const ip = getIPFromRequest(req); const ip = getIPFromRequest(req);
const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent);
const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent);
await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'visit', sessionHash, ip, flowHash }); await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'visit', sessionHash, ip, flowHash, timestamp: Date.now() });
return res.sendStatus(200); return res.sendStatus(200);
} catch (ex: any) { } catch (ex: any) {
return res.status(500).json({ error: ex.message }); return res.status(500).json({ error: ex.message });
@@ -50,7 +51,7 @@ app.post('/keep_alive', express.json(jsonOptions), async (req, res) => {
await RedisStreamService.addToStream(streamName, { await RedisStreamService.addToStream(streamName, {
...req.body, _type: 'keep_alive', sessionHash, ip, ...req.body, _type: 'keep_alive', sessionHash, ip,
instant: req.body.instant + '', instant: req.body.instant + '',
flowHash flowHash, timestamp: Date.now()
}); });
return res.sendStatus(200); return res.sendStatus(200);
} catch (ex: any) { } catch (ex: any) {

View File

@@ -0,0 +1,87 @@
import fs from 'fs-extra';
import path from 'path';
import child from 'child_process';
import { createZip } from '../helpers/zip-helper';
import { DeployHelper } from '../helpers/deploy-helper';
import { DATABASE_CONNECTION_STRING_PRODUCTION, DATABASE_CONNECTION_STRING_TESTMODE, REMOTE_HOST_TESTMODE } from '../.config';
const TMP_PATH = path.join(__dirname, '../../tmp');
const LOCAL_PATH = path.join(__dirname, '../../consumer');
const REMOTE_PATH = '/home/litlyx/consumer';
const ZIP_NAME = 'consumer.zip';
const MODE = DeployHelper.getMode();
const SKIP_BUILD = DeployHelper.getArgAt(0) == '--no-build';
console.log('Deploying consumer in mode:', MODE);
setTimeout(() => { main(); }, 3000);
async function main() {
if (fs.existsSync(TMP_PATH)) fs.rmSync(TMP_PATH, { force: true, recursive: true });
fs.ensureDirSync(TMP_PATH);
if (!SKIP_BUILD) {
console.log('Building');
child.execSync(`cd ${LOCAL_PATH} && pnpm run build`);
}
console.log('Creting zip file');
const archive = createZip(TMP_PATH + '/' + ZIP_NAME);
archive.directory(LOCAL_PATH + '/dist', '/dist');
if (MODE === 'testmode') {
const ecosystemContent = fs.readFileSync(LOCAL_PATH + '/ecosystem.config.js', 'utf8');
const REDIS_URL = ecosystemContent.match(/REDIS_URL: ["'](.*?)["']/)[1];
const devContent = ecosystemContent
.replace(REDIS_URL, `redis://${REMOTE_HOST_TESTMODE}`)
.replace(DATABASE_CONNECTION_STRING_PRODUCTION, `redis://${DATABASE_CONNECTION_STRING_TESTMODE}`);
archive.append(Buffer.from(devContent), { name: '/ecosystem.config.js' });
} else {
archive.file(LOCAL_PATH + '/ecosystem.config.js', { name: '/ecosystem.config.js' })
}
archive.file(LOCAL_PATH + '/package.json', { name: '/package.json' });
archive.file(LOCAL_PATH + '/pnpm-lock.yaml', { name: '/pnpm-lock.yaml' });
await archive.finalize();
await DeployHelper.connect();
const { scp, ssh } = DeployHelper.instances();
console.log('Creating remote structure');
console.log('Check existing');
const remoteExist = await scp.exists(REMOTE_PATH);
console.log('Exist', remoteExist);
if (remoteExist) {
console.log('Deleting');
await DeployHelper.execute(`rm -r ${REMOTE_PATH}`);
}
console.log('Creating folder');
await scp.mkdir(REMOTE_PATH);
console.log('Uploading zip file');
await scp.uploadFile(TMP_PATH + '/' + ZIP_NAME, REMOTE_PATH + '/' + ZIP_NAME);
scp.close();
console.log('Cleaning local');
fs.rmSync(TMP_PATH + '/' + ZIP_NAME, { force: true, recursive: true });
console.log('Extracting remote');
await DeployHelper.execute(`cd ${REMOTE_PATH} && unzip ${ZIP_NAME} && rm -r ${ZIP_NAME}`);
console.log('Installing remote');
await DeployHelper.execute(`cd ${REMOTE_PATH} && /root/.nvm/versions/node/v21.2.0/bin/pnpm i`);
console.log('Executing remote');
await DeployHelper.execute(`cd ${REMOTE_PATH} && /root/.nvm/versions/node/v21.2.0/bin/pm2 start ecosystem.config.js`);
ssh.dispose();
}

View File

@@ -1,6 +1,7 @@
import fs from 'fs-extra'; import fs from 'fs-extra';
import path from 'path'; import path from 'path';
import child from 'child_process';
import { createZip } from '../helpers/zip-helper'; import { createZip } from '../helpers/zip-helper';
import { DeployHelper } from '../helpers/deploy-helper'; import { DeployHelper } from '../helpers/deploy-helper';
import { REMOTE_HOST_TESTMODE } from '../.config'; import { REMOTE_HOST_TESTMODE } from '../.config';
@@ -11,6 +12,8 @@ const REMOTE_PATH = '/home/litlyx/producer';
const ZIP_NAME = 'producer.zip'; const ZIP_NAME = 'producer.zip';
const MODE = DeployHelper.getMode(); const MODE = DeployHelper.getMode();
const SKIP_BUILD = DeployHelper.getArgAt(0) == '--no-build';
console.log('Deploying producer in mode:', MODE); console.log('Deploying producer in mode:', MODE);
setTimeout(() => { main(); }, 3000); setTimeout(() => { main(); }, 3000);
@@ -20,6 +23,13 @@ async function main() {
if (fs.existsSync(TMP_PATH)) fs.rmSync(TMP_PATH, { force: true, recursive: true }); if (fs.existsSync(TMP_PATH)) fs.rmSync(TMP_PATH, { force: true, recursive: true });
fs.ensureDirSync(TMP_PATH); fs.ensureDirSync(TMP_PATH);
if (!SKIP_BUILD) {
console.log('Building');
child.execSync(`cd ${LOCAL_PATH} && pnpm run build`);
}
console.log('Creting zip file'); console.log('Creting zip file');
const archive = createZip(TMP_PATH + '/' + ZIP_NAME); const archive = createZip(TMP_PATH + '/' + ZIP_NAME);
archive.directory(LOCAL_PATH + '/dist', '/dist'); archive.directory(LOCAL_PATH + '/dist', '/dist');

View File

@@ -7,7 +7,6 @@ export type ReadingLoopOptions = {
consumer_name: string consumer_name: string
} }
type xReadGroupMessage = { id: string, message: { [x: string]: string } } type xReadGroupMessage = { id: string, message: { [x: string]: string } }
type xReadGgroupResult = { name: string, messages: xReadGroupMessage[] }[] | null type xReadGgroupResult = { name: string, messages: xReadGroupMessage[] }[] | null
@@ -25,6 +24,22 @@ export class RedisStreamService {
database: process.env.DEV_MODE === 'true' ? 1 : 0 database: process.env.DEV_MODE === 'true' ? 1 : 0
}); });
private static METRICS_MAX_ENTRIES = 1000;
static async METRICS_onProcess(id: string, time: number) {
const key = `___dev_metrics`;
await this.client.lPush(key, `${id}:${time.toString()}`);
await this.client.lTrim(key, 0, this.METRICS_MAX_ENTRIES - 1);
}
static async METRICS_get() {
const key = `___dev_metrics`;
const data = await this.client.lRange(key, 0, -1);
return data.map(e => e.split(':')) as [string, string][];
}
static async connect() { static async connect() {
await this.client.connect(); await this.client.connect();
} }