add dates to producer/consumer

This commit is contained in:
Emily
2024-12-03 17:40:47 +01:00
parent b630bddef0
commit 9ce2c89575
4 changed files with 71 additions and 7 deletions

View File

@@ -65,7 +65,7 @@ async function processStreamEntry(data: Record<string, string>) {
async function process_visit(data: Record<string, string>, sessionHash: string) { async function process_visit(data: Record<string, string>, sessionHash: string) {
const { pid, ip, website, page, referrer, userAgent, flowHash } = data; const { pid, ip, website, page, referrer, userAgent, flowHash, timestamp } = data;
let referrerParsed; let referrerParsed;
try { try {
@@ -90,6 +90,7 @@ async function process_visit(data: Record<string, string>, sessionHash: string)
flowHash, flowHash,
continent: geoLocation[0], continent: geoLocation[0],
country: geoLocation[1], country: geoLocation[1],
created_at: new Date(parseInt(timestamp))
}), }),
ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'visits': 1 } }, { upsert: true }), ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'visits': 1 } }, { upsert: true }),
ProjectLimitModel.updateOne({ project_id: pid }, { $inc: { 'visits': 1 } }) ProjectLimitModel.updateOne({ project_id: pid }, { $inc: { 'visits': 1 } })
@@ -99,7 +100,7 @@ async function process_visit(data: Record<string, string>, sessionHash: string)
async function process_keep_alive(data: Record<string, string>, sessionHash: string) { async function process_keep_alive(data: Record<string, string>, sessionHash: string) {
const { pid, instant, flowHash } = data; const { pid, instant, flowHash, timestamp } = data;
const existingSession = await SessionModel.findOne({ project_id: pid, session: sessionHash }, { _id: 1 }); const existingSession = await SessionModel.findOne({ project_id: pid, session: sessionHash }, { _id: 1 });
if (!existingSession) { if (!existingSession) {
@@ -124,7 +125,7 @@ async function process_keep_alive(data: Record<string, string>, sessionHash: str
async function process_event(data: Record<string, string>, sessionHash: string) { async function process_event(data: Record<string, string>, sessionHash: string) {
const { name, metadata, pid, flowHash } = data; const { name, metadata, pid, flowHash, timestamp } = data;
let metadataObject; let metadataObject;
try { try {
@@ -134,7 +135,10 @@ async function process_event(data: Record<string, string>, sessionHash: string)
} }
await Promise.all([ await Promise.all([
EventModel.create({ project_id: pid, name, flowHash, metadata: metadataObject, session: sessionHash }), EventModel.create({
project_id: pid, name, flowHash, metadata: metadataObject, session: sessionHash,
created_at: new Date(parseInt(timestamp))
}),
ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } }, { upsert: true }), ProjectCountModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } }, { upsert: true }),
ProjectLimitModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } }) ProjectLimitModel.updateOne({ project_id: pid }, { $inc: { 'events': 1 } })
]); ]);

View File

