diff --git a/bl-app-wait.js b/bl-app-wait.js index c07c032..52ad86c 100755 --- a/bl-app-wait.js +++ b/bl-app-wait.js @@ -5,28 +5,25 @@ const commander = require('commander'); const util = require('./util'); const request = require('request-promise-native'); -commander - .usage('[options] ') - .option('-i, --id ', 'id of task to wait for') - .parse(process.argv); -try { - if(commander.args.length > 0) commander.id = commander.id || commander.args[0]; - if(!commander.id) throw new Error("please specify task id"); -} catch(err) { - console.error(err.toString()); - process.exit(1); -} +let program = new commander.Command(); +program + .storeOptionsAsProperties(true) + .argument('task-id', 'Id of the task to wait for') + .parse(); + +program.parse(); + +let taskId = program.args[0]; util.loadJwt().then(jwt => { let headers = { "Authorization": "Bearer " + jwt }; - request.get({ url: config.api.amaretti+"/task?find=" + JSON.stringify({_id: commander.id}), headers, json: true}).then(body=>{ - if (body.tasks.length == 0) throw new Error("no tasks found with id " + commander.id); - util.waitForFinish(headers, body.tasks[0], process.stdout.isTTY, async err => { - if (err) throw err; - console.log("(done waiting)"); + request.get({ url: config.api.amaretti+"/task?find=" + JSON.stringify({_id: taskId}), headers, json: true}) + .then(async (body) => { + if (body.tasks.length == 0) throw new Error("No tasks found with Id " + taskId); + await util.waitForFinish(headers, body.tasks[0], process.stdout.isTTY); + console.error("(done waiting)"); + }).catch(err => { + console.error(err.message); }); - }).catch(err=>{ - console.error(err.message); - }); }); diff --git a/bl-bids-upload.js b/bl-bids-upload.js index 748a6a9..b3b87f4 100755 --- a/bl-bids-upload.js +++ b/bl-bids-upload.js @@ -140,12 +140,7 @@ util.loadJwt().then(async jwt => { }}).then(body=>{ let task = body.task; console.log("Waiting for upload task to be ready..."); - return new Promise((resolve, reject)=>{ - util.waitForFinish(headers, task, true, err=>{ - if(err) return reject(err); - resolve(task); - }); - }); + return util.waitForFinish(headers, task, true); }); } diff --git a/bl-data-upload.js b/bl-data-upload.js index 1b5fa63..fc2411d 100755 --- a/bl-data-upload.js +++ b/bl-data-upload.js @@ -1,253 +1,175 @@ #!/usr/bin/env node -const request = require('request-promise-native'); -const axios = require('axios'); -const config = require('./config'); const fs = require('fs'); -const async = require('async'); -const archiver = require('archiver'); -const jsonwebtoken = require('jsonwebtoken'); const commander = require('commander'); +const archiver = require('archiver'); +const FormData = require('form-data'); +const config = require('./config'); const util = require('./util'); -commander - .option('-p, --project ', 'project id to upload dataset to') - .option('-d, --datatype ', 'datatype of uploaded dataset') - .option('--datatype_tag ', 'add a datatype tag to the uploaded dataset', util.collect, []) - .option('-n, --desc ', 'description of uploaded dataset') - .option('-s, --subject ', '(metadata) subject of the uploaded dataset') +let program = new commander.Command(); +program + .requiredOption('-p, --project ', 'project id to upload dataset to') + .requiredOption('-d, --datatype ', 'datatype of uploaded dataset') + .requiredOption('-s, --subject ', '(metadata) subject of the uploaded dataset') + .option('--datatype_tag ', 'add a datatype tag to the uploaded dataset', util.collect, []) .option('-e, --session ', '(metadata) session of the uploaded dataset') .option('-r, --run ', '(metadata) run of the uploaded dataset') .option('-t, --tag ', 'add a tag to the uploaded dataset', util.collect, []) - .option('-m, --meta ', 'file path for (sidecar).json containing additional metadata') - .option('-j, --json', 'output uploaded dataset information in json format'); - -//TODO.. -//.option('--force', 'force the dataset to be uploaded, even if no validator is present') - -function getcliopt(key) { - let match = commander.options.find(option=>{ - return(option.short == key || option.long == key); - }); - return match; -} - -//parse individual user-inputted files -//TODO - file id could collide with cli options. -let fileList = {}; -let new_argv = []; -for(let i = 0;i < process.argv.length; ++i) { - let arg = process.argv[i]; - if(arg.indexOf("--") === 0 && arg != "--help" && !getcliopt(arg)) { - fileList[arg.substring(2)] = process.argv[i+1]; - i++; //skip - } else { - new_argv.push(arg); - } -} -commander.parse(new_argv); - -try { - if(!commander.project) throw new Error("Please specify project (-p) to upload data to"); - if(!commander.datatype) throw new Error("Please specify datatype (-d) of the object"); - if(!commander.subject) throw new Error("Please specify subject name (-s)"); -} catch(err) { - console.error(err.toString()); - process.exit(1); -} - -util.loadJwt().then(jwt => { - let headers = { "Authorization": "Bearer " + jwt }; - let opts = { - datatype: commander.datatype, - project: commander.project, - files: fileList, - desc: commander.desc, - - datatype_tags: commander.datatype_tag, - subject: commander.subject, - session: commander.session, - tags: commander.tag, - run: commander.run, - json: commander.json, - } + .option('-m, --meta ', 'path for a sidecar .json file containing additional metadata', util.ensureArgFileJSONRead) + .option('-n, --desc ', 'description of uploaded dataset') + .option('-j, --json', 'output uploaded dataset information in json format') + .allowUnknownOption(true) + .exitOverride(); - if (commander.meta) { - fs.stat(commander.meta, (err, stats) => { - if (err) throw err; - opts.meta = JSON.parse(fs.readFileSync(commander.meta, 'ascii')); //why ascii? - uploadDataset(headers, opts); - }); - } else { - uploadDataset(headers, opts); - } -}); +program.missingMandatoryOptionValue = (opt) => { + throw new util.ArgError(opt.flags); +}; -async function uploadDataset(headers, options) { +new Promise(async () => { + try { + + program.parse(); + let options = program.opts ? program.opts() : program; - let files = options.files || {}; - let desc = options.desc || ''; - let datatype_tags = options.datatype_tags || []; - let tags = options.tags || []; - let metadata = options.meta || {}; - let fileids = Object.keys(files); + util.http.authenticate(); - if (options.subject) { - metadata.subject = options.subject; - } - if (options.session) { - metadata.session = options.session; - } - if (options.run) { - metadata.run = options.run; - tags.push("run-"+options.run); //let's add run to tag - } - - let datatype = await util.getDatatype(headers, options.datatype); - - let projects = await util.resolveProjects(headers, options.project); - if (projects.length == 0) throw new Error("project '" + options.project + "' not found"); - if (projects.length > 1) throw new Error("multiple projects matching '"+projects.length); - - - //check to make sure user didn't set anything weird via command line - for(let id in fileList) { - let file = datatype.files.find(f=>f.id == id); - if(!file) { - console.error("Unknown parameter", "--"+id); - console.error("Please use the following file IDs for the specified datatype"); - datatype.files.forEach(f=>{ - console.log("--"+f.id, f.filename||f.dirname, f.desc||'') - }); - process.exit(1); - } - } + const datatype = await util.getDatatype(undefined, options.datatype); - let archive = archiver('tar', { gzip: true }); - let project = projects[0]; - - let instanceName = 'upload.'+project.group_id; //same for web ui upload - let instance = await util.findOrCreateInstance(headers, instanceName, {project}); - archive.on('error', err=>{ - throw new Error(err); - }); - - async.forEach(datatype.files, (file, next_file) => { - if (fileids.length > 0) { - let path = files[file.id] || files[file.filename||file.dirname]; //TODO - explain. - if (path) { - fs.stat(path, (err, stats) => { - if (err) throw err; - if (file.filename) { - archive.file(path, { name: file.filename }); - } else { - archive.directory(path, file.dirname); - } - next_file(); - }); - } else { - if (file.required) throw new Error("File '" + (file.filename||file.dirname) + "' is required for this datatype but was not provided"); - next_file(); - } - } else { - if (!options.json) console.log("Looking for " + (file.filename||file.dirname)); - fs.stat(file.filename||file.dirname, (err,stats)=>{ - if(err) { - if (!file.dirname) throw err; - fs.stat(file.dirname, (err, stats) => { - if (err) throw new Error("unable to stat " + file.dirname + " ... Does the specified directory exist?"); - archive.directory(file.dirname, file.dirname); - next_file(); - }); - } else { - archive.file(file.filename, { name: (file.filename||file.dirname) }); - next_file(); - } - }); + // Add datatype arguments + program = program.allowUnknownOption(false); + for (let file of datatype.files) { + const isDir = !file.filename && file.dirname; + const path = file.filename || file.dirname; + const label = file.desc ?? file.id + (isDir ? ' directory' : ' file'); + + program.option(`--${file.id} <${path}>`, label + file.required ? ' (required)' : '', util.ensureArgFileRead); } - }, err => { - if(err) throw err; - archive.finalize(); - - //submit noop to upload data - //warehouse dataset post api need a real task to submit from - axios.post(config.api.amaretti+"/task", { + // Recheck arguments + program.parse(); + options = program.opts ? program.opts() : program; + + // Fetch project + const projects = await util.resolveProjects(undefined, options.project); + if (projects.length == 0) + throw new util.UserError(`Project "${options.project}" not found.`); + if (projects.length > 1) + throw new util.UserError( + `There are ${projects.length} projects matching "${options.project}".` + + `Please, specify one.` + ); + const project = projects[0]; + + let instanceName = `upload.${project.group_id}`; + let instance = await util.findOrCreateInstance(undefined, instanceName, { project }); + + console.error("Preparing to upload..."); + + const taskRes = await util.http.post(`${config.api.amaretti}/task`, { instance_id: instance._id, name: instanceName, service: 'brainlife/app-noop', - config: {}, //must exists - }, {headers}).then(res=>{ - let task = res.data.task; - if (!options.json) console.log("preparing to upload.."); - util.waitForFinish(headers, task, !options.json, function(err) { - if(err) throw err; - if (!options.json) console.log("uploading data.."); - - //TODO - update to use axios, and use upload2 api that uses formdata/multipart - let req = request.post({url: config.api.amaretti+"/task/upload/"+task._id+"?p=upload/upload.tar.gz&untar=true", headers}); - archive.pipe(req); - - req.on('response', res=>{ - if(res.statusCode != "200") throw new Error(res); - - if (!options.json) console.log("data successfully uploaded. finalizing upload.."); - axios.post(config.api.warehouse+'/dataset/finalize-upload', { - task: task._id, - datatype: datatype._id, - subdir: "upload", - - fileids, - - //data object info - datatype_tags, - meta: metadata, - tags, - desc, - - }, {headers}).then(res=>{ - if(res.data.validator_task) { - if (!options.json) console.log("validating..."); - util.waitForFinish(headers, res.data.validator_task, !options.json, async (err, archive_task, datasets) => { - if (err) { - console.error("validation failed", err); - process.exit(1); - } else { - if(!options.json) console.log("validator finished"); - if (task.product && !options.json) { - if (task.product.warnings && task.product.warnings.length > 0) { - task.product.warnings.forEach(warning => console.log("Warning: " + warning)); - } - } - if(!options.json) { - console.log("successfully uploaded"); - console.log("https://"+config.host+"/project/"+project._id+"#object:"+datasets[0]._id); - } else { - //finally dump the dataset - console.log(JSON.stringify(datasets[0], null, 4)); - } - } - }); - } else { - if(!options.json) console.log("no validator registered for this datatype. skipping validation"); - util.waitForArchivedDatasets(headers, 1, task, !options.json, (err, datasets)=>{ - if(err) throw err; - if(!options.json) console.log("successfully uploaded. data object id:", datasets[0]._id); - else { - //finally dump the dataset - console.log(JSON.stringify(datasets[0], null, 4)); - } - }) - } - }).catch(err=>{ - if(err.response && err.response.data && err.response.data.message) console.log(err.response.data.message); - else console.error(err); - }); - }); - }); - }).catch(err=>{ - console.error(err); + config: {}, }); - }); -} + const task = taskRes.data.task; + await util.waitForFinish( + undefined, task, !options.json + ); + + // Compress all the files into a tar.gz + let archive = archiver('tar', { gzip: true }); + + const output = fs.createWriteStream(`/tmp/bl-${task._id}.tar.gz`); + archive.pipe(output); + // @TODO for some reason, archiver and axios don't get together well + // so have to save to filesystem first and then upload + + for (let file of datatype.files) { + const path = options[file.id]; + if (path === undefined) { + continue; + } + if (file.filename) { + archive.file(path, { name: file.filename }); + } else { + archive.directory(path, file.dirname); + } + } + + await archive.finalize(); + + console.error("Sending data..."); + + const formData = new FormData({ autoDestroy: true }); + formData.append('file', fs.createReadStream(`/tmp/bl-${task._id}.tar.gz`)); + // formData.append('file', archive); + const formHeaders = formData.getHeaders(); + + await util.http.post( + `${config.api.amaretti}/task/upload2/${task._id}`, + formData, + { + params: { + p: 'upload/upload.tar.gz', + untar: true, + }, + headers: formHeaders, + maxBodyLength: Infinity, + } + ); + + console.error("Data successfully sent to Brainlife. Finalizing upload..."); + + const finalizeRes = await util.http.post( + `${config.api.warehouse}/dataset/finalize-upload`, + { + task: task._id, + datatype: datatype._id, + subdir: "upload", + fileids: datatype.files.map(f => f.id), + datatype_tags: options.datatype_tag, + meta: options.meta, + tags: options.tags, + desc: options.desc, + } + ); + + // Wait for validation to finish + if(finalizeRes.data.validator_task) { + console.error("Validating..."); + const validatorTask = finalizeRes.data.validator_task; + const { datasets } = await util.waitForFinish( + undefined, validatorTask, !options.json, + ); + + console.error(); + console.error("Validator finished."); + + console.error(`Successfully uploaded. Data object id: ${datasets[0]._id}`); + console.error(`https://${config.host}/project/${project._id}#object:${datasets[0]._id}`); + + if (options.json) { + console.log(JSON.stringify(datasets[0])); + } + } else { + console.error("No validator registered for this datatype. Skipping validation..."); + const { datasets } = await util.waitForArchivedDatasets( + undefined, 1, task, !options.json + ); + + console.error(`Successfully uploaded. Data object id: ${datasets[0]._id}`); + console.error(`https://${config.host}/project/${project._id}#object:${datasets[0]._id}`); + + if (options.json) { + console.log(JSON.stringify(datasets[0])); + } + } + } catch (error) { + process.exit( + util.handleAppError(program, error) + ); + } + +}); diff --git a/bl-login.js b/bl-login.js index 7a7ecac..5308814 100755 --- a/bl-login.js +++ b/bl-login.js @@ -25,14 +25,14 @@ async function login() { if(!password) password = readlineSync.question("password: ", {hideEchoBack: true}); try { - const _jwt = await util.login({ + const token = await util.login({ ldap: commander.ldap, ttl: commander.ttl, username, password, }); - let token = jwt.decode(_jwt); - let ttl = timediff(new Date(token.exp*1000), new Date()); + let payload = jwt.decode(token); + let ttl = timediff(new Date(payload.exp*1000), new Date()); let formattedTime = Object.keys(ttl).map(units => { let time = ttl[units]; if (time == 0 || units == 'milliseconds') return ''; @@ -42,7 +42,6 @@ async function login() { } catch (err) { console.error(err.toString()); } - } login(); diff --git a/config.js b/config.js index 029d40c..8a91568 100644 --- a/config.js +++ b/config.js @@ -3,12 +3,10 @@ exports.host = process.env.BLHOST || "brainlife.io"; exports.api = { auth: "https://"+exports.host+"/api/auth", - amaretti: "https://"+exports.host+"/api/amaretti", - warehouse: "https://"+exports.host+"/api/warehouse", - - event_ws: "wss://"+exports.host+"/api/event", + amaretti: "https://"+exports.host+"/api/amaretti", + warehouse: "https://"+exports.host+"/api/warehouse", + event_ws: "wss://"+exports.host+"/api/event", } -exports.api.wf = exports.api.amaretti; //deprecated .. use api.amaretti exports.path = { jwt: process.env.HOME+"/.config/" + exports.host + "/.jwt", diff --git a/package.json b/package.json index ee59742..c50914a 100644 --- a/package.json +++ b/package.json @@ -16,8 +16,9 @@ "axios": "^0.27.2", "bids-validator": "^1.9.4", "colors": "^1.4.0", - "commander": "^5", + "commander": "^9.4.0", "datetime-difference": "^1.0.2", + "form-data": "^4.0.0", "jsonwebtoken": "^8.5.1", "mkdirp": "^1.0.4", "readline-sync": "^1.4.10", diff --git a/util.js b/util.js index 906ff83..291e480 100644 --- a/util.js +++ b/util.js @@ -6,26 +6,95 @@ const axios = require('axios'); const config = require('./config'); const fs = require('fs'); const jsonwebtoken = require('jsonwebtoken'); -const timeago = require('time-ago'); -const async = require('async'); -const tar = require('tar'); const path = require('path'); const mkdirp = require('mkdirp'); -const colors = require('colors'); -const delimiter = ','; +class UserError extends Error {} -exports.login = async function(opt) { +class ArgError extends UserError { + constructor(arg) { + super(); + this.arg = arg; + } +} + +class ArgFileReadError extends ArgError { + constructor(arg, path) { + super(arg); + this.path = path; + } +} + +exports.UserError = UserError; +exports.ArgError = ArgError; +exports.ArgFileReadError = ArgFileReadError; + +/** + * Validate a path exists and is readable + * @param {string} path + * @returns {string} path + */ +exports.ensureArgFileRead = (path) => { + try { + fs.accessSync(path, fs.constants.R_OK); + return path; + } catch (error) { + throw new ArgFileReadError(this.name(), path); + } +} + +/** + * Validator for JSON file path + * @param {string} path + * @returns {object} JSON content + */ +exports.ensureArgFileJSONRead = (path) => { + try { + fs.accessSync(path, fs.constants.R_OK); + return JSON.parse(fs.readFileSync(path, 'utf-8')); + } catch (error) { + throw new ArgFileReadError(this.name(), path); + } +} + +/** + * Handles general errors like arguments and Axios communitation + * @param {Error} error + * @returns exit code + */ +exports.handleAppError = (program, error) => { + if (error instanceof ArgFileReadError) { + console.error(`Error: argument '${error.arg}' path not readable: '${error.path}'`); + } else if (error instanceof ArgError) { + console.error(`Error: required argument '${error.arg}' not specified`); + console.error(); + program.outputHelp(); + } else if (error instanceof UserError) { + console.error(`Error: ${error.message}`); + } else if (error instanceof axios.AxiosError) { + ;;; // handled in interceptor + } else { + console.error(`Error: ${error.message ?? error}`); + console.trace(error); + } + return 1; +} + +exports.login = async function (opt) { let url = config.api.auth; - if(opt.ldap) url += "/ldap/auth"; + if (opt.ldap) url += "/ldap/auth"; else url += "/local/auth"; let jwt = null; try { - const res = await axios.post(url, {username: opt.username, password: opt.password, ttl: 1000*60*60*24*(opt.ttl || 1)}); - if(res.status != 200) throw new Error(res.data.message); + const res = await axios.post(url, { + username: opt.username, + password: opt.password, + ttl: 1000 * 60 * 60 * 24 * (opt.ttl || 1) + }); + if (res.status != 200) throw new Error(res.data.message); jwt = res.data.jwt; } catch (err) { throw new Error(err.response.data.message); @@ -41,10 +110,12 @@ exports.login = async function(opt) { return jwt; } -exports.refresh = async function(opt, headers) { - let url = config.api.auth+"/refresh"; - let res = await axios.post(url, {ttl: 1000*60*60*24*(opt.ttl || 1)}, {headers}); - if(res.status != 200) throw new Error("Error: " + res.data.message); +exports.refresh = async function (opt, headers) { + let url = config.api.auth + "/refresh"; + let res = await axios.post(url, { + ttl: 1000 * 60 * 60 * 24 * (opt.ttl || 1) + }, { headers }); + if (res.status != 200) throw new Error("Error: " + res.data.message); let dirname = path.dirname(config.path.jwt); await mkdirp(dirname); fs.chmodSync(dirname, '700'); @@ -55,24 +126,26 @@ exports.refresh = async function(opt, headers) { /** * Load the user's jwt token - * @returns {Promise} + * @returns {string} */ -exports.loadJwt = function() { - return new Promise((resolve, reject) => { - fs.stat(config.path.jwt, (err, stat) => { - if (err) { - return reject("Couldn't find your access token. Please try logging in by running 'bl login'"); - process.exit(1); - } - let jwt = fs.readFileSync(config.path.jwt, "ascii").trim(); - let dec = jsonwebtoken.decode(jwt); - if(!dec) return reject("Failed to decode you access token. Please try logging in by running 'bl login'"); - if(dec.exp < Date.now()/1000) return reject("You access token is expired. Please try logging in by running 'bl login'."); - - resolve(jwt); - }); - }); +exports.loadJwtSync = () => { + try { + const jwtFile = config.path.jwt; + if (!fs.existsSync(jwtFile)) + throw Error("Please log-in first using 'bl login'."); + const jwt = fs.readFileSync(jwtFile, "ascii").trim(); + const payload = jsonwebtoken.decode(jwt); + if (!payload) + throw Error("Failed to read your credentials. Please log-in using 'bl login'."); + if (payload.exp < Date.now() / 1000) + throw Error("Your credentials have expired. Please log-in using 'bl login'."); + return jwt; + } catch (error) { + console.error(error.message); + process.exit(1); + } } +exports.loadJwt = async () => exports.loadJwtSync(); exports.queryProfiles = function(headers, query, opt) { if(!query) query = {}; @@ -250,20 +323,21 @@ exports.resolveDatasets = function(headers, query, opt) { } } -exports.queryProjects = async function(headers, query, opt) { +exports.queryProjects = async (headers, query, opt) => { if(!query) query = {}; if(!opt) opt = {}; - + let projectAdmin = null; let projectMember = null; let projectGuest = null; if (query.admin) projectAdmin = await exports.resolveProfiles(headers, query.admin); if (query.member) projectMember = await exports.resolveProfiles(headers, query.member); - if (query.guest) projectGuest = await exports.resolvePRofiles(headers, query.guest); + if (query.guest) projectGuest = await exports.resolveProfiles(headers, query.guest); let find = { removed: false }, andQueries = [], orQueries = []; if (query.id) { - if (!exports.isValidObjectId(query.id)) throw new Error('Not a valid object id: ' + query.id); + if (!exports.isValidObjectId(query.id)) + throw new Error('Not a valid object id: ' + query.id); orQueries.push({ _id: query.id }); } if (query.search) { @@ -283,17 +357,21 @@ exports.queryProjects = async function(headers, query, opt) { if (orQueries.length > 0) andQueries.push({ $or: orQueries }); if (andQueries.length > 0) find.$and = andQueries; - return request(config.api.warehouse + '/project', { headers, json: true, - qs: { - find: JSON.stringify(find), - sort: JSON.stringify({ name: 1 }), - skip: opt.skip || 0, - limit: opt.limit || 100 + + const res = await http.get( + `${config.api.warehouse}/project`, + { + ...(headers || {}), + params: { + find: JSON.stringify(find), + sort: JSON.stringify({ name: 1 }), + skip: opt.skip || 0, + limit: opt.limit || 100 + } } - }).then(body=>{ - //else if (res.statusCode != 200) return throw new Error(res.body.message); - return body.projects; - }); + ); + + return res.data.projects; } exports.queryPubs = async function(headers, query, opt) { @@ -437,21 +515,21 @@ exports.queryApps = async function(headers, query, opt) { }); } -exports.getDatatype = function(headers, query) { - return new Promise(async (resolve, reject) => { - const find = {}; - if(exports.isValidObjectId(query)) find._id = query; - else find.name = query; +exports.getDatatype = async function (headers, query) { + const find = {}; + if (exports.isValidObjectId(query)) find._id = query; + else find.name = query; - axios.get(config.api.warehouse + '/datatype', { headers, - params: { - find: JSON.stringify(find), - } - }).then(res=>{; - if(res.data.datatypes.length == 0) return reject("no matching datatype:"+query); - return resolve(res.data.datatypes[0]); - }); + const res = await http.get(config.api.warehouse + '/datatype', { + ...(headers || {}), + params: { + find: JSON.stringify(find), + } }); + + if (res.data.datatypes.length == 0) + throw Error(`The datatype ${query} was not found.`); + return res.data.datatypes[0]; } //TODO get rid off this @@ -538,125 +616,139 @@ exports.resolveResources = function(headers, query, opt) { * @param {string} options.desc * @returns {Promise} */ -exports.findOrCreateInstance = function(headers, instanceName, options) { - return new Promise((resolve, reject)=>{ - // get instance that might already exist - var find = { name: instanceName }; - options = options || {}; - - request({url: config.api.amaretti + "/instance?find=" + JSON.stringify(find), headers: headers, json: true}, (err, res, body) => { - if(err) return reject(err); - if(res.statusCode != 200) return reject(res.statusCode); - if(body.instances[0]) resolve(body.instances[0]); - else { - // need to create new instance - let body = { name: instanceName, desc: options.desc }; - if (options.project) { - body.config = { brainlife: true }; - body.group_id = options.project.group_id; - } - request.post({url: config.api.amaretti + "/instance", headers: headers, json: true, body, - }, function(err, res, body) { - if (err) return reject(err); - else if (res.statusCode != 200) { - if (res.statusMessage == 'not member of the group you have specified') { - return reject("There was an error during instance creation. Please log in again."); - } - else return reject(res.body.message); - } else { - resolve(body); - } - }); +exports.findOrCreateInstance = async (headers, instanceName, options) => { + const find = { name: instanceName }; + const res = await http.get( + `${config.api.amaretti}/instance`, + { + params: { find: JSON.stringify(find) }, + ...(headers || {}) + } + ) + + if (res.data?.instances[0]) + return res.data.instances[0]; + + else { + // need to create new instance + const body = { name: instanceName, desc: options.desc }; + if (options.project) { + body.config = { brainlife: true }; + body.group_id = options.project.group_id; + } + const res = await http.get( + `${config.api.amaretti}/instance`, + { + params: { find: JSON.stringify(find) }, + ...(headers || {}) } - }); - }); + ); + return res.data?.instances[0]; + } } //Wait for datasets from task to be archived -exports.waitForArchivedDatasets = function(headers, datasetCount, task, verbose, cb) { - request(config.api.warehouse+'/dataset', { json: true, headers, qs: { - find: JSON.stringify({'prov.task_id': task._id}), - } }, (err, res, body) => { - if (err) return cb(err); - if (res.statusCode != 200) return cb(res.body.message); +exports.waitForArchivedDatasets = async (headers, datasetCount, task, verbose) => { + while (true) { + const res = await http.get(`${config.api.warehouse}/dataset`, { + headers, + params: { + find: JSON.stringify({ 'prov.task_id': task._id }) + } + }); + let failed = false; - let stored_datasets = body.datasets.filter(dataset=>{ - if(verbose) console.debug(new Date(), "object:", dataset._id, dataset.status, dataset.status_msg); - if(dataset.status == "failed") failed = true; - return (dataset.status == "stored"); + const storedDatasets = res.data.datasets.filter((dataset) => { + if (verbose) console.error(`Object: ${dataset._id} ${dataset.status} ${dataset.status_msg}`); + if (dataset.status === 'failed') failed = true; + return dataset.status === 'stored'; }); - if(failed) return cb("failed to archive", null); - if(stored_datasets.length == datasetCount) { - //finished! - return cb(null, stored_datasets); - } else { - //if(verbose) console.log(stored_datasets.length+" of "+datasetCount+" datasets archived"); - //not all datasets archived yet.. wait - return setTimeout(()=>{ - exports.waitForArchivedDatasets(headers, datasetCount, task, verbose, cb); - }, 1000*5); + + if (failed) throw Error('Failed to archive the dataset.'); + if (storedDatasets.length == datasetCount) { + return { task, datasets: storedDatasets }; } - }); + + await exports.sleep(5); + } } -//wait for the task to finish -//2nd parameter for cb will be set to archive_task for output -exports.waitForFinish = function(headers, task, verbose, cb) { - var find = {_id: task._id}; - axios.get(config.api.amaretti + "/task?find=" + JSON.stringify({_id: task._id}), {headers}).then(res=>{ - if(res.status != 200) return cb(err); - - //task might not yet be comitted to the db.. and length could be 0? - if(res.data.tasks.length == 1) { - let task = res.data.tasks[0]; - if(verbose) console.debug(new Date, "task:", task._id, task.name, task.service, task.status, task.status_msg); - if (task.status == "finished") { - let datasetCount = 0; - if(task.config && task.config._outputs) { - task.config._outputs.forEach(output=>{ - if(output.archive) datasetCount++; - }); - } - - if(datasetCount == 0) return cb(null, null, []); //no output for this task. - if(verbose) console.log("waiting for output to be archived...") - if(task.name == "__dtv") { - //check for validation result - if(verbose) console.debug("loading product for __dtv", task._id); - let params = {ids: [task._id]}; - axios.get(config.api.amaretti+"/task/product", {headers, params}).then(res=>{ - if(res.data.length == 0) return cb("couldn't find validation result"); - let product = res.data[0].product; - if(verbose && product.warnings && product.warnings.length > 0) console.error("warnings", product.warnings); - if(product.errors && product.errors.length > 0) return cb(product.errors); - - //now wait for the output to be archived - exports.waitForArchivedDatasets(headers, datasetCount, task, verbose, (err, datasets)=>{ - return cb(err, task, datasets); - }); - - }).catch(err=>{ - return cb(err); - }); - } else { - //datatype without validator should immediately archive - exports.waitForArchivedDatasets(headers, datasetCount, task, verbose, (err, datasets)=>{ - return cb(err, task, datasets); - }); +exports.waitForFinish = async (headers, task, verbose) => { + console.error(); + + let status; + while (true) { + const res = await http.get( + `${config.api.amaretti}/task`, + { + headers, + params: { find: JSON.stringify({ _id: task._id }) } + } + ); + + if (res.data.tasks.length == 1) { + const task = res.data.tasks[0]; + if (verbose && status != task.status) { + console.error(`Task: ${task._id} ${task.name} ${task.service} ${task.status} ${task.status_msg}`); + status = task.status; + } + + if (task.status === 'finished') { + if (verbose) + console.error(); + + const datasetCount = task.config?._outputs?.reduce( + (acc, output) => output.archive ? acc + 1 : acc, 0 + ) ?? 0; + + if (datasetCount == 0) + return { task, datasets: [] }; + if (verbose) + console.error('Waiting for output to be archived...'); + + // If there is a validator, check for validation result + if (task.name == "__dtv") { + if (verbose) + console.error(`Loading product for __dtv: ${task._id}...`); + + const productRes = await http.get( + `${config.api.amaretti}/task/product`, + { + headers, + params: { ids: [task._id] } + } + ); + + if (productRes.data.length == 0) + throw Error('No validation result was found.'); + + const product = productRes.data[0].product; + if (verbose && product.warnings?.length > 0) { + console.error(`Warnings:`); + product.warnings.forEach( + warning => console.error(`- ${warning}`) + ); + console.error(); + } + if (product.errors?.length > 0) + throw Error(product.errors); } - return; + // Wait for datasets to be archived + const { datasets } = await exports.waitForArchivedDatasets( + headers, datasetCount, task, verbose + ); + + console.error(); + return { task, datasets }; + } else if (task.status == "failed") { - return cb(task.status_msg, null); + throw Error(task.status_msg); } } - setTimeout(function() { - exports.waitForFinish(headers, task, verbose, cb); - }, 3000); - }).catch(err=>{ - return cb(err, null); - }); + await exports.sleep(5); + } } /** @@ -789,3 +881,33 @@ exports.handleAxiosError = function(err) { } //console.error(err.config); } + +/** + * 😴 + * @param {Number} seconds sleeping + */ +exports.sleep = async (secs) => + new Promise(resolve => setTimeout(resolve, secs * 1000)); + + +/** + * Spawn http client, handling common errors and authenticating when requested + */ +const http = axios.create(); +http.authenticate = () => { + http.defaults.headers.common.Authorization = `Bearer ${exports.loadJwtSync()}`; +} +http.interceptors.response.use( + (response) => response, + (error) => { + if (error.response) { + if (error.response.status == 401) { + console.error(`You do not have permission to this project. Please check your credentials.`); + } + } else { + console.error(`There was an error communicating with Brainlife. Please try again later.`); + } + throw error; + } +); +exports.http = http;