Skip to content

Commit

Permalink
feat(api): prevent duplicates in api events
Browse files Browse the repository at this point in the history
  • Loading branch information
Dyostiq committed Jul 19, 2021
1 parent 25880b3 commit 7722a47
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class AddExternalUniqueIdToApiEvents1626458359147
implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE api_events ADD external_id character varying`,
);

await queryRunner.query(
`ALTER TABLE api_events ADD CONSTRAINT api_events_external_id_unique UNIQUE ("external_id")`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE api_events DROP CONSTRAINT api_events_external_id_unique`,
);
await queryRunner.query(`ALTER TABLE api_events DROP COLUMN external_id`);
}
}
12 changes: 12 additions & 0 deletions api/apps/api/src/modules/api-events/api-event.api.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ export class ApiEvent {
@PrimaryGeneratedColumn('uuid')
id!: string;

/**
* Unique identifier of an event, when the event origins from other sources than the application itself and has its own id.
* It is unique to prevent duplicates in case of multiple instances listening on the same source.
*/
@Column({
name: 'external_id',
nullable: true,
type: 'varchar',
unique: true,
})
externalId?: string | null;

/**
* Timestamp of the event.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
import { IsEnum, IsJSON, IsOptional, IsUUID } from 'class-validator';
import { IsEnum, IsJSON, IsOptional, IsString, IsUUID } from 'class-validator';
import * as ApiEventsUserData from '@marxan-api/modules/api-events/dto/apiEvents.user.data.dto';
import { API_EVENT_KINDS } from '@marxan/api-events';

Expand All @@ -19,6 +19,11 @@ export class CreateApiEventDTO {
@IsUUID(4)
topic!: string;

@ApiPropertyOptional()
@IsString()
@IsOptional()
externalId?: string;

/**
* Data payload of the event. Its semantics depend on kind.
*/
Expand Down
36 changes: 22 additions & 14 deletions api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ beforeEach(async () => {
});

test(`scheduling job`, async () => {
fixtures.setupMocksForSchedulingJobs();
fixtures.setupMocksForSchedulingJobs(() => `1234`);
// given
fixtures.GivenAssetsAvailable();

Expand All @@ -33,12 +33,12 @@ test(`scheduling job`, async () => {

// then
fixtures.ThenShouldUpdateScenario();
fixtures.ThenShouldEmitSubmittedEvent();
fixtures.ThenShouldEmitSubmittedEvent(`1234`);
fixtures.ThenShouldAddJob();
});

test(`scheduling job for scenario without assets`, async () => {
fixtures.setupMocksForSchedulingJobs();
fixtures.setupMocksForSchedulingJobs(() => `12345`);
// given
fixtures.GivenAssetsNotAvailable();

Expand Down Expand Up @@ -94,14 +94,18 @@ describe(`with a single job in the queue`, () => {
${`failed`} | ${API_EVENT_KINDS.scenario__run__failed__v1__alpha1}
${`completed`} | ${API_EVENT_KINDS.scenario__run__finished__v1__alpha1}
`(`when $GotEvent, saves $SavedKind`, async ({ GotEvent, SavedKind }) => {
fixtures.fakeEvents.emit(GotEvent, {
jobId: `123`,
data: {
scenarioId: `scenario-x`,
fixtures.fakeEvents.emit(
GotEvent,
{
jobId: `123`,
data: {
scenarioId: `scenario-x`,
},
},
});
`eventId1`,
);

await fixtures.ThenEventCreated(SavedKind);
await fixtures.ThenEventCreated(SavedKind, `eventId1`);
});
});

Expand Down Expand Up @@ -190,10 +194,12 @@ async function getFixtures() {
getRunService() {
return testingModule.get(RunService);
},
setupMocksForSchedulingJobs() {
setupMocksForSchedulingJobs(createId: () => string) {
this.setupMockForCreatingEvents();
fakeQueue.add.mockImplementation(() => {
//
fakeQueue.add.mockImplementation(async () => {
return {
id: createId(),
};
});
fakeScenarioRepo.update.mockImplementation(() => {
//
Expand Down Expand Up @@ -238,11 +244,12 @@ async function getFixtures() {
ranAtLeastOnce: true,
});
},
ThenShouldEmitSubmittedEvent() {
ThenShouldEmitSubmittedEvent(id: string) {
expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1);
expect(fixtures.fakeApiEvents.create).toBeCalledWith({
topic: `scenario-1`,
kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1,
externalId: `${id}${API_EVENT_KINDS.scenario__run__submitted__v1__alpha1}`,
});
},
ThenShouldAddJob() {
Expand All @@ -252,12 +259,13 @@ async function getFixtures() {
assets: this.scenarioAssets,
});
},
async ThenEventCreated(kind: API_EVENT_KINDS) {
async ThenEventCreated(kind: API_EVENT_KINDS, eventId: string) {
await waitForExpect(() => {
expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1);
expect(fixtures.fakeApiEvents.create).toBeCalledWith({
kind,
topic: `scenario-1`,
externalId: eventId,
});
});
},
Expand Down
50 changes: 39 additions & 11 deletions api/apps/api/src/modules/scenarios/marxan-run/run.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Repository } from 'typeorm';
import { InjectRepository } from '@nestjs/typeorm';
import { assertDefined, isDefined } from '@marxan/utils';
import { JobData, queueName } from '@marxan/scenario-run-queue';
import { CreateApiEventDTO } from '@marxan-api/modules/api-events/dto/create.api-event.dto';
import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service';
import { API_EVENT_KINDS } from '@marxan/api-events';
import { QueueBuilder, QueueEventsBuilder } from '@marxan-api/modules/queue';
Expand Down Expand Up @@ -43,23 +44,29 @@ export class RunService {
private readonly scenarios: Repository<Scenario>,
private readonly assets: AssetsService,
) {
queueEvents.on(`completed`, ({ jobId }) => this.handleFinished(jobId));
queueEvents.on(`failed`, ({ jobId }) => this.handleFailed(jobId));
queueEvents.on(`completed`, ({ jobId }, eventId) =>
this.handleFinished(jobId, eventId),
);
queueEvents.on(`failed`, ({ jobId }, eventId) =>
this.handleFailed(jobId, eventId),
);
}

async run(scenarioId: string): Promise<void> {
const assets = await this.assets.forScenario(scenarioId);
assertDefined(assets);
await this.queue.add(`run-scenario`, {
const job = await this.queue.add(`run-scenario`, {
scenarioId,
assets,
});
await this.scenarios.update(scenarioId, {
ranAtLeastOnce: true,
});
await this.apiEvents.create({
const kind = API_EVENT_KINDS.scenario__run__submitted__v1__alpha1;
await this.createEvent({
topic: scenarioId,
kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1,
kind,
externalId: job.id + kind,
});
}

Expand All @@ -83,19 +90,23 @@ export class RunService {
return right(void 0);
}

private async handleFinished(jobId: string) {
private async handleFinished(jobId: string, eventId: string) {
const job = await this.getJob(jobId);
await this.apiEvents.create({
const kind = API_EVENT_KINDS.scenario__run__finished__v1__alpha1;
await this.createEvent({
topic: job.data.scenarioId,
kind: API_EVENT_KINDS.scenario__run__finished__v1__alpha1,
kind,
externalId: eventId,
});
}

private async handleFailed(jobId: string) {
private async handleFailed(jobId: string, eventId: string) {
const job = await this.getJob(jobId);
await this.apiEvents.create({
const kind = API_EVENT_KINDS.scenario__run__failed__v1__alpha1;
await this.createEvent({
topic: job.data.scenarioId,
kind: API_EVENT_KINDS.scenario__run__failed__v1__alpha1,
kind,
externalId: eventId,
});
}

Expand All @@ -104,4 +115,21 @@ export class RunService {
assertDefined(job);
return job;
}

private async createEvent(data: CreateApiEventDTO) {
try {
await this.apiEvents.create(data);
} catch (error) {
const postgresDuplicateKeyErrorCode = '23505';
const externalIdConstraint = 'api_events_external_id_unique';
if (
error.code === postgresDuplicateKeyErrorCode &&
error.constraint === externalIdConstraint
) {
// duplicate
return;
}
throw error;
}
}
}

0 comments on commit 7722a47

Please sign in to comment.