new selfhosted version

This commit is contained in:
antonio
2025-11-28 14:11:51 +01:00
parent afda29997d
commit 951860f67e
1046 changed files with 72586 additions and 574750 deletions

View File

@@ -1,78 +1,53 @@
import { ProjectModel } from "./shared/schema/project/ProjectSchema";
import { UserModel } from "./shared/schema/UserSchema";
import { LimitNotifyModel } from "./shared/schema/broker/LimitNotifySchema";
import { EmailService } from './shared/services/EmailService';
import { TProjectLimit } from "./shared/schema/project/ProjectsLimits";
import { EmailServiceHelper } from "./EmailServiceHelper";
import { TUserLimit } from "./shared/schema/UserLimitSchema";
import { TrcpInstance } from './trpc'
export async function checkLimitsForEmail(projectCounts: TProjectLimit) {
export async function checkLimitsForEmail(projectCounts: TUserLimit) {
const project_id = projectCounts.project_id;
const hasNotifyEntry = await LimitNotifyModel.findOne({ project_id });
const user_id = projectCounts.user_id;
const hasNotifyEntry = await LimitNotifyModel.findOne({ user_id });
if (!hasNotifyEntry) {
await LimitNotifyModel.create({ project_id, limit1: false, limit2: false, limit3: false })
await LimitNotifyModel.create({ user_id, limit1: false, limit2: false, limit3: false })
}
const owner = await UserModel.findById(user_id);
if (!owner) return;
const userName = owner.given_name || owner.name || 'no_name';
if ((projectCounts.visits + projectCounts.events) >= (projectCounts.limit)) {
if (hasNotifyEntry.limit3 === true) return;
const project = await ProjectModel.findById(project_id);
if (!project) return;
const owner = await UserModel.findById(project.owner);
if (!owner) return;
setImmediate(() => {
const emailData = EmailService.getEmailServerInfo('limit_max', {
target: owner.email,
projectName: project.name
});
EmailServiceHelper.sendEmail(emailData);
TrcpInstance.client.email.sendLimitEmail50.mutate({ email: owner.email });
});
await LimitNotifyModel.updateOne({ project_id: projectCounts.project_id }, { limit1: true, limit2: true, limit3: true });
await LimitNotifyModel.updateOne({ user_id }, { limit1: true, limit2: true, limit3: true });
} else if ((projectCounts.visits + projectCounts.events) >= (projectCounts.limit * 0.9)) {
if (hasNotifyEntry.limit2 === true) return;
const project = await ProjectModel.findById(project_id);
if (!project) return;
const owner = await UserModel.findById(project.owner);
if (!owner) return;
setImmediate(() => {
const emailData = EmailService.getEmailServerInfo('limit_90', {
target: owner.email,
projectName: project.name
});
EmailServiceHelper.sendEmail(emailData);
TrcpInstance.client.email.sendLimitEmail90.mutate({ email: owner.email });
});
await LimitNotifyModel.updateOne({ project_id: projectCounts.project_id }, { limit1: true, limit2: true, limit3: false });
await LimitNotifyModel.updateOne({ user_id }, { limit1: true, limit2: true, limit3: false });
} else if ((projectCounts.visits + projectCounts.events) >= (projectCounts.limit * 0.5)) {
if (hasNotifyEntry.limit1 === true) return;
const project = await ProjectModel.findById(project_id);
if (!project) return;
const owner = await UserModel.findById(project.owner);
if (!owner) return;
setImmediate(() => {
const emailData = EmailService.getEmailServerInfo('limit_50', {
target: owner.email,
projectName: project.name
});
EmailServiceHelper.sendEmail(emailData);
TrcpInstance.client.email.sendLimitEmailMax.mutate({ email: owner.email });
});
await LimitNotifyModel.updateOne({ project_id: projectCounts.project_id }, { limit1: true, limit2: false, limit3: false });
await LimitNotifyModel.updateOne({ user_id }, { limit1: true, limit2: false, limit3: false });
}

View File

@@ -1,19 +0,0 @@
import { EmailServerInfo } from './shared/services/EmailService'
import axios from 'axios';
const EMAIL_SECRET = process.env.EMAIL_SECRET;
export class EmailServiceHelper {
static async sendEmail(data: EmailServerInfo) {
try {
await axios(data.url, {
method: 'POST',
data: data.body,
headers: { ...data.headers, 'x-litlyx-token': EMAIL_SECRET }
})
} catch (ex) {
console.error(ex);
}
}
}

View File

@@ -1,15 +1,15 @@
import { ProjectLimitModel } from './shared/schema/project/ProjectsLimits';
import { MAX_LOG_LIMIT_PERCENT } from './shared/data/broker/Limits';
import { checkLimitsForEmail } from './EmailController';
import { UserLimitModel } from './shared/schema/UserLimitSchema';
export async function checkLimits(project_id: string) {
const projectLimits = await ProjectLimitModel.findOne({ project_id });
if (!projectLimits) return false;
const TOTAL_COUNT = projectLimits.events + projectLimits.visits;
const COUNT_LIMIT = projectLimits.limit;
export async function checkLimits(user_id: string) {
const userLimits = await UserLimitModel.findOne({ user_id });
if (!userLimits) return false;
const TOTAL_COUNT = userLimits.events + userLimits.visits;
const COUNT_LIMIT = userLimits.limit;
if ((TOTAL_COUNT) > COUNT_LIMIT * MAX_LOG_LIMIT_PERCENT) return false;
await checkLimitsForEmail(projectLimits);
await checkLimitsForEmail(userLimits);
return true;
}

View File

@@ -6,15 +6,16 @@ import { ProjectModel } from "./shared/schema/project/ProjectSchema";
import { VisitModel } from "./shared/schema/metrics/VisitSchema";
import { SessionModel } from "./shared/schema/metrics/SessionSchema";
import { EventModel } from "./shared/schema/metrics/EventSchema";
import { lookup } from './lookup';
import { UAParser } from 'ua-parser-js';
import { checkLimits } from './LimitChecker';
import express from 'express';
import { ProjectLimitModel } from './shared/schema/project/ProjectsLimits';
import { ProjectCountModel } from './shared/schema/project/ProjectsCounts';
import { metricsRouter } from './Metrics';
import { UserLimitModel } from './shared/schema/UserLimitSchema';
import { TrcpInstance } from './trpc';
import { PremiumModel } from './shared/schema/PremiumSchema';
import { lookupIP } from './lookup';
const app = express();
@@ -22,17 +23,28 @@ app.use('/metrics', metricsRouter);
app.listen(process.env.PORT, () => console.log(`Listening on port ${process.env.PORT}`));
TrcpInstance.init(requireEnv('EMAIL_TRPC_URL'), requireEnv('EMAIL_SECRET'));
connectDatabase(requireEnv('MONGO_CONNECTION_STRING'));
main();
const CONSUMER_NAME = `CONSUMER_${process.env.NODE_APP_INSTANCE || 'DEFAULT'}`
async function getProjectOwner(pid: string) {
const ownerData = await ProjectModel.findOne({ _id: pid }, { owner: 1 });
return ownerData.owner;
}
async function main() {
console.log('Consumer started');
await RedisStreamService.connect();
const stream_name = requireEnv('STREAM_NAME');
const group_name = requireEnv('GROUP_NAME') as any; // Checks are inside "startReadingLoop"
const group_name = requireEnv('GROUP_NAME') as any; // Checks are inside "startReadingLoop"
await RedisStreamService.startReadingLoop({
stream_name, group_name, consumer_name: CONSUMER_NAME
@@ -51,22 +63,34 @@ async function processStreamEntry(data: Record<string, string>) {
const { pid, sessionHash } = data;
const project = await ProjectModel.exists({ _id: pid });
if (!project) return;
const owner = await getProjectOwner(pid);
if (!owner) return;
const canLog = await checkLimits(pid);
if (!canLog) return;
const premiumData = await PremiumModel.findOne({ user_id: owner }, { payment_failed: 1, premium_type: 1, created_at: 1 });
if (!premiumData) return;
if (premiumData.payment_failed === true) return;
if (premiumData.premium_type !== 7999) {
const canLog = await checkLimits(owner.toString());
if (!canLog) return;
}
if (premiumData.premium_type === 7999 &&
(Date.now() > new Date(premiumData.created_at).getTime() + (1000 * 60 * 60 * 24 * 14) + (1000 * 60 * 60 * 24 * 30))
) return;
if (eventType === 'event') {
await process_event(data, sessionHash);
await process_event(data, sessionHash, owner.toString());
} else if (eventType === 'keep_alive') {
await process_keep_alive(data, sessionHash);
await process_keep_alive(data, sessionHash, owner.toString());
} else if (eventType === 'visit') {
await process_visit(data, sessionHash);
await process_visit(data, sessionHash, owner.toString());
}
} catch (ex: any) {
console.error('ERROR PROCESSING STREAM EVENT', ex.message);
console.error(ex);
}
const duration = Date.now() - start;
@@ -75,18 +99,30 @@ async function processStreamEntry(data: Record<string, string>) {
}
async function process_visit(data: Record<string, string>, sessionHash: string) {
async function process_visit(data: Record<string, string>, sessionHash: string, user_id: string) {
const { pid, ip, website, page, referrer, userAgent, flowHash, timestamp } = data;
let referrerParsed;
try {
referrerParsed = new URL(referrer);
} catch (ex) {
referrerParsed = { hostname: referrer };
}
const referrerParsed = { hostname: referrer };
const geoLocation = lookup(ip);
let utm_campaign: string | undefined = undefined;
let utm_content: string | undefined = undefined;
let utm_medium: string | undefined = undefined;
let utm_source: string | undefined = undefined;
let utm_term: string | undefined = undefined;
try {
const url = new URL(referrer);
referrerParsed.hostname = url.hostname
utm_campaign = url.searchParams.get('utm_campaign') ?? undefined;
utm_content = url.searchParams.get('utm_content') ?? undefined;
utm_medium = url.searchParams.get('utm_medium') ?? undefined;
utm_source = url.searchParams.get('utm_source') ?? undefined;
utm_term = url.searchParams.get('utm_term') ?? undefined;
} catch (ex) { }
const geoLocation = lookupIP(ip);
const userAgentParsed = UAParser(userAgent);
@@ -100,17 +136,22 @@ async function process_visit(data: Record<string, string>, sessionHash: string)
device: device ? device : (userAgentParsed.browser.name ? 'desktop' : undefined),
session: sessionHash,
flowHash,
continent: geoLocation[0],
country: geoLocation[1],
continent: geoLocation ? geoLocation.continent.code : undefined,
country: geoLocation ? geoLocation.country.iso_code : undefined,
region: (geoLocation && geoLocation.subdivisions && geoLocation.subdivisions.length > 0) ? geoLocation.subdivisions[0]?.iso_code : undefined,
city: (geoLocation && geoLocation.subdivisions && geoLocation.subdivisions.length > 1) ? geoLocation.subdivisions[1]?.iso_code : undefined,
utm_campaign, utm_content, utm_medium, utm_source, utm_term,
created_at: new Date(parseInt(timestamp))
}),
ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'visits': 1 } }, { upsert: true }),
ProjectLimitModel.updateOne({ project_id: pid }, { $inc: { 'visits': 1 } })
UserLimitModel.updateOne({ user_id }, { $inc: { 'visits': 1 } })
]);
}
async function process_keep_alive(data: Record<string, string>, sessionHash: string) {
async function process_keep_alive(data: Record<string, string>, sessionHash: string, user_id: string) {
const { pid, instant, flowHash, timestamp, website } = data;
@@ -137,7 +178,7 @@ async function process_keep_alive(data: Record<string, string>, sessionHash: str
}
async function process_event(data: Record<string, string>, sessionHash: string) {
async function process_event(data: Record<string, string>, sessionHash: string, user_id: string) {
const { name, metadata, pid, flowHash, timestamp, website } = data;
@@ -155,7 +196,7 @@ async function process_event(data: Record<string, string>, sessionHash: string)
created_at: new Date(parseInt(timestamp))
}),
ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } }, { upsert: true }),
ProjectLimitModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } })
UserLimitModel.updateOne({ user_id }, { $inc: { 'events': 1 } })
]);

