diff --git a/src/controllers/async/asyncquery.js b/src/controllers/async/asyncquery.js index 850ca344..f9507481 100644 --- a/src/controllers/async/asyncquery.js +++ b/src/controllers/async/asyncquery.js @@ -133,9 +133,9 @@ exports.getQueryResponse = async (jobID, logLevel = null) => { ); const response = Object.fromEntries(values); if (response.logs && logLevel) { - utils.filterForLogLevel(response, logLevel); + response.logs = utils.filterForLogLevel(response.logs, logLevel); } else if (response.logs && originalLogLevel) { - utils.filterForLogLevel(response, originalLogLevel); + response.logs = utils.filterForLogLevel(response.logs, originalLogLevel); } return response ? response : undefined; }); diff --git a/src/routes/v1/asyncquery_status.js b/src/routes/v1/asyncquery_status.js index ee4de9ce..ff23671d 100644 --- a/src/routes/v1/asyncquery_status.js +++ b/src/routes/v1/asyncquery_status.js @@ -31,76 +31,81 @@ class VCheckQueryStatus { //logger.info("query /query endpoint") try { debug(`checking query status of job ${req.params.id}`); - let by = req.data.options.by; - let job_id = req.params.id; + let jobID = req.params.id; let queryQueue; - if (redisClient.clientEnabled) { - if (job_id.startsWith("BT_")) { - queryQueue = getQueryQueue("bte_query_queue_by_team"); - } else if (job_id.startsWith("BA_")) { - queryQueue = getQueryQueue("bte_query_queue_by_api"); - } else { - queryQueue = getQueryQueue("bte_query_queue"); - } + if (!redisClient.clientEnabled) { + taskResponse({ error: "Redis service is unavailable" }, 503); } - if (queryQueue) { - let job = await queryQueue.getJobFromId(job_id); - if (job === null) { - return taskResponse(null, 404); - } - await queryQueue.isReady(); - const state = await job.getState(); - let logs = await queryQueue.getJobLogs(job_id); - logs = logs.logs.map(log => JSON.parse(log)); - let [status, description] = { - // convert to TRAPI states - completed: ["Completed", "The query has finished executing."], - failed: ["Failed", job.failedReason], - delayed: ["Queued", "The query is queued, but has been delayed."], - active: ["Running", "The query is currently being processed."], - waiting: ["Queued", "The query is waiting in the queue."], - paused: ["Queued", "The query is queued, but the queue is temporarily paused."], - stuck: ["Failed", "The query is stuck (if you see this, raise an issue)."], - null: ["Failed", "The query status is unknown, presumed failed (if you see this, raise an issue)."], - }[state]; - let progress = job._progress; - if (status === "Failed" && !req.endpoint.includes("asyncquery_response")) { - if (description.includes("Promise timed out")) { - // something might break when calculating process.env.JOB_TIMEOUT so wrap it in try catch - try { - return taskResponse({ - job_id, - status, - description: `Job was stopped after exceeding time limit of ${ - parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()) / 1000 - }s`, - logs, - }); - } catch (e) { - return taskResponse({ job_id, status, description, logs }); - } - } - return taskResponse({ job_id, status, description, logs }); - } + if (jobID.startsWith("BT_")) { + queryQueue = getQueryQueue("bte_query_queue_by_team"); + } else if (jobID.startsWith("BA_")) { + queryQueue = getQueryQueue("bte_query_queue_by_api"); + } else { + queryQueue = getQueryQueue("bte_query_queue"); + } + + let job = await queryQueue.getJobFromId(jobID); + if (job === null) { + return taskResponse(null, 404); + } - // If done, just give response if using the response_url - if ((state === "completed" || state === "failed") && req.endpoint.includes("asyncquery_response")) { - let returnValue; - const storedResponse = await getQueryResponse(job_id, req.data.options.logLevel); + await queryQueue.isReady(); - if (!storedResponse.logs && logs) { - storedResponse.logs = logs; + const state = await job.getState(); + let progress = job._progress; + + let logs = await queryQueue.getJobLogs(jobID); + logs = logs.logs.map(log => JSON.parse(log)); + const originalLogLevel = JSON.parse(await redisClient.client.getTimeout(`asyncQueryResult:logLevel:${jobID}`)); + logs = utils.filterForLogLevel(logs, req.data.options.log_level ?? originalLogLevel); + + // convert to TRAPI states + let [status, description] = { + completed: ["Completed", "The query has finished executing."], + failed: ["Failed", job.failedReason], + delayed: ["Queued", "The query is queued, but has been delayed."], + active: ["Running", "The query is currently being processed."], + waiting: ["Queued", "The query is waiting in the queue."], + paused: ["Queued", "The query is queued, but the queue is temporarily paused."], + stuck: ["Failed", "The query is stuck (if you see this, raise an issue)."], + null: ["Failed", "The query status is unknown, presumed failed (if you see this, raise an issue)."], + }[state]; + + if (status === "Failed" && !req.endpoint.includes("asyncquery_response")) { + if (description.includes("Promise timed out")) { + // something might break when calculating process.env.JOB_TIMEOUT so wrap it in try catch + try { + return taskResponse({ + job_id: jobID, + status, + description: `Job was stopped after exceeding time limit of ${ + parseInt(process.env.JOB_TIMEOUT ?? (1000 * 60 * 5).toString()) / 1000 + }s`, + logs, + }); + } catch (e) { + return taskResponse({ job_id: jobID, status, description, logs }); } + } + return taskResponse({ job_id: jobID, status, description, logs }); + } - returnValue = storedResponse ? storedResponse : { error: "Response expired. Responses are kept 30 days." }; - return taskResponse(returnValue, returnValue.statusCode || 200); + // If done, just give response if using asyncquery_response + if ((state === "completed" || state === "failed") && req.endpoint.includes("asyncquery_response")) { + let returnValue; + const storedResponse = await getQueryResponse(jobID, req.data.options.log_level); + + if (storedResponse && !storedResponse.logs && logs) { + storedResponse.logs = logs; } - taskResponse({ job_id, status, progress, description, response_url: job.data.url, logs }, 200); - } else { - taskResponse({ error: "Redis service is unavailable" }, 503); + returnValue = storedResponse ? storedResponse : { error: "Response expired. Responses are kept 30 days." }; + return taskResponse(returnValue, returnValue.statusCode || 200); } + + // Otherwise respond for asyncquery_status + taskResponse({ job_id: jobID, status, progress, description, response_url: job.data.url, logs }, 200); } catch (error) { taskError(error); } diff --git a/src/routes/v1/query_v1.js b/src/routes/v1/query_v1.js index 912f1eaa..26ababe3 100644 --- a/src/routes/v1/query_v1.js +++ b/src/routes/v1/query_v1.js @@ -43,7 +43,7 @@ class V1RouteQuery { await handler.query(); const response = handler.getResponse(); - utils.filterForLogLevel(response, options.logLevel); + response.logs = utils.filterForLogLevel(response.logs, options.logLevel); return taskResponse(response); } catch (error) { return taskError(error); diff --git a/src/routes/v1/query_v1_by_api.js b/src/routes/v1/query_v1_by_api.js index 5e046430..a03eb908 100644 --- a/src/routes/v1/query_v1_by_api.js +++ b/src/routes/v1/query_v1_by_api.js @@ -51,7 +51,7 @@ class RouteQueryV1ByAPI { handler.setQueryGraph(queryGraph); await handler.query(); const response = handler.getResponse(); - utils.filterForLogLevel(response, options.logLevel); + response.logs = utils.filterForLogLevel(response.logs, options.logLevel); return taskResponse(response); } catch (error) { return taskError(error); diff --git a/src/routes/v1/query_v1_by_team.js b/src/routes/v1/query_v1_by_team.js index 43e12089..ab2baa1f 100644 --- a/src/routes/v1/query_v1_by_team.js +++ b/src/routes/v1/query_v1_by_team.js @@ -50,7 +50,7 @@ class RouteQueryV1ByTeam { handler.setQueryGraph(queryGraph); await handler.query(); const response = handler.getResponse(); - utils.filterForLogLevel(response, options.logLevel); + response.logs = utils.filterForLogLevel(response.logs, options.logLevel); return taskResponse(response); } catch (error) { return taskError(error); diff --git a/src/utils/common.js b/src/utils/common.js index 34e20152..70ecde3f 100644 --- a/src/utils/common.js +++ b/src/utils/common.js @@ -43,7 +43,7 @@ exports.stringIsAValidUrl = s => { } }; -exports.filterForLogLevel = (response, logLevel) => { +exports.filterForLogLevel = (logs, logLevel) => { const logLevels = { ERROR: 3, WARNING: 2, @@ -51,10 +51,11 @@ exports.filterForLogLevel = (response, logLevel) => { DEBUG: 0, }; if (logLevel && Object.keys(logLevels).includes(logLevel)) { - response.logs = response.logs.filter(log => { + logs = logs.filter(log => { return logLevels[log.level] >= logLevels[logLevel]; }); } + return logs; }; exports.methodNotAllowed = (req, res, next) => res.status(405).send();