Skip to content

Commit

Permalink
feat(marxan-run): dump pu solutions matrix to geodb 2
Browse files Browse the repository at this point in the history
  • Loading branch information
kgajowy committed Jul 19, 2021
1 parent 4b44d09 commit 469ad7b
Show file tree
Hide file tree
Showing 17 changed files with 4,740 additions and 148 deletions.
1 change: 1 addition & 0 deletions api/apps/geoprocessing/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { ScenariosModule } from '@marxan-geoprocessing/modules/scenarios/scenari
TypeOrmModule.forRoot({
...geoprocessingConnections.default,
keepConnectionAlive: true,
// logging: true,
}),
TypeOrmModule.forRoot({
...geoprocessingConnections.apiDB,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import { GeoOutputRepository } from './geo-output.repository';
import { MetadataArchiver } from './metadata/data-archiver.service';
import { MarxanDirectory } from '../../marxan-directory.service';
import { FileReader } from '../../file-reader';
import { SolutionsReaderService } from './solutions/solutions-reader.service';
import { SolutionsReaderService } from './solutions/output-file-parsing/solutions-reader.service';
import { PlanningUnitStateCalculatorService } from './solutions/solution-aggregation/planning-unit-state-calculator.service';
import { OutputScenariosPuDataGeoEntity } from '@marxan/marxan-output';
import { ScenariosPlanningUnitGeoEntity } from '@marxan/scenarios-planning-unit';

Expand All @@ -24,6 +25,7 @@ import { ScenariosPlanningUnitGeoEntity } from '@marxan/scenarios-planning-unit'
MarxanDirectory,
FileReader,
SolutionsReaderService,
PlanningUnitStateCalculatorService,
],
exports: [GeoOutputRepository],
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
import { readFileSync } from 'fs';

import { Repository } from 'typeorm';
import { EntityManager } from 'typeorm';
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { MarxanExecutionMetadataGeoEntity } from '@marxan/marxan-output';
import { InjectEntityManager } from '@nestjs/typeorm';

import { Workspace } from '../../../ports/workspace';
import { MetadataArchiver } from './metadata/data-archiver.service';
import { SolutionsReaderService } from './solutions/solutions-reader.service';
import { OutputScenariosPuDataGeoEntity } from '@marxan/marxan-output';
import { SolutionsReaderService } from './solutions/output-file-parsing/solutions-reader.service';
import { PlanningUnitStateCalculatorService } from './solutions/solution-aggregation/planning-unit-state-calculator.service';
import { PlanningUnitsState } from './solutions/planning-unit-state';
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig';

@Injectable()
export class GeoOutputRepository {
constructor(
@InjectRepository(MarxanExecutionMetadataGeoEntity)
private readonly executionMetadataRepo: Repository<MarxanExecutionMetadataGeoEntity>,
@InjectRepository(OutputScenariosPuDataGeoEntity)
private readonly scenarioSolutionsOutputRepo: Repository<OutputScenariosPuDataGeoEntity>,
private readonly metadataArchiver: MetadataArchiver,
private readonly solutionsReader: SolutionsReaderService,
private readonly planningUnitsStateCalculator: PlanningUnitStateCalculatorService,
@InjectEntityManager(geoprocessingConnections.default)
private readonly entityManager: EntityManager,
) {}

async save(
Expand All @@ -36,56 +33,72 @@ export class GeoOutputRepository {
const solutionMatrix =
workspace.workingDirectory + `/output/output_solutionsmatrix.csv`;

const rowsStream = await this.solutionsReader.from(solutionMatrix);

return new Promise((resolve, reject) => {
// TODO open transaction
// // delete output_scenarios_pu_data for given scenarioId
// // insert new rows from file

rowsStream.on('data', async (data) => {
//
if (data.length === 0) {
return;
}

// does not skip first batch of headers? (empty array)
console.log(`--- save rows:`, data, data.length);
try {
this.scenarioSolutionsOutputRepo
.save(
data.map((row) => this.scenarioSolutionsOutputRepo.create(row)),
{
chunk: 10000,
},
)
.then((res) => {
console.log(`--- saved`, res.length);
})
.catch((error) => {
console.log(`--- errors`, error);
});
} catch (er) {
console.log(`--- why no return at all?`, er);
}
});
const solutionsStream = await this.solutionsReader.from(
solutionMatrix,
scenarioId,
);

rowsStream.on('error', reject);
const planningUnitsState: PlanningUnitsState = await this.planningUnitsStateCalculator.consume(
solutionsStream,
);

rowsStream.on('finish', async () => {
// // insert metadata
// console.log(planningUnitsState);

await this.executionMetadataRepo.save(
this.executionMetadataRepo.create({
scenarioId,
stdOutput: metaData.stdOutput.toString(),
stdError: metaData.stdErr?.toString(),
outputZip: readFileSync(outputArchivePath),
inputZip: readFileSync(inputArchivePath),
}),
);
resolve();
});
});
// return this.entityManager.transaction((transaction) => {
// return new Promise(async (resolve, reject) => {
// const onFinish = async () => {
// await transaction.save(
// transaction.create(MarxanExecutionMetadataGeoEntity, {
// scenarioId,
// stdOutput: metaData.stdOutput.toString(),
// stdError: metaData.stdErr?.toString(),
// outputZip: readFileSync(outputArchivePath),
// inputZip: readFileSync(inputArchivePath),
// }),
// );
// resolve();
// };
//
// let streamFinished = false;
// let count = 0;
// let finishedCount = 0;
//
// rowsStream.on('data', async (data) => {
// if (data.length === 0) {
// return;
// }
//
// try {
// count += 1;
// for (const r of data) {
// await transaction.query(
// `
// insert into output_scenarios_pu_data (value, scenario_pu_id, run_id)
// values (${r.value},'${r.spdId}', ${r.runId})`,
// );
// }
// finishedCount += 1;
//
// if (count === finishedCount && streamFinished) {
// onFinish();
// }
// } catch (error) {
// reject(error);
// }
// });
//
// rowsStream.on('error', (error) => {
// reject(error);
// });
//
// rowsStream.on('finish', async () => {
// streamFinished = true;
//
// if (count === finishedCount) {
// await onFinish();
// }
// });
// });
// });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export type PuToScenarioPu = Record<string, string>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Transform } from 'stronger-typed-streams';
import { TransformCallback } from 'stream';
import { PuToScenarioPu } from './pu-to-scenario-pu';
import { SolutionRowResult } from '../solution-row-result';

export class SolutionTransformer extends Transform<
string,
SolutionRowResult[]
> {
#headerAck = false;

constructor(private readonly solutionPuMapping: PuToScenarioPu) {
super({
objectMode: true,
});
}

_transform(
chunk: string,
encoding: BufferEncoding,
callback: TransformCallback,
) {
if (!this.#headerAck) {
this.#headerAck = true;
return callback(null, []);
}
const [solutionString, ...puValues] = chunk.split(',');
const solutionNumber = +solutionString.replace('S', '');

const results: (SolutionRowResult & {
raw: string;
puid: number;
})[] = puValues.map((puValue, index) => ({
value: +puValue === 1 ? 1 : 0,
runId: solutionNumber,
spdId: this.solutionPuMapping[`${index + 1}`],
raw: `index: ${index + 1} ; puValue: ${puValue}`,
puid: index + 1,
}));
for (const k of results) {
if (!k.spdId) {
console.log(JSON.stringify(this.solutionPuMapping));
return callback(
new Error(
'spd.id is missing for: ' +
k.raw +
'; mapping yields:' +
this.solutionPuMapping[`${k.puid}`],
),
);
}
}

callback(null, results);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,37 @@ import { createReadStream } from 'fs';
import { createInterface } from 'readline';

import { SolutionTransformer } from './solution-row.transformer';
import { SolutionRowResult } from './solution-row-result';
import { SolutionRowResult } from '../solution-row-result';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { ScenariosPlanningUnitGeoEntity } from '@marxan/scenarios-planning-unit';

interface ReaderEvents {
data(rows: SolutionRowResult[]): void;

error(error: any): void;

finish(): void;
}
import { PuToScenarioPu } from '@marxan-geoprocessing/marxan-sandboxed-runner/adapters/solutions-output/geo-output/solutions/output-file-parsing/pu-to-scenario-pu';
import { SolutionsEvents } from '@marxan-geoprocessing/marxan-sandboxed-runner/adapters/solutions-output/geo-output/solutions/solutions-events';

@Injectable()
export class SolutionsReaderService {
constructor(
@InjectRepository(ScenariosPlanningUnitGeoEntity)
private readonly scenarioPuData: Repository<ScenariosPlanningUnitGeoEntity>,
) {
//
}

async from(file: string): Promise<TypedEmitter<ReaderEvents>> {
/**
* get maaping of PU for given scenario
* <number, string> -> <puid, scenarios_pu_data.id>
*
*
*
*
*/

) {}

async from(
file: string,
scenarioId: string,
): Promise<TypedEmitter<SolutionsEvents>> {
const planningUnits = await this.scenarioPuData.findAndCount({
where: {
scenarioId,
},
select: ['id', 'planningUnitMarxanId'],
});
const mapping = planningUnits[0].reduce<PuToScenarioPu>(
(previousValue, pu) => {
previousValue[pu.planningUnitMarxanId] = pu.id;
return previousValue;
},
{},
);
const duplex: Duplex<SolutionRowResult, string> = new PassThrough({
objectMode: true,
});
Expand All @@ -46,22 +45,14 @@ export class SolutionsReaderService {
crlfDelay: Infinity,
});

rl.on('line', (line) => {
rl.on(`line`, (line) => {
duplex.push(line);
});

rl.on('close', () => {
duplex.end();
});

// TODO could it be that silent fails come from there?
return duplex.pipe(
new SolutionTransformer({
0: '+>>>>>>>',
1: '-1-',
99: '-99-',
98: '-^.^-',
}),
);
console.log(`scenarioId`, scenarioId);
return duplex.pipe(new SolutionTransformer(mapping));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface PlanningUnitState {
values: boolean[];
usedCount: number;
}

export type PlanningUnitsState = Record<string, PlanningUnitState>;

This file was deleted.

Loading

0 comments on commit 469ad7b

Please sign in to comment.