From 487c3ac7b45901ee00b3cee2b8ee756e172a0e40 Mon Sep 17 00:00:00 2001 From: Emily Date: Fri, 31 Jan 2025 14:58:46 +0100 Subject: [PATCH] change consumer --- consumer/ecosystem.config.example.cjs | 21 ----- consumer/package.json | 5 +- consumer/pnpm-lock.yaml | 59 ++++++++++++ consumer/src/EmailController.ts | 43 ++++++--- consumer/src/EmailServiceHelper.ts | 19 ++++ consumer/src/LimitChecker.ts | 4 +- consumer/src/index.ts | 18 ++-- consumer/src/shared/data/broker/Limits.ts | 5 ++ consumer/src/shared/schema/UserSchema.ts | 38 ++++++++ .../shared/schema/broker/LimitNotifySchema.ts | 18 ++++ .../src/shared/schema/metrics/EventSchema.ts | 22 +++++ .../shared/schema/metrics/SessionSchema.ts | 23 +++++ .../src/shared/schema/metrics/VisitSchema.ts | 45 ++++++++++ .../shared/schema/project/ProjectSchema.ts | 26 ++++++ .../shared/schema/project/ProjectsCounts.ts | 22 +++++ .../shared/schema/project/ProjectsLimits.ts | 26 ++++++ .../src/shared/services/DatabaseService.ts | 10 +++ consumer/src/shared/services/EmailService.ts | 35 ++++++++ .../src/shared/services/RedisStreamService.ts | 90 +++++++++++++++++++ consumer/src/shared/utils/requireEnv.ts | 9 ++ consumer/tsconfig.json | 2 - package.json | 3 + scripts/consumer/deploy.ts | 0 scripts/consumer/shared.ts | 35 ++++++++ 24 files changed, 531 insertions(+), 47 deletions(-) delete mode 100644 consumer/ecosystem.config.example.cjs create mode 100644 consumer/src/EmailServiceHelper.ts create mode 100644 consumer/src/shared/data/broker/Limits.ts create mode 100644 consumer/src/shared/schema/UserSchema.ts create mode 100644 consumer/src/shared/schema/broker/LimitNotifySchema.ts create mode 100644 consumer/src/shared/schema/metrics/EventSchema.ts create mode 100644 consumer/src/shared/schema/metrics/SessionSchema.ts create mode 100644 consumer/src/shared/schema/metrics/VisitSchema.ts create mode 100644 consumer/src/shared/schema/project/ProjectSchema.ts create mode 100644 consumer/src/shared/schema/project/ProjectsCounts.ts create mode 100644 consumer/src/shared/schema/project/ProjectsLimits.ts create mode 100644 consumer/src/shared/services/DatabaseService.ts create mode 100644 consumer/src/shared/services/EmailService.ts create mode 100644 consumer/src/shared/services/RedisStreamService.ts create mode 100644 consumer/src/shared/utils/requireEnv.ts create mode 100644 scripts/consumer/deploy.ts create mode 100644 scripts/consumer/shared.ts diff --git a/consumer/ecosystem.config.example.cjs b/consumer/ecosystem.config.example.cjs deleted file mode 100644 index 95efba9..0000000 --- a/consumer/ecosystem.config.example.cjs +++ /dev/null @@ -1,21 +0,0 @@ -module.exports = { - apps: [ - { - name: 'consumer', - port: '3031', - exec_mode: 'cluster', - instances: '2', - script: './dist/consumer/src/index.js', - env: { - EMAIL_SERVICE: '', - BREVO_API_KEY: '', - MONGO_CONNECTION_STRING: '', - REDIS_URL: "", - REDIS_USERNAME: "", - REDIS_PASSWORD: "", - STREAM_NAME: "", - GROUP_NAME: '' - } - } - ] -} \ No newline at end of file diff --git a/consumer/package.json b/consumer/package.json index 81974de..97b4ad4 100644 --- a/consumer/package.json +++ b/consumer/package.json @@ -1,5 +1,6 @@ { "dependencies": { + "axios": "^1.7.9", "express": "^4.19.2", "ua-parser-js": "^1.0.37" }, @@ -19,7 +20,9 @@ "build": "npm run compile && npm run build_project && npm run create_db", "create_db": "cd scripts && ts-node create_database.ts", "docker-build": "docker build -t litlyx-consumer -f Dockerfile ../", - "docker-inspect": "docker run -it litlyx-consumer sh" + "docker-inspect": "docker run -it litlyx-consumer sh", + "workspace:shared": "ts-node ../scripts/consumer/shared.ts", + "workspace:deploy": "ts-node ../scripts/consumer/deploy.ts" }, "keywords": [], "author": "Emily", diff --git a/consumer/pnpm-lock.yaml b/consumer/pnpm-lock.yaml index 641b2bd..4d15a9d 100644 --- a/consumer/pnpm-lock.yaml +++ b/consumer/pnpm-lock.yaml @@ -8,6 +8,9 @@ importers: .: dependencies: + axios: + specifier: ^1.7.9 + version: 1.7.9 express: specifier: ^4.19.2 version: 4.21.2 @@ -81,6 +84,12 @@ packages: array-flatten@1.1.1: resolution: {integrity: sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==} + asynckit@0.4.0: + resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} + + axios@1.7.9: + resolution: {integrity: sha512-LhLcE7Hbiryz8oMDdDptSrWowmB4Bl6RCt6sIJKpRB4XtVf0iEgewX3au/pJqm+Py1kCASkb/FFKjxQaLtxJvw==} + body-parser@1.20.3: resolution: {integrity: sha512-7rAxByjUMqQ3/bHJy7D6OGXvx/MMc4IqBn/X0fcM1QUcAItpZrBEYhWGem+tzXH90c+G01ypMcYJBO9Y30203g==} engines: {node: '>= 0.8', npm: 1.2.8000 || >= 1.4.16} @@ -97,6 +106,10 @@ packages: resolution: {integrity: sha512-YTd+6wGlNlPxSuri7Y6X8tY2dmm12UMH66RpKMhiX6rsk5wXXnYgbUcOt8kiS31/AjfoTOvCsE+w8nZQLQnzHA==} engines: {node: '>= 0.4'} + combined-stream@1.0.8: + resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==} + engines: {node: '>= 0.8'} + content-disposition@0.5.4: resolution: {integrity: sha512-FveZTNuGw04cxlAiWbzi6zTAL/lhehaWbTtgluJh4/E95DqMwTmha3KZN1aAWA8cFIhHzMZUvLevkw5Rqk+tSQ==} engines: {node: '>= 0.6'} @@ -123,6 +136,10 @@ packages: supports-color: optional: true + delayed-stream@1.0.0: + resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} + engines: {node: '>=0.4.0'} + depd@2.0.0: resolution: {integrity: sha512-g7nH6P6dyDioJogAAGprGpCtVImJhpPk/roCzdb3fIh61/s/nPsfR6onyMwkCAR/OlC3yBC0lESvUoQEAssIrw==} engines: {node: '>= 0.8'} @@ -177,6 +194,19 @@ packages: resolution: {integrity: sha512-6BN9trH7bp3qvnrRyzsBz+g3lZxTNZTbVO2EV1CS0WIcDbawYVdYvGflME/9QP0h0pYlCDBCTjYa9nZzMDpyxQ==} engines: {node: '>= 0.8'} + follow-redirects@1.15.9: + resolution: {integrity: sha512-gew4GsXizNgdoRyqmyfMHyAmXsZDk6mHkSxZFCzW9gwlbtOW44CDtYavM+y+72qD/Vq2l550kMF52DT8fOLJqQ==} + engines: {node: '>=4.0'} + peerDependencies: + debug: '*' + peerDependenciesMeta: + debug: + optional: true + + form-data@4.0.1: + resolution: {integrity: sha512-tzN8e4TX8+kkxGPK8D5u0FNmjPUjw3lwC9lSLxxoB/+GtsJG91CO8bSWy73APlgAZzZbXEYZJuxjkHH2w+Ezhw==} + engines: {node: '>= 6'} + forwarded@0.2.0: resolution: {integrity: sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==} engines: {node: '>= 0.6'} @@ -283,6 +313,9 @@ packages: resolution: {integrity: sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg==} engines: {node: '>= 0.10'} + proxy-from-env@1.1.0: + resolution: {integrity: sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==} + qs@6.13.0: resolution: {integrity: sha512-+38qI9SOr8tfZ4QmJNplMUxqjbe7LKvvZgWdExBOmd+egZTtjLB67Gu0HRX3u/XOq7UU2Nx6nsjvS16Z9uwfpg==} engines: {node: '>=0.6'} @@ -429,6 +462,16 @@ snapshots: array-flatten@1.1.1: {} + asynckit@0.4.0: {} + + axios@1.7.9: + dependencies: + follow-redirects: 1.15.9 + form-data: 4.0.1 + proxy-from-env: 1.1.0 + transitivePeerDependencies: + - debug + body-parser@1.20.3: dependencies: bytes: 3.1.2 @@ -458,6 +501,10 @@ snapshots: call-bind-apply-helpers: 1.0.1 get-intrinsic: 1.2.7 + combined-stream@1.0.8: + dependencies: + delayed-stream: 1.0.0 + content-disposition@0.5.4: dependencies: safe-buffer: 5.2.1 @@ -474,6 +521,8 @@ snapshots: dependencies: ms: 2.0.0 + delayed-stream@1.0.0: {} + depd@2.0.0: {} destroy@1.2.0: {} @@ -552,6 +601,14 @@ snapshots: transitivePeerDependencies: - supports-color + follow-redirects@1.15.9: {} + + form-data@4.0.1: + dependencies: + asynckit: 0.4.0 + combined-stream: 1.0.8 + mime-types: 2.1.35 + forwarded@0.2.0: {} fresh@0.5.2: {} @@ -639,6 +696,8 @@ snapshots: forwarded: 0.2.0 ipaddr.js: 1.9.1 + proxy-from-env@1.1.0: {} + qs@6.13.0: dependencies: side-channel: 1.1.0 diff --git a/consumer/src/EmailController.ts b/consumer/src/EmailController.ts index 58926f0..f576f1a 100644 --- a/consumer/src/EmailController.ts +++ b/consumer/src/EmailController.ts @@ -1,13 +1,11 @@ -import { ProjectModel } from "@schema/project/ProjectSchema"; -import { UserModel } from "@schema/UserSchema"; -import { LimitNotifyModel } from "@schema/broker/LimitNotifySchema"; -import EmailService from '@services/EmailService'; -import { requireEnv } from "@utils/requireEnv"; -import { TProjectLimit } from "@schema/project/ProjectsLimits"; +import { ProjectModel } from "./shared/schema/project/ProjectSchema"; +import { UserModel } from "./shared/schema/UserSchema"; +import { LimitNotifyModel } from "./shared/schema/broker/LimitNotifySchema"; +import { EmailService } from './shared/services/EmailService'; +import { requireEnv } from "./shared/utils/requireEnv"; +import { TProjectLimit } from "./shared/schema/project/ProjectsLimits"; +import { EmailServiceHelper } from "./EmailServiceHelper"; -if (process.env.EMAIL_SERVICE) { - EmailService.init(requireEnv('BREVO_API_KEY')); -} export async function checkLimitsForEmail(projectCounts: TProjectLimit) { @@ -27,7 +25,14 @@ export async function checkLimitsForEmail(projectCounts: TProjectLimit) { const owner = await UserModel.findById(project.owner); if (!owner) return; - if (process.env.EMAIL_SERVICE) await EmailService.sendLimitEmailMax(owner.email, project.name); + setImmediate(() => { + const emailData = EmailService.getEmailServerInfo('limit_max', { + target: owner.email, + projectName: project.name + }); + EmailServiceHelper.sendEmail(emailData); + }); + await LimitNotifyModel.updateOne({ project_id: projectCounts.project_id }, { limit1: true, limit2: true, limit3: true }); } else if ((projectCounts.visits + projectCounts.events) >= (projectCounts.limit * 0.9)) { @@ -40,7 +45,14 @@ export async function checkLimitsForEmail(projectCounts: TProjectLimit) { const owner = await UserModel.findById(project.owner); if (!owner) return; - if (process.env.EMAIL_SERVICE) await EmailService.sendLimitEmail90(owner.email, project.name); + setImmediate(() => { + const emailData = EmailService.getEmailServerInfo('limit_90', { + target: owner.email, + projectName: project.name + }); + EmailServiceHelper.sendEmail(emailData); + }); + await LimitNotifyModel.updateOne({ project_id: projectCounts.project_id }, { limit1: true, limit2: true, limit3: false }); } else if ((projectCounts.visits + projectCounts.events) >= (projectCounts.limit * 0.5)) { @@ -53,7 +65,14 @@ export async function checkLimitsForEmail(projectCounts: TProjectLimit) { const owner = await UserModel.findById(project.owner); if (!owner) return; - if (process.env.EMAIL_SERVICE) await EmailService.sendLimitEmail50(owner.email, project.name); + setImmediate(() => { + const emailData = EmailService.getEmailServerInfo('limit_50', { + target: owner.email, + projectName: project.name + }); + EmailServiceHelper.sendEmail(emailData); + }); + await LimitNotifyModel.updateOne({ project_id: projectCounts.project_id }, { limit1: true, limit2: false, limit3: false }); } diff --git a/consumer/src/EmailServiceHelper.ts b/consumer/src/EmailServiceHelper.ts new file mode 100644 index 0000000..314efe3 --- /dev/null +++ b/consumer/src/EmailServiceHelper.ts @@ -0,0 +1,19 @@ + +import { EmailServerInfo } from './shared/services/EmailService' +import axios from 'axios'; + +const EMAIL_SECRET = process.env.EMAIL_SECRET; + +export class EmailServiceHelper { + static async sendEmail(data: EmailServerInfo) { + try { + await axios(data.url, { + method: 'POST', + data: data.body, + headers: { ...data.headers, 'x-litlyx-token': EMAIL_SECRET } + }) + } catch (ex) { + console.error(ex); + } + } +} \ No newline at end of file diff --git a/consumer/src/LimitChecker.ts b/consumer/src/LimitChecker.ts index 9eda8a8..9a459da 100644 --- a/consumer/src/LimitChecker.ts +++ b/consumer/src/LimitChecker.ts @@ -1,7 +1,7 @@ -import { ProjectLimitModel } from '@schema/project/ProjectsLimits'; -import { MAX_LOG_LIMIT_PERCENT } from '@data/broker/Limits'; +import { ProjectLimitModel } from './shared/schema/project/ProjectsLimits'; +import { MAX_LOG_LIMIT_PERCENT } from './shared/data/broker/Limits'; import { checkLimitsForEmail } from './EmailController'; export async function checkLimits(project_id: string) { diff --git a/consumer/src/index.ts b/consumer/src/index.ts index d54f739..076955f 100644 --- a/consumer/src/index.ts +++ b/consumer/src/index.ts @@ -1,18 +1,18 @@ -import { requireEnv } from '@utils/requireEnv'; -import { connectDatabase } from '@services/DatabaseService'; -import { RedisStreamService } from '@services/RedisStreamService'; -import { ProjectModel } from "@schema/project/ProjectSchema"; -import { VisitModel } from "@schema/metrics/VisitSchema"; -import { SessionModel } from "@schema/metrics/SessionSchema"; -import { EventModel } from "@schema/metrics/EventSchema"; +import { requireEnv } from './shared/utils/requireEnv'; +import { connectDatabase } from './shared/services/DatabaseService'; +import { RedisStreamService } from './shared/services/RedisStreamService'; +import { ProjectModel } from "./shared/schema/project/ProjectSchema"; +import { VisitModel } from "./shared/schema/metrics/VisitSchema"; +import { SessionModel } from "./shared/schema/metrics/SessionSchema"; +import { EventModel } from "./shared/schema/metrics/EventSchema"; import { lookup } from './lookup'; import { UAParser } from 'ua-parser-js'; import { checkLimits } from './LimitChecker'; import express from 'express'; -import { ProjectLimitModel } from '@schema/project/ProjectsLimits'; -import { ProjectCountModel } from '@schema/project/ProjectsCounts'; +import { ProjectLimitModel } from './shared/schema/project/ProjectsLimits'; +import { ProjectCountModel } from './shared/schema/project/ProjectsCounts'; const app = express(); diff --git a/consumer/src/shared/data/broker/Limits.ts b/consumer/src/shared/data/broker/Limits.ts new file mode 100644 index 0000000..fd28d88 --- /dev/null +++ b/consumer/src/shared/data/broker/Limits.ts @@ -0,0 +1,5 @@ + + +// Default: 1.01 +// ((events + visits) * VALUE) > limit +export const MAX_LOG_LIMIT_PERCENT = 1.01; \ No newline at end of file diff --git a/consumer/src/shared/schema/UserSchema.ts b/consumer/src/shared/schema/UserSchema.ts new file mode 100644 index 0000000..d312393 --- /dev/null +++ b/consumer/src/shared/schema/UserSchema.ts @@ -0,0 +1,38 @@ +import { model, Schema, Types } from 'mongoose'; + +export type TUser = { + email: string, + name: string, + given_name: string, + locale: string, + picture: string, + created_at: Date, + google_tokens?: { + refresh_token?: string; + expiry_date?: number; + access_token?: string; + token_type?: string; + id_token?: string; + scope?: string; + } +} + +const UserSchema = new Schema({ + email: { type: String, unique: true, index: 1 }, + name: String, + given_name: String, + locale: String, + picture: String, + google_tokens: { + refresh_token: String, + expiry_date: Number, + access_token: String, + token_type: String, + id_token: String, + scope: String + }, + created_at: { type: Date, default: () => Date.now() } +}) + +export const UserModel = model('users', UserSchema); + diff --git a/consumer/src/shared/schema/broker/LimitNotifySchema.ts b/consumer/src/shared/schema/broker/LimitNotifySchema.ts new file mode 100644 index 0000000..c1ec3d6 --- /dev/null +++ b/consumer/src/shared/schema/broker/LimitNotifySchema.ts @@ -0,0 +1,18 @@ +import { model, Schema, Types } from 'mongoose'; + +export type TLimitNotify = { + _id: Schema.Types.ObjectId, + project_id: Schema.Types.ObjectId, + limit1: boolean, + limit2: boolean, + limit3: boolean +} + +const LimitNotifySchema = new Schema({ + project_id: { type: Types.ObjectId, index: 1 }, + limit1: { type: Boolean }, + limit2: { type: Boolean }, + limit3: { type: Boolean } +}); + +export const LimitNotifyModel = model('limit_notifies', LimitNotifySchema); \ No newline at end of file diff --git a/consumer/src/shared/schema/metrics/EventSchema.ts b/consumer/src/shared/schema/metrics/EventSchema.ts new file mode 100644 index 0000000..17ffc2b --- /dev/null +++ b/consumer/src/shared/schema/metrics/EventSchema.ts @@ -0,0 +1,22 @@ +import { model, Schema, Types } from 'mongoose'; + +export type TEvent = { + project_id: Schema.Types.ObjectId, + name: string, + metadata: Record, + session: string, + flowHash: string, + created_at: Date +} + +const EventSchema = new Schema({ + project_id: { type: Types.ObjectId, index: 1 }, + name: { type: String, required: true, index: 1 }, + metadata: Schema.Types.Mixed, + session: { type: String, index: 1 }, + flowHash: { type: String }, + created_at: { type: Date, default: () => Date.now(), index: true }, +}) + +export const EventModel = model('events', EventSchema); + diff --git a/consumer/src/shared/schema/metrics/SessionSchema.ts b/consumer/src/shared/schema/metrics/SessionSchema.ts new file mode 100644 index 0000000..152e534 --- /dev/null +++ b/consumer/src/shared/schema/metrics/SessionSchema.ts @@ -0,0 +1,23 @@ +import { model, Schema, Types } from 'mongoose'; + + +export type TSession = { + project_id: Schema.Types.ObjectId, + session: string, + flowHash: string, + duration: number, + updated_at: Date, + created_at: Date, +} + +const SessionSchema = new Schema({ + project_id: { type: Types.ObjectId, index: 1 }, + session: { type: String, required: true, index: 1 }, + flowHash: { type: String }, + duration: { type: Number, required: true, default: 0 }, + updated_at: { type: Date, default: () => Date.now() }, + created_at: { type: Date, default: () => Date.now(), index: true }, +}) + +export const SessionModel = model('sessions', SessionSchema); + diff --git a/consumer/src/shared/schema/metrics/VisitSchema.ts b/consumer/src/shared/schema/metrics/VisitSchema.ts new file mode 100644 index 0000000..8981baa --- /dev/null +++ b/consumer/src/shared/schema/metrics/VisitSchema.ts @@ -0,0 +1,45 @@ +import { model, Schema } from 'mongoose'; + +export type TVisit = { + project_id: Schema.Types.ObjectId, + + browser: string, + os: string, + + continent: string, + country: string, + + session: string, + flowHash: string, + device: string, + + website: string, + page: string, + referrer: string, + + created_at: Date +} + +const VisitSchema = new Schema({ + project_id: { type: Schema.Types.ObjectId, index: true }, + + browser: { type: String, required: true }, + os: { type: String, required: true }, + + continent: { type: String }, + country: { type: String }, + + session: { type: String, index: true }, + flowHash: { type: String }, + device: { type: String }, + + website: { type: String, required: true, index: true }, + page: { type: String, required: true }, + referrer: { type: String, required: true }, + created_at: { type: Date, default: () => Date.now() }, +}) + +VisitSchema.index({ project_id: 1, created_at: -1 }); + +export const VisitModel = model('visits', VisitSchema); + diff --git a/consumer/src/shared/schema/project/ProjectSchema.ts b/consumer/src/shared/schema/project/ProjectSchema.ts new file mode 100644 index 0000000..5470139 --- /dev/null +++ b/consumer/src/shared/schema/project/ProjectSchema.ts @@ -0,0 +1,26 @@ +import { model, Schema, Types } from 'mongoose'; + +export type TProject = { + _id: Schema.Types.ObjectId, + owner: Schema.Types.ObjectId, + name: string, + premium: boolean, + premium_type: number, + customer_id: string, + subscription_id: string, + premium_expire_at: Date, + created_at: Date +} + +const ProjectSchema = new Schema({ + owner: { type: Types.ObjectId, index: 1 }, + name: { type: String, required: true }, + premium: { type: Boolean, default: false }, + premium_type: { type: Number, default: 0 }, + customer_id: { type: String, required: true }, + subscription_id: { type: String, required: true }, + premium_expire_at: { type: Date, required: true }, + created_at: { type: Date, default: () => Date.now() }, +}) + +export const ProjectModel = model('projects', ProjectSchema); diff --git a/consumer/src/shared/schema/project/ProjectsCounts.ts b/consumer/src/shared/schema/project/ProjectsCounts.ts new file mode 100644 index 0000000..bf4124b --- /dev/null +++ b/consumer/src/shared/schema/project/ProjectsCounts.ts @@ -0,0 +1,22 @@ +import { model, Schema, Types } from 'mongoose'; + +export type TProjectCount = { + _id: Schema.Types.ObjectId, + project_id: Schema.Types.ObjectId, + events: number, + visits: number, + sessions: number, + lastRecheck?: Date, + updated_at: Date +} + +const ProjectCountSchema = new Schema({ + project_id: { type: Types.ObjectId, index: true, unique: true }, + events: { type: Number, required: true, default: 0 }, + visits: { type: Number, required: true, default: 0 }, + sessions: { type: Number, required: true, default: 0 }, + lastRecheck: { type: Date }, + updated_at: { type: Date } +}, { timestamps: { updatedAt: 'updated_at' } }); + +export const ProjectCountModel = model('project_counts', ProjectCountSchema); \ No newline at end of file diff --git a/consumer/src/shared/schema/project/ProjectsLimits.ts b/consumer/src/shared/schema/project/ProjectsLimits.ts new file mode 100644 index 0000000..b619225 --- /dev/null +++ b/consumer/src/shared/schema/project/ProjectsLimits.ts @@ -0,0 +1,26 @@ +import { model, Schema, Types } from 'mongoose'; + +export type TProjectLimit = { + _id: Schema.Types.ObjectId, + project_id: Schema.Types.ObjectId, + events: number, + visits: number, + ai_messages: number, + limit: number, + ai_limit: number, + billing_expire_at: Date, + billing_start_at: Date, +} + +const ProjectLimitSchema = new Schema({ + project_id: { type: Types.ObjectId, index: true, unique: true }, + events: { type: Number, required: true, default: 0 }, + visits: { type: Number, required: true, default: 0 }, + ai_messages: { type: Number, required: true, default: 0 }, + limit: { type: Number, required: true }, + ai_limit: { type: Number, required: true }, + billing_start_at: { type: Date, required: true }, + billing_expire_at: { type: Date, required: true }, +}); + +export const ProjectLimitModel = model('project_limits', ProjectLimitSchema); \ No newline at end of file diff --git a/consumer/src/shared/services/DatabaseService.ts b/consumer/src/shared/services/DatabaseService.ts new file mode 100644 index 0000000..74c73ed --- /dev/null +++ b/consumer/src/shared/services/DatabaseService.ts @@ -0,0 +1,10 @@ + +import mongoose from "mongoose"; + +export async function connectDatabase(connectionString: string) { + await mongoose.connect(connectionString); +} + +export async function disconnectDatabase() { + await mongoose.disconnect(); +} diff --git a/consumer/src/shared/services/EmailService.ts b/consumer/src/shared/services/EmailService.ts new file mode 100644 index 0000000..30759f2 --- /dev/null +++ b/consumer/src/shared/services/EmailService.ts @@ -0,0 +1,35 @@ +const templateMap = { + confirm: '/confirm', + welcome: '/welcome', + purchase: '/purchase', + reset_password: '/reset_password', + anomaly_domain: '/anomaly/domain', + anomaly_visits_events: '/anomaly_visits_events', + limit_50: '/limit/50', + limit_90: '/limit/90', + limit_max: '/limit/max', +} as const; + +export type EmailTemplate = keyof typeof templateMap; +export type EmailServerInfo = { url: string, body: Record, headers: Record }; + +type EmailData = + | { template: 'confirm', data: { target: string, link: string } } + | { template: 'welcome', data: { target: string } } + | { template: 'purchase', data: { target: string, projectName: string } } + | { template: 'reset_password', data: { target: string, newPassword: string } } + | { template: 'anomaly_domain', data: { target: string, projectName: string, domains: string[] } } + | { template: 'anomaly_visits_events', data: { target: string, projectName: string, data: any[] } } + | { template: 'limit_50', data: { target: string, projectName: string } } + | { template: 'limit_90', data: { target: string, projectName: string } } + | { template: 'limit_max', data: { target: string, projectName: string } }; + +export class EmailService { + static getEmailServerInfo(template: T, data: Extract['data']): EmailServerInfo { + return { + url: `https://mail-service.litlyx.com/send${templateMap[template]}`, + body: data, + headers: { 'Content-Type': 'application/json' } + }; + } +} \ No newline at end of file diff --git a/consumer/src/shared/services/RedisStreamService.ts b/consumer/src/shared/services/RedisStreamService.ts new file mode 100644 index 0000000..81a5ff7 --- /dev/null +++ b/consumer/src/shared/services/RedisStreamService.ts @@ -0,0 +1,90 @@ +import { createClient } from 'redis'; +import { requireEnv } from '../utils/requireEnv'; + +export type ReadingLoopOptions = { + stream_name: string, + group_name: ConsumerGroup, + consumer_name: string +} + + +type xReadGroupMessage = { id: string, message: { [x: string]: string } } +type xReadGgroupResult = { name: string, messages: xReadGroupMessage[] }[] | null + +const consumerGroups = ['DATABASE'] as const; + +type ConsumerGroup = typeof consumerGroups[number]; + + +export class RedisStreamService { + + private static client = createClient({ + url: requireEnv("REDIS_URL"), + username: requireEnv("REDIS_USERNAME"), + password: requireEnv("REDIS_PASSWORD"), + database: process.env.DEV_MODE === 'true' ? 1 : 0 + }); + + static async connect() { + await this.client.connect(); + } + + static async readFromStream(stream_name: string, group_name: string, consumer_name: string, process_function: (content: Record) => Promise) { + + const result: xReadGgroupResult = await this.client.xReadGroup(group_name, consumer_name, [{ key: stream_name, id: '>' }], { COUNT: 5, BLOCK: 10000 }); + + if (!result) { + setTimeout(() => this.readFromStream(stream_name, group_name, consumer_name, process_function), 10); + return; + } + + for (const entry of result) { + for (const messageData of entry.messages) { + await process_function(messageData.message); + await this.client.xAck(stream_name, group_name, messageData.id); + await this.client.set(`ACK:${group_name}`, messageData.id); + } + } + + await this.trimStream(stream_name); + + setTimeout(() => this.readFromStream(stream_name, group_name, consumer_name, process_function), 10); + return; + + } + + private static async trimStream(stream_name: string) { + + let lastMessageAck = '0'; + + for (const consumerGroup of consumerGroups) { + const lastAck = await this.client.get(`ACK:${consumerGroup}`); + if (!lastAck) continue; + if (lastAck > lastMessageAck) lastMessageAck = lastAck; + } + + await this.client.xTrim(stream_name, 'MINID', lastMessageAck as any); + + } + + static async startReadingLoop(options: ReadingLoopOptions, processFunction: (content: Record) => Promise) { + + if (!consumerGroups.includes(options.group_name)) return console.error('GROUP NAME NOT ALLOWED'); + + console.log('Start reading loop') + + try { + await this.client.xGroupCreate(options.stream_name, options.group_name, '0', { MKSTREAM: true }); + } catch (ex) { + console.log('Group', options.group_name, 'already exist'); + } + + this.readFromStream(options.stream_name, options.group_name, options.consumer_name, processFunction); + } + + static async addToStream(streamName: string, data: Record) { + const result = await this.client.xAdd(streamName, "*", { ...data, timestamp: Date.now().toString() }); + return result; + } + +} diff --git a/consumer/src/shared/utils/requireEnv.ts b/consumer/src/shared/utils/requireEnv.ts new file mode 100644 index 0000000..a6497e0 --- /dev/null +++ b/consumer/src/shared/utils/requireEnv.ts @@ -0,0 +1,9 @@ + +export function requireEnv(name: string, errorMessage?: string) { + if (!process.env[name]) { + console.error(errorMessage || `ENV variable ${name} is required`); + return process.exit(1); + } + console.log('requireEnv', name, process.env[name]); + return process.env[name] as string; +} \ No newline at end of file diff --git a/consumer/tsconfig.json b/consumer/tsconfig.json index 2ec5c72..b8d10c4 100644 --- a/consumer/tsconfig.json +++ b/consumer/tsconfig.json @@ -1,9 +1,7 @@ { - "extends": "../tsconfig.json", "compilerOptions": { "module": "NodeNext", "target": "ESNext", - "esModuleInterop": true, "outDir": "dist" }, "include": [ diff --git a/package.json b/package.json index db858bb..f195002 100644 --- a/package.json +++ b/package.json @@ -11,6 +11,9 @@ "producer:shared": "ts-node scripts/producer/shared.ts", "producer:deploy": "ts-node scripts/producer/deploy.ts", + "consumer:shared": "ts-node scripts/consumer/shared.ts", + "consumer:deploy": "ts-node scripts/consumer/deploy.ts", + "email:deploy": "ts-node scripts/email/deploy.ts" }, "keywords": [], diff --git a/scripts/consumer/deploy.ts b/scripts/consumer/deploy.ts new file mode 100644 index 0000000..e69de29 diff --git a/scripts/consumer/shared.ts b/scripts/consumer/shared.ts new file mode 100644 index 0000000..ed58fa3 --- /dev/null +++ b/scripts/consumer/shared.ts @@ -0,0 +1,35 @@ + +import { SharedHelper } from "../helpers/shared-helper"; +import path from "node:path"; + +const helper = new SharedHelper(path.join(__dirname, '../../consumer/src/shared')) + +helper.clear(); + +helper.create('utils'); +helper.copy('utils/requireEnv.ts'); + +helper.create('services'); +helper.copy('services/RedisStreamService.ts'); +helper.copy('services/DatabaseService.ts'); +helper.copy('services/EmailService.ts'); + +helper.create('schema'); +helper.copy('schema/UserSchema.ts'); + +helper.create('schema/broker'); +helper.copy('schema/broker/LimitNotifySchema.ts'); + +helper.create('schema/project'); +helper.copy('schema/project/ProjectSchema.ts'); +helper.copy('schema/project/ProjectsLimits.ts'); +helper.copy('schema/project/ProjectsCounts.ts'); + +helper.create('schema/metrics'); +helper.copy('schema/metrics/VisitSchema.ts'); +helper.copy('schema/metrics/SessionSchema.ts'); +helper.copy('schema/metrics/EventSchema.ts'); + +helper.create('data'); +helper.create('data/broker'); +helper.copy('data/broker/Limits.ts'); \ No newline at end of file