Skip to content

Commit

Permalink
feat(marxan-run): dump pu solutions matrix to geodb 2
Browse files Browse the repository at this point in the history
  • Loading branch information
kgajowy committed Jul 20, 2021
1 parent 4b44d09 commit 4e96245
Show file tree
Hide file tree
Showing 18 changed files with 402 additions and 164 deletions.
20 changes: 10 additions & 10 deletions api/apps/api/test/fixtures/test-geodata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ select id, 1 as cost from scenarios_pu_data where scenario_id = '$scenario';

----Fake outputs
--- Fake output_scenarios_pu_data
WITH RECURSIVE nums (n) AS (
SELECT 1
UNION ALL
SELECT n+1 FROM nums WHERE n+1 <= 10
)
INSERT INTO output_scenarios_pu_data
(run_id, scenario_pu_id, value)
SELECT n as run_id, scenarios_pu_data.id, round(random()) as value
FROM nums, scenarios_pu_data
where scenarios_pu_data.scenario_id='$scenario';
--WITH RECURSIVE nums (n) AS (
-- SELECT 1
-- UNION ALL
-- SELECT n+1 FROM nums WHERE n+1 <= 10
--)
--INSERT INTO output_scenarios_pu_data
--(run_id, scenario_pu_id, value)
--SELECT n as run_id, scenarios_pu_data.id, round(random()) as value
--FROM nums, scenarios_pu_data
--where scenarios_pu_data.scenario_id='$scenario';

--- Fake output_scenarios_features_data
WITH RECURSIVE nums (n) AS (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import { GeoOutputRepository } from './geo-output.repository';
import { MetadataArchiver } from './metadata/data-archiver.service';
import { MarxanDirectory } from '../../marxan-directory.service';
import { FileReader } from '../../file-reader';
import { SolutionsReaderService } from './solutions/solutions-reader.service';
import { SolutionsReaderService } from './solutions/output-file-parsing/solutions-reader.service';
import { PlanningUnitStateCalculatorService } from './solutions/solution-aggregation/planning-unit-state-calculator.service';
import { OutputScenariosPuDataGeoEntity } from '@marxan/marxan-output';
import { ScenariosPlanningUnitGeoEntity } from '@marxan/scenarios-planning-unit';

Expand All @@ -24,6 +25,7 @@ import { ScenariosPlanningUnitGeoEntity } from '@marxan/scenarios-planning-unit'
MarxanDirectory,
FileReader,
SolutionsReaderService,
PlanningUnitStateCalculatorService,
],
exports: [GeoOutputRepository],
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
import { readFileSync } from 'fs';

import { Repository } from 'typeorm';
import { EntityManager, In } from 'typeorm';
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';

import { MarxanExecutionMetadataGeoEntity } from '@marxan/marxan-output';
import { InjectEntityManager } from '@nestjs/typeorm';

import { Workspace } from '../../../ports/workspace';
import { MetadataArchiver } from './metadata/data-archiver.service';
import { SolutionsReaderService } from './solutions/solutions-reader.service';
import { OutputScenariosPuDataGeoEntity } from '@marxan/marxan-output';
import { SolutionsReaderService } from './solutions/output-file-parsing/solutions-reader.service';
import { PlanningUnitStateCalculatorService } from './solutions/solution-aggregation/planning-unit-state-calculator.service';
import { PlanningUnitsState } from './solutions/planning-unit-state';
import { geoprocessingConnections } from '@marxan-geoprocessing/ormconfig';
import {
MarxanExecutionMetadataGeoEntity,
OutputScenariosPuDataGeoEntity,
} from '@marxan/marxan-output';
import { readFileSync } from 'fs';

