diff --git a/src/lib/queue.js b/src/lib/queue.js new file mode 100644 index 0000000000..294f78b822 --- /dev/null +++ b/src/lib/queue.js @@ -0,0 +1,38 @@ +import Queue from 'bull'; + +const generateRedisConfig = () => { + if (process.env.VCAP_SERVICES) { + const { + 'aws-elasticache-redis': [{ + credentials: { + host, + port, + password, + }, + }], + } = JSON.parse(process.env.VCAP_SERVICES); + return { + host, + port, + // TLS needs to be set to an empty object for redis on cloud.gov + // eslint-disable-next-line no-empty-pattern + redisOpts: { redis: { password, tls: {} } }, + }; + } + const { REDIS_HOST: host, REDIS_PASS: password } = process.env; + return { + host, + port: (process.env.REDIS_PORT || 6379), + redisOpts: { redis: { password } }, + }; +}; + +const { + host, + port, + redisOpts, +} = generateRedisConfig(); + +export default function newQueue(queName) { + return new Queue(queName, `redis://${host}:${port}`, redisOpts); +} diff --git a/src/services/legacyreports.js b/src/services/legacyreports.js index 3c813f89cf..12c8a21eb7 100644 --- a/src/services/legacyreports.js +++ b/src/services/legacyreports.js @@ -1,7 +1,8 @@ import { Op } from 'sequelize'; import { userByEmail } from './users'; import { ActivityReport, ActivityReportCollaborator } from '../models'; -import { logger } from '../logger'; +import { auditLogger, logger } from '../logger'; +import newQueue from '../lib/queue'; /* * Returns all legacy reports that either: @@ -121,29 +122,53 @@ export const reconcileCollaborators = async (report) => { }; export default async function reconcileLegacyReports() { + logger.info('Starting legacy report reconciliation'); // Get all reports that might need reconciliation const reports = await getLegacyReports(); + logger.info(`found ${reports.length} reports that may need reconciliation`); // Array to help promises from reports that are getting reconciled - const updates = []; - try { - reports.forEach((report) => { - // if there is no author, try to reconcile the author - if (!report.userId) { - updates.push(reconcileAuthors(report)); - } - // if there is no approving manager, try to reconcile the approving manager - if (!report.approvingManagerId) { - updates.push(reconcileApprovingManagers(report)); - } - // if the report has collaborators, check if collaborators need reconcilliation. - if (report.imported.otherSpecialists !== '') { - updates.push(reconcileCollaborators(report)); - } - }); - // let all promises resolve - await Promise.all(updates); - } catch (err) { - logger.error(err); - throw err; + if (reports) { + const updates = []; + try { + reports.forEach((report) => { + // if there is no author, try to reconcile the author + if (!report.userId) { + updates.push(reconcileAuthors(report)); + } + // if there is no approving manager, try to reconcile the approving manager + if (!report.approvingManagerId) { + updates.push(reconcileApprovingManagers(report)); + } + // if the report has collaborators, check if collaborators need reconcilliation. + if (report.imported.otherSpecialists !== '') { + updates.push(reconcileCollaborators(report)); + } + }); + // let all promises resolve + await Promise.all(updates); + } catch (err) { + logger.error(err); + throw err; + } } + return 'done'; } + +export const reconciliationQueue = newQueue('reconcile'); + +// Checks if this job is already queued and adds it if it isn't +const populateReconciliationQueue = async () => { + try { + const depth = await reconciliationQueue.count(); + if (depth < 1) { + // To test, uncomment the following line and comment out the one below. + // that will make it run every minute instead of every day at 2 am. + // await reconciliationQueue.add('legacyReports', {}, { repeat: { cron: '* * * * *' } }); + await reconciliationQueue.add('legacyReports', {}, { repeat: { cron: '0 2 * * *' } }); + } + } catch (e) { + auditLogger.error(e); + } +}; + +populateReconciliationQueue(); diff --git a/src/services/scanQueue.js b/src/services/scanQueue.js index 57464c4c7a..cddf038fc2 100644 --- a/src/services/scanQueue.js +++ b/src/services/scanQueue.js @@ -1,40 +1,6 @@ -import Queue from 'bull'; - -const generateRedisConfig = () => { - if (process.env.VCAP_SERVICES) { - const { - 'aws-elasticache-redis': [{ - credentials: { - host, - port, - password, - }, - }], - } = JSON.parse(process.env.VCAP_SERVICES); - return { - host, - port, - // TLS needs to be set to an empty object for redis on cloud.gov - // eslint-disable-next-line no-empty-pattern - redisOpts: { redis: { password, tls: {} } }, - }; - } - const { REDIS_HOST: host, REDIS_PASS: password } = process.env; - return { - host, - port: (process.env.REDIS_PORT || 6379), - redisOpts: { redis: { password } }, - }; -}; - -const { - host, - port, - redisOpts, -} = generateRedisConfig(); - -const scanQueue = new Queue('scan', `redis://${host}:${port}`, redisOpts); +import newQueue from '../lib/queue'; +const scanQueue = newQueue('scan'); const addToScanQueue = (fileKey) => { const retries = process.env.FILE_SCAN_RETRIES || 5; const delay = process.env.FILE_SCAN_BACKOFF_DELAY || 10000; diff --git a/src/worker.js b/src/worker.js index 2c7e42658b..d23e025861 100644 --- a/src/worker.js +++ b/src/worker.js @@ -4,6 +4,7 @@ require('newrelic'); import {} from 'dotenv/config'; import throng from 'throng'; import { logger, auditLogger } from './logger'; +import reconcileLegacyReports, { reconciliationQueue } from './services/legacyreports'; import { scanQueue } from './services/scanQueue'; import processFile from './workers/files'; @@ -23,6 +24,15 @@ function start() { } }); scanQueue.process(maxJobsPerWorker, (job) => processFile(job.data.key)); + + reconciliationQueue.on('failed', (job, error) => auditLogger + .error(`${job.data.key}: Legacy report reconciliation failed with error ${error}`)); + reconciliationQueue.on('completed', () => logger + .info('Legacy report reconciliation completed successfully')); + reconciliationQueue.process('legacyReports', async (job) => { + logger.info(`starting ${job}`); + await reconcileLegacyReports(); + }); } // spawn workers and start them