Skip to content

Commit

Permalink
feat: add event for planning unit geom calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
Dyostiq committed Sep 29, 2021
1 parent 70a2a13 commit 8e9f178
Show file tree
Hide file tree
Showing 21 changed files with 198 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class AddPlanningUnitCalculationEvents1632901740965
implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
INSERT INTO api_event_kinds (id) values
('project.planningUnits.submitted/v1/alpha'),
('project.planningUnits.finished/v1/alpha'),
('project.planningUnits.failed/v1/alpha');
`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`DELETE FROM api_event_kinds WHERE id = 'project.planningUnits.submitted/v1/alpha';`,
);
await queryRunner.query(
`DELETE FROM api_event_kinds WHERE id = 'project.planningUnits.finished/v1/alpha';`,
);
await queryRunner.query(
`DELETE FROM api_event_kinds WHERE id = 'project.planningUnits.failed/v1/alpha';`,
);
}
}
4 changes: 2 additions & 2 deletions api/apps/api/src/modules/analysis/analysis.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Module } from '@nestjs/common';
import { ApiEventsModule } from '@marxan-api/modules/api-events/api-events.module';
import { queueName } from '@marxan-jobs/planning-unit-geometry';
import { updateQueueName } from '@marxan-jobs/planning-unit-geometry';
import { PlanningUnitsModule } from '@marxan-api/modules/planning-units/planning-units.module';
import { ScenariosPlanningUnitModule } from '@marxan-api/modules/scenarios-planning-unit/scenarios-planning-unit.module';

Expand All @@ -20,7 +20,7 @@ import { UpdatePlanningUnitsEventsPort } from './providers/planning-units/update
ScenariosPlanningUnitModule,
PlanningUnitsModule,
QueueModule.register({
name: queueName,
name: updateQueueName,
}),
],
providers: [
Expand Down

This file was deleted.

This file was deleted.

18 changes: 12 additions & 6 deletions api/apps/api/src/modules/planning-units/planning-units.module.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import { Module } from '@nestjs/common';
import { ProxyService } from '@marxan-api/modules/proxy/proxy.service';
import { PlanningUnitsController } from './planning-units.controller';
import { PlanningUnitsService } from './planning-units.service';
import {
PlanningUnitsService,
queueEventsProvider,
queueProvider,
} from './planning-units.service';
import { QueueModule } from '@marxan-api/modules/queue/queue.module';
import { ApiEventsModule } from '@marxan-api/modules/api-events';

@Module({
imports: [
QueueModule.register({
name: 'planning-units',
}),
imports: [QueueModule.register(), ApiEventsModule],
providers: [
PlanningUnitsService,
ProxyService,
queueProvider,
queueEventsProvider,
],
providers: [PlanningUnitsService, ProxyService],
exports: [PlanningUnitsService],
controllers: [PlanningUnitsController],
})
Expand Down
79 changes: 72 additions & 7 deletions api/apps/api/src/modules/planning-units/planning-units.service.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,80 @@
import { Injectable } from '@nestjs/common';
import { FactoryProvider, Inject, Injectable } from '@nestjs/common';

import { QueueService } from '@marxan-api/modules/queue/queue.service';
import { CreatePlanningUnitsDTO } from './dto/create.planning-units.dto';
import { Job, Queue, QueueEvents } from 'bullmq';
import { ApiEventsService } from '@marxan-api/modules/api-events';
import { API_EVENT_KINDS } from '@marxan/api-events';
import {
PlanningUnitsJob,
createQueueName,
} from '@marxan-jobs/planning-unit-geometry';
import { QueueBuilder, QueueEventsBuilder } from '@marxan-api/modules/queue';

export const queueToken = Symbol(`planning unit queue token`);
export const queueProvider: FactoryProvider<Queue<PlanningUnitsJob, void>> = {
provide: queueToken,
useFactory: (builder: QueueBuilder) => builder.buildQueue(createQueueName),
inject: [QueueBuilder],
};
export const queueEventsToken = Symbol(`planning units queue events token`);
export const queueEventsProvider: FactoryProvider<QueueEvents> = {
provide: queueEventsToken,
useFactory: (builder: QueueEventsBuilder) =>
builder.buildQueueEvents(createQueueName),
inject: [QueueEventsBuilder],
};

type CompletedEvent = { jobId: string; returnvalue: string };
type FailedEvent = { jobId: string; failedReason: string };

@Injectable()
export class PlanningUnitsService {
constructor(
private readonly queueService: QueueService<CreatePlanningUnitsDTO>,
) {}
@Inject(queueToken)
private readonly queue: Queue<PlanningUnitsJob, void>,
@Inject(queueEventsToken)
private readonly queueEvents: QueueEvents,
private readonly events: ApiEventsService,
) {
this.queueEvents.on(`completed`, (data, eventId) =>
this.onCompleted(data, eventId),
);
this.queueEvents.on(`failed`, (data, eventId) =>
this.onFailed(data, eventId),
);
}

public async create(jobDefinition: PlanningUnitsJob): Promise<void> {
const job = await this.queue.add('create-regular-pu', jobDefinition);
await this.events.createIfNotExists({
kind: API_EVENT_KINDS.project__planningUnits__submitted__v1__alpha,
topic: jobDefinition.projectId,
externalId: job.id,
});
}

private async onCompleted(data: CompletedEvent, eventId: string) {
const job: Job<PlanningUnitsJob> | undefined = await this.queue.getJob(
data.jobId,
);
if (!job) return;
const { projectId } = job.data;
await this.events.createIfNotExists({
kind: API_EVENT_KINDS.project__protectedAreas__finished__v1__alpha,
topic: projectId,
externalId: eventId,
});
}

public async create(creationOptions: CreatePlanningUnitsDTO): Promise<void> {
await this.queueService.queue.add('create-regular-pu', creationOptions);
private async onFailed(data: FailedEvent, eventId: string) {
const job: Job<PlanningUnitsJob> | undefined = await this.queue.getJob(
data.jobId,
);
if (!job) return;
const { projectId } = job.data;
await this.events.createIfNotExists({
kind: API_EVENT_KINDS.project__protectedAreas__finished__v1__alpha,
topic: projectId,
externalId: eventId,
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ const eventToJobStatusMapping: Record<ValuesType<ProjectEvents>, JobStatus> = {
JobStatus.done,
[API_EVENT_KINDS.project__protectedAreas__submitted__v1__alpha]:
JobStatus.running,
[API_EVENT_KINDS.project__planningUnits__submitted__v1__alpha]:
JobStatus.running,
[API_EVENT_KINDS.project__planningUnits__finished__v1__alpha]: JobStatus.done,
[API_EVENT_KINDS.project__planningUnits__failed__v1__alpha]:
JobStatus.failure,
};
14 changes: 12 additions & 2 deletions api/apps/api/src/modules/projects/projects-crud.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,12 @@ export class ProjectsCrudService extends AppBaseService<
'creating planning unit job and assigning project to area',
);
await Promise.all([
this.planningUnitsService.create(createModel),
this.planningUnitsService.create({
...createModel,
planningUnitAreakm2: createModel.planningUnitAreakm2,
planningUnitGridShape: createModel.planningUnitGridShape,
projectId: model.id,
}),
this.planningAreasService.assignProject({
projectId: model.id,
planningAreaGeometryId: createModel.planningAreaId,
Expand All @@ -221,7 +226,12 @@ export class ProjectsCrudService extends AppBaseService<
createModel.planningAreaId)
) {
await Promise.all([
this.planningUnitsService.create(createModel),
this.planningUnitsService.create({
...createModel,
planningUnitAreakm2: createModel.planningUnitAreakm2,
planningUnitGridShape: createModel.planningUnitGridShape,
projectId: model.id,
}),
this.planningAreasService.assignProject({
projectId: model.id,
planningAreaGeometryId: createModel.planningAreaId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { ExpectBadRequest } from './assertions/expect-bad-request';
import { HasRelevantJobName } from './assertions/has-relevant-job-name';
import { HasExpectedJobDetails } from './assertions/has-expected-job-details';
import { tearDown } from '../utils/tear-down';
import { queueName } from '@marxan-jobs/planning-unit-geometry';
import { updateQueueName } from '@marxan-jobs/planning-unit-geometry';

let app: INestApplication;
let jwtToken: string;
Expand All @@ -23,7 +23,7 @@ beforeAll(async () => {
jwtToken = await GivenUserIsLoggedIn(app);
world = await createWorld(app, jwtToken);
await world.GivenScenarioPuDataExists();
queue = FakeQueue.getByName(queueName);
queue = FakeQueue.getByName(updateQueueName);
});

afterAll(async () => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
import { Logger } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig';
import { createConnection } from 'typeorm';
import { Connection } from 'typeorm';
import { validate } from 'class-validator';
import { InjectConnection } from '@nestjs/typeorm';
import { plainToClass } from 'class-transformer';

import {
PlanningUnitsJob,
PlanningUnitGridShape,
} from './dto/create.regular.planning-units.dto';
import { plainToClass } from 'class-transformer';
} from '@marxan-jobs/planning-unit-geometry';

const logger = new Logger('planning-units-job-processor');

@Injectable()
export class PlanningUnitsJobProcessor {
constructor(
@InjectConnection()
private readonly geoConnection: Connection,
) {}

async process(job: Pick<Job<PlanningUnitsJob>, 'data' | 'id' | 'name'>) {
await createPlanningUnitGridFromJobSpec(job, this.geoConnection);
}
}

/**
* @deprecated Workers and jobs should be move to the new functionality
* @description This function will take care of generating the regular-pu-grids in the area
Expand All @@ -27,6 +39,7 @@ const logger = new Logger('planning-units-job-processor');
*/
const createPlanningUnitGridFromJobSpec = async (
job: Pick<Job<PlanningUnitsJob>, 'data' | 'id' | 'name'>,
connection: Connection,
) => {
logger.debug(`Start planning-units processing for ${job.id}...`);
/**
Expand All @@ -41,7 +54,6 @@ const createPlanningUnitGridFromJobSpec = async (
}

if (job.name === 'create-regular-pu') {
const connection = await createConnection(geoprocessingConnections.default);
try {
let subquery: string;
const gridShape: { [value in PlanningUnitGridShape]?: string } = {
Expand Down Expand Up @@ -90,8 +102,6 @@ const createPlanningUnitGridFromJobSpec = async (
} catch (err) {
logger.error(err);
throw err;
} finally {
await connection.close();
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { PlanningUnitsController } from './planning-units.controller';
import { ShapefileService, FileService } from '@marxan/shapefile-converter';
import { PlanningUnitsService } from './planning-units.service';
import { WorkerModule } from '../worker';
import { PlanningUnitsJobProcessor } from './planning-units.job';

@Module({
imports: [TileModule, WorkerModule],
Expand All @@ -13,6 +14,7 @@ import { WorkerModule } from '../worker';
ShapefileService,
FileService,
PlanningUnitsService,
PlanningUnitsJobProcessor,
Logger,
],
controllers: [PlanningUnitsController],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,38 @@
import { Logger, Injectable } from '@nestjs/common';
import { Worker, QueueScheduler } from 'bullmq';
import { join } from 'path';
import * as config from 'config';
import { Injectable, Logger } from '@nestjs/common';
import { Worker } from 'bullmq';
import { WorkerBuilder } from '@marxan-geoprocessing/modules/worker';
import { PlanningUnitsJobProcessor } from './planning-units.job';
import {
createQueueName,
PlanningUnitsJob,
} from '@marxan-jobs/planning-unit-geometry';

/**
* @see https://docs.bullmq.io/guide/workers
*
* @deprecated Workers and jobs should be move to the new functionality
* @debt Bullmq is expected to be supported soon in the
* nest.js bull wrapper. In the meanwhile we are using Bullmq
* in the worker
*
**/
@Injectable()
export class PlanningUnitsProcessor {
private readonly queueName: string = 'planning-units';
private readonly logger: Logger = new Logger(PlanningUnitsProcessor.name);
public readonly worker: Worker = new Worker(
this.queueName,
join(__dirname, '/planning-units.job.ts'),
config.get('redisApi'),
);
private scheduler: QueueScheduler = new QueueScheduler(
this.queueName,
config.get('redisApi'),
);
private readonly worker: Worker<PlanningUnitsJob, void>;

constructor() {
this.logger.debug('worker');
this.worker.on('completed', async (job) => {
this.logger.debug(`Job finished ${JSON.stringify(job)}`);
constructor(
private readonly workerBuilder: WorkerBuilder,
processor: PlanningUnitsJobProcessor,
) {
this.worker = workerBuilder.build<PlanningUnitsJob, void>(
createQueueName,
processor,
);
this.worker.on(`completed`, (job) => {
this.logger.log(`Planning units job #${job.id} completed`);
});
this.worker.on(`failed`, (job) => {
this.logger.log(`Planning units job #${job.id} failed`);
});
}

public async onModuleDestroy(): Promise<void> {
await this.scheduler.close();
await this.scheduler.disconnect();
await this.worker.close();
await this.worker.disconnect();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { EventBus } from '@nestjs/cqrs';

import { WorkerBuilder } from '@marxan-geoprocessing/modules/worker';
import { API_EVENT_KINDS } from '@marxan/api-events';
import { queueName, JobInput } from '@marxan-jobs/planning-unit-geometry';
import { updateQueueName, JobInput } from '@marxan-jobs/planning-unit-geometry';

import { ScenarioPlanningUnitsInclusionProcessor } from './scenario-planning-units-inclusion-processor';
import { ApiEvent } from '../api-events';
Expand All @@ -18,7 +18,7 @@ export class ScenarioPlanningUnitsInclusionWorker {
private readonly eventBus: EventBus,
private readonly processor: ScenarioPlanningUnitsInclusionProcessor,
) {
this.#worker = wrapper.build(queueName, processor);
this.#worker = wrapper.build(updateQueueName, processor);
this.#worker.on('completed', ({ data }: Job<JobInput>) => {
this.eventBus.publish(
new ApiEvent(
Expand Down
Loading

0 comments on commit 8e9f178

Please sign in to comment.