diff --git a/consumer/src/index.ts b/consumer/src/index.ts index a45c305..5fa8554 100644 --- a/consumer/src/index.ts +++ b/consumer/src/index.ts @@ -65,7 +65,7 @@ async function processStreamEntry(data: Record) { async function process_visit(data: Record, sessionHash: string) { - const { pid, ip, website, page, referrer, userAgent, flowHash } = data; + const { pid, ip, website, page, referrer, userAgent, flowHash, timestamp } = data; let referrerParsed; try { @@ -90,6 +90,7 @@ async function process_visit(data: Record, sessionHash: string) flowHash, continent: geoLocation[0], country: geoLocation[1], + created_at: new Date(parseInt(timestamp)) }), ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'visits': 1 } }, { upsert: true }), ProjectLimitModel.updateOne({ project_id: pid }, { $inc: { 'visits': 1 } }) @@ -99,7 +100,7 @@ async function process_visit(data: Record, sessionHash: string) async function process_keep_alive(data: Record, sessionHash: string) { - const { pid, instant, flowHash } = data; + const { pid, instant, flowHash, timestamp } = data; const existingSession = await SessionModel.findOne({ project_id: pid, session: sessionHash }, { _id: 1 }); if (!existingSession) { @@ -124,7 +125,7 @@ async function process_keep_alive(data: Record, sessionHash: str async function process_event(data: Record, sessionHash: string) { - const { name, metadata, pid, flowHash } = data; + const { name, metadata, pid, flowHash, timestamp } = data; let metadataObject; try { @@ -134,7 +135,10 @@ async function process_event(data: Record, sessionHash: string) } await Promise.all([ - EventModel.create({ project_id: pid, name, flowHash, metadata: metadataObject, session: sessionHash }), + EventModel.create({ + project_id: pid, name, flowHash, metadata: metadataObject, session: sessionHash, + created_at: new Date(parseInt(timestamp)) + }), ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } }, { upsert: true }), ProjectLimitModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } }) ]); diff --git a/producer/src/index.ts b/producer/src/index.ts index 8b5cb1e..a01a081 100644 --- a/producer/src/index.ts +++ b/producer/src/index.ts @@ -21,7 +21,9 @@ app.post('/event', express.json(jsonOptions), async (req, res) => { const ip = getIPFromRequest(req); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); - await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'event', sessionHash, ip, flowHash }); + await RedisStreamService.addToStream(streamName, { + ...req.body, _type: 'event', sessionHash, ip, flowHash + }); return res.sendStatus(200); } catch (ex: any) { return res.status(500).json({ error: ex.message }); diff --git a/shared/services/DateService.ts b/shared/services/DateService.ts index 9b535ba..e30c70e 100644 --- a/shared/services/DateService.ts +++ b/shared/services/DateService.ts @@ -50,7 +50,7 @@ class DateService { // 3 Days if (slice === 'hour' && (days > 3)) return [false, 'Date gap too big for this slice']; // 3 Weeks - if (slice === 'day' && (days > 7 * 3)) return [false, 'Date gap too big for this slice']; + if (slice === 'day' && (days > 31)) return [false, 'Date gap too big for this slice']; // 3 Months if (slice === 'week' && (days > 30 * 3)) return [false, 'Date gap too big for this slice']; // 3 Years @@ -114,6 +114,9 @@ class DateService { return { group, sort } } + /** + * @deprecated interal to generateDateSlices + */ prepareDateRange(from: string, to: string, slice: Slice) { let fromDate = dayjs(from).minute(0).second(0).millisecond(0); @@ -134,6 +137,9 @@ class DateService { } } + /** + * @deprecated interal to generateDateSlices + */ createBetweenDates(from: string, to: string, slice: Slice) { let start = dayjs(from); const end = dayjs(to); @@ -145,6 +151,9 @@ class DateService { return { dates: filledDates, from, to }; } + /** + * @deprecated use generateDateSlices + */ fillDates(dates: string[], slice: Slice) { const allDates: dayjs.Dayjs[] = []; const firstDate = dayjs(dates.at(0)); @@ -161,6 +170,9 @@ class DateService { return allDates; } + /** + * @deprecated use mergeDates + */ mergeFilledDates, K extends keyof T>(dates: dayjs.Dayjs[], items: T[], dateField: K, slice: Slice, fillData: Omit) { const result = new Array(); for (const date of dates) { @@ -170,6 +182,52 @@ class DateService { return result; } + generateDateSlices(slice: Slice, fromDate: Date, toDate: Date) { + const slices: Date[] = []; + let currentDate = fromDate; + const addFunctions: { [key in Slice]: any } = { hour: fns.addHours, day: fns.addDays, week: fns.addWeeks, month: fns.addMonths, year: fns.addYears }; + const addFunction = addFunctions[slice]; + if (!addFunction) { throw new Error(`Invalid slice: ${slice}`); } + while (fns.isBefore(currentDate, toDate) || currentDate.getTime() === toDate.getTime()) { + slices.push(currentDate); + currentDate = addFunction(currentDate, 1); + } + return slices; + } + + mergeDates(timeline: { _id: string, count: number }[], dates: Date[], slice: Slice) { + + const result: { _id: string, count: number }[] = []; + + const isSames: { [key in Slice]: any } = { hour: fns.isSameHour, day: fns.isSameDay, week: fns.isSameWeek, month: fns.isSameMonth, year: fns.isSameYear, } + + const isSame = isSames[slice]; + + if (!isSame) { + throw new Error(`Invalid slice: ${slice}`); + } + + for (const element of timeline) { + const elementDate = new Date(element._id); + for (const date of dates) { + if (isSame(elementDate, date)) { + const existingEntry = result.find(item => isSame(new Date(item._id), date)); + + if (existingEntry) { + existingEntry.count += element.count; + } else { + result.push({ + _id: date.toISOString(), + count: element.count, + }); + } + } + } + } + + return result; + } + } const dateServiceInstance = new DateService(); diff --git a/shared/services/RedisStreamService.ts b/shared/services/RedisStreamService.ts index a5ad0a2..33e9719 100644 --- a/shared/services/RedisStreamService.ts +++ b/shared/services/RedisStreamService.ts @@ -93,7 +93,7 @@ export class RedisStreamService { } static async addToStream(streamName: string, data: Record) { - const result = await this.client.xAdd(streamName, "*", data); + const result = await this.client.xAdd(streamName, "*", { ...data, timestamp: Date.now().toString() }); return result; }