Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';
const DataHubSingleton = require("/data-hub/5/datahub-singleton.sjs");
const datahub = DataHubSingleton.instance();
const hubUtils = require("/data-hub/5/impl/hub-utils.sjs");
const httpUtils = require("/data-hub/5/impl/http-utils.sjs");
const provLib = require("/data-hub/5/impl/prov.sjs");

Expand All @@ -14,19 +15,20 @@ function transform(context, params, content) {

let step = params['step'] ? xdmp.urlDecode(params['step']) : params['flow-name'] ? null : 1;
let stepObj = flow.steps[step];
if(!stepObj) {
if (!stepObj) {
datahub.debug.log({message: params, type: 'error'});
httpUtils.throwNotFoundWithArray(["Not Found", "The specified step "+ step + " is missing in " + flowName]);
httpUtils.throwNotFoundWithArray(["Not Found", "The specified step " + step + " is missing in " + flowName]);
}
if(! stepObj.stepDefinitionType.toLowerCase() === "ingestion"){
if (!stepObj.stepDefinitionType.toLowerCase() === "ingestion") {
datahub.debug.log({message: params, type: 'error'});
httpUtils.throwBadRequestWithArray(["Invalid Step Type", "The specified step "+ step + " is not an ingestion step"]);
httpUtils.throwBadRequestWithArray(["Invalid Step Type", "The specified step " + step + " is not an ingestion step"]);
}

let jobId = params["job-id"];
let options = params.options ? JSON.parse(params.options) : {};
options.permissions = options.permissions || "data-hub-common,read,data-hub-common,update";
Copy link
Preview

Copilot AI Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardcoded permission string should be extracted to a constant or configuration to improve maintainability and make it easier to change permissions across the codebase.

Suggested change
options.permissions = options.permissions || "data-hub-common,read,data-hub-common,update";
options.permissions = options.permissions || DEFAULT_PERMISSIONS;

Copilot uses AI. Check for mistakes.


if(options.inputFileType && options.inputFileType.toLowerCase() === "csv") {
if (options.inputFileType && options.inputFileType.toLowerCase() === "csv") {
content = JSON.parse(content);
options.file = content.file;
// Wrap the JSON parsed from the CSV as a document node, as a step's main function expects content.value
Expand All @@ -38,35 +40,38 @@ function transform(context, params, content) {
options.fullOutput = true;
options.enableBatchOutput = "never";

let newContent = {};
newContent.uri=context.uri;
newContent.value=content;
let newContent = {
uri: context.uri,
value: content,
context: {
permissions: hubUtils.parsePermissions(options.permissions)
Copy link
Preview

Copilot AI Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding input validation to ensure that hubUtils.parsePermissions() can handle the permissions string format properly, as malformed permission strings could cause runtime errors.

Copilot uses AI. Check for mistakes.

}
};

let flowContent = [];
flowContent.push(newContent);

let flowResponse = datahub.flow.runFlow(flowName, jobId, flowContent, options, step);
if (flowResponse.errors && flowResponse.errors.length) {
datahub.debug.log(flowResponse.errors[0]);
httpUtils.throwBadRequest(flowResponse.errors[0].stack);
}
let documents = flowResponse.documents;
if (documents && documents.length) {
Object.assign(context, documents[0].context);
}
let docs = [];
for (let doc of documents) {
delete doc.context;
if (!doc.value) {
datahub.debug.log({message: params, type: 'error'});
httpUtils.throwNotFoundWithArray(["Null Content", "The content was null in the flow " + flowName + " for " + doc.uri + "."]);
}
else {
docs.push(doc.value);
}
if (flowResponse.errors && flowResponse.errors.length) {
datahub.debug.log(flowResponse.errors[0]);
httpUtils.throwBadRequest(flowResponse.errors[0].stack);
}
let documents = flowResponse.documents;
if (documents && documents.length) {
Object.assign(context, documents[0].context);
}
let docs = [];
for (let doc of documents) {
delete doc.context;
if (!doc.value) {
datahub.debug.log({message: params, type: 'error'});
httpUtils.throwNotFoundWithArray(["Null Content", "The content was null in the flow " + flowName + " for " + doc.uri + "."]);
} else {
docs.push(doc.value);
}
provLib.getProvenanceWriteQueue().persist();
return Sequence.from(docs);
}
provLib.getProvenanceWriteQueue().persist();
return Sequence.from(docs);
}

exports.transform = transform;