diff --git a/broker/src/StreamLoopController.ts b/broker/src/StreamLoopController.ts index 8110abd..be246fc 100644 --- a/broker/src/StreamLoopController.ts +++ b/broker/src/StreamLoopController.ts @@ -28,18 +28,18 @@ export async function startStreamLoop() { -async function processStreamEvent(data: Record) { +export async function processStreamEvent(data: Record) { try { const eventType = data._type; if (!eventType) return; - const { pid, sessionHash } = data; const project = await ProjectModel.exists({ _id: pid }); if (!project) return; + if (eventType === 'event') return await process_event(data, sessionHash); if (eventType === 'keep_alive') return await process_keep_alive(data, sessionHash); if (eventType === 'visit') return await process_visit(data, sessionHash); @@ -50,17 +50,23 @@ async function processStreamEvent(data: Record) { } + +async function checkLimits(project_id: string) { + const projectLimits = await ProjectLimitModel.findOne({ project_id }); + if (!projectLimits) return false; + const TOTAL_COUNT = projectLimits.events + projectLimits.visits; + const COUNT_LIMIT = projectLimits.limit; + if ((TOTAL_COUNT) > COUNT_LIMIT * EVENT_LOG_LIMIT_PERCENT) return false; + await checkLimitsForEmail(projectLimits); + return true; +} + async function process_visit(data: Record, sessionHash: string) { const { pid, ip, website, page, referrer, userAgent, flowHash } = data; - const projectLimits = await ProjectLimitModel.findOne({ project_id: pid }); - if (!projectLimits) return; - - const TOTAL_COUNT = projectLimits.events + projectLimits.visits; - const COUNT_LIMIT = projectLimits.limit; - if ((TOTAL_COUNT) > COUNT_LIMIT * EVENT_LOG_LIMIT_PERCENT) return; - await checkLimitsForEmail(projectLimits); + const canLog = await checkLimits(pid); + if (!canLog) return; let referrerParsed; try { @@ -97,7 +103,10 @@ async function process_keep_alive(data: Record, sessionHash: str const { pid, instant, flowHash } = data; - const existingSession = await SessionModel.findOne({ project_id: pid }, { _id: 1 }); + const canLog = await checkLimits(pid); + if (!canLog) return; + + const existingSession = await SessionModel.findOne({ project_id: pid, session: sessionHash }, { _id: 1 }); if (!existingSession) { await ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'sessions': 1 } }, { upsert: true }); } @@ -123,6 +132,9 @@ async function process_event(data: Record, sessionHash: string) const { name, metadata, pid, flowHash } = data; + const canLog = await checkLimits(pid); + if (!canLog) return; + let metadataObject; try { if (metadata) metadataObject = JSON.parse(metadata); diff --git a/shared/services/DatabaseService.ts b/shared/services/DatabaseService.ts index 7af0165..74c73ed 100644 --- a/shared/services/DatabaseService.ts +++ b/shared/services/DatabaseService.ts @@ -3,4 +3,8 @@ import mongoose from "mongoose"; export async function connectDatabase(connectionString: string) { await mongoose.connect(connectionString); -} \ No newline at end of file +} + +export async function disconnectDatabase() { + await mongoose.disconnect(); +}