View File

@@ -1,42 +1,16 @@
import { Reader, CityResponse } from 'mmdb-lib';
import fs from 'fs';
const ipsData = JSON.parse(fs.readFileSync('./dist/ipv4-db.json', 'utf8'));
const countriesData = JSON.parse(fs.readFileSync('./dist/countries-db.json', 'utf8'));
const dbPath =
fs.existsSync('./cities.mmdb') ? './cities.mmdb' :
fs.existsSync('./script/GeoLite2-City.mmdb') ? './script/GeoLite2-City.mmdb' :
fs.existsSync('./dist/cities.mmdb') ? './dist/cities.mmdb' : ''
function inRange(ip: string, cidr: string) {
const [subnet, mask] = cidr.split('/');
const ipBytes = ip.split('.').map(Number);
const subnetBytes = subnet.split('.').map(Number);
const citiesBuffer = fs.readFileSync(dbPath);
const ipInt = (ipBytes[0] << 24) | (ipBytes[1] << 16) | (ipBytes[2] << 8) | ipBytes[3];
const subnetInt = (subnetBytes[0] << 24) | (subnetBytes[1] << 16) | (subnetBytes[2] << 8) | subnetBytes[3];
const reader = new Reader<CityResponse>(citiesBuffer);
const maskInt = 0xffffffff << (32 - parseInt(mask));
return (ipInt & maskInt) === (subnetInt & maskInt);
}
function getCountryFromId(id: number) {
for (const country of countriesData) {
if (country[0] == id) {
return country;
}
}
}
export function lookup(ip: string) {
try {
const startPiece = parseInt(ip.split('.')[0]);
for (const target of ipsData) {
const matchingStartPiece = target[0] == startPiece;
if (!matchingStartPiece) continue;
if (!inRange(ip, target[1])) continue;
const country = getCountryFromId(target[2]);
return [country[1], country[2]];
}
return ['??', '??'];
} catch (ex) {
console.error('ERROR DURING LOOKUP', ex);
return ['??', '??'];
}
export function lookupIP(address: string) {
const res = reader.get(address);
return res;
}

26
consumer/src/trpc.ts Normal file
View File

@@ -0,0 +1,26 @@
import { createTRPCClient, httpBatchLink, TRPCClient } from '@trpc/client';
//@ts-ignore
import type { AppRouter as EmailsAppRouter } from '../../emails/src/index'
class TRPC {
public client: TRPCClient<EmailsAppRouter>
init(url: string, secret: string) {
this.client = createTRPCClient<EmailsAppRouter>({
links: [
httpBatchLink({
url,
headers: {
Authorization: `Bearer ${secret}`
}
}),
],
});
}
}
export const TrcpInstance = new TRPC();