Skip to content

Commit

Permalink
fix(api): Subscriber deletion side-effects (#6872)
Browse files Browse the repository at this point in the history
Co-authored-by: Richard Fontein <32132657+rifont@users.noreply.github.com>
  • Loading branch information
SokratisVidros and rifont authored Nov 8, 2024
1 parent 6a45807 commit 5a0efd7
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 112 deletions.
14 changes: 2 additions & 12 deletions apps/api/src/app/subscribers/e2e/remove-subscriber.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,8 @@ describe('Delete Subscriber - /subscribers/:subscriberId (DELETE)', function ()
},
});

const isDeleted = !(await subscriberRepository.findBySubscriberId(session.environment._id, '123'));

expect(isDeleted).to.equal(true);

const deletedSubscriber = (
await subscriberRepository.findDeleted({
_environmentId: session.environment._id,
subscriberId: '123',
})
)?.[0];

expect(deletedSubscriber.deleted).to.equal(true);
const subscriber = await subscriberRepository.findBySubscriberId(session.environment._id, '123');
expect(subscriber).to.be.null;
});

it('should dispose subscriber relations to topic once he was removed', async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { Test } from '@nestjs/testing';
import { SubscribersService, UserSession } from '@novu/testing';
import { NotFoundException } from '@nestjs/common';
import { expect } from 'chai';

import { NotFoundException } from '@nestjs/common';
import { SubscribersService, UserSession } from '@novu/testing';
import { Test } from '@nestjs/testing';
import { RemoveSubscriber } from './remove-subscriber.usecase';
import { RemoveSubscriberCommand } from './remove-subscriber.command';

import { SharedModule } from '../../../shared/shared.module';
import { SubscribersModule } from '../../subscribers.module';

Expand Down Expand Up @@ -41,8 +39,6 @@ describe('Remove Subscriber', function () {
});

it('should throw a not found exception if subscriber to remove does not exist', async () => {
const subscriberService = new SubscribersService(session.organization._id, session.environment._id);

try {
await useCase.execute(
RemoveSubscriberCommand.create({
Expand All @@ -51,10 +47,9 @@ describe('Remove Subscriber', function () {
organizationId: session.organization._id,
})
);
throw new Error('Should not reach here');
expect(true, 'Should never reach this statement').to.be.false;
} catch (e) {
expect(e).to.be.instanceOf(NotFoundException);
expect(e.message).to.eql("Subscriber 'invalid-subscriber-id' was not found");
}
});
});
Original file line number Diff line number Diff line change
@@ -1,53 +1,88 @@
import { Injectable } from '@nestjs/common';
import { SubscriberRepository, DalException, TopicSubscribersRepository } from '@novu/dal';
import { buildSubscriberKey, InvalidateCacheService } from '@novu/application-generic';
import { Injectable, NotFoundException } from '@nestjs/common';
import {
SubscriberRepository,
TopicSubscribersRepository,
SubscriberPreferenceRepository,
PreferencesRepository,
} from '@novu/dal';
import {
buildSubscriberKey,
buildFeedKey,
buildMessageCountKey,
InvalidateCacheService,
} from '@novu/application-generic';

import { RemoveSubscriberCommand } from './remove-subscriber.command';
import { GetSubscriber } from '../get-subscriber';
import { ApiException } from '../../../shared/exceptions/api.exception';

@Injectable()
export class RemoveSubscriber {
constructor(
private invalidateCache: InvalidateCacheService,
private subscriberRepository: SubscriberRepository,
private getSubscriber: GetSubscriber,
private topicSubscribersRepository: TopicSubscribersRepository
private topicSubscribersRepository: TopicSubscribersRepository,
private subscriberPreferenceRepository: SubscriberPreferenceRepository,
private preferenceRepository: PreferencesRepository
) {}

async execute(command: RemoveSubscriberCommand) {
try {
const { environmentId: _environmentId, organizationId, subscriberId } = command;
const subscriber = await this.getSubscriber.execute({
environmentId: _environmentId,
organizationId,
subscriberId,
});

await this.invalidateCache.invalidateByKey({
async execute({ environmentId: _environmentId, subscriberId }: RemoveSubscriberCommand) {
await Promise.all([
this.invalidateCache.invalidateByKey({
key: buildSubscriberKey({
subscriberId: command.subscriberId,
_environmentId: command.environmentId,
subscriberId,
_environmentId,
}),
});
}),
this.invalidateCache.invalidateQuery({
key: buildFeedKey().invalidate({
subscriberId,
_environmentId,
}),
}),
this.invalidateCache.invalidateQuery({
key: buildMessageCountKey().invalidate({
subscriberId,
_environmentId,
}),
}),
]);

const subscriberInternalIds = await this.subscriberRepository._model.distinct('_id', {
subscriberId,
_environmentId,
});

if (subscriberInternalIds.length === 0) {
throw new NotFoundException({ message: 'Subscriber was not found', externalSubscriberId: subscriberId });
}

await this.subscriberRepository.withTransaction(async () => {
/*
* Note about parallelism in transactions
*
* Running operations in parallel is not supported during a transaction.
* The use of Promise.all, Promise.allSettled, Promise.race, etc. to parallelize operations
* inside a transaction is undefined behaviour and should be avoided.
*
* Refer to https://mongoosejs.com/docs/transactions.html#note-about-parallelism-in-transactions
*/
await this.subscriberRepository.delete({
_environmentId: subscriber._environmentId,
_organizationId: subscriber._organizationId,
subscriberId: subscriber.subscriberId,
subscriberId,
_environmentId,
});

await this.topicSubscribersRepository.delete({
_environmentId: subscriber._environmentId,
_organizationId: subscriber._organizationId,
externalSubscriberId: subscriber.subscriberId,
_environmentId,
externalSubscriberId: subscriberId,
});
} catch (e) {
if (e instanceof DalException) {
throw new ApiException(e.message);
}
throw e;
}
await this.subscriberPreferenceRepository.delete({
_environmentId,
_subscriberId: { $in: subscriberInternalIds },
});
await this.preferenceRepository.delete({
_environmentId,
_subscriberId: { $in: subscriberInternalIds },
});
});

return {
acknowledged: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,15 @@ export class UpsertWorkflowUseCase {
);
}

private async upsertUserWorkflowPreferences(
workflow: NotificationTemplateEntity,
command: UpsertWorkflowCommand
): Promise<PreferencesEntity> {
private async upsertUserWorkflowPreferences(workflow: NotificationTemplateEntity, command: UpsertWorkflowCommand) {
let preferences: WorkflowPreferences | null;
if (command.workflowDto.preferences?.user !== undefined) {
preferences = command.workflowDto.preferences.user;
} else {
preferences = DEFAULT_WORKFLOW_PREFERENCES;
}

return await this.upsertPreferencesUsecase.upsertUserWorkflowPreferences(
await this.upsertPreferencesUsecase.upsertUserWorkflowPreferences(
UpsertUserWorkflowPreferencesCommand.create({
environmentId: workflow._environmentId,
organizationId: workflow._organizationId,
Expand All @@ -183,11 +180,8 @@ export class UpsertWorkflowUseCase {
);
}

private async upsertWorkflowPreferences(
workflow: NotificationTemplateEntity,
command: UpsertWorkflowCommand
): Promise<PreferencesEntity> {
return await this.upsertPreferencesUsecase.upsertWorkflowPreferences(
private async upsertWorkflowPreferences(workflow: NotificationTemplateEntity, command: UpsertWorkflowCommand) {
await this.upsertPreferencesUsecase.upsertWorkflowPreferences(
UpsertWorkflowPreferencesCommand.create({
environmentId: workflow._environmentId,
organizationId: workflow._organizationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,14 @@ export class UpsertPreferences {
throw new BadRequestException('Preference not found');
}

return this.deletePreferences(command, foundPreference?._id);
await this.deletePreferences(command, foundPreference?._id);

/*
* TODO: Ideally we need to return the foundPreference with a deleted: true flag
* but the repository does not support this yet. For now we will make a compromise
* to avoid refactoring all the usages of this usecase.
*/
return foundPreference;
}

if (foundPreference) {
Expand Down Expand Up @@ -162,7 +169,7 @@ export class UpsertPreferences {
private async deletePreferences(
command: UpsertPreferencesCommand,
preferencesId: string,
): Promise<PreferencesEntity> {
) {
return await this.preferencesRepository.delete({
_id: preferencesId,
_environmentId: command.environmentId,
Expand Down
15 changes: 14 additions & 1 deletion libs/dal/src/repositories/base-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ import {
DEFAULT_MESSAGE_IN_APP_RETENTION_DAYS,
DEFAULT_NOTIFICATION_RETENTION_DAYS,
} from '@novu/shared';
import { FilterQuery, Model, ProjectionType, QueryOptions, QueryWithHelpers, Types, UpdateQuery } from 'mongoose';
import {
ClientSession,
FilterQuery,
Model,
ProjectionType,
QueryOptions,
QueryWithHelpers,
Types,
UpdateQuery,
} from 'mongoose';
import { DalException } from '../shared';

export class BaseRepository<T_DBModel, T_MappedEntity, T_Enforcement> {
Expand Down Expand Up @@ -338,6 +347,10 @@ export class BaseRepository<T_DBModel, T_MappedEntity, T_Enforcement> {
protected mapEntities(data: any): T_MappedEntity[] {
return plainToInstance<T_MappedEntity, T_MappedEntity[]>(this.entity, JSON.parse(JSON.stringify(data)));
}

async withTransaction(fn: Parameters<ClientSession['withTransaction']>[0]) {
return (await this._model.db.startSession()).withTransaction(fn);
}
}

interface IOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ export class PreferencesRepository extends BaseRepository<PreferencesDBModel, Pr
return this.mapEntity(item);
}

async delete(query: PreferencesQuery) {
const item = await this.findOne({ _id: query._id, _environmentId: query._environmentId });
if (!item) throw new DalException(`Could not find preferences with id ${query._id}`);

return await this.preferences.delete({ _id: item._id, _environmentId: item._environmentId });
}

async findDeleted(query: PreferencesQuery): Promise<PreferencesEntity> {
const res: PreferencesEntity = await this.preferences.findDeleted(query);

Expand Down
42 changes: 3 additions & 39 deletions libs/dal/src/repositories/subscriber/subscriber.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,55 +157,19 @@ export class SubscriberRepository extends BaseRepository<SubscriberDBModel, Subs
);
}

async delete(query: SubscriberDeleteQuery) {
const requestQuery: SubscriberDeleteQuery = {
_environmentId: query._environmentId,
subscriberId: query.subscriberId,
};

const foundSubscriber = await this.findOne(requestQuery);

if (!foundSubscriber) {
throw new DalException(`Could not find subscriber ${query.subscriberId} to delete`);
}

return await this.subscriber.delete(requestQuery);
}

async deleteMany(query: SubscriberDeleteManyQuery) {
const requestQuery: SubscriberDeleteManyQuery = {
_environmentId: query._environmentId,
subscriberId: query.subscriberId,
};

if (query._id) {
requestQuery._id = query._id;
}

return await this.subscriber.delete(requestQuery);
}

async findDeleted(query: SubscriberQuery) {
const requestQuery: SubscriberQuery = {
_environmentId: query._environmentId,
subscriberId: query.subscriberId,
};

const res = await this.subscriber.findDeleted(requestQuery);

return this.mapEntity(res);
}

async estimatedDocumentCount(): Promise<number> {
return this.subscriber.estimatedDocumentCount();
}
}

function mapToSubscriberObject(subscriberId: string) {
return { subscriberId };
}

function regExpEscape(literalString: string): string {
return literalString.replace(/[-[\]{}()*+!<=:?./\\^$|#\s,]/g, '\\$&');
}

function isErrorWithWriteErrors(e: unknown): e is { writeErrors?: any; message?: string; result?: any } {
return typeof e === 'object' && e !== null && 'writeErrors' in e;
}

0 comments on commit 5a0efd7

Please sign in to comment.