-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/planning area grid custom piece importer #887
Merged
aciddaute
merged 2 commits into
develop
from
feat/planning-area-grid-custom-piece-importer
Mar 3, 2022
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
142 changes: 142 additions & 0 deletions
142
...pps/geoprocessing/src/import/pieces-importers/planning-area-custom-grid.piece-importer.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig'; | ||
import { ClonePiece, ImportJobInput, ImportJobOutput } from '@marxan/cloning'; | ||
import { | ||
ComponentLocationSnapshot, | ||
ResourceKind, | ||
} from '@marxan/cloning/domain'; | ||
import { FileRepository } from '@marxan/files-repository'; | ||
import { PlanningUnitGridShape } from '@marxan/scenarios-planning-unit'; | ||
import { Injectable, Logger } from '@nestjs/common'; | ||
import { InjectEntityManager } from '@nestjs/typeorm'; | ||
import { isLeft } from 'fp-ts/lib/Either'; | ||
import { Readable, Transform } from 'stream'; | ||
import { EntityManager } from 'typeorm'; | ||
import { ParseOne } from 'unzipper'; | ||
import { | ||
ImportPieceProcessor, | ||
PieceImportProvider, | ||
} from '../pieces/import-piece-processor'; | ||
|
||
@Injectable() | ||
@PieceImportProvider() | ||
export class PlanningAreaCustomGridPieceImporter | ||
implements ImportPieceProcessor { | ||
constructor( | ||
private readonly fileRepository: FileRepository, | ||
@InjectEntityManager(geoprocessingConnections.default) | ||
private readonly geoprocessingEntityManager: EntityManager, | ||
private readonly logger: Logger, | ||
) { | ||
this.logger.setContext(PlanningAreaCustomGridPieceImporter.name); | ||
} | ||
|
||
isSupported(piece: ClonePiece, kind: ResourceKind): boolean { | ||
return ( | ||
piece === ClonePiece.PlanningAreaGridCustom && | ||
kind === ResourceKind.Project | ||
); | ||
} | ||
|
||
async run(input: ImportJobInput): Promise<ImportJobOutput> { | ||
const { uris, importResourceId, piece } = input; | ||
if (uris.length !== 1) { | ||
const errorMessage = `uris array has an unexpected amount of elements: ${uris.length}`; | ||
this.logger.error(errorMessage); | ||
throw new Error(errorMessage); | ||
} | ||
const [planningAreaCustomGridLocation] = uris; | ||
|
||
const readableOrError = await this.fileRepository.get( | ||
planningAreaCustomGridLocation.uri, | ||
); | ||
if (isLeft(readableOrError)) { | ||
const errorMessage = `File with piece data for ${piece}/${importResourceId} is not available at ${planningAreaCustomGridLocation.uri}`; | ||
this.logger.error(errorMessage); | ||
throw new Error(errorMessage); | ||
} | ||
|
||
await this.processGridFile( | ||
readableOrError.right, | ||
planningAreaCustomGridLocation, | ||
importResourceId, | ||
); | ||
|
||
return { | ||
importId: input.importId, | ||
componentId: input.componentId, | ||
importResourceId, | ||
componentResourceId: input.componentResourceId, | ||
piece: input.piece, | ||
}; | ||
} | ||
|
||
private parseGridFileLine(pu: string) { | ||
const regex = /^(\d+),(\[(\d+,)*\d+\])$/gi; | ||
const result = regex.exec(pu); | ||
|
||
if (result) { | ||
const [_, puId, geom] = result; | ||
return { puId: parseInt(puId), geom: JSON.parse(geom) as number[] }; | ||
} | ||
throw new Error('unknown line format'); | ||
} | ||
|
||
private async processGridFile( | ||
zipStream: Readable, | ||
planningAreaCustomGridLocation: ComponentLocationSnapshot, | ||
projectId: string, | ||
) { | ||
return new Promise<void>((resolve, reject) => { | ||
let lastChunkIncompletedData = ''; | ||
|
||
zipStream | ||
.pipe(ParseOne(new RegExp(planningAreaCustomGridLocation.relativePath))) | ||
.pipe( | ||
new Transform({ | ||
writableObjectMode: true, | ||
transform: async (chunk, encoding, callback) => { | ||
const data = lastChunkIncompletedData + chunk.toString(); | ||
const lastNewLineIndex = data.lastIndexOf('\n'); | ||
if (lastNewLineIndex === -1) { | ||
lastChunkIncompletedData = data; | ||
callback(); | ||
return; | ||
} | ||
const processableData = data.substring(0, lastNewLineIndex); | ||
lastChunkIncompletedData = data.substring(lastNewLineIndex + 1); | ||
try { | ||
const geomPUs = processableData | ||
.split('\n') | ||
.map(this.parseGridFileLine); | ||
|
||
const values = geomPUs | ||
.map((pu, index) => `($1,$2,ST_GeomFromEWKB($${index + 3}))`) | ||
.join(','); | ||
|
||
const buffers = geomPUs.map((pu) => Buffer.from(pu.geom)); | ||
await this.geoprocessingEntityManager.query( | ||
` | ||
INSERT INTO planning_units_geom (type,project_id,the_geom) | ||
VALUES ${values} | ||
`, | ||
[PlanningUnitGridShape.FromShapefile, projectId, ...buffers], | ||
); | ||
|
||
callback(); | ||
} catch (err) { | ||
callback(err as Error); | ||
} | ||
}, | ||
}), | ||
) | ||
.on('finish', () => { | ||
!lastChunkIncompletedData | ||
? resolve() | ||
: reject('couldnt parse grid file'); | ||
}) | ||
.on('error', (err) => { | ||
reject(err); | ||
}); | ||
}); | ||
} | ||
} |
223 changes: 223 additions & 0 deletions
223
...processing/test/integration/clonning/planning-area-custom-grid.piece-importer.e2e-spec.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig'; | ||
import { PlanningUnitsGeom } from '@marxan-jobs/planning-unit-geometry'; | ||
import { ImportJobInput, ImportJobOutput } from '@marxan/cloning'; | ||
import { | ||
ArchiveLocation, | ||
ClonePiece, | ||
ResourceKind, | ||
} from '@marxan/cloning/domain'; | ||
import { ClonePieceUrisResolver } from '@marxan/cloning/infrastructure/clone-piece-data'; | ||
import { FileRepository, FileRepositoryModule } from '@marxan/files-repository'; | ||
import { FixtureType } from '@marxan/utils/tests/fixture-type'; | ||
import { Logger } from '@nestjs/common'; | ||
import { Test } from '@nestjs/testing'; | ||
import { getRepositoryToken, TypeOrmModule } from '@nestjs/typeorm'; | ||
import * as archiver from 'archiver'; | ||
import { isLeft } from 'fp-ts/lib/Either'; | ||
import { Repository } from 'typeorm'; | ||
import { v4 } from 'uuid'; | ||
import { PlanningAreaCustomGridPieceImporter } from '../../../src/import/pieces-importers/planning-area-custom-grid.piece-importer'; | ||
|
||
let fixtures: FixtureType<typeof getFixtures>; | ||
|
||
describe(PlanningAreaCustomGridPieceImporter, () => { | ||
beforeEach(async () => { | ||
fixtures = await getFixtures(); | ||
}, 10_000); | ||
|
||
afterEach(async () => { | ||
fixtures?.cleanUp(); | ||
}); | ||
|
||
it('should fail when planning area grid file is missing in uris array ', async () => { | ||
const input = fixtures.GivenJobInputWithoutUris(); | ||
await fixtures | ||
.WhenPieceImporterIsInvoked(input) | ||
.ThenAnUrisArrayErrorShouldBeThrown(); | ||
}); | ||
|
||
it('should fail when the file cannot be retrieved from file repo', async () => { | ||
const archiveLocation = fixtures.GivenNoGridFileIsAvailable(); | ||
const input = fixtures.GivenJobInput(archiveLocation); | ||
await fixtures | ||
.WhenPieceImporterIsInvoked(input) | ||
.ThenADataNotAvailableErrorShouldBeThrown(); | ||
}); | ||
|
||
it('should fail when file content is invalid', async () => { | ||
const archiveLocation = await fixtures.GivenInvalidGridFileFormat(); | ||
const input = fixtures.GivenJobInput(archiveLocation); | ||
await fixtures | ||
.WhenPieceImporterIsInvoked(input) | ||
.ThenGridFormatErrorShouldBeThrown(); | ||
}); | ||
|
||
it('should throw if insert operation fails', async () => { | ||
const archiveLocation = await fixtures.GivenInvalidGridFileContent(); | ||
const input = fixtures.GivenJobInput(archiveLocation); | ||
await fixtures | ||
.WhenPieceImporterIsInvoked(input) | ||
.ThenInsertErrorShouldBeThrown(); | ||
}); | ||
|
||
it('should insert geometries succesfully when file is valid', async () => { | ||
const archiveLocation = await fixtures.GivenValidGridFile(); | ||
const input = fixtures.GivenJobInput(archiveLocation); | ||
await fixtures | ||
.WhenPieceImporterIsInvoked(input) | ||
.ThenPlanningUnitsGeometriesShouldBeInserted(); | ||
}); | ||
}); | ||
|
||
const getFixtures = async () => { | ||
const sandbox = await Test.createTestingModule({ | ||
imports: [ | ||
TypeOrmModule.forRoot({ | ||
...geoprocessingConnections.default, | ||
keepConnectionAlive: true, | ||
logging: false, | ||
}), | ||
TypeOrmModule.forFeature([PlanningUnitsGeom]), | ||
FileRepositoryModule, | ||
], | ||
providers: [ | ||
PlanningAreaCustomGridPieceImporter, | ||
{ provide: Logger, useValue: { error: () => {}, setContext: () => {} } }, | ||
], | ||
}).compile(); | ||
|
||
await sandbox.init(); | ||
const projectId = v4(); | ||
const sut = sandbox.get(PlanningAreaCustomGridPieceImporter); | ||
const fileRepository = sandbox.get(FileRepository); | ||
const puGeomRepo = sandbox.get<Repository<PlanningUnitsGeom>>( | ||
getRepositoryToken(PlanningUnitsGeom), | ||
); | ||
return { | ||
cleanUp: async () => { | ||
await puGeomRepo.delete({ projectId }); | ||
}, | ||
GivenNoGridFileIsAvailable: () => { | ||
return new ArchiveLocation('not found'); | ||
}, | ||
GivenInvalidGridFileFormat: async (): Promise<ArchiveLocation> => { | ||
const archive = archiver(`zip`, { | ||
zlib: { level: 9 }, | ||
}); | ||
|
||
const [{ relativePath }] = ClonePieceUrisResolver.resolveFor( | ||
ClonePiece.PlanningAreaGridCustom, | ||
'planning area custom grid relative path', | ||
); | ||
|
||
const invalidGridFile = '1,[1,2,4\n'; | ||
archive.append(invalidGridFile, { | ||
name: relativePath, | ||
}); | ||
|
||
const saveFile = fileRepository.save(archive); | ||
archive.finalize(); | ||
const uriOrError = await saveFile; | ||
|
||
if (isLeft(uriOrError)) throw new Error('coudnt save file'); | ||
return new ArchiveLocation(uriOrError.right); | ||
}, | ||
GivenValidGridFile: async () => { | ||
const validGridFile = | ||
'1,[1,3,0,0,32,230,16,0,0,1,0,0,0,6,0,0,0,210,1,41,148,71,222,49,64,65,105,159,209,218,16,43,192,56,82,9,145,4,221,49,64,26,22,15,148,128,17,43,192,204,204,204,204,204,220,49,64,51,51,51,51,51,19,43,192,58,152,177,244,227,220,49,64,30,31,62,250,112,20,43,192,47,57,246,128,0,221,49,64,247,216,244,65,170,20,43,192,210,1,41,148,71,222,49,64,65,105,159,209,218,16,43,192]\n2,[1,3,0,0,32,230,16,0,0,1,0,0,0,6,0,0,0,104,104,118,46,178,112,51,64,180,80,228,111,31,222,49,192,145,194,89,190,194,106,51,64,180,80,228,111,31,222,49,192,233,215,125,56,114,106,51,64,68,237,244,189,154,222,49,192,254,131,129,231,222,107,51,64,95,139,79,1,48,222,49,192,4,222,2,9,138,111,51,64,141,102,44,154,206,222,49,192,104,104,118,46,178,112,51,64,180,80,228,111,31,222,49,192]\n3,[1,3,0,0,32,230,16,0,0,1,0,0,0,4,0,0,0,26,94,3,137,61,189,51,64,89,116,60,154,153,39,45,192,217,233,181,223,174,189,51,64,155,15,104,202,190,40,45,192,85,199,245,173,87,189,51,64,170,187,53,170,74,39,45,192,26,94,3,137,61,189,51,64,89,116,60,154,153,39,45,192]\n4,[1,3,0,0,32,230,16,0,0,1,0,0,0,5,0,0,0,92,233,206,197,9,244,53,64,191,209,176,188,205,35,48,192,162,85,95,148,69,246,53,64,191,209,176,188,205,35,48,192,31,137,94,70,177,244,53,64,169,131,245,127,14,35,48,192,142,141,34,203,39,243,53,64,11,67,162,206,174,34,48,192,92,233,206,197,9,244,53,64,191,209,176,188,205,35,48,192]\n'; | ||
|
||
const archive = archiver(`zip`, { | ||
zlib: { level: 9 }, | ||
}); | ||
const [{ relativePath }] = ClonePieceUrisResolver.resolveFor( | ||
ClonePiece.PlanningAreaGridCustom, | ||
'planning area custom grid relative path', | ||
); | ||
archive.append(validGridFile, { | ||
name: relativePath, | ||
}); | ||
|
||
const saveFile = fileRepository.save(archive); | ||
archive.finalize(); | ||
const uriOrError = await saveFile; | ||
|
||
if (isLeft(uriOrError)) throw new Error('coudnt save file'); | ||
return new ArchiveLocation(uriOrError.right); | ||
}, | ||
GivenInvalidGridFileContent: async (): Promise<ArchiveLocation> => { | ||
const archive = archiver(`zip`, { | ||
zlib: { level: 9 }, | ||
}); | ||
|
||
const [{ relativePath }] = ClonePieceUrisResolver.resolveFor( | ||
ClonePiece.PlanningAreaGridCustom, | ||
'planning area custom grid relative path', | ||
); | ||
|
||
const invalidGridFile = '1,[1,2,4]\n'; | ||
archive.append(invalidGridFile, { | ||
name: relativePath, | ||
}); | ||
|
||
const saveFile = fileRepository.save(archive); | ||
archive.finalize(); | ||
const uriOrError = await saveFile; | ||
|
||
if (isLeft(uriOrError)) throw new Error('coudnt save file'); | ||
return new ArchiveLocation(uriOrError.right); | ||
}, | ||
GivenJobInput: (archiveLocation: ArchiveLocation): ImportJobInput => { | ||
const [uri] = ClonePieceUrisResolver.resolveFor( | ||
ClonePiece.PlanningAreaGridCustom, | ||
archiveLocation.value, | ||
); | ||
return { | ||
componentId: v4(), | ||
componentResourceId: v4(), | ||
importId: v4(), | ||
importResourceId: projectId, | ||
piece: ClonePiece.PlanningAreaGridCustom, | ||
resourceKind: ResourceKind.Project, | ||
uris: [uri.toSnapshot()], | ||
}; | ||
}, | ||
GivenJobInputWithoutUris: (): ImportJobInput => { | ||
return { | ||
componentId: v4(), | ||
componentResourceId: v4(), | ||
importId: v4(), | ||
importResourceId: projectId, | ||
piece: ClonePiece.PlanningAreaGridCustom, | ||
resourceKind: ResourceKind.Project, | ||
uris: [], | ||
}; | ||
}, | ||
WhenPieceImporterIsInvoked: (input: ImportJobInput) => { | ||
return { | ||
ThenInsertErrorShouldBeThrown: async () => { | ||
await expect(sut.run(input)).rejects.toThrow( | ||
/WKB structure does not match expected size/gi, | ||
); | ||
}, | ||
ThenAnUrisArrayErrorShouldBeThrown: async () => { | ||
await expect(sut.run(input)).rejects.toThrow(/uris/gi); | ||
}, | ||
ThenADataNotAvailableErrorShouldBeThrown: async () => { | ||
await expect(sut.run(input)).rejects.toThrow( | ||
/File with piece data for/gi, | ||
); | ||
}, | ||
ThenGridFormatErrorShouldBeThrown: async () => { | ||
await expect(sut.run(input)).rejects.toThrow(/unknown line format/gi); | ||
}, | ||
ThenPlanningUnitsGeometriesShouldBeInserted: async () => { | ||
const result = await sut.run(input); | ||
const geoms = await puGeomRepo.find({ | ||
projectId: result.importResourceId, | ||
}); | ||
expect(geoms).toHaveLength(4); | ||
}, | ||
}; | ||
}, | ||
}; | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick note on this and in general as you progress through imports of geo data which in some real cases may span tens of thousands of records, time a few columns: let's consider both PostgreSQL's limits on query parameters (IIRC depends on versions, but easy to hit at the 10k scale, I think) and the efficiency of TypeORM at composing queries with "high" (for some value of high" numbers of params.
My general suggestion would be to open a transaction (if not in one already) and split the inserts into batches considering the number of parameters needed to safely set the batch size according to PostgreSQL's limits.
I also suggest to always add a short note about this in a comment next to the batching logic, so that whoever may update the query next and for example add more params could take that into account and see if it may be necessary to reduce the batch length according (e.g. if doubling the number of params needed for each row, halve the batch size)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrapping the insert operations inside a transaction makes totally sense 👍 . However, as we are already working with streams the inserts are being made in batches (chunks) right?