Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/planning area grid custom piece importer #887

Merged
merged 2 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { PlanningAreaGadmPieceImporter } from './planning-area-gadm.piece-import
import { PlanningAreaCustomPieceImporter } from './planning-area-custom.piece-importer';
import { ProjectMetadataPieceImporter } from './project-metadata.piece-importer';
import { ScenarioMetadataPieceImporter } from './scenario-metadata.piece-importer';
import { PlanningAreaCustomGridPieceImporter } from './planning-area-custom-grid.piece-importer';

@Module({
imports: [
Expand All @@ -17,6 +18,7 @@ import { ScenarioMetadataPieceImporter } from './scenario-metadata.piece-importe
ScenarioMetadataPieceImporter,
PlanningAreaGadmPieceImporter,
PlanningAreaCustomPieceImporter,
PlanningAreaCustomGridPieceImporter,
{ provide: Logger, useClass: Logger, scope: Scope.TRANSIENT },
],
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig';
import { ClonePiece, ImportJobInput, ImportJobOutput } from '@marxan/cloning';
import {
ComponentLocationSnapshot,
ResourceKind,
} from '@marxan/cloning/domain';
import { FileRepository } from '@marxan/files-repository';
import { PlanningUnitGridShape } from '@marxan/scenarios-planning-unit';
import { Injectable, Logger } from '@nestjs/common';
import { InjectEntityManager } from '@nestjs/typeorm';
import { isLeft } from 'fp-ts/lib/Either';
import { Readable, Transform } from 'stream';
import { EntityManager } from 'typeorm';
import { ParseOne } from 'unzipper';
import {
ImportPieceProcessor,
PieceImportProvider,
} from '../pieces/import-piece-processor';

@Injectable()
@PieceImportProvider()
export class PlanningAreaCustomGridPieceImporter
implements ImportPieceProcessor {
constructor(
private readonly fileRepository: FileRepository,
@InjectEntityManager(geoprocessingConnections.default)
private readonly geoprocessingEntityManager: EntityManager,
private readonly logger: Logger,
) {
this.logger.setContext(PlanningAreaCustomGridPieceImporter.name);
}

isSupported(piece: ClonePiece, kind: ResourceKind): boolean {
return (
piece === ClonePiece.PlanningAreaGridCustom &&
kind === ResourceKind.Project
);
}

async run(input: ImportJobInput): Promise<ImportJobOutput> {
const { uris, importResourceId, piece } = input;
if (uris.length !== 1) {
const errorMessage = `uris array has an unexpected amount of elements: ${uris.length}`;
this.logger.error(errorMessage);
throw new Error(errorMessage);
}
const [planningAreaCustomGridLocation] = uris;

const readableOrError = await this.fileRepository.get(
planningAreaCustomGridLocation.uri,
);
if (isLeft(readableOrError)) {
const errorMessage = `File with piece data for ${piece}/${importResourceId} is not available at ${planningAreaCustomGridLocation.uri}`;
this.logger.error(errorMessage);
throw new Error(errorMessage);
}

await this.processGridFile(
readableOrError.right,
planningAreaCustomGridLocation,
importResourceId,
);

return {
importId: input.importId,
componentId: input.componentId,
importResourceId,
componentResourceId: input.componentResourceId,
piece: input.piece,
};
}

private parseGridFileLine(pu: string) {
const regex = /^(\d+),(\[(\d+,)*\d+\])$/gi;
const result = regex.exec(pu);

if (result) {
const [_, puId, geom] = result;
return { puId: parseInt(puId), geom: JSON.parse(geom) as number[] };
}
throw new Error('unknown line format');
}

private async processGridFile(
zipStream: Readable,
planningAreaCustomGridLocation: ComponentLocationSnapshot,
projectId: string,
) {
return new Promise<void>((resolve, reject) => {
let lastChunkIncompletedData = '';

zipStream
.pipe(ParseOne(new RegExp(planningAreaCustomGridLocation.relativePath)))
.pipe(
new Transform({
writableObjectMode: true,
transform: async (chunk, encoding, callback) => {
const data = lastChunkIncompletedData + chunk.toString();
const lastNewLineIndex = data.lastIndexOf('\n');
if (lastNewLineIndex === -1) {
lastChunkIncompletedData = data;
callback();
return;
}
const processableData = data.substring(0, lastNewLineIndex);
lastChunkIncompletedData = data.substring(lastNewLineIndex + 1);
try {
const geomPUs = processableData
.split('\n')
.map(this.parseGridFileLine);

const values = geomPUs
.map((pu, index) => `($1,$2,ST_GeomFromEWKB($${index + 3}))`)
.join(',');

const buffers = geomPUs.map((pu) => Buffer.from(pu.geom));
await this.geoprocessingEntityManager.query(
`
INSERT INTO planning_units_geom (type,project_id,the_geom)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick note on this and in general as you progress through imports of geo data which in some real cases may span tens of thousands of records, time a few columns: let's consider both PostgreSQL's limits on query parameters (IIRC depends on versions, but easy to hit at the 10k scale, I think) and the efficiency of TypeORM at composing queries with "high" (for some value of high" numbers of params.

My general suggestion would be to open a transaction (if not in one already) and split the inserts into batches considering the number of parameters needed to safely set the batch size according to PostgreSQL's limits.

I also suggest to always add a short note about this in a comment next to the batching logic, so that whoever may update the query next and for example add more params could take that into account and see if it may be necessary to reduce the batch length according (e.g. if doubling the number of params needed for each row, halve the batch size)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping the insert operations inside a transaction makes totally sense 👍 . However, as we are already working with streams the inserts are being made in batches (chunks) right?

VALUES ${values}
`,
[PlanningUnitGridShape.FromShapefile, projectId, ...buffers],
);

callback();
} catch (err) {
callback(err as Error);
}
},
}),
)
.on('finish', () => {
!lastChunkIncompletedData
? resolve()
: reject('couldnt parse grid file');
})
.on('error', (err) => {
reject(err);
});
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig';
import { PlanningUnitsGeom } from '@marxan-jobs/planning-unit-geometry';
import { ImportJobInput, ImportJobOutput } from '@marxan/cloning';
import {
ArchiveLocation,
ClonePiece,
ResourceKind,
} from '@marxan/cloning/domain';
import { ClonePieceUrisResolver } from '@marxan/cloning/infrastructure/clone-piece-data';
import { FileRepository, FileRepositoryModule } from '@marxan/files-repository';
import { FixtureType } from '@marxan/utils/tests/fixture-type';
import { Logger } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { getRepositoryToken, TypeOrmModule } from '@nestjs/typeorm';
import * as archiver from 'archiver';
import { isLeft } from 'fp-ts/lib/Either';
import { Repository } from 'typeorm';
import { v4 } from 'uuid';
import { PlanningAreaCustomGridPieceImporter } from '../../../src/import/pieces-importers/planning-area-custom-grid.piece-importer';

let fixtures: FixtureType<typeof getFixtures>;

describe(PlanningAreaCustomGridPieceImporter, () => {
beforeEach(async () => {
fixtures = await getFixtures();
}, 10_000);

afterEach(async () => {
fixtures?.cleanUp();
});

it('should fail when planning area grid file is missing in uris array ', async () => {
const input = fixtures.GivenJobInputWithoutUris();
await fixtures
.WhenPieceImporterIsInvoked(input)
.ThenAnUrisArrayErrorShouldBeThrown();
});

it('should fail when the file cannot be retrieved from file repo', async () => {
const archiveLocation = fixtures.GivenNoGridFileIsAvailable();
const input = fixtures.GivenJobInput(archiveLocation);
await fixtures
.WhenPieceImporterIsInvoked(input)
.ThenADataNotAvailableErrorShouldBeThrown();
});

it('should fail when file content is invalid', async () => {
const archiveLocation = await fixtures.GivenInvalidGridFileFormat();
const input = fixtures.GivenJobInput(archiveLocation);
await fixtures
.WhenPieceImporterIsInvoked(input)
.ThenGridFormatErrorShouldBeThrown();
});

it('should throw if insert operation fails', async () => {
const archiveLocation = await fixtures.GivenInvalidGridFileContent();
const input = fixtures.GivenJobInput(archiveLocation);
await fixtures
.WhenPieceImporterIsInvoked(input)
.ThenInsertErrorShouldBeThrown();
});

it('should insert geometries succesfully when file is valid', async () => {
const archiveLocation = await fixtures.GivenValidGridFile();
const input = fixtures.GivenJobInput(archiveLocation);
await fixtures
.WhenPieceImporterIsInvoked(input)
.ThenPlanningUnitsGeometriesShouldBeInserted();
});
});

const getFixtures = async () => {
const sandbox = await Test.createTestingModule({
imports: [
TypeOrmModule.forRoot({
...geoprocessingConnections.default,
keepConnectionAlive: true,
logging: false,
}),
TypeOrmModule.forFeature([PlanningUnitsGeom]),
FileRepositoryModule,
],
providers: [
PlanningAreaCustomGridPieceImporter,
{ provide: Logger, useValue: { error: () => {}, setContext: () => {} } },
],
}).compile();

await sandbox.init();
const projectId = v4();
const sut = sandbox.get(PlanningAreaCustomGridPieceImporter);
const fileRepository = sandbox.get(FileRepository);
const puGeomRepo = sandbox.get<Repository<PlanningUnitsGeom>>(
getRepositoryToken(PlanningUnitsGeom),
);
return {
cleanUp: async () => {
await puGeomRepo.delete({ projectId });
},
GivenNoGridFileIsAvailable: () => {
return new ArchiveLocation('not found');
},
GivenInvalidGridFileFormat: async (): Promise<ArchiveLocation> => {
const archive = archiver(`zip`, {
zlib: { level: 9 },
});

const [{ relativePath }] = ClonePieceUrisResolver.resolveFor(
ClonePiece.PlanningAreaGridCustom,
'planning area custom grid relative path',
);

const invalidGridFile = '1,[1,2,4\n';
archive.append(invalidGridFile, {
name: relativePath,
});

const saveFile = fileRepository.save(archive);
archive.finalize();
const uriOrError = await saveFile;

if (isLeft(uriOrError)) throw new Error('coudnt save file');
return new ArchiveLocation(uriOrError.right);
},
GivenValidGridFile: async () => {
const validGridFile =
'1,[1,3,0,0,32,230,16,0,0,1,0,0,0,6,0,0,0,210,1,41,148,71,222,49,64,65,105,159,209,218,16,43,192,56,82,9,145,4,221,49,64,26,22,15,148,128,17,43,192,204,204,204,204,204,220,49,64,51,51,51,51,51,19,43,192,58,152,177,244,227,220,49,64,30,31,62,250,112,20,43,192,47,57,246,128,0,221,49,64,247,216,244,65,170,20,43,192,210,1,41,148,71,222,49,64,65,105,159,209,218,16,43,192]\n2,[1,3,0,0,32,230,16,0,0,1,0,0,0,6,0,0,0,104,104,118,46,178,112,51,64,180,80,228,111,31,222,49,192,145,194,89,190,194,106,51,64,180,80,228,111,31,222,49,192,233,215,125,56,114,106,51,64,68,237,244,189,154,222,49,192,254,131,129,231,222,107,51,64,95,139,79,1,48,222,49,192,4,222,2,9,138,111,51,64,141,102,44,154,206,222,49,192,104,104,118,46,178,112,51,64,180,80,228,111,31,222,49,192]\n3,[1,3,0,0,32,230,16,0,0,1,0,0,0,4,0,0,0,26,94,3,137,61,189,51,64,89,116,60,154,153,39,45,192,217,233,181,223,174,189,51,64,155,15,104,202,190,40,45,192,85,199,245,173,87,189,51,64,170,187,53,170,74,39,45,192,26,94,3,137,61,189,51,64,89,116,60,154,153,39,45,192]\n4,[1,3,0,0,32,230,16,0,0,1,0,0,0,5,0,0,0,92,233,206,197,9,244,53,64,191,209,176,188,205,35,48,192,162,85,95,148,69,246,53,64,191,209,176,188,205,35,48,192,31,137,94,70,177,244,53,64,169,131,245,127,14,35,48,192,142,141,34,203,39,243,53,64,11,67,162,206,174,34,48,192,92,233,206,197,9,244,53,64,191,209,176,188,205,35,48,192]\n';

const archive = archiver(`zip`, {
zlib: { level: 9 },
});
const [{ relativePath }] = ClonePieceUrisResolver.resolveFor(
ClonePiece.PlanningAreaGridCustom,
'planning area custom grid relative path',
);
archive.append(validGridFile, {
name: relativePath,
});

const saveFile = fileRepository.save(archive);
archive.finalize();
const uriOrError = await saveFile;

if (isLeft(uriOrError)) throw new Error('coudnt save file');
return new ArchiveLocation(uriOrError.right);
},
GivenInvalidGridFileContent: async (): Promise<ArchiveLocation> => {
const archive = archiver(`zip`, {
zlib: { level: 9 },
});

const [{ relativePath }] = ClonePieceUrisResolver.resolveFor(
ClonePiece.PlanningAreaGridCustom,
'planning area custom grid relative path',
);

const invalidGridFile = '1,[1,2,4]\n';
archive.append(invalidGridFile, {
name: relativePath,
});

const saveFile = fileRepository.save(archive);
archive.finalize();
const uriOrError = await saveFile;

if (isLeft(uriOrError)) throw new Error('coudnt save file');
return new ArchiveLocation(uriOrError.right);
},
GivenJobInput: (archiveLocation: ArchiveLocation): ImportJobInput => {
const [uri] = ClonePieceUrisResolver.resolveFor(
ClonePiece.PlanningAreaGridCustom,
archiveLocation.value,
);
return {
componentId: v4(),
componentResourceId: v4(),
importId: v4(),
importResourceId: projectId,
piece: ClonePiece.PlanningAreaGridCustom,
resourceKind: ResourceKind.Project,
uris: [uri.toSnapshot()],
};
},
GivenJobInputWithoutUris: (): ImportJobInput => {
return {
componentId: v4(),
componentResourceId: v4(),
importId: v4(),
importResourceId: projectId,
piece: ClonePiece.PlanningAreaGridCustom,
resourceKind: ResourceKind.Project,
uris: [],
};
},
WhenPieceImporterIsInvoked: (input: ImportJobInput) => {
return {
ThenInsertErrorShouldBeThrown: async () => {
await expect(sut.run(input)).rejects.toThrow(
/WKB structure does not match expected size/gi,
);
},
ThenAnUrisArrayErrorShouldBeThrown: async () => {
await expect(sut.run(input)).rejects.toThrow(/uris/gi);
},
ThenADataNotAvailableErrorShouldBeThrown: async () => {
await expect(sut.run(input)).rejects.toThrow(
/File with piece data for/gi,
);
},
ThenGridFormatErrorShouldBeThrown: async () => {
await expect(sut.run(input)).rejects.toThrow(/unknown line format/gi);
},
ThenPlanningUnitsGeometriesShouldBeInserted: async () => {
const result = await sut.run(input);
const geoms = await puGeomRepo.find({
projectId: result.importResourceId,
});
expect(geoms).toHaveLength(4);
},
};
},
};
};
4 changes: 4 additions & 0 deletions api/libs/cloning/src/domain/component-location.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ export class ComponentLocation extends TinyType {
) {
super();
}

toSnapshot(): ComponentLocationSnapshot {
return { uri: this.uri, relativePath: this.relativePath };
}
}
Loading