diff --git a/api/apps/api/src/migrations/api/1625209470391-AddRunAtLeastOnceFlagToScenario.ts b/api/apps/api/src/migrations/api/1625209470391-AddRunAtLeastOnceFlagToScenario.ts new file mode 100644 index 0000000000..154dacee6d --- /dev/null +++ b/api/apps/api/src/migrations/api/1625209470391-AddRunAtLeastOnceFlagToScenario.ts @@ -0,0 +1,16 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddRunAtLeastOnceFlagToScenario1625209470391 + implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE scenarios ADD "ran_at_least_once" boolean NOT NULL DEFAULT false`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE scenarios DROP COLUMN "ran_at_least_once"`, + ); + } +} diff --git a/api/apps/api/src/modules/projects/job-status/job-status.view.api.entity.ts b/api/apps/api/src/modules/projects/job-status/job-status.view.api.entity.ts index 7c7ddd4e9b..3b7798e024 100644 --- a/api/apps/api/src/modules/projects/job-status/job-status.view.api.entity.ts +++ b/api/apps/api/src/modules/projects/job-status/job-status.view.api.entity.ts @@ -68,4 +68,7 @@ const eventToJobStatusMapping: Record, JobStatus> = { JobStatus.done, [API_EVENT_KINDS.scenario__planningUnitsInclusion__submitted__v1__alpha1]: JobStatus.running, + [API_EVENT_KINDS.scenario__run__submitted__v1__alpha1]: JobStatus.running, + [API_EVENT_KINDS.scenario__run__finished__v1__alpha1]: JobStatus.done, + [API_EVENT_KINDS.scenario__run__failed__v1__alpha1]: JobStatus.failure, }; diff --git a/api/apps/api/src/modules/scenarios/__mocks__/scenario.data.ts b/api/apps/api/src/modules/scenarios/__mocks__/scenario.data.ts index 21ed84f3f9..9b10941e04 100644 --- a/api/apps/api/src/modules/scenarios/__mocks__/scenario.data.ts +++ b/api/apps/api/src/modules/scenarios/__mocks__/scenario.data.ts @@ -1,6 +1,5 @@ import { JobStatus, Scenario, ScenarioType } from '../scenario.api.entity'; import { User } from '../../users/user.api.entity'; -import { IUCNCategory } from '../../protected-areas/protected-area.geo.entity'; const scenarioBase = (): Scenario => ({ createdAt: new Date('2021-05-10T10:25:11.959Z'), @@ -16,6 +15,7 @@ const scenarioBase = (): Scenario => ({ wdpaThreshold: undefined, wdpaIucnCategories: undefined, protectedAreaFilterByIds: undefined, + ranAtLeastOnce: false, }); export const scenarioWithRequiredWatchedEmpty = (): Scenario => ({ diff --git a/api/apps/api/src/modules/scenarios/input-parameter-file.provider.spec.ts b/api/apps/api/src/modules/scenarios/input-parameter-file.provider.spec.ts index 60a51309a5..0ed955d3d5 100644 --- a/api/apps/api/src/modules/scenarios/input-parameter-file.provider.spec.ts +++ b/api/apps/api/src/modules/scenarios/input-parameter-file.provider.spec.ts @@ -153,6 +153,7 @@ async function getFixtures() { status: JobStatus.done, type: ScenarioType.marxanWithZones, users: [], + ranAtLeastOnce: false, }; }, withInputParameters() { diff --git a/api/apps/api/src/modules/scenarios/run.service.spec.ts b/api/apps/api/src/modules/scenarios/run.service.spec.ts new file mode 100644 index 0000000000..273169fec3 --- /dev/null +++ b/api/apps/api/src/modules/scenarios/run.service.spec.ts @@ -0,0 +1,252 @@ +import { EventEmitter } from 'events'; +import { PromiseType } from 'utility-types'; +import { right, left } from 'fp-ts/Either'; +import waitForExpect from 'wait-for-expect'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { Test } from '@nestjs/testing'; +import { API_EVENT_KINDS } from '@marxan/api-events'; +import { + notFound, + runEventsToken, + runQueueToken, + RunService, +} from '@marxan-api/modules/scenarios/run.service'; +import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service'; +import { Scenario } from './scenario.api.entity'; + +let fixtures: PromiseType>; +let runService: RunService; + +beforeEach(async () => { + fixtures = await getFixtures(); + runService = fixtures.getRunService(); +}); + +test(`scheduling job`, async () => { + fixtures.setupMocksForSchedulingJobs(); + + // when + await runService.run('scenario-1'); + + // then + fixtures.ThenShouldUpdateScenario(); + fixtures.ThenShouldEmitSubmittedEvent(); + fixtures.ThenShouldAddJob(); +}); + +test(`canceling job`, async () => { + fixtures.GivenANotCancellableJobInQueue(); + + const result = await runService.cancel(`scenario-1`); + + expect(result).toStrictEqual(right(void 0)); +}); + +test(`canceling job`, async () => { + fixtures.GivenNoJobsInQueue(); + + const result = await runService.cancel(`scenario-1`); + + expect(result).toStrictEqual(left(notFound)); +}); + +test(`canceling active job`, async () => { + fixtures.GivenAnActiveJobInQueue(); + + const result = await runService.cancel(`scenario-1`); + + fixtures.ThenProgressOfActiveJobIsSetToCancel(); + expect(result).toStrictEqual(right(void 0)); +}); + +test(`canceling waiting job`, async () => { + fixtures.GivenAWaitingJobInQueue(); + + const result = await runService.cancel(`scenario-1`); + + expect(fixtures.waitingJob.remove).toBeCalledTimes(1); + expect(result).toStrictEqual(right(void 0)); +}); + +describe(`with a single job in the queue`, () => { + beforeEach(() => { + fixtures.setupMockForCreatingEvents(); + fixtures.GivenAJobInQueue(); + }); + + test.each` + Got Event | Saved Kind + ${`failed`} | ${API_EVENT_KINDS.scenario__run__failed__v1__alpha1} + ${`completed`} | ${API_EVENT_KINDS.scenario__run__finished__v1__alpha1} + `(`when $GotEvent, saves $SavedKind`, async ({ GotEvent, SavedKind }) => { + fixtures.fakeEvents.emit(GotEvent, { + jobId: `123`, + data: { + scenarioId: `scenario-x`, + }, + }); + + await fixtures.ThenEventCreated(SavedKind); + }); +}); + +async function getFixtures() { + const throwingMock = () => jest.fn(fail); + const fakeQueue = { + add: throwingMock(), + getJobs: jest.fn(), + getJob: jest.fn(), + }; + const fakeEvents = new EventEmitter(); + const fakeApiEvents = { + create: throwingMock(), + }; + const fakeScenarioRepo = { + update: throwingMock(), + }; + const testingModule = await Test.createTestingModule({ + providers: [ + { + provide: runQueueToken, + useValue: fakeQueue, + }, + { + provide: runEventsToken, + useValue: fakeEvents, + }, + { + provide: ApiEventsService, + useValue: fakeApiEvents, + }, + { + provide: getRepositoryToken(Scenario), + useValue: fakeScenarioRepo, + }, + RunService, + ], + }).compile(); + + return { + fakeQueue, + fakeEvents, + fakeApiEvents, + fakeScenarioRepo, + activeJob: { + data: { + scenarioId: 'scenario-1', + }, + isActive: async () => true, + isWaiting: async () => false, + updateProgress: jest.fn(), + }, + waitingJob: { + data: { + scenarioId: 'scenario-1', + }, + isActive: async () => false, + isWaiting: async () => true, + remove: jest.fn(), + }, + notCancelableJob: { + data: { + scenarioId: 'scenario-1', + }, + isActive: async () => false, + isWaiting: async () => false, + }, + otherJob: { + data: { + scenarioId: 'other-scenario', + }, + }, + getRunService() { + return testingModule.get(RunService); + }, + setupMocksForSchedulingJobs() { + this.setupMockForCreatingEvents(); + fakeQueue.add.mockImplementation(() => { + // + }); + fakeScenarioRepo.update.mockImplementation(() => { + // + }); + }, + setupMockForCreatingEvents() { + fakeApiEvents.create.mockImplementation(() => { + // + }); + }, + GivenAnActiveJobInQueue() { + const jobs = [this.otherJob, this.activeJob]; + this.fakeQueue.getJobs.mockImplementation((...args) => { + expect(args).toStrictEqual([['active', 'waiting']]); + return jobs; + }); + }, + GivenAWaitingJobInQueue() { + const jobs = [this.otherJob, this.waitingJob]; + this.fakeQueue.getJobs.mockImplementation((...args) => { + expect(args).toStrictEqual([['active', 'waiting']]); + return jobs; + }); + }, + GivenANotCancellableJobInQueue() { + const jobs = [this.otherJob, this.notCancelableJob]; + this.fakeQueue.getJobs.mockImplementation((...args) => { + expect(args).toStrictEqual([['active', 'waiting']]); + return jobs; + }); + }, + ThenProgressOfActiveJobIsSetToCancel() { + expect(fixtures.activeJob.updateProgress).toBeCalledTimes(1); + expect(fixtures.activeJob.updateProgress).toBeCalledWith({ + canceled: true, + }); + }, + ThenShouldUpdateScenario() { + expect(fixtures.fakeScenarioRepo.update).toBeCalledTimes(1); + expect(fixtures.fakeScenarioRepo.update).toBeCalledWith(`scenario-1`, { + ranAtLeastOnce: true, + }); + }, + ThenShouldEmitSubmittedEvent() { + expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1); + expect(fixtures.fakeApiEvents.create).toBeCalledWith({ + topic: `scenario-1`, + kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1, + }); + }, + ThenShouldAddJob() { + expect(fixtures.fakeQueue.add).toBeCalledTimes(1); + expect(fixtures.fakeQueue.add).toBeCalledWith(`run-scenario`, { + scenarioId: `scenario-1`, + }); + }, + async ThenEventCreated(kind: API_EVENT_KINDS) { + await waitForExpect(() => { + expect(fixtures.fakeApiEvents.create).toBeCalledTimes(1); + expect(fixtures.fakeApiEvents.create).toBeCalledWith({ + kind, + topic: `scenario-1`, + }); + }); + }, + GivenNoJobsInQueue() { + const jobs = [] as const; + this.fakeQueue.getJobs.mockImplementation((...args) => { + expect(args).toStrictEqual([['active', 'waiting']]); + return jobs; + }); + }, + GivenAJobInQueue() { + fixtures.fakeQueue.getJob.mockImplementation((...args) => { + expect(args).toStrictEqual([`123`]); + return { + data: { + scenarioId: `scenario-1`, + }, + }; + }); + }, + }; +} diff --git a/api/apps/api/src/modules/scenarios/run.service.ts b/api/apps/api/src/modules/scenarios/run.service.ts new file mode 100644 index 0000000000..ed1853bf22 --- /dev/null +++ b/api/apps/api/src/modules/scenarios/run.service.ts @@ -0,0 +1,105 @@ +import { Job, Queue, QueueEvents } from 'bullmq'; +import { Either, left, right } from 'fp-ts/Either'; +import { FactoryProvider, Inject, Injectable } from '@nestjs/common'; +import { Repository } from 'typeorm'; +import { InjectRepository } from '@nestjs/typeorm'; +import { assertDefined, isDefined } from '@marxan/utils'; +import { ApiEventsService } from '@marxan-api/modules/api-events/api-events.service'; +import { API_EVENT_KINDS } from '@marxan/api-events'; +import { QueueBuilder, QueueEventsBuilder } from '@marxan-api/modules/queue'; +import { Scenario } from '@marxan-api/modules/scenarios/scenario.api.entity'; + +export const runQueueToken = Symbol('run queue token'); +export const runEventsToken = Symbol('run events token'); +export const queueName = 'scenario-run-queue'; +export const runQueueProvider: FactoryProvider> = { + provide: runQueueToken, + useFactory: (queueBuilder: QueueBuilder) => { + return queueBuilder.buildQueue(queueName); + }, + inject: [QueueBuilder], +}; +export const runQueueEventsProvider: FactoryProvider = { + provide: runEventsToken, + useFactory: (eventsBuilder: QueueEventsBuilder) => { + return eventsBuilder.buildQueueEvents(queueName); + }, + inject: [QueueEventsBuilder], +}; + +type JobData = { + scenarioId: string; +}; + +export const notFound = Symbol('not found'); +export type NotFound = typeof notFound; + +@Injectable() +export class RunService { + constructor( + @Inject(runQueueToken) + private readonly queue: Queue, + @Inject(runEventsToken) + queueEvents: QueueEvents, + private readonly apiEvents: ApiEventsService, + @InjectRepository(Scenario) + private readonly scenarios: Repository, + ) { + queueEvents.on(`completed`, ({ jobId }) => this.handleFinished(jobId)); + queueEvents.on(`failed`, ({ jobId }) => this.handleFailed(jobId)); + } + + async run(scenarioId: string): Promise { + await this.queue.add(`run-scenario`, { + scenarioId, + }); + await this.scenarios.update(scenarioId, { + ranAtLeastOnce: true, + }); + await this.apiEvents.create({ + topic: scenarioId, + kind: API_EVENT_KINDS.scenario__run__submitted__v1__alpha1, + }); + } + + async cancel(scenarioId: string): Promise> { + const activeJobs: Job[] = await this.queue.getJobs([ + 'active', + 'waiting', + ]); + const scenarioJob = activeJobs.find( + (job) => job.data.scenarioId === scenarioId, + ); + if (!isDefined(scenarioJob)) return left(notFound); + + if (await scenarioJob.isActive()) + await scenarioJob.updateProgress({ + canceled: true, + }); + else if (await scenarioJob.isWaiting()) await scenarioJob.remove(); + + return right(void 0); + } + + private async handleFinished(jobId: string) { + const job = await this.getJob(jobId); + await this.apiEvents.create({ + topic: job.data.scenarioId, + kind: API_EVENT_KINDS.scenario__run__finished__v1__alpha1, + }); + } + + private async handleFailed(jobId: string) { + const job = await this.getJob(jobId); + await this.apiEvents.create({ + topic: job.data.scenarioId, + kind: API_EVENT_KINDS.scenario__run__failed__v1__alpha1, + }); + } + + private async getJob(jobId: string): Promise> { + const job = await this.queue.getJob(jobId); + assertDefined(job); + return job; + } +} diff --git a/api/apps/api/src/modules/scenarios/scenario.api.entity.ts b/api/apps/api/src/modules/scenarios/scenario.api.entity.ts index 3601258dbc..80367e1c3c 100644 --- a/api/apps/api/src/modules/scenarios/scenario.api.entity.ts +++ b/api/apps/api/src/modules/scenarios/scenario.api.entity.ts @@ -158,6 +158,13 @@ export class Scenario extends TimeUserEntityMetadata { @IsArray() @ManyToMany((_type) => User, (user) => user.scenarios, { eager: true }) users!: Partial[]; + + @ApiProperty() + @Column({ + name: 'ran_at_least_once', + default: false, + }) + ranAtLeastOnce!: boolean; } export class JSONAPIScenarioData { diff --git a/api/apps/api/src/modules/scenarios/scenarios.module.ts b/api/apps/api/src/modules/scenarios/scenarios.module.ts index 86c53ef1dd..f650675e5f 100644 --- a/api/apps/api/src/modules/scenarios/scenarios.module.ts +++ b/api/apps/api/src/modules/scenarios/scenarios.module.ts @@ -30,9 +30,16 @@ import { IoSettings, ioSettingsToken, } from './input-parameter-file.provider'; -import { AppConfig } from '@marxan-api/utils/config.utils'; import { assertDefined } from '@marxan/utils'; +import { AppConfig } from '@marxan-api/utils/config.utils'; +import { QueueModule } from '@marxan-api/modules/queue/queue.module'; +import { ApiEventsModule } from '@marxan-api/modules/api-events/api-events.module'; import { SpecDatModule } from './input-files/spec.dat.module'; +import { + runQueueEventsProvider, + runQueueProvider, + RunService, +} from './run.service'; import { MarxanRunService } from './marxan-run/marxan-run.service'; import { MarxanRunController } from './marxan-run/marxan-run.controller'; @@ -56,8 +63,13 @@ import { MarxanRunController } from './marxan-run/marxan-run.controller'; CostSurfaceViewModule, SpecDatModule, PlanningUnitsProtectionLevelModule, + QueueModule.register(), + ApiEventsModule, ], providers: [ + runQueueProvider, + runQueueEventsProvider, + RunService, ScenariosService, ScenariosCrudService, ProxyService, diff --git a/api/apps/api/src/modules/scenarios/scenarios.service.ts b/api/apps/api/src/modules/scenarios/scenarios.service.ts index 74a4cb8e97..1bd085d977 100644 --- a/api/apps/api/src/modules/scenarios/scenarios.service.ts +++ b/api/apps/api/src/modules/scenarios/scenarios.service.ts @@ -2,7 +2,8 @@ import { BadRequestException, HttpService, Injectable, - NotImplementedException, + InternalServerErrorException, + NotFoundException, } from '@nestjs/common'; import { FetchSpecification } from 'nestjs-base-service'; import { classToClass } from 'class-transformer'; @@ -28,6 +29,8 @@ import { SolutionResultCrudService } from './solutions-result/solution-result-cr import { CostSurfaceViewService } from './cost-surface-readmodel/cost-surface-view.service'; import { InputParameterFileProvider } from './input-parameter-file.provider'; import { SpecDatService } from './input-files/spec.dat.service'; +import { notFound, RunService } from './run.service'; +import { isLeft } from 'fp-ts/Either'; @Injectable() export class ScenariosService { @@ -45,6 +48,7 @@ export class ScenariosService { private readonly costSurfaceView: CostSurfaceViewService, private readonly marxanInputValidator: MarxanInput, private readonly inputParameterFileProvider: InputParameterFileProvider, + private readonly runService: RunService, private readonly specDatService: SpecDatService, ) {} @@ -157,15 +161,20 @@ export class ScenariosService { async run(scenarioId: string, _blm?: number): Promise { await this.assertScenario(scenarioId); - // TODO ensure not running yet - // TODO submit - throw new NotImplementedException(); + await this.runService.run(scenarioId); } async cancel(scenarioId: string): Promise { await this.assertScenario(scenarioId); - // TODO ensure it is running - throw new NotImplementedException(); + const result = await this.runService.cancel(scenarioId); + if (isLeft(result)) { + const mapping: Record never> = { + [notFound]: () => { + throw new NotFoundException(); + }, + }; + mapping[result.left](); + } } private async assertScenario(scenarioId: string) { diff --git a/api/libs/api-events/src/api-event-kinds.enum.ts b/api/libs/api-events/src/api-event-kinds.enum.ts index aa0c54063c..038287fff0 100644 --- a/api/libs/api-events/src/api-event-kinds.enum.ts +++ b/api/libs/api-events/src/api-event-kinds.enum.ts @@ -14,6 +14,9 @@ export enum API_EVENT_KINDS { scenario__planningUnitsInclusion__submitted__v1__alpha1 = 'scenario.planningUnitsInclusion.submitted/v1alpha', scenario__planningUnitsInclusion__failed__v1__alpha1 = 'scenario.planningUnitsInclusion.failed/v1alpha', scenario__planningUnitsInclusion__finished__v1__alpha1 = 'scenario.planningUnitsInclusion.finished/v1alpha', + scenario__run__submitted__v1__alpha1 = 'scenario.run.submitted/v1alpha', + scenario__run__failed__v1__alpha1 = 'scenario.run.failed/v1alpha', + scenario__run__finished__v1__alpha1 = 'scenario.run.finished/v1alpha', project__protectedAreas__submitted__v1__alpha = 'project.protectedAreas.submitted/v1/alpha', project__protectedAreas__finished__v1__alpha = 'project.protectedAreas.finished/v1/alpha', project__protectedAreas__failed__v1__alpha = 'project.protectedAreas.failed/v1/alpha', diff --git a/api/package.json b/api/package.json index 72582a569c..32bf7523cf 100644 --- a/api/package.json +++ b/api/package.json @@ -59,7 +59,7 @@ "config": "^3.3.3", "express": "^4.17.1", "faker": "^5.1.0", - "fp-ts": "^2.10.5", + "fp-ts": "2.10.5", "geojson": "0.5.0", "helmet": "^4.3.1", "http-proxy": "^1.18.1", diff --git a/api/yarn.lock b/api/yarn.lock index c0dc040313..d368734380 100644 --- a/api/yarn.lock +++ b/api/yarn.lock @@ -4918,7 +4918,7 @@ forwarded@~0.1.2: resolved "https://registry.yarnpkg.com/forwarded/-/forwarded-0.1.2.tgz#98c23dab1175657b8c0573e8ceccd91b0ff18c84" integrity sha1-mMI9qxF1ZXuMBXPozszZGw/xjIQ= -fp-ts@^2.10.5: +fp-ts@2.10.5: version "2.10.5" resolved "https://registry.yarnpkg.com/fp-ts/-/fp-ts-2.10.5.tgz#7c77868fe8bd9b229743303c1bec505b959f631b" integrity sha512-X2KfTIV0cxIk3d7/2Pvp/pxL/xr2MV1WooyEzKtTWYSc1+52VF4YzjBTXqeOlSiZsPCxIBpDGfT9Dyo7WEY0DQ==