Skip to content

Commit

Permalink
WIP Upload data to storage location from the worker
Browse files Browse the repository at this point in the history
  • Loading branch information
brollb committed Oct 10, 2019
1 parent 80872f4 commit 4154330
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 55 deletions.
80 changes: 28 additions & 52 deletions src/plugins/ExecuteJob/ExecuteJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -629,58 +629,34 @@ define([
};

ExecuteJob.prototype.onDistOperationComplete = async function (node, fileHashes) {
const opName = this.getAttribute(node, 'name');
const resultTypes = await this.getResultTypes(fileHashes);
let nodeId = this.core.getPath(node),
outputMap = {},
outputs;


// Match the output names to the actual nodes
// Create an array of [name, node]
// For now, just match by type. Later we may use ports for input/outputs
// Store the results in the outgoing ports
return this.getOutputs(node)
.then(outputPorts => {
outputs = outputPorts.map(tuple => [tuple[0], tuple[2]]);
outputs.forEach(output => outputMap[output[0]] = output[1]);

// this should not be in directories -> flatten the data!
const hashes = outputs.map(tuple => { // [ name, node ]
let [name] = tuple;
let artifactHash = fileHashes[name];
return this.getContentHash(artifactHash, `outputs/${name}`);
});

return Q.all(hashes);
})
.then(hashes => {
// Create new metadata for each
hashes.forEach((hash, i) => {
var name = outputs[i][0],
dataType = resultTypes[name];

if (dataType) {
this.setAttribute(outputMap[name], 'type', dataType);
this.logger.info(`Setting ${nodeId} data type to ${dataType}`);
} else {
this.logger.warn(`No data type found for ${nodeId}`);
}

if (hash) {
this.setAttribute(outputMap[name], 'data', hash);
this.logger.info(`Setting ${nodeId} data to ${hash}`);
}
});

return this.onOperationComplete(node);
})
.catch(e => this.onOperationFail(node, `"${opName}" failed: ${e}`));
};

ExecuteJob.prototype.getResultTypes = async function (fileHashes) {
const mdHash = fileHashes['result-types'];
const hash = await this.getContentHashSafe(mdHash, 'result-types.json', ERROR.NO_TYPES_FILE);
const results = await this.getResults(fileHashes);
const nodeId = this.core.getPath(node);
const outputPorts = await this.getOutputs(node);
const outputs = outputPorts.map(tuple => [tuple[0], tuple[2]]);

for (let i = outputs.length; i--;) {
const [name, node] = outputs[i];
const {type, dataInfo} = results[name];

if (type) {
this.setAttribute(node, 'type', type);
this.logger.info(`Setting ${nodeId} data type to ${type}`);
} else {
this.logger.warn(`No data type found for ${nodeId}`);
}

if (dataInfo) {
this.setAttribute(node, 'data', dataInfo);
this.logger.info(`Setting ${nodeId} data to ${dataInfo}`);
}
}

return this.onOperationComplete(node);
};

ExecuteJob.prototype.getResults = async function (fileHashes) {
const mdHash = fileHashes['results'];
const hash = await this.getContentHashSafe(mdHash, 'results.json', ERROR.NO_TYPES_FILE);
return await this.blobClient.getObjectAsJSON(hash);
};

Expand Down
4 changes: 2 additions & 2 deletions src/plugins/GenerateJob/GenerateJob.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ define([
resultPatterns: [STDOUT_FILE]
},
{
name: 'result-types',
resultPatterns: ['result-types.json']
name: 'results',
resultPatterns: ['results.json']
},
{
name: name + '-all-files',
Expand Down
16 changes: 15 additions & 1 deletion src/plugins/GenerateJob/templates/start.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,23 @@ requirejs([
job = spawn('python', ['main.py'], {detached: true});
job.stdout.on('data', onStdout);
job.stderr.on('data', onStderr);
job.on('close', code => {
job.on('close', async code => {
exitCode = code;
log('script finished w/ exit code:', code);
const results = JSON.parse(fs.readFileSync('result-types.json'));
if (exitCode === 0) {
const client = await Storage.getBackend('gme').getClient(logger);
const outputNames = Object.keys(results);

for (let i = outputNames.length; i--;) {
const filename = outputNames[i];
const contents = fs.readFileSync(`outputs/${filename}`);
const dataInfo = await client.putFile(filename, contents);
const type = results[filename];
results[filename] = {type, dataInfo};
}
}
fs.writeFileSync('results.json', JSON.stringify(results));
checkFinished();
});
})
Expand Down

0 comments on commit 4154330

Please sign in to comment.