Skip to content

Commit

Permalink
chore(api): Add batch mechanism for preference centralization migrati…
Browse files Browse the repository at this point in the history
…on and improve logging (#7107)
  • Loading branch information
rifont authored Nov 21, 2024
1 parent 95ad589 commit 525576c
Showing 1 changed file with 158 additions and 81 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
/* eslint-disable max-len */
/* eslint-disable no-cond-assign */
/* eslint-disable no-console */
import '../../src/config';

import { NestFactory } from '@nestjs/core';
import { SubscriberPreferenceRepository, NotificationTemplateRepository, PreferenceLevelEnum } from '@novu/dal';
import {
SubscriberPreferenceRepository,
NotificationTemplateRepository,
PreferenceLevelEnum,
NotificationTemplateEntity,
SubscriberPreferenceEntity,
} from '@novu/dal';
import {
UpsertPreferences,
UpsertWorkflowPreferencesCommand,
Expand All @@ -14,7 +22,7 @@ import { buildWorkflowPreferencesFromPreferenceChannels, DEFAULT_WORKFLOW_PREFER

import { AppModule } from '../../src/app.module';

const BATCH_SIZE = 100;
const BATCH_SIZE = 500;

/**
* Sleep for a random amount of time between 80% and 120% of the provided duration.
Expand Down Expand Up @@ -72,10 +80,25 @@ export async function preferenceCentralization(startWorkflowId?: string, startSu
const subscriberPreferenceRepository = app.get(SubscriberPreferenceRepository);
const workflowPreferenceRepository = app.get(NotificationTemplateRepository);

// Set up a logging interval to log the counter and last processed IDs every 10 seconds
const logInterval = setInterval(() => {
console.log('Current migration status:');
console.log({ counter });
if (lastProcessedWorkflowId) {
console.log(`Last processed workflow preference ID: ${lastProcessedWorkflowId}`);
}
if (lastProcessedSubscriberId) {
console.log(`Last processed subscriber preference ID: ${lastProcessedSubscriberId}`);
}
}, 1000); // 10 seconds

await migrateWorkflowPreferences(workflowPreferenceRepository, upsertPreferences, startWorkflowId);
console.log({ counter });
await migrateSubscriberPreferences(subscriberPreferenceRepository, upsertPreferences, startSubscriberId);

// Clear the logging interval once migration is complete
clearInterval(logInterval);

console.log('end migration - preference centralization');
console.log({ counter });
console.log(`Processed workflow preference with ID: ${lastProcessedWorkflowId}`);
Expand All @@ -84,6 +107,51 @@ export async function preferenceCentralization(startWorkflowId?: string, startSu
app.close();
}

async function processWorkflowBatch(
batch: NotificationTemplateEntity[],
upsertPreferences: UpsertPreferences,
workflowPreferenceRepository: NotificationTemplateRepository
) {
await Promise.all(
batch.map(async (workflowPreference) => {
try {
await workflowPreferenceRepository.withTransaction(async (tx) => {
const workflowPreferenceToUpsert = UpsertWorkflowPreferencesCommand.create({
templateId: workflowPreference._id.toString(),
environmentId: workflowPreference._environmentId.toString(),
organizationId: workflowPreference._organizationId.toString(),
preferences: DEFAULT_WORKFLOW_PREFERENCES,
});

await upsertPreferences.upsertWorkflowPreferences(workflowPreferenceToUpsert);

const userWorkflowPreferenceToUpsert = UpsertUserWorkflowPreferencesCommand.create({
userId: workflowPreference._creatorId.toString(),
templateId: workflowPreference._id.toString(),
environmentId: workflowPreference._environmentId.toString(),
organizationId: workflowPreference._organizationId.toString(),
preferences: buildWorkflowPreferencesFromPreferenceChannels(
workflowPreference.critical,
workflowPreference.preferenceSettings
),
});

await upsertPreferences.upsertUserWorkflowPreferences(userWorkflowPreferenceToUpsert);
});

counter.workflow.success += 1;
lastProcessedWorkflowId = workflowPreference._id.toString();
} catch (error) {
console.error(error);
console.error({
failedWorkflowId: workflowPreference._id,
});
counter.workflow.error += 1;
}
})
);
}

async function migrateWorkflowPreferences(
workflowPreferenceRepository: NotificationTemplateRepository,
upsertPreferences: UpsertPreferences,
Expand All @@ -97,51 +165,88 @@ async function migrateWorkflowPreferences(
}
const workflowPreferenceCursor = await workflowPreferenceRepository._model
.find(query)
.select({ _id: 1, _environmentId: 1, _organizationId: 1, _creatorId: 1, critical: 1, preferenceSettings: 1 })
.sort({ _id: 1 })
.batchSize(BATCH_SIZE)
.cursor();

for await (const workflowPreference of workflowPreferenceCursor) {
await sleep(10);
try {
await workflowPreferenceRepository.withTransaction(async (tx) => {
const workflowPreferenceToUpsert = UpsertWorkflowPreferencesCommand.create({
templateId: workflowPreference._id.toString(),
environmentId: workflowPreference._environmentId.toString(),
organizationId: workflowPreference._organizationId.toString(),
preferences: DEFAULT_WORKFLOW_PREFERENCES,
});
.read('secondaryPreferred')
.cursor({ batchSize: BATCH_SIZE });

await upsertPreferences.upsertWorkflowPreferences(workflowPreferenceToUpsert);

const userWorkflowPreferenceToUpsert = UpsertUserWorkflowPreferencesCommand.create({
userId: workflowPreference._creatorId.toString(),
templateId: workflowPreference._id.toString(),
environmentId: workflowPreference._environmentId.toString(),
organizationId: workflowPreference._organizationId.toString(),
preferences: buildWorkflowPreferencesFromPreferenceChannels(
workflowPreference.critical,
workflowPreference.preferenceSettings
),
});
let batch: NotificationTemplateEntity[] = [];
let document: any;
while ((document = await workflowPreferenceCursor.next())) {
batch.push(document);

await upsertPreferences.upsertUserWorkflowPreferences(userWorkflowPreferenceToUpsert);
});

counter.workflow.success += 1;
lastProcessedWorkflowId = workflowPreference._id.toString();
} catch (error) {
console.error(error);
console.error({
failedWorkflowId: workflowPreference._id,
});
counter.workflow.error += 1;
if (batch.length === BATCH_SIZE) {
await processWorkflowBatch(batch, upsertPreferences, workflowPreferenceRepository);
batch = [];
}
}

// Process any remaining documents in the batch
if (batch.length > 0) {
await processWorkflowBatch(batch, upsertPreferences, workflowPreferenceRepository);
}

console.log('end workflow preference migration');
}

async function processSubscriberBatch(batch: SubscriberPreferenceEntity[], upsertPreferences: UpsertPreferences) {
await Promise.all(
batch.map(async (subscriberPreference) => {
try {
if (subscriberPreference.level === PreferenceLevelEnum.GLOBAL) {
const preferenceToUpsert = UpsertSubscriberGlobalPreferencesCommand.create({
_subscriberId: subscriberPreference._subscriberId.toString(),
environmentId: subscriberPreference._environmentId.toString(),
organizationId: subscriberPreference._organizationId.toString(),
preferences: buildWorkflowPreferencesFromPreferenceChannels(false, subscriberPreference.channels),
});

await upsertPreferences.upsertSubscriberGlobalPreferences(preferenceToUpsert);

counter.subscriberGlobal.success += 1;
} else if (subscriberPreference.level === PreferenceLevelEnum.TEMPLATE) {
if (!subscriberPreference._templateId) {
console.error(
`Invalid templateId ${subscriberPreference._templateId} for id ${subscriberPreference._id} for subscriber ${subscriberPreference._subscriberId}`
);
counter.subscriberWorkflow.error += 1;

return;
}
const preferenceToUpsert = UpsertSubscriberWorkflowPreferencesCommand.create({
_subscriberId: subscriberPreference._subscriberId.toString(),
templateId: subscriberPreference._templateId.toString(),
environmentId: subscriberPreference._environmentId.toString(),
organizationId: subscriberPreference._organizationId.toString(),
preferences: buildWorkflowPreferencesFromPreferenceChannels(false, subscriberPreference.channels),
});

await upsertPreferences.upsertSubscriberWorkflowPreferences(preferenceToUpsert);

counter.subscriberWorkflow.success += 1;
} else {
console.error(
`Invalid preference level ${subscriberPreference.level} for id ${subscriberPreference._subscriberId}`
);
counter.subscriberUnknown.error += 1;
}
lastProcessedSubscriberId = subscriberPreference._id.toString();
} catch (error) {
console.error(error);
console.error({
failedSubscriberPreferenceId: subscriberPreference._id,
failedSubscriberId: subscriberPreference._subscriberId,
});
if (subscriberPreference.level === PreferenceLevelEnum.GLOBAL) {
counter.subscriberGlobal.error += 1;
} else if (subscriberPreference.level === PreferenceLevelEnum.TEMPLATE) {
counter.subscriberWorkflow.error += 1;
}
}
})
);
}

async function migrateSubscriberPreferences(
subscriberPreferenceRepository: SubscriberPreferenceRepository,
upsertPreferences: UpsertPreferences,
Expand All @@ -155,55 +260,27 @@ async function migrateSubscriberPreferences(
}
const subscriberPreferenceCursor = await subscriberPreferenceRepository._model
.find(query)
.select({ _id: 1, _environmentId: 1, _organizationId: 1, _subscriberId: 1, _templateId: 1, level: 1, channels: 1 })
.sort({ _id: 1 })
.batchSize(BATCH_SIZE)
.cursor();

for await (const subscriberPreference of subscriberPreferenceCursor) {
await sleep(10);
try {
if (subscriberPreference.level === PreferenceLevelEnum.GLOBAL) {
const preferenceToUpsert = UpsertSubscriberGlobalPreferencesCommand.create({
_subscriberId: subscriberPreference._subscriberId.toString(),
environmentId: subscriberPreference._environmentId.toString(),
organizationId: subscriberPreference._organizationId.toString(),
preferences: buildWorkflowPreferencesFromPreferenceChannels(false, subscriberPreference.channels),
});

await upsertPreferences.upsertSubscriberGlobalPreferences(preferenceToUpsert);

counter.subscriberGlobal.success += 1;
} else if (subscriberPreference.level === PreferenceLevelEnum.TEMPLATE) {
const preferenceToUpsert = UpsertSubscriberWorkflowPreferencesCommand.create({
_subscriberId: subscriberPreference._subscriberId.toString(),
templateId: subscriberPreference._templateId.toString(),
environmentId: subscriberPreference._environmentId.toString(),
organizationId: subscriberPreference._organizationId.toString(),
preferences: buildWorkflowPreferencesFromPreferenceChannels(false, subscriberPreference.channels),
});
.read('secondaryPreferred')
.cursor({ batchSize: BATCH_SIZE });

await upsertPreferences.upsertSubscriberWorkflowPreferences(preferenceToUpsert);
let batch: SubscriberPreferenceEntity[] = [];
let document: any;
while ((document = await subscriberPreferenceCursor.next())) {
batch.push(document);

counter.subscriberWorkflow.success += 1;
} else {
console.error(`Invalid preference level ${subscriberPreference.level}`);
counter.subscriberUnknown.error += 1;
}
lastProcessedSubscriberId = subscriberPreference._id.toString();
} catch (error) {
console.error(error);
console.error({
failedSubscriberPreferenceId: subscriberPreference._id,
failedSubscriberId: subscriberPreference._subscriberId,
});
if (subscriberPreference.level === PreferenceLevelEnum.GLOBAL) {
counter.subscriberGlobal.error += 1;
} else if (subscriberPreference.level === PreferenceLevelEnum.TEMPLATE) {
counter.subscriberWorkflow.error += 1;
}
if (batch.length === BATCH_SIZE) {
await processSubscriberBatch(batch, upsertPreferences);
batch = [];
}
}

// Process any remaining documents in the batch
if (batch.length > 0) {
await processSubscriberBatch(batch, upsertPreferences);
}

console.log('end subscriber preference migration');
}

Expand Down

0 comments on commit 525576c

Please sign in to comment.