@Injectable()
export class GeoOutputRepository {
constructor(
@InjectRepository(MarxanExecutionMetadataGeoEntity)
private readonly executionMetadataRepo: Repository<MarxanExecutionMetadataGeoEntity>,
@InjectRepository(OutputScenariosPuDataGeoEntity)
private readonly scenarioSolutionsOutputRepo: Repository<OutputScenariosPuDataGeoEntity>,
private readonly metadataArchiver: MetadataArchiver,
private readonly solutionsReader: SolutionsReaderService,
private readonly planningUnitsStateCalculator: PlanningUnitStateCalculatorService,
@InjectEntityManager(geoprocessingConnections.default)
private readonly entityManager: EntityManager,
) {}

async save(
Expand All @@ -36,56 +38,38 @@ export class GeoOutputRepository {
const solutionMatrix =
workspace.workingDirectory + `/output/output_solutionsmatrix.csv`;

const rowsStream = await this.solutionsReader.from(solutionMatrix);

return new Promise((resolve, reject) => {
// TODO open transaction
// // delete output_scenarios_pu_data for given scenarioId
// // insert new rows from file
const solutionsStream = await this.solutionsReader.from(
solutionMatrix,
scenarioId,
);

rowsStream.on('data', async (data) => {
//
if (data.length === 0) {
return;
}

// does not skip first batch of headers? (empty array)
console.log(`--- save rows:`, data, data.length);
try {
this.scenarioSolutionsOutputRepo
.save(
data.map((row) => this.scenarioSolutionsOutputRepo.create(row)),
{
chunk: 10000,
},
)
.then((res) => {
console.log(`--- saved`, res.length);
})
.catch((error) => {
console.log(`--- errors`, error);
});
} catch (er) {
console.log(`--- why no return at all?`, er);
}
const planningUnitsState: PlanningUnitsState = await this.planningUnitsStateCalculator.consume(
solutionsStream,
);
return this.entityManager.transaction(async (transaction) => {
await transaction.delete(OutputScenariosPuDataGeoEntity, {
scenarioPuId: In(Object.keys(planningUnitsState)),
});

rowsStream.on('error', reject);

rowsStream.on('finish', async () => {
// // insert metadata

await this.executionMetadataRepo.save(
this.executionMetadataRepo.create({
scenarioId,
stdOutput: metaData.stdOutput.toString(),
stdError: metaData.stdErr?.toString(),
outputZip: readFileSync(outputArchivePath),
inputZip: readFileSync(inputArchivePath),
await Promise.all(
Object.entries(planningUnitsState).map(([scenarioPuId, data]) =>
transaction.insert(OutputScenariosPuDataGeoEntity, {
scenarioPuId,
values: data.values,
includedCount: data.usedCount,
}),
);
resolve();
});
),
);

await transaction.save(
transaction.create(MarxanExecutionMetadataGeoEntity, {
scenarioId,
stdOutput: metaData.stdOutput.toString(),
stdError: metaData.stdErr?.toString(),
outputZip: readFileSync(outputArchivePath),
inputZip: readFileSync(inputArchivePath),
}),
);
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export type PuToScenarioPu = Record<string, string>;
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { Transform } from 'stronger-typed-streams';
import { TransformCallback } from 'stream';
import { PuToScenarioPu } from './pu-to-scenario-pu';
import { SolutionRowResult } from '../solution-row-result';

export class SolutionTransformer extends Transform<
string,
SolutionRowResult[]
> {
#headerAck = false;

constructor(private readonly solutionPuMapping: PuToScenarioPu) {
super({
objectMode: true,
});
}

_transform(
chunk: string,
encoding: BufferEncoding,
callback: TransformCallback,
) {
if (!this.#headerAck) {
this.#headerAck = true;
return callback(null, []);
}
const [solutionString, ...puValues] = chunk.split(',');
const solutionNumber = +solutionString.replace('S', '');

const results: (SolutionRowResult & {
raw: string;
puid: number;
})[] = puValues.map((puValue, index) => ({
value: +puValue === 1 ? 1 : 0,
runId: solutionNumber,
spdId: this.solutionPuMapping[`${index + 1}`],
raw: `index: ${index + 1} ; puValue: ${puValue}`,
puid: index + 1,
}));
for (const k of results) {
if (!k.spdId) {
return callback(
new Error(
'spd.id is missing for: ' +
k.raw +
'; mapping yields:' +
this.solutionPuMapping[`${k.puid}`],
),
);
}
}

callback(null, results);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,37 @@ import { createReadStream } from 'fs';
import { createInterface } from 'readline';

import { SolutionTransformer } from './solution-row.transformer';
import { SolutionRowResult } from './solution-row-result';
import { SolutionRowResult } from '../solution-row-result';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { ScenariosPlanningUnitGeoEntity } from '@marxan/scenarios-planning-unit';

interface ReaderEvents {
data(rows: SolutionRowResult[]): void;

error(error: any): void;

finish(): void;
}
import { PuToScenarioPu } from '@marxan-geoprocessing/marxan-sandboxed-runner/adapters/solutions-output/geo-output/solutions/output-file-parsing/pu-to-scenario-pu';
import { SolutionsEvents } from '@marxan-geoprocessing/marxan-sandboxed-runner/adapters/solutions-output/geo-output/solutions/solutions-events';

@Injectable()
export class SolutionsReaderService {
constructor(
@InjectRepository(ScenariosPlanningUnitGeoEntity)
private readonly scenarioPuData: Repository<ScenariosPlanningUnitGeoEntity>,
) {
//
}

async from(file: string): Promise<TypedEmitter<ReaderEvents>> {
/**
* get maaping of PU for given scenario
* <number, string> -> <puid, scenarios_pu_data.id>
*
*
*
*
*/

) {}

async from(
file: string,
scenarioId: string,
): Promise<TypedEmitter<SolutionsEvents>> {
const planningUnits = await this.scenarioPuData.findAndCount({
where: {
scenarioId,
},
select: ['id', 'planningUnitMarxanId'],
});
const mapping = planningUnits[0].reduce<PuToScenarioPu>(
(previousValue, pu) => {
previousValue[pu.planningUnitMarxanId] = pu.id;
return previousValue;
},
{},
);
const duplex: Duplex<SolutionRowResult, string> = new PassThrough({
objectMode: true,
});
Expand All @@ -46,22 +45,13 @@ export class SolutionsReaderService {
crlfDelay: Infinity,
});

rl.on('line', (line) => {
rl.on(`line`, (line) => {
duplex.push(line);
});

rl.on('close', () => {
duplex.end();
});

// TODO could it be that silent fails come from there?
return duplex.pipe(
new SolutionTransformer({
0: '+>>>>>>>',
1: '-1-',
99: '-99-',
98: '-^.^-',
}),
);
return duplex.pipe(new SolutionTransformer(mapping));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export interface PlanningUnitState {
values: boolean[];
usedCount: number;
}

export type PlanningUnitsState = Record<string, PlanningUnitState>;

This file was deleted.

Loading

0 comments on commit 4e96245

Please sign in to comment.