diff --git a/api/apps/api/src/migrations/api/1626458359147-AddExternalUniqueIdToApiEvents.ts b/api/apps/api/src/migrations/api/1626458359147-AddExternalUniqueIdToApiEvents.ts new file mode 100644 index 0000000000..e8bff2ddca --- /dev/null +++ b/api/apps/api/src/migrations/api/1626458359147-AddExternalUniqueIdToApiEvents.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddExternalUniqueIdToApiEvents1626458359147 + implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE api_events ADD external_id character varying`, + ); + + await queryRunner.query( + `ALTER TABLE api_events ADD CONSTRAINT api_events_external_id_unique UNIQUE ("external_id")`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE api_events DROP CONSTRAINT api_events_external_id_unique`, + ); + await queryRunner.query(`ALTER TABLE api_events DROP COLUMN external_id`); + } +} diff --git a/api/apps/api/src/modules/api-events/api-event.api.entity.ts b/api/apps/api/src/modules/api-events/api-event.api.entity.ts index 00d715cd79..705a62e5f4 100644 --- a/api/apps/api/src/modules/api-events/api-event.api.entity.ts +++ b/api/apps/api/src/modules/api-events/api-event.api.entity.ts @@ -33,6 +33,18 @@ export class ApiEvent { @PrimaryGeneratedColumn('uuid') id!: string; + /** + * Unique identifier of an event, when the event origins from other sources than the application itself and has its own id. + * It is unique to prevent duplicates in case of multiple instances listening on the same source. + */ + @Column({ + name: 'external_id', + nullable: true, + type: 'varchar', + unique: true, + }) + externalId?: string | null; + /** * Timestamp of the event. */ diff --git a/api/apps/api/src/modules/api-events/dto/create.api-event.dto.ts b/api/apps/api/src/modules/api-events/dto/create.api-event.dto.ts index 2f2ddd464a..2113969fac 100644 --- a/api/apps/api/src/modules/api-events/dto/create.api-event.dto.ts +++ b/api/apps/api/src/modules/api-events/dto/create.api-event.dto.ts @@ -1,5 +1,5 @@ import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; -import { IsEnum, IsJSON, IsOptional, IsUUID } from 'class-validator'; +import { IsEnum, IsJSON, IsOptional, IsString, IsUUID } from 'class-validator'; import * as ApiEventsUserData from '@marxan-api/modules/api-events/dto/apiEvents.user.data.dto'; import { API_EVENT_KINDS } from '@marxan/api-events'; @@ -19,6 +19,11 @@ export class CreateApiEventDTO { @IsUUID(4) topic!: string; + @ApiPropertyOptional() + @IsString() + @IsOptional() + externalId?: string; + /** * Data payload of the event. Its semantics depend on kind. */ diff --git a/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts b/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts index f4ef06b9c7..cbc9809c9a 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/run.service.spec.ts @@ -24,7 +24,7 @@ beforeEach(async () => { }); test(`scheduling job`, async () => { - fixtures.setupMocksForSchedulingJobs(); + fixtures.setupMocksForSchedulingJobs(() => `1234`); // given fixtures.GivenAssetsAvailable(); @@ -33,12 +33,12 @@ test(`scheduling job`, async () => { // then fixtures.ThenShouldUpdateScenario(); - fixtures.ThenShouldEmitSubmittedEvent(); + fixtures.ThenShouldEmitSubmittedEvent(`1234`); fixtures.ThenShouldAddJob(); }); test(`scheduling job for scenario without assets`, async () => { - fixtures.setupMocksForSchedulingJobs(); + fixtures.setupMocksForSchedulingJobs(() => `12345`); // given fixtures.GivenAssetsNotAvailable(); @@ -94,14 +94,18 @@ describe(`with a single job in the queue`, () => { ${`failed`} | ${API_EVENT_KINDS.scenario__run__failed__v1__alpha1} ${`completed`} | ${API_EVENT_KINDS.scenario__run__finished__v1__alpha1} `(`when $GotEvent, saves $SavedKind`, async ({ GotEvent, SavedKind }) => { - fixtures.fakeEvents.emit(GotEvent, { - jobId: `123`, - data: { - scenarioId: `scenario-x`, + fixtures.fakeEvents.emit( + GotEvent, + { + jobId: `123`, + data: { + scenarioId: `scenario-x`, + }, }, - }); + `eventId1`, + ); - await fixtures.ThenEventCreated(SavedKind); + await fixtures.ThenEventCreated(SavedKind, `eventId1`); }); }); @@ -190,10 +194,12 @@ async function getFixtures() { getRunService() { return testingModule.get(RunService); }, - setupMocksForSchedulingJobs() { + setupMocksForSchedulingJobs(createId: () => string) { this.setupMockForCreatingEvents(); - fakeQueue.add.mockImplementation(() => { - // + fakeQueue.add.mockImplementation(async () => { + return { + id: createId(), + }; }); fakeScenarioRepo.update.mockImplementation(() => { // @@ -238,11 +244,12 @@ async function getFixtures() { ranAtLeastOnce: true, }); }, - ThenShouldEmitSubmittedEvent() { + ThenShouldEmitSubmittedEvent(id: string) { expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1); expect(fixtures.fakeApiEvents.create).toBeCalledWith({ topic: `scenario-1`, kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1, + externalId: `${id}${API_EVENT_KINDS.scenario__run__submitted__v1__alpha1}`, }); }, ThenShouldAddJob() { @@ -252,12 +259,13 @@ async function getFixtures() { assets: this.scenarioAssets, }); }, - async ThenEventCreated(kind: API_EVENT_KINDS) { + async ThenEventCreated(kind: API_EVENT_KINDS, eventId: string) { await waitForExpect(() => { expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1); expect(fixtures.fakeApiEvents.create).toBeCalledWith({ kind, topic: `scenario-1`, + externalId: eventId, }); }); }, diff --git a/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts b/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts index f4a6b1cc2f..c24d038f85 100644 --- a/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts +++ b/api/apps/api/src/modules/scenarios/marxan-run/run.service.ts @@ -5,6 +5,7 @@ import { Repository } from 'typeorm'; import { InjectRepository } from '@nestjs/typeorm'; import { assertDefined, isDefined } from '@marxan/utils'; import { JobData, queueName } from '@marxan/scenario-run-queue'; +import { CreateApiEventDTO } from '@marxan-api/modules/api-events/dto/create.api-event.dto'; import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service'; import { API_EVENT_KINDS } from '@marxan/api-events'; import { QueueBuilder, QueueEventsBuilder } from '@marxan-api/modules/queue'; @@ -43,23 +44,29 @@ export class RunService { private readonly scenarios: Repository, private readonly assets: AssetsService, ) { - queueEvents.on(`completed`, ({ jobId }) => this.handleFinished(jobId)); - queueEvents.on(`failed`, ({ jobId }) => this.handleFailed(jobId)); + queueEvents.on(`completed`, ({ jobId }, eventId) => + this.handleFinished(jobId, eventId), + ); + queueEvents.on(`failed`, ({ jobId }, eventId) => + this.handleFailed(jobId, eventId), + ); } async run(scenarioId: string): Promise { const assets = await this.assets.forScenario(scenarioId); assertDefined(assets); - await this.queue.add(`run-scenario`, { + const job = await this.queue.add(`run-scenario`, { scenarioId, assets, }); await this.scenarios.update(scenarioId, { ranAtLeastOnce: true, }); - await this.apiEvents.create({ + const kind = API_EVENT_KINDS.scenario__run__submitted__v1__alpha1; + await this.createEvent({ topic: scenarioId, - kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1, + kind, + externalId: job.id + kind, }); } @@ -83,19 +90,23 @@ export class RunService { return right(void 0); } - private async handleFinished(jobId: string) { + private async handleFinished(jobId: string, eventId: string) { const job = await this.getJob(jobId); - await this.apiEvents.create({ + const kind = API_EVENT_KINDS.scenario__run__finished__v1__alpha1; + await this.createEvent({ topic: job.data.scenarioId, - kind: API_EVENT_KINDS.scenario__run__finished__v1__alpha1, + kind, + externalId: eventId, }); } - private async handleFailed(jobId: string) { + private async handleFailed(jobId: string, eventId: string) { const job = await this.getJob(jobId); - await this.apiEvents.create({ + const kind = API_EVENT_KINDS.scenario__run__failed__v1__alpha1; + await this.createEvent({ topic: job.data.scenarioId, - kind: API_EVENT_KINDS.scenario__run__failed__v1__alpha1, + kind, + externalId: eventId, }); } @@ -104,4 +115,21 @@ export class RunService { assertDefined(job); return job; } + + private async createEvent(data: CreateApiEventDTO) { + try { + await this.apiEvents.create(data); + } catch (error) { + const postgresDuplicateKeyErrorCode = '23505'; + const externalIdConstraint = 'api_events_external_id_unique'; + if ( + error.code === postgresDuplicateKeyErrorCode && + error.constraint === externalIdConstraint + ) { + // duplicate + return; + } + throw error; + } + } }