diff --git a/marklogic-data-hub/src/main/resources/ml-modules/transforms/mlRunIngest.sjs b/marklogic-data-hub/src/main/resources/ml-modules/transforms/mlRunIngest.sjs index bab91dd3af..657eb8d67e 100644 --- a/marklogic-data-hub/src/main/resources/ml-modules/transforms/mlRunIngest.sjs +++ b/marklogic-data-hub/src/main/resources/ml-modules/transforms/mlRunIngest.sjs @@ -1,6 +1,7 @@ 'use strict'; const DataHubSingleton = require("/data-hub/5/datahub-singleton.sjs"); const datahub = DataHubSingleton.instance(); +const hubUtils = require("/data-hub/5/impl/hub-utils.sjs"); const httpUtils = require("/data-hub/5/impl/http-utils.sjs"); const provLib = require("/data-hub/5/impl/prov.sjs"); @@ -14,19 +15,20 @@ function transform(context, params, content) { let step = params['step'] ? xdmp.urlDecode(params['step']) : params['flow-name'] ? null : 1; let stepObj = flow.steps[step]; - if(!stepObj) { + if (!stepObj) { datahub.debug.log({message: params, type: 'error'}); - httpUtils.throwNotFoundWithArray(["Not Found", "The specified step "+ step + " is missing in " + flowName]); + httpUtils.throwNotFoundWithArray(["Not Found", "The specified step " + step + " is missing in " + flowName]); } - if(! stepObj.stepDefinitionType.toLowerCase() === "ingestion"){ + if (!stepObj.stepDefinitionType.toLowerCase() === "ingestion") { datahub.debug.log({message: params, type: 'error'}); - httpUtils.throwBadRequestWithArray(["Invalid Step Type", "The specified step "+ step + " is not an ingestion step"]); + httpUtils.throwBadRequestWithArray(["Invalid Step Type", "The specified step " + step + " is not an ingestion step"]); } let jobId = params["job-id"]; let options = params.options ? JSON.parse(params.options) : {}; + options.permissions = options.permissions || "data-hub-common,read,data-hub-common,update"; - if(options.inputFileType && options.inputFileType.toLowerCase() === "csv") { + if (options.inputFileType && options.inputFileType.toLowerCase() === "csv") { content = JSON.parse(content); options.file = content.file; // Wrap the JSON parsed from the CSV as a document node, as a step's main function expects content.value @@ -38,35 +40,38 @@ function transform(context, params, content) { options.fullOutput = true; options.enableBatchOutput = "never"; - let newContent = {}; - newContent.uri=context.uri; - newContent.value=content; + let newContent = { + uri: context.uri, + value: content, + context: { + permissions: hubUtils.parsePermissions(options.permissions) + } + }; let flowContent = []; flowContent.push(newContent); let flowResponse = datahub.flow.runFlow(flowName, jobId, flowContent, options, step); - if (flowResponse.errors && flowResponse.errors.length) { - datahub.debug.log(flowResponse.errors[0]); - httpUtils.throwBadRequest(flowResponse.errors[0].stack); - } - let documents = flowResponse.documents; - if (documents && documents.length) { - Object.assign(context, documents[0].context); - } - let docs = []; - for (let doc of documents) { - delete doc.context; - if (!doc.value) { - datahub.debug.log({message: params, type: 'error'}); - httpUtils.throwNotFoundWithArray(["Null Content", "The content was null in the flow " + flowName + " for " + doc.uri + "."]); - } - else { - docs.push(doc.value); - } + if (flowResponse.errors && flowResponse.errors.length) { + datahub.debug.log(flowResponse.errors[0]); + httpUtils.throwBadRequest(flowResponse.errors[0].stack); + } + let documents = flowResponse.documents; + if (documents && documents.length) { + Object.assign(context, documents[0].context); + } + let docs = []; + for (let doc of documents) { + delete doc.context; + if (!doc.value) { + datahub.debug.log({message: params, type: 'error'}); + httpUtils.throwNotFoundWithArray(["Null Content", "The content was null in the flow " + flowName + " for " + doc.uri + "."]); + } else { + docs.push(doc.value); } - provLib.getProvenanceWriteQueue().persist(); - return Sequence.from(docs); + } + provLib.getProvenanceWriteQueue().persist(); + return Sequence.from(docs); } exports.transform = transform;