Skip to content

Commit

Permalink
Merge pull request #4133 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Dec 28, 2024
2 parents e1c9cbb + b5d47f1 commit 9ef51bb
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 104 deletions.
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-fc8758f2-1735398975
tag: prod-e1c9cbbb-1735417448
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/auth-service/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-stage-auth-api
tag: stage-b821ce15-1735398939
tag: stage-9a6b31b4-1735417414
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-fc8758f2-1735398975
tag: prod-e1c9cbbb-1735417448
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-fc8758f2-1735398975
tag: prod-e1c9cbbb-1735417448
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-fc8758f2-1735398975
tag: prod-e1c9cbbb-1735417448
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/website/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-website-api
tag: prod-fc8758f2-1735398975
tag: prod-e1c9cbbb-1735417448
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-fc8758f2-1735398975
tag: prod-e1c9cbbb-1735417448
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
218 changes: 121 additions & 97 deletions src/auth-service/utils/create-analytics.js
Original file line number Diff line number Diff line change
Expand Up @@ -767,141 +767,165 @@ const analytics = {
emails = [],
year = new Date().getFullYear(),
tenant = "airqo",
chunkSize = 5,
timeWindowDays = 90, // Process data in 90-day windows
} = {}) => {
try {
const startDate = new Date(`${year}-01-01`);
const endDate = new Date(`${year}-12-31`);
const enrichedStats = [];

// Single aggregation pipeline for all emails
const pipeline = [
{
$match: {
timestamp: {
$gte: startDate,
$lte: endDate,
},
"meta.email": { $in: emails },
// Filter out invalid services early
"meta.service": {
$nin: ["unknown", "none", "", null],
},
},
},
{
$facet: {
userStats: [
{
$group: {
_id: "$meta.email",
totalCount: { $sum: 1 },
username: { $first: "$meta.username" },
firstActivity: { $min: "$timestamp" },
lastActivity: { $max: "$timestamp" },
uniqueServices: { $addToSet: "$meta.service" },
uniqueEndpoints: { $addToSet: "$meta.endpoint" },
// Pre-calculate service usage
serviceUsage: {
$push: {
service: "$meta.service",
endpoint: "$meta.endpoint",
timestamp: "$timestamp",
},
},
},
},
// Calculate engagement metrics in MongoDB
{
$addFields: {
activityDays: {
$divide: [
{ $subtract: ["$lastActivity", "$firstActivity"] },
1000 * 60 * 60 * 24,
],
},
activityMonths: {
$divide: [
{ $subtract: ["$lastActivity", "$firstActivity"] },
1000 * 60 * 60 * 24 * 30,
],
},
// Process users in chunks to avoid memory overload
for (let i = 0; i < emails.length; i += chunkSize) {
const emailChunk = emails.slice(i, i + chunkSize);

// Break the year into smaller time windows
const timeWindows = [];
let currentStart = new Date(startDate);

while (currentStart < endDate) {
const windowEnd = new Date(currentStart);
windowEnd.setDate(windowEnd.getDate() + timeWindowDays);
const actualEnd = windowEnd > endDate ? endDate : windowEnd;

timeWindows.push({
start: new Date(currentStart),
end: new Date(actualEnd),
});

currentStart = new Date(actualEnd);
}

// Process each time window for the current user chunk
const userStatsPromises = timeWindows.map(async (window) => {
const pipeline = [
{
$match: {
timestamp: {
$gte: window.start,
$lte: window.end,
},
"meta.email": { $in: emailChunk },
"meta.service": { $nin: ["unknown", "none", "", null] },
},
],
monthlyActivity: [
{
$group: {
_id: {
email: "$meta.email",
year: { $year: "$timestamp" },
month: { $month: "$timestamp" },
},
{
$group: {
_id: "$meta.email",
totalCount: { $sum: 1 },
username: { $first: "$meta.username" },
firstActivity: { $min: "$timestamp" },
lastActivity: { $max: "$timestamp" },
uniqueServices: { $addToSet: "$meta.service" },
uniqueEndpoints: { $addToSet: "$meta.endpoint" },
serviceUsage: {
$push: {
service: "$meta.service",
},
count: { $sum: 1 },
},
},
],
},
},
];
},
];

const results = await LogModel(tenant)
.aggregate(pipeline)
.allowDiskUse(true);
return LogModel(tenant).aggregate(pipeline).allowDiskUse(true).exec();
});

// Process results in batches
const batchSize = 100;
const enrichedStats = [];
// Wait for all time windows to complete for current user chunk
const windowResults = await Promise.all(userStatsPromises);

// Merge results for each user across time windows
const mergedUserStats = emailChunk.reduce((acc, email) => {
acc[email] = {
email,
totalCount: 0,
username: "",
firstActivity: null,
lastActivity: null,
uniqueServices: new Set(),
uniqueEndpoints: new Set(),
serviceUsage: new Map(),
};
return acc;
}, {});

// Combine stats from all time windows
windowResults.flat().forEach((stat) => {
if (!stat || !stat._id) return;

const userStats = mergedUserStats[stat._id];
if (!userStats) return;

for (let i = 0; i < results[0].userStats.length; i += batchSize) {
const batch = results[0].userStats.slice(i, i + batchSize);
userStats.totalCount += stat.totalCount;
userStats.username = stat.username;

const enrichedBatch = batch.map((stat) => {
// Calculate service usage statistics
const serviceUsageMap = new Map();
if (
!userStats.firstActivity ||
stat.firstActivity < userStats.firstActivity
) {
userStats.firstActivity = stat.firstActivity;
}
if (
!userStats.lastActivity ||
stat.lastActivity > userStats.lastActivity
) {
userStats.lastActivity = stat.lastActivity;
}

stat.uniqueServices.forEach((s) => userStats.uniqueServices.add(s));
stat.uniqueEndpoints.forEach((e) => userStats.uniqueEndpoints.add(e));

// Update service usage counts
stat.serviceUsage.forEach((usage) => {
const key = usage.service;
serviceUsageMap.set(key, (serviceUsageMap.get(key) || 0) + 1);
const current = userStats.serviceUsage.get(usage.service) || 0;
userStats.serviceUsage.set(usage.service, current + 1);
});
});

// Get top services
const topServices = Array.from(serviceUsageMap.entries())
// Convert merged stats to final format
Object.values(mergedUserStats).forEach((stat) => {
if (stat.totalCount === 0) return; // Skip users with no activity

const topServices = Array.from(stat.serviceUsage.entries())
.map(([service, count]) => ({
service: formatServiceName(service),
count,
}))
.sort((a, b) => b.count - a.count)
.slice(0, 5);

// Calculate engagement metrics
const activityDays = Math.floor(
(stat.lastActivity - stat.firstActivity) / (1000 * 60 * 60 * 24)
);

const engagementScore = calculateEngagementScore({
totalActions: stat.totalCount,
uniqueServices: stat.uniqueServices.length,
uniqueEndpoints: stat.uniqueEndpoints.length,
activityDays: stat.activityDays || 1,
uniqueServices: stat.uniqueServices.size,
uniqueEndpoints: stat.uniqueEndpoints.size,
activityDays: activityDays || 1,
});

logObject("engagementScore", engagementScore);

return {
email: stat._id,
enrichedStats.push({
email: stat.email,
username: stat.username,
totalActions: stat.totalCount,
firstActivity: stat.firstActivity,
lastActivity: stat.lastActivity,
uniqueServices: stat.uniqueServices,
uniqueEndpoints: stat.uniqueEndpoints,
uniqueServices: Array.from(stat.uniqueServices),
uniqueEndpoints: Array.from(stat.uniqueEndpoints),
topServices,
activityDuration: {
totalDays: Math.floor(stat.activityDays || 0),
totalMonths: Math.floor(stat.activityMonths || 0),
description: `Active for ${Math.floor(
stat.activityMonths || 0
)} months`,
totalDays: activityDays,
totalMonths: Math.floor(activityDays / 30),
description: `Active for ${Math.floor(activityDays / 30)} months`,
},
engagementTier: calculateEngagementTier(engagementScore),
};
});
});

enrichedStats.push(...enrichedBatch);
// Add a small delay between chunks to prevent overwhelming the server
if (i + chunkSize < emails.length) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}

return enrichedStats;
Expand Down

0 comments on commit 9ef51bb

Please sign in to comment.