Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MARXAN-1616-scheduled-geodata-cleanup #1164

Merged
merged 7 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ENV_VARS.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ applications.
on which the Express daemon of the Geoprocessing service will listen.
If running the API on the same host as the Geoprocessing application, you
need to modify at least one.
* `CLEANUP_CRON_INTERVAL` (string, optional, default is 0 0-23/6 * * *):
hotzevzl marked this conversation as resolved.
Show resolved Hide resolved
String in cron notation for the interval of time where the cleanup will
trigger. The default value is every 6 hours.


### PostgreSQL service - Geoprocessing database

Expand Down
7 changes: 3 additions & 4 deletions api/apps/api/test/utils/event-bus.test.utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { FixtureType } from '@marxan/utils/tests/fixture-type';
import { CqrsModule, EventBus, IEvent } from '@nestjs/cqrs';
import { Test } from '@nestjs/testing';
import { delay } from '../../../geoprocessing/test/utils';
import { EventBusTestUtils } from './event-bus.test.utils';

class FirstEventType implements IEvent {
Expand Down Expand Up @@ -78,7 +77,7 @@ const getFixtures = async () => {
new Promise<void>(async (resolve) => {
eventBus.publish(new SecondEventType('Wrong event class'));

await delay(500);
await new Promise((resolve) => setTimeout(resolve, 500));

eventBus.publish(new FirstEventType('Expected event class'));

Expand All @@ -97,13 +96,13 @@ const getFixtures = async () => {
new Promise<void>(async (resolve) => {
eventBus.publish(new SecondEventType('Wrong event class'));

await delay(500);
await new Promise((resolve) => setTimeout(resolve, 500));

eventBus.publish(
new FirstEventType('Expected event class but wrong id'),
);

await delay(500);
await new Promise((resolve) => setTimeout(resolve, 500));

eventBus.publish(new FirstEventType('Foo'));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,8 @@
"maxAgeForOrphanUploads": "UPLOADS_PLANNING_AREAS_MAX_AGE"
}
}
},
"cleanupCronJobSettings": {
"interval": "CLEANUP_CRON_INTERVAL"
}
}
3 changes: 3 additions & 0 deletions api/apps/geoprocessing/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,8 @@
},
"marxan": {
"bin": "/opt/marxan-geoprocessing/apps/geoprocessing/src/marxan/marxan-linux-4-0-6"
},
"cleanupCronJobSettings": {
"interval": "0 0-23/6 * * *"
}
}
4 changes: 4 additions & 0 deletions api/apps/geoprocessing/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ScheduleModule } from '@nestjs/schedule';
import { geoprocessingConnections } from './ormconfig';
import { AppController } from './app.controller';
import { AppService } from './app.service';
Expand All @@ -21,6 +22,7 @@ import { ImportModule } from '@marxan-geoprocessing/import/import.module';
import { PingController } from '@marxan-geoprocessing/modules/ping/ping.controller';
import { LegacyProjectImportModule } from './legacy-project-import/legacy-project-import.module';
import { UnusedResourcesCleanUpModule } from './modules/unused-resources-cleanup/unused-resources-cleanup.module';
import { CleanupTasksModule } from './modules/cleanup-tasks/cleanup-tasks.module';

