From 0963201a32fb7fb4f75236230012394d4d45336e Mon Sep 17 00:00:00 2001 From: Emily Date: Sat, 1 Feb 2025 15:26:26 +0100 Subject: [PATCH] rewrite consumer + testmode utils --- consumer/package.json | 3 +- consumer/src/EmailController.ts | 1 - consumer/src/Metrics.ts | 22 +---- consumer/src/index.ts | 12 +-- dashboard/nuxt.config.ts | 1 + dashboard/server/init.ts | 3 +- producer/package.json | 3 +- producer/src/deprecated.ts | 9 +- producer/src/index.ts | 7 +- scripts/consumer/deploy.ts | 87 ++++++++++++++++++++ scripts/producer/deploy.ts | 10 +++ shared_global/services/RedisStreamService.ts | 17 +++- 12 files changed, 135 insertions(+), 40 deletions(-) diff --git a/consumer/package.json b/consumer/package.json index 4212f42..88d7cf6 100644 --- a/consumer/package.json +++ b/consumer/package.json @@ -19,8 +19,7 @@ "scripts": { "dev": "node scripts/start_dev.js", "compile": "tsc", - "build_project": "node ../scripts/build.js", - "build": "npm run compile && npm run build_project && npm run create_db", + "build": "npm run compile && npm run create_db", "create_db": "cd scripts && ts-node create_database.ts", "docker-build": "docker build -t litlyx-consumer -f Dockerfile ../", "docker-inspect": "docker run -it litlyx-consumer sh", diff --git a/consumer/src/EmailController.ts b/consumer/src/EmailController.ts index f576f1a..03bb4e4 100644 --- a/consumer/src/EmailController.ts +++ b/consumer/src/EmailController.ts @@ -2,7 +2,6 @@ import { ProjectModel } from "./shared/schema/project/ProjectSchema"; import { UserModel } from "./shared/schema/UserSchema"; import { LimitNotifyModel } from "./shared/schema/broker/LimitNotifySchema"; import { EmailService } from './shared/services/EmailService'; -import { requireEnv } from "./shared/utils/requireEnv"; import { TProjectLimit } from "./shared/schema/project/ProjectsLimits"; import { EmailServiceHelper } from "./EmailServiceHelper"; diff --git a/consumer/src/Metrics.ts b/consumer/src/Metrics.ts index 61a77b6..ca78c63 100644 --- a/consumer/src/Metrics.ts +++ b/consumer/src/Metrics.ts @@ -5,26 +5,6 @@ import { requireEnv } from './shared/utils/requireEnv'; const stream_name = requireEnv('STREAM_NAME'); -export class MetricsManager { - - private static processTime = new Map(); - - static onProcess(id: string, time: number) { - const target = this.processTime.get(id); - if (!target) { - this.processTime.set(id, [time]); - } else { - target.push(time); - if (target.length > 1000) target.splice(0, target.length - 1000); - } - } - - static get() { - return Array.from(this.processTime.entries()); - } - -} - export const metricsRouter = Router(); metricsRouter.get('/queue', async (req, res) => { @@ -39,7 +19,7 @@ metricsRouter.get('/queue', async (req, res) => { metricsRouter.get('/durations', async (req, res) => { try { - const durations = MetricsManager.get(); + const durations = RedisStreamService.METRICS_get() res.json({ durations }); } catch (ex) { console.error(ex); diff --git a/consumer/src/index.ts b/consumer/src/index.ts index 7c98428..24bdd63 100644 --- a/consumer/src/index.ts +++ b/consumer/src/index.ts @@ -13,14 +13,14 @@ import express from 'express'; import { ProjectLimitModel } from './shared/schema/project/ProjectsLimits'; import { ProjectCountModel } from './shared/schema/project/ProjectsCounts'; -import { MetricsManager, metricsRouter } from './Metrics'; +import { metricsRouter } from './Metrics'; const app = express(); app.use('/metrics', metricsRouter); -app.listen(process.env.PORT); +app.listen(process.env.PORT, () => console.log(`Listening on port ${process.env.PORT}`)); connectDatabase(requireEnv('MONGO_CONNECTION_STRING')); main(); @@ -47,15 +47,15 @@ async function processStreamEntry(data: Record) { try { const eventType = data._type; - if (!eventType) return; + if (!eventType) return console.log('No type'); const { pid, sessionHash } = data; const project = await ProjectModel.exists({ _id: pid }); - if (!project) return; + if (!project) return console.log('No project'); const canLog = await checkLimits(pid); - if (!canLog) return; + if (!canLog) return console.log('No limits'); if (eventType === 'event') { await process_event(data, sessionHash); @@ -71,7 +71,7 @@ async function processStreamEntry(data: Record) { const duration = Date.now() - start; - MetricsManager.onProcess(CONSUMER_NAME, duration); + RedisStreamService.METRICS_onProcess(CONSUMER_NAME, duration); } diff --git a/dashboard/nuxt.config.ts b/dashboard/nuxt.config.ts index 2fdb91f..a96796c 100644 --- a/dashboard/nuxt.config.ts +++ b/dashboard/nuxt.config.ts @@ -56,6 +56,7 @@ export default defineNuxtConfig({ STRIPE_WH_SECRET_TEST: process.env.STRIPE_WH_SECRET_TEST, NOAUTH_USER_EMAIL: process.env.NOAUTH_USER_EMAIL, NOAUTH_USER_NAME: process.env.NOAUTH_USER_NAME, + MODE: process.env.MODE || 'NONE', SELFHOSTED: process.env.SELFHOSTED || 'FALSE', public: { AUTH_MODE: process.env.AUTH_MODE, diff --git a/dashboard/server/init.ts b/dashboard/server/init.ts index c1a1bd5..29c6662 100644 --- a/dashboard/server/init.ts +++ b/dashboard/server/init.ts @@ -14,7 +14,8 @@ export default async () => { logger.info('[SERVER] Initializing'); if (config.STRIPE_SECRET) { - StripeService.init(config.STRIPE_SECRET, config.STRIPE_WH_SECRET, false); + const TEST_MODE = config.MODE === 'TEST'; + StripeService.init(config.STRIPE_SECRET, config.STRIPE_WH_SECRET, TEST_MODE); logger.info('[STRIPE] Initialized'); } else { StripeService.disable(); diff --git a/producer/package.json b/producer/package.json index 1504016..1001e02 100644 --- a/producer/package.json +++ b/producer/package.json @@ -17,8 +17,7 @@ "scripts": { "dev": "node scripts/start_dev.js", "compile": "tsc", - "build_project": "node ../scripts/build.js", - "build": "npm run compile && npm run build_project", + "build": "npm run compile", "docker-build": "docker build -t litlyx-producer -f Dockerfile ../", "docker-inspect": "docker run -it litlyx-producer sh", "workspace:shared": "ts-node ../scripts/producer/shared.ts", diff --git a/producer/src/deprecated.ts b/producer/src/deprecated.ts index 920ebdf..7380242 100644 --- a/producer/src/deprecated.ts +++ b/producer/src/deprecated.ts @@ -16,7 +16,8 @@ router.post('/keep_alive', json(jsonOptions), async (req, res) => { const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'keep_alive', sessionHash, ip, - instant: req.body.instant + '' + instant: req.body.instant + '', + timestamp: Date.now() }); return res.sendStatus(200); } catch (ex: any) { @@ -38,12 +39,14 @@ router.post('/metrics/push', json(jsonOptions), async (req, res) => { ...req.body, _type: 'visit', sessionHash, ip, screenWidth: '0', screenHeight: '0', - type: req.body.type.toString() + type: req.body.type.toString(), + timestamp: Date.now() }); } else { await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'event', sessionHash, ip, - type: req.body.type.toString() + type: req.body.type.toString(), + timestamp: Date.now() }); } diff --git a/producer/src/index.ts b/producer/src/index.ts index 826ceef..88a70c5 100644 --- a/producer/src/index.ts +++ b/producer/src/index.ts @@ -22,7 +22,8 @@ app.post('/event', express.json(jsonOptions), async (req, res) => { const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); await RedisStreamService.addToStream(streamName, { - ...req.body, _type: 'event', sessionHash, ip, flowHash + ...req.body, _type: 'event', sessionHash, ip, flowHash, + timestamp: Date.now() }); return res.sendStatus(200); } catch (ex: any) { @@ -35,7 +36,7 @@ app.post('/visit', express.json(jsonOptions), async (req, res) => { const ip = getIPFromRequest(req); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); - await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'visit', sessionHash, ip, flowHash }); + await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'visit', sessionHash, ip, flowHash, timestamp: Date.now() }); return res.sendStatus(200); } catch (ex: any) { return res.status(500).json({ error: ex.message }); @@ -50,7 +51,7 @@ app.post('/keep_alive', express.json(jsonOptions), async (req, res) => { await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'keep_alive', sessionHash, ip, instant: req.body.instant + '', - flowHash + flowHash, timestamp: Date.now() }); return res.sendStatus(200); } catch (ex: any) { diff --git a/scripts/consumer/deploy.ts b/scripts/consumer/deploy.ts index e69de29..6faa3ed 100644 --- a/scripts/consumer/deploy.ts +++ b/scripts/consumer/deploy.ts @@ -0,0 +1,87 @@ + +import fs from 'fs-extra'; +import path from 'path'; +import child from 'child_process'; +import { createZip } from '../helpers/zip-helper'; +import { DeployHelper } from '../helpers/deploy-helper'; +import { DATABASE_CONNECTION_STRING_PRODUCTION, DATABASE_CONNECTION_STRING_TESTMODE, REMOTE_HOST_TESTMODE } from '../.config'; + +const TMP_PATH = path.join(__dirname, '../../tmp'); +const LOCAL_PATH = path.join(__dirname, '../../consumer'); +const REMOTE_PATH = '/home/litlyx/consumer'; +const ZIP_NAME = 'consumer.zip'; + +const MODE = DeployHelper.getMode(); +const SKIP_BUILD = DeployHelper.getArgAt(0) == '--no-build'; + +console.log('Deploying consumer in mode:', MODE); + +setTimeout(() => { main(); }, 3000); + +async function main() { + + if (fs.existsSync(TMP_PATH)) fs.rmSync(TMP_PATH, { force: true, recursive: true }); + fs.ensureDirSync(TMP_PATH); + + + if (!SKIP_BUILD) { + console.log('Building'); + child.execSync(`cd ${LOCAL_PATH} && pnpm run build`); + } + + + console.log('Creting zip file'); + const archive = createZip(TMP_PATH + '/' + ZIP_NAME); + archive.directory(LOCAL_PATH + '/dist', '/dist'); + + if (MODE === 'testmode') { + const ecosystemContent = fs.readFileSync(LOCAL_PATH + '/ecosystem.config.js', 'utf8'); + const REDIS_URL = ecosystemContent.match(/REDIS_URL: ["'](.*?)["']/)[1]; + const devContent = ecosystemContent + .replace(REDIS_URL, `redis://${REMOTE_HOST_TESTMODE}`) + .replace(DATABASE_CONNECTION_STRING_PRODUCTION, `redis://${DATABASE_CONNECTION_STRING_TESTMODE}`); + archive.append(Buffer.from(devContent), { name: '/ecosystem.config.js' }); + } else { + archive.file(LOCAL_PATH + '/ecosystem.config.js', { name: '/ecosystem.config.js' }) + } + + + archive.file(LOCAL_PATH + '/package.json', { name: '/package.json' }); + archive.file(LOCAL_PATH + '/pnpm-lock.yaml', { name: '/pnpm-lock.yaml' }); + await archive.finalize(); + + await DeployHelper.connect(); + + const { scp, ssh } = DeployHelper.instances(); + + console.log('Creating remote structure'); + console.log('Check existing'); + const remoteExist = await scp.exists(REMOTE_PATH); + console.log('Exist', remoteExist); + if (remoteExist) { + console.log('Deleting'); + await DeployHelper.execute(`rm -r ${REMOTE_PATH}`); + } + + console.log('Creating folder'); + await scp.mkdir(REMOTE_PATH); + + console.log('Uploading zip file'); + await scp.uploadFile(TMP_PATH + '/' + ZIP_NAME, REMOTE_PATH + '/' + ZIP_NAME); + scp.close(); + + console.log('Cleaning local'); + fs.rmSync(TMP_PATH + '/' + ZIP_NAME, { force: true, recursive: true }); + + console.log('Extracting remote'); + await DeployHelper.execute(`cd ${REMOTE_PATH} && unzip ${ZIP_NAME} && rm -r ${ZIP_NAME}`); + + console.log('Installing remote'); + await DeployHelper.execute(`cd ${REMOTE_PATH} && /root/.nvm/versions/node/v21.2.0/bin/pnpm i`); + + console.log('Executing remote'); + await DeployHelper.execute(`cd ${REMOTE_PATH} && /root/.nvm/versions/node/v21.2.0/bin/pm2 start ecosystem.config.js`); + + ssh.dispose(); + +} diff --git a/scripts/producer/deploy.ts b/scripts/producer/deploy.ts index 2644d3b..7e14195 100644 --- a/scripts/producer/deploy.ts +++ b/scripts/producer/deploy.ts @@ -1,6 +1,7 @@ import fs from 'fs-extra'; import path from 'path'; +import child from 'child_process'; import { createZip } from '../helpers/zip-helper'; import { DeployHelper } from '../helpers/deploy-helper'; import { REMOTE_HOST_TESTMODE } from '../.config'; @@ -11,6 +12,8 @@ const REMOTE_PATH = '/home/litlyx/producer'; const ZIP_NAME = 'producer.zip'; const MODE = DeployHelper.getMode(); +const SKIP_BUILD = DeployHelper.getArgAt(0) == '--no-build'; + console.log('Deploying producer in mode:', MODE); setTimeout(() => { main(); }, 3000); @@ -20,6 +23,13 @@ async function main() { if (fs.existsSync(TMP_PATH)) fs.rmSync(TMP_PATH, { force: true, recursive: true }); fs.ensureDirSync(TMP_PATH); + + if (!SKIP_BUILD) { + console.log('Building'); + child.execSync(`cd ${LOCAL_PATH} && pnpm run build`); + } + + console.log('Creting zip file'); const archive = createZip(TMP_PATH + '/' + ZIP_NAME); archive.directory(LOCAL_PATH + '/dist', '/dist'); diff --git a/shared_global/services/RedisStreamService.ts b/shared_global/services/RedisStreamService.ts index a3e3928..6cd3a93 100644 --- a/shared_global/services/RedisStreamService.ts +++ b/shared_global/services/RedisStreamService.ts @@ -7,7 +7,6 @@ export type ReadingLoopOptions = { consumer_name: string } - type xReadGroupMessage = { id: string, message: { [x: string]: string } } type xReadGgroupResult = { name: string, messages: xReadGroupMessage[] }[] | null @@ -25,6 +24,22 @@ export class RedisStreamService { database: process.env.DEV_MODE === 'true' ? 1 : 0 }); + + private static METRICS_MAX_ENTRIES = 1000; + + static async METRICS_onProcess(id: string, time: number) { + const key = `___dev_metrics`; + await this.client.lPush(key, `${id}:${time.toString()}`); + await this.client.lTrim(key, 0, this.METRICS_MAX_ENTRIES - 1); + } + + static async METRICS_get() { + const key = `___dev_metrics`; + const data = await this.client.lRange(key, 0, -1); + return data.map(e => e.split(':')) as [string, string][]; + } + + static async connect() { await this.client.connect(); }