@@ -21,7 +21,9 @@ app.post('/event', express.json(jsonOptions), async (req, res) => {
const ip = getIPFromRequest(req); const ip = getIPFromRequest(req);
const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent); const sessionHash = createSessionHash(req.body.website, ip, req.body.userAgent);
const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent); const flowHash = createFlowSessionHash(req.body.pid, ip, req.body.userAgent);
await RedisStreamService.addToStream(streamName, { ...req.body, _type: 'event', sessionHash, ip, flowHash }); await RedisStreamService.addToStream(streamName, {
...req.body, _type: 'event', sessionHash, ip, flowHash
});
return res.sendStatus(200); return res.sendStatus(200);
} catch (ex: any) { } catch (ex: any) {
return res.status(500).json({ error: ex.message }); return res.status(500).json({ error: ex.message });

View File

@@ -50,7 +50,7 @@ class DateService {
// 3 Days // 3 Days
if (slice === 'hour' && (days > 3)) return [false, 'Date gap too big for this slice']; if (slice === 'hour' && (days > 3)) return [false, 'Date gap too big for this slice'];
// 3 Weeks // 3 Weeks
if (slice === 'day' && (days > 7 * 3)) return [false, 'Date gap too big for this slice']; if (slice === 'day' && (days > 31)) return [false, 'Date gap too big for this slice'];
// 3 Months // 3 Months
if (slice === 'week' && (days > 30 * 3)) return [false, 'Date gap too big for this slice']; if (slice === 'week' && (days > 30 * 3)) return [false, 'Date gap too big for this slice'];
// 3 Years // 3 Years
@@ -114,6 +114,9 @@ class DateService {
return { group, sort } return { group, sort }
} }
/**
* @deprecated interal to generateDateSlices
*/
prepareDateRange(from: string, to: string, slice: Slice) { prepareDateRange(from: string, to: string, slice: Slice) {
let fromDate = dayjs(from).minute(0).second(0).millisecond(0); let fromDate = dayjs(from).minute(0).second(0).millisecond(0);
@@ -134,6 +137,9 @@ class DateService {
} }
} }
/**
* @deprecated interal to generateDateSlices
*/
createBetweenDates(from: string, to: string, slice: Slice) { createBetweenDates(from: string, to: string, slice: Slice) {
let start = dayjs(from); let start = dayjs(from);
const end = dayjs(to); const end = dayjs(to);
@@ -145,6 +151,9 @@ class DateService {
return { dates: filledDates, from, to }; return { dates: filledDates, from, to };
} }
/**
* @deprecated use generateDateSlices
*/
fillDates(dates: string[], slice: Slice) { fillDates(dates: string[], slice: Slice) {
const allDates: dayjs.Dayjs[] = []; const allDates: dayjs.Dayjs[] = [];
const firstDate = dayjs(dates.at(0)); const firstDate = dayjs(dates.at(0));
@@ -161,6 +170,9 @@ class DateService {
return allDates; return allDates;
} }
/**
* @deprecated use mergeDates
*/
mergeFilledDates<T extends Record<string, any>, K extends keyof T>(dates: dayjs.Dayjs[], items: T[], dateField: K, slice: Slice, fillData: Omit<T, K>) { mergeFilledDates<T extends Record<string, any>, K extends keyof T>(dates: dayjs.Dayjs[], items: T[], dateField: K, slice: Slice, fillData: Omit<T, K>) {
const result = new Array<T>(); const result = new Array<T>();
for (const date of dates) { for (const date of dates) {
@@ -170,6 +182,52 @@ class DateService {
return result; return result;
} }
generateDateSlices(slice: Slice, fromDate: Date, toDate: Date) {
const slices: Date[] = [];
let currentDate = fromDate;
const addFunctions: { [key in Slice]: any } = { hour: fns.addHours, day: fns.addDays, week: fns.addWeeks, month: fns.addMonths, year: fns.addYears };
const addFunction = addFunctions[slice];
if (!addFunction) { throw new Error(`Invalid slice: ${slice}`); }
while (fns.isBefore(currentDate, toDate) || currentDate.getTime() === toDate.getTime()) {
slices.push(currentDate);
currentDate = addFunction(currentDate, 1);
}
return slices;
}
mergeDates(timeline: { _id: string, count: number }[], dates: Date[], slice: Slice) {
const result: { _id: string, count: number }[] = [];
const isSames: { [key in Slice]: any } = { hour: fns.isSameHour, day: fns.isSameDay, week: fns.isSameWeek, month: fns.isSameMonth, year: fns.isSameYear, }
const isSame = isSames[slice];
if (!isSame) {
throw new Error(`Invalid slice: ${slice}`);
}
for (const element of timeline) {
const elementDate = new Date(element._id);
for (const date of dates) {
if (isSame(elementDate, date)) {
const existingEntry = result.find(item => isSame(new Date(item._id), date));
if (existingEntry) {
existingEntry.count += element.count;
} else {
result.push({
_id: date.toISOString(),
count: element.count,
});
}
}
}
}
return result;
}
} }
const dateServiceInstance = new DateService(); const dateServiceInstance = new DateService();

View File

@@ -93,7 +93,7 @@ export class RedisStreamService {
} }
static async addToStream(streamName: string, data: Record<string, string>) { static async addToStream(streamName: string, data: Record<string, string>) {
const result = await this.client.xAdd(streamName, "*", data); const result = await this.client.xAdd(streamName, "*", { ...data, timestamp: Date.now().toString() });
return result; return result;
} }