@Module({
imports: [
Expand Down Expand Up @@ -50,6 +52,8 @@ import { UnusedResourcesCleanUpModule } from './modules/unused-resources-cleanup
ImportModule,
LegacyProjectImportModule,
UnusedResourcesCleanUpModule,
ScheduleModule.forRoot(),
CleanupTasksModule,
],
controllers: [AppController, PingController],
providers: [AppService],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class CreateProjectGeodataCleanupPreparation1657182655233
implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE project_geodata_cleanup_preparation (
project_id uuid PRIMARY KEY
);`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
DROP TABLE project_geodata_cleanup_preparation;
`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { MigrationInterface, QueryRunner } from 'typeorm';

export class CreateScenarioGeodataCleanupPreparation1657182662302
implements MigrationInterface {
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE scenario_geodata_cleanup_preparation (
scenario_id uuid PRIMARY KEY
);`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
DROP TABLE scenario_geodata_cleanup_preparation;
`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { BlmPartialResultEntity } from '@marxan-geoprocessing/marxan-sandboxed-runner/adapters-blm/blm-partial-results.geo.entity';
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig';
import { ProjectsPuEntity } from '@marxan-jobs/planning-unit-geometry';
import { BlmFinalResultEntity } from '@marxan/blm-calibration';
import { ScenarioFeaturesData } from '@marxan/features';
import { GeoFeatureGeometry } from '@marxan/geofeatures';
import {
MarxanExecutionMetadataGeoEntity,
OutputScenariosPuDataGeoEntity,
} from '@marxan/marxan-output';
import { PlanningArea } from '@marxan/planning-area-repository/planning-area.geo.entity';
import { ProtectedArea } from '@marxan/protected-areas';
import {
ScenariosPlanningUnitGeoEntity,
ScenariosPuPaDataGeo,
} from '@marxan/scenarios-planning-unit';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { UnusedResourcesCleanUpModule } from '../unused-resources-cleanup/unused-resources-cleanup.module';
import { CleanupTasksService } from './cleanup-tasks.service';

@Module({
imports: [
TypeOrmModule.forFeature([], geoprocessingConnections.apiDB),
TypeOrmModule.forFeature(
[
BlmFinalResultEntity,
BlmPartialResultEntity,
ScenariosPuPaDataGeo,
ScenariosPlanningUnitGeoEntity,
OutputScenariosPuDataGeoEntity,
PlanningArea,
ProtectedArea,
GeoFeatureGeometry,
ProjectsPuEntity,
ScenarioFeaturesData,
MarxanExecutionMetadataGeoEntity,
],
geoprocessingConnections.default,
),
UnusedResourcesCleanUpModule,
],
providers: [CleanupTasksService],
exports: [CleanupTasksService],
})
export class CleanupTasksModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig';
import { Injectable, Logger } from '@nestjs/common';
import { Cron } from '@nestjs/schedule';
import { InjectEntityManager } from '@nestjs/typeorm';
import { EntityManager } from 'typeorm';
import { CleanupTasks } from './cleanup-tasks';
import { chunk } from 'lodash';
import { AppConfig } from '@marxan-geoprocessing/utils/config.utils';

const CHUNK_SIZE_FOR_BATCH_DB_OPERATIONS = 1000;
const cronJobInterval: string = AppConfig.get(
'cleanupCronJobSettings.interval',
);
@Injectable()
export class CleanupTasksService implements CleanupTasks {
private readonly logger = new Logger(CleanupTasksService.name);
constructor(
@InjectEntityManager(geoprocessingConnections.apiDB)
private readonly apiEntityManager: EntityManager,
@InjectEntityManager(geoprocessingConnections.apiDB)
private readonly geoEntityManager: EntityManager,
) {}

async cleanupProjectsDanglingData() {
// Find all existing projects in API DB and return an array of their IDs
const projectIds = await this.apiEntityManager
hotzevzl marked this conversation as resolved.
Show resolved Hide resolved
.createQueryBuilder()
.from('projects', 'p')
.select(['id'])
.getRawMany()
.then((result) => result.map((i) => i.id))
.catch((error) => {
throw new Error(error);
});

// Start cleaning up process inside transaction
await this.geoEntityManager.transaction(async (entityManager) => {
// Truncate table to be sure that not any projectId is inside before operation
await this.geoEntityManager.query(
`TRUNCATE TABLE project_nuke_preparation;`,
);

// Set batches to insert ids in intermediate table for processing
for (const [, summaryChunks] of chunk(
projectIds,
CHUNK_SIZE_FOR_BATCH_DB_OPERATIONS,
).entries()) {
await entityManager.insert(
'project_nuke_preparation',
summaryChunks.map((chunk: string) => ({
project_id: chunk,
})),
);
}

// For every related entity, we look for non-matching ids inside entity table
// and compare it with intermediate projectId table to delete records that are not there
await this.geoEntityManager.query(
`DELETE FROM planning_areas pa
WHERE pa.project_id IS NOT NULL
AND pa.project_id NOT IN (
SELECT pgcp.project_id FROM project_geodata_cleanup_preparation pgcp
);`,
);
await this.geoEntityManager.query(
`DELETE FROM wdpa
WHERE wdpa.project_id IS NOT NULL
AND wdpa.project_id NOT IN (
SELECT pgcp.project_id FROM project_geodata_cleanup_preparation pgcp
);`,
);
await this.geoEntityManager.query(
`DELETE FROM projects_pu ppu
WHERE ppu.project_id IS NOT NULL
AND ppu.project_id NOT IN (
SELECT pgcp.project_id FROM project_geodata_cleanup_preparation pgcp
);`,
);
await this.apiEntityManager.query(
`DELETE FROM features_data fd
WHERE fd.project_id IS NOT NULL
AND fd.project_id NOT IN (
SELECT p.id FROM projects
);`,
);

await this.geoEntityManager.query(
`TRUNCATE TABLE project_nuke_preparation;`,
);
});
}

async cleanupScenariosDanglingData() {
const scenarioIds = await this.apiEntityManager
.createQueryBuilder()
.from('scenarios', 'p')
.select(['id'])
.getRawMany()
.then((result) => result.map((i) => i.id))
.catch((error) => {
throw new Error(error);
});

await this.geoEntityManager.transaction(async (entityManager) => {
await this.geoEntityManager.query(
`TRUNCATE TABLE scenario_geodata_cleanup_preparation;`,
);

for (const [, summaryChunks] of chunk(
scenarioIds,
CHUNK_SIZE_FOR_BATCH_DB_OPERATIONS,
).entries()) {
await entityManager.insert(
'scenario_geodata_cleanup_preparation',
summaryChunks.map((chunk: string) => ({
scenario_id: chunk,
})),
);
}

await this.geoEntityManager.query(
`DELETE FROM scenario_features_data sfd
WHERE sfd.scenario_id IS NOT NULL
AND sfd.scenario_id NOT IN (
SELECT sgcp.scenario_id FROM scenario_geodata_cleanup_preparation sgcp
);`,
);
await this.geoEntityManager.query(
`DELETE FROM blm_final_results bfr
WHERE bfr.scenario_id IS NOT NULL
AND bfr.scenario_id NOT IN (
SELECT sgcp.scenario_id FROM scenario_geodata_cleanup_preparation sgcp
);`,
);
await this.geoEntityManager.query(
`DELETE FROM blm_partial_results bpr
WHERE bpr.scenario_id IS NOT NULL
AND bpr.scenario_id NOT IN (
SELECT sgcp.scenario_id FROM scenario_geodata_cleanup_preparation sgcp
);`,
);
await this.geoEntityManager.query(
`DELETE FROM marxan_execution_metadata mem
WHERE mem.scenarioId IS NOT NULL
AND mem.scenarioId NOT IN (
SELECT sgcp.scenario_id FROM scenario_geodata_cleanup_preparation sgcp
);`,
);
await this.geoEntityManager.query(
`DELETE FROM scenarios_pu_data spd
WHERE spd.scenario_id IS NOT NULL
AND spd.scenario_id NOT IN (
SELECT sgcp.scenario_id FROM scenario_geodata_cleanup_preparation sgcp
);`,
);

await this.geoEntityManager.query(
`TRUNCATE TABLE scenario_geodata_cleanup_preparation;`,
);
});
}

@Cron(cronJobInterval)
async handleCron() {
this.logger.debug(
'Preparing to clean dangling geo data for projects/scenarios',
);

await this.cleanupProjectsDanglingData();
await this.cleanupScenariosDanglingData();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export abstract class CleanupTasks {
abstract handleCron(): Promise<void>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Injectable } from '@nestjs/common';
import { InjectEntityManager, InjectRepository } from '@nestjs/typeorm';
import { EntityManager, In, Repository } from 'typeorm';

type ProjectUnusedResourcesData = { projectCustomFeaturesIds: string[] };
export type ProjectUnusedResourcesData = { projectCustomFeaturesIds: string[] };

@Injectable()
export class ProjectUnusedResources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ import { UnusedResourcesCleanupWorker } from './unused-resources-cleanup.worker'
@Module({
imports: [WorkerModule, UnusedResourcesModule],
providers: [UnusedResourcesCleanupWorker, UnusedResourcesCleanupProcessor],
exports: [UnusedResourcesModule],
})
export class UnusedResourcesCleanUpModule {}
2 changes: 2 additions & 0 deletions api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"@nestjs/jwt": "^7.2.0",
"@nestjs/passport": "^7.1.5",
"@nestjs/platform-express": "7.6.18",
"@nestjs/schedule": "^2.0.1",
"@nestjs/swagger": "^4.8.0",
"@nestjs/typeorm": "^7.1.5",
"@pgtyped/cli": "0.11.0",
Expand Down Expand Up @@ -114,6 +115,7 @@
"@types/config": "^0.0.38",
"@types/cookie-parser": "^1.4.2",
"@types/cors": "^2.8.9",
"@types/cron": "^2.0.0",
"@types/express": "^4.17.8",
"@types/faker": "^5.1.5",
"@types/fs-extra": "9.0.11",
Expand Down
Loading