Skip to content

Commit

Permalink
feat(api): prevent duplicates in api events
Browse files Browse the repository at this point in the history
  • Loading branch information
Dyostiq committed Jul 21, 2021
1 parent 9128c56 commit bfb5dab
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class AddExternalUniqueIdToApiEvents1626458359147
implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE api_events ADD external_id character varying`,
);

await queryRunner.query(
`ALTER TABLE api_events ADD CONSTRAINT api_events_external_id_unique UNIQUE ("external_id")`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE api_events DROP CONSTRAINT api_events_external_id_unique`,
);
await queryRunner.query(`ALTER TABLE api_events DROP COLUMN external_id`);
}
}
12 changes: 12 additions & 0 deletions api/apps/api/src/modules/api-events/api-event.api.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ export class ApiEvent {
@PrimaryGeneratedColumn('uuid')
id!: string;

/**
* Unique identifier of an event, when the event origins from other sources than the application itself and has its own id.
* It is unique to prevent duplicates in case of multiple instances listening on the same source.
*/
@Column({
name: 'external_id',
nullable: true,
type: 'varchar',
unique: true,
})
externalId?: string | null;

/**
* Timestamp of the event.
*/
Expand Down
24 changes: 24 additions & 0 deletions api/apps/api/src/modules/api-events/api-events.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Injectable, NotFoundException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { DeleteResult, Repository } from 'typeorm';
import { Either, left, right } from 'fp-ts/lib/Either';

import {
ApiEvent,
Expand All @@ -23,6 +24,8 @@ import { UpdateApiEventDTO } from './dto/update.api-event.dto';
import { AppInfoDTO } from '../../dto/info.dto';
import { AppConfig } from '../../utils/config.utils';

export const duplicate = Symbol();

@Injectable()
/**
* API Events
Expand Down Expand Up @@ -69,6 +72,27 @@ export class ApiEventsService extends AppBaseService<
return result;
}

/**
* recognizes duplicates on {@link CreateApiEventDTO.externalId}
*/
async createIfNotExists(
data: CreateApiEventDTO,
): Promise<Either<typeof duplicate, ApiEvent>> {
try {
return right(await this.create(data));
} catch (error) {
const postgresDuplicateKeyErrorCode = '23505';
const externalIdConstraint = 'api_events_external_id_unique';
if (
error.code === postgresDuplicateKeyErrorCode &&
error.constraint === externalIdConstraint
) {
return left(duplicate);
}
throw error;
}
}

/**
* Purge all events. Optionally this can be limited to events of a given
* `QualifiedEventTopic` (i.e. a topic qualified by `kind` and `apiVersion`).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { IsEnum, IsJSON, IsOptional, IsUUID } from 'class-validator';
import { IsEnum, IsJSON, IsOptional, IsString, IsUUID } from 'class-validator';
import * as ApiEventsUserData from '@marxan-api/modules/api-events/dto/apiEvents.user.data.dto';
import { API_EVENT_KINDS } from '@marxan/api-events';

Expand All @@ -19,6 +19,11 @@ export class CreateApiEventDTO {
@IsUUID(4)
topic!: string;

@ApiPropertyOptional()
@IsString()
@IsOptional()
externalId?: string;

/**
* Data payload of the event. Its semantics depend on kind.
*/
Expand Down
36 changes: 22 additions & 14 deletions api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ beforeEach(async () => {
});

test(`scheduling job`, async () => {
fixtures.setupMocksForSchedulingJobs();
fixtures.setupMocksForSchedulingJobs(() => `1234`);
// given
fixtures.GivenAssetsAvailable();

Expand All @@ -33,12 +33,12 @@ test(`scheduling job`, async () => {

// then
fixtures.ThenShouldUpdateScenario();
fixtures.ThenShouldEmitSubmittedEvent();
fixtures.ThenShouldEmitSubmittedEvent(`1234`);
fixtures.ThenShouldAddJob();
});

test(`scheduling job for scenario without assets`, async () => {
fixtures.setupMocksForSchedulingJobs();
fixtures.setupMocksForSchedulingJobs(() => `12345`);
// given
fixtures.GivenAssetsNotAvailable();

Expand Down Expand Up @@ -94,14 +94,18 @@ describe(`with a single job in the queue`, () => {
${`failed`} | ${API_EVENT_KINDS.scenario__run__failed__v1__alpha1}
${`completed`} | ${API_EVENT_KINDS.scenario__run__finished__v1__alpha1}
`(`when $GotEvent, saves $SavedKind`, async ({ GotEvent, SavedKind }) => {
fixtures.fakeEvents.emit(GotEvent, {
jobId: `123`,
data: {
scenarioId: `scenario-x`,
fixtures.fakeEvents.emit(
GotEvent,
{
jobId: `123`,
data: {
scenarioId: `scenario-x`,
},
},
});
`eventId1`,
);

await fixtures.ThenEventCreated(SavedKind);
await fixtures.ThenEventCreated(SavedKind, `eventId1`);
});
});

Expand Down Expand Up @@ -190,10 +194,12 @@ async function getFixtures() {
getRunService() {
return testingModule.get(RunService);
},
setupMocksForSchedulingJobs() {
setupMocksForSchedulingJobs(createId: () => string) {
this.setupMockForCreatingEvents();
fakeQueue.add.mockImplementation(() => {
//
fakeQueue.add.mockImplementation(async () => {
return {
id: createId(),
};
});
fakeScenarioRepo.update.mockImplementation(() => {
//
Expand Down Expand Up @@ -238,11 +244,12 @@ async function getFixtures() {
ranAtLeastOnce: true,
});
},
ThenShouldEmitSubmittedEvent() {
ThenShouldEmitSubmittedEvent(id: string) {
expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1);
expect(fixtures.fakeApiEvents.create).toBeCalledWith({
topic: `scenario-1`,
kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1,
externalId: `${id}${API_EVENT_KINDS.scenario__run__submitted__v1__alpha1}`,
});
},
ThenShouldAddJob() {
Expand All @@ -252,12 +259,13 @@ async function getFixtures() {
assets: this.scenarioAssets,
});
},
async ThenEventCreated(kind: API_EVENT_KINDS) {
async ThenEventCreated(kind: API_EVENT_KINDS, eventId: string) {
await waitForExpect(() => {
expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1);
expect(fixtures.fakeApiEvents.create).toBeCalledWith({
kind,
topic: `scenario-1`,
externalId: eventId,
});
});
},
Expand Down
30 changes: 20 additions & 10 deletions api/apps/api/src/modules/scenarios/marxan-run/run.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,29 @@ export class RunService {
private readonly scenarios: Repository<Scenario>,
private readonly assets: AssetsService,
) {
queueEvents.on(`completed`, ({ jobId }) => this.handleFinished(jobId));
queueEvents.on(`failed`, ({ jobId }) => this.handleFailed(jobId));
queueEvents.on(`completed`, ({ jobId }, eventId) =>
this.handleFinished(jobId, eventId),
);
queueEvents.on(`failed`, ({ jobId }, eventId) =>
this.handleFailed(jobId, eventId),
);
}

async run(scenarioId: string): Promise<void> {
const assets = await this.assets.forScenario(scenarioId);
assertDefined(assets);
await this.queue.add(`run-scenario`, {
const job = await this.queue.add(`run-scenario`, {
scenarioId,
assets,
});
await this.scenarios.update(scenarioId, {
ranAtLeastOnce: true,
});
const kind = API_EVENT_KINDS.scenario__run__submitted__v1__alpha1;
await this.apiEvents.create({
topic: scenarioId,
kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1,
kind,
externalId: job.id + kind,
});
}

Expand All @@ -83,19 +89,23 @@ export class RunService {
return right(void 0);
}

private async handleFinished(jobId: string) {
private async handleFinished(jobId: string, eventId: string) {
const job = await this.getJob(jobId);
await this.apiEvents.create({
const kind = API_EVENT_KINDS.scenario__run__finished__v1__alpha1;
await this.apiEvents.createIfNotExists({
topic: job.data.scenarioId,
kind: API_EVENT_KINDS.scenario__run__finished__v1__alpha1,
kind,
externalId: eventId,
});
}

private async handleFailed(jobId: string) {
private async handleFailed(jobId: string, eventId: string) {
const job = await this.getJob(jobId);
await this.apiEvents.create({
const kind = API_EVENT_KINDS.scenario__run__failed__v1__alpha1;
await this.apiEvents.createIfNotExists({
topic: job.data.scenarioId,
kind: API_EVENT_KINDS.scenario__run__failed__v1__alpha1,
kind,
externalId: eventId,
});
}

Expand Down

0 comments on commit bfb5dab

Please sign in to comment.