diff --git a/broker/src/StreamLoopController.ts b/broker/src/StreamLoopController.ts index efe82a2..2e238d6 100644 --- a/broker/src/StreamLoopController.ts +++ b/broker/src/StreamLoopController.ts @@ -51,7 +51,7 @@ async function processStreamEvent(data: Record) { async function process_visit(data: Record, sessionHash: string) { - const { pid, ip, website, page, referrer, userAgent } = data; + const { pid, ip, website, page, referrer, userAgent, flowHash } = data; const projectLimits = await ProjectLimitModel.findOne({ project_id: pid }); if (!projectLimits) return; @@ -78,6 +78,7 @@ async function process_visit(data: Record, sessionHash: string) os: userAgentParsed.os.name || 'NO_OS', device: userAgentParsed.device.type, session: sessionHash, + flowHash, continent: geoLocation[0], country: geoLocation[1], }); @@ -93,16 +94,18 @@ async function process_visit(data: Record, sessionHash: string) async function process_keep_alive(data: Record, sessionHash: string) { - const { pid, instant } = data; + const { pid, instant, flowHash } = data; if (instant == "true") { await SessionModel.updateOne({ project_id: pid, session: sessionHash, }, { $inc: { duration: 0 }, + flowHash, updated_at: Date.now() }, { upsert: true }); } else { await SessionModel.updateOne({ project_id: pid, session: sessionHash, }, { $inc: { duration: 1 }, + flowHash, updated_at: Date.now() }, { upsert: true }); } @@ -112,7 +115,7 @@ async function process_keep_alive(data: Record, sessionHash: str async function process_event(data: Record, sessionHash: string) { - const { name, metadata, pid } = data; + const { name, metadata, pid, flowHash } = data; let metadataObject; try { @@ -121,7 +124,7 @@ async function process_event(data: Record, sessionHash: string) metadataObject = { error: 'Error parsing metadata' } } - const event = new EventModel({ project_id: pid, name, metadata: metadataObject, session: sessionHash }); + const event = new EventModel({ project_id: pid, name, flowHash, metadata: metadataObject, session: sessionHash }); await event.save(); await ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } }, { upsert: true }); diff --git a/producer/src/index.ts b/producer/src/index.ts index fd28da2..13de99f 100644 --- a/producer/src/index.ts +++ b/producer/src/index.ts @@ -3,7 +3,7 @@ import { RedisStreamService } from "@services/RedisStreamService"; import express from 'express'; import cors from 'cors'; -import { createSessionHash, getIPFromRequest } from "./utils"; +import { createFlowSessionHash, createSessionHash, getIPFromRequest } from "./utils"; const app = express(); app.use(cors()); @@ -20,7 +20,8 @@ app.post('/event', express.json(jsonOptions), async (req, res) => { try { const ip = getIPFromRequest(req); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); - await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'event', sessionHash, ip }); + const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); + await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'event', sessionHash, ip, flowHash }); return res.sendStatus(200); } catch (ex: any) { return res.status(500).json({ error: ex.message }); @@ -31,7 +32,8 @@ app.post('/visit', express.json(jsonOptions), async (req, res) => { try { const ip = getIPFromRequest(req); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); - await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'visit', sessionHash, ip }); + const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); + await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'visit', sessionHash, ip, flowHash }); return res.sendStatus(200); } catch (ex: any) { return res.status(500).json({ error: ex.message }); @@ -42,9 +44,11 @@ app.post('/keep_alive', express.json(jsonOptions), async (req, res) => { try { const ip = getIPFromRequest(req); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); + const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'keep_alive', sessionHash, ip, - instant: req.body.instant + '' + instant: req.body.instant + '', + flowHash }); return res.sendStatus(200); } catch (ex: any) { diff --git a/producer/src/utils.ts b/producer/src/utils.ts index 3f96b7c..ea964e9 100644 --- a/producer/src/utils.ts +++ b/producer/src/utils.ts @@ -14,3 +14,12 @@ export function createSessionHash(website: string, ip: string, userAgent: string const sessionHash = crypto.createHash('md5').update(sessionClean).digest("hex"); return sessionHash; } + + +// Track user flow from referrers to cto +export function createFlowSessionHash(project_id: string, ip: string, userAgent: string) { + const dailySalt = new Date().toLocaleDateString('it-IT'); + const sessionClean = dailySalt + project_id + ip + userAgent; + const sessionHash = crypto.createHash('md5').update(sessionClean).digest("hex"); + return sessionHash; +} \ No newline at end of file diff --git a/shared/schema/metrics/EventSchema.ts b/shared/schema/metrics/EventSchema.ts index 8aab521..2a11723 100644 --- a/shared/schema/metrics/EventSchema.ts +++ b/shared/schema/metrics/EventSchema.ts @@ -5,6 +5,7 @@ export type TEvent = { name: string, metadata: Record, session: string, + flowHash: string, created_at: Date } @@ -13,6 +14,7 @@ const EventSchema = new Schema({ name: { type: String, required: true }, metadata: Schema.Types.Mixed, session: { type: String }, + flowHash: { type: String }, created_at: { type: Date, default: () => Date.now() }, }) diff --git a/shared/schema/metrics/SessionSchema.ts b/shared/schema/metrics/SessionSchema.ts index 8ae27c6..9f3cf71 100644 --- a/shared/schema/metrics/SessionSchema.ts +++ b/shared/schema/metrics/SessionSchema.ts @@ -4,6 +4,7 @@ import { model, Schema, Types } from 'mongoose'; export type TSession = { project_id: Schema.Types.ObjectId, session: string, + flowHash: string, duration: number, updated_at: Date, created_at: Date, @@ -12,6 +13,7 @@ export type TSession = { const SessionSchema = new Schema({ project_id: { type: Types.ObjectId, index: 1 }, session: { type: String, required: true }, + flowHash: { type: String }, duration: { type: Number, required: true, default: 0 }, updated_at: { type: Date, default: () => Date.now() }, created_at: { type: Date, default: () => Date.now() }, diff --git a/shared/schema/metrics/VisitSchema.ts b/shared/schema/metrics/VisitSchema.ts index 3ac28b5..8d59912 100644 --- a/shared/schema/metrics/VisitSchema.ts +++ b/shared/schema/metrics/VisitSchema.ts @@ -10,6 +10,7 @@ export type TVisit = { country: string, session: string, + flowHash: string, device: string, website: string, @@ -27,9 +28,9 @@ const VisitSchema = new Schema({ continent: { type: String }, country: { type: String }, - - session: { type: String }, + session: { type: String }, + flowHash: { type: String }, device: { type: String }, website: { type: String, required: true },