Skip to content

Commit

Permalink
Merge pull request #258 from adhocteam/cm-60-add-nightly-reconcilliation
Browse files Browse the repository at this point in the history
Cm 60 add nightly reconcilliation
  • Loading branch information
dcmcand authored Mar 30, 2021
2 parents 8d514e2 + 8b99f94 commit 53922f8
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 58 deletions.
38 changes: 38 additions & 0 deletions src/lib/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import Queue from 'bull';

const generateRedisConfig = () => {
if (process.env.VCAP_SERVICES) {
const {
'aws-elasticache-redis': [{
credentials: {
host,
port,
password,
},
}],
} = JSON.parse(process.env.VCAP_SERVICES);
return {
host,
port,
// TLS needs to be set to an empty object for redis on cloud.gov
// eslint-disable-next-line no-empty-pattern
redisOpts: { redis: { password, tls: {} } },
};
}
const { REDIS_HOST: host, REDIS_PASS: password } = process.env;
return {
host,
port: (process.env.REDIS_PORT || 6379),
redisOpts: { redis: { password } },
};
};

const {
host,
port,
redisOpts,
} = generateRedisConfig();

export default function newQueue(queName) {
return new Queue(queName, `redis://${host}:${port}`, redisOpts);
}
69 changes: 47 additions & 22 deletions src/services/legacyreports.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Op } from 'sequelize';
import { userByEmail } from './users';
import { ActivityReport, ActivityReportCollaborator } from '../models';
import { logger } from '../logger';
import { auditLogger, logger } from '../logger';
import newQueue from '../lib/queue';

/*
* Returns all legacy reports that either:
Expand Down Expand Up @@ -121,29 +122,53 @@ export const reconcileCollaborators = async (report) => {
};

export default async function reconcileLegacyReports() {
logger.info('Starting legacy report reconciliation');
// Get all reports that might need reconciliation
const reports = await getLegacyReports();
logger.info(`found ${reports.length} reports that may need reconciliation`);
// Array to help promises from reports that are getting reconciled
const updates = [];
try {
reports.forEach((report) => {
// if there is no author, try to reconcile the author
if (!report.userId) {
updates.push(reconcileAuthors(report));
}
// if there is no approving manager, try to reconcile the approving manager
if (!report.approvingManagerId) {
updates.push(reconcileApprovingManagers(report));
}
// if the report has collaborators, check if collaborators need reconcilliation.
if (report.imported.otherSpecialists !== '') {
updates.push(reconcileCollaborators(report));
}
});
// let all promises resolve
await Promise.all(updates);
} catch (err) {
logger.error(err);
throw err;
if (reports) {
const updates = [];
try {
reports.forEach((report) => {
// if there is no author, try to reconcile the author
if (!report.userId) {
updates.push(reconcileAuthors(report));
}
// if there is no approving manager, try to reconcile the approving manager
if (!report.approvingManagerId) {
updates.push(reconcileApprovingManagers(report));
}
// if the report has collaborators, check if collaborators need reconcilliation.
if (report.imported.otherSpecialists !== '') {
updates.push(reconcileCollaborators(report));
}
});
// let all promises resolve
await Promise.all(updates);
} catch (err) {
logger.error(err);
throw err;
}
}
return 'done';
}

export const reconciliationQueue = newQueue('reconcile');

// Checks if this job is already queued and adds it if it isn't
const populateReconciliationQueue = async () => {
try {
const depth = await reconciliationQueue.count();
if (depth < 1) {
// To test, uncomment the following line and comment out the one below.
// that will make it run every minute instead of every day at 2 am.
// await reconciliationQueue.add('legacyReports', {}, { repeat: { cron: '* * * * *' } });
await reconciliationQueue.add('legacyReports', {}, { repeat: { cron: '0 2 * * *' } });
}
} catch (e) {
auditLogger.error(e);
}
};

populateReconciliationQueue();
38 changes: 2 additions & 36 deletions src/services/scanQueue.js
Original file line number Diff line number Diff line change
@@ -1,40 +1,6 @@
import Queue from 'bull';

const generateRedisConfig = () => {
if (process.env.VCAP_SERVICES) {
const {
'aws-elasticache-redis': [{
credentials: {
host,
port,
password,
},
}],
} = JSON.parse(process.env.VCAP_SERVICES);
return {
host,
port,
// TLS needs to be set to an empty object for redis on cloud.gov
// eslint-disable-next-line no-empty-pattern
redisOpts: { redis: { password, tls: {} } },
};
}
const { REDIS_HOST: host, REDIS_PASS: password } = process.env;
return {
host,
port: (process.env.REDIS_PORT || 6379),
redisOpts: { redis: { password } },
};
};

const {
host,
port,
redisOpts,
} = generateRedisConfig();

const scanQueue = new Queue('scan', `redis://${host}:${port}`, redisOpts);
import newQueue from '../lib/queue';

const scanQueue = newQueue('scan');
const addToScanQueue = (fileKey) => {
const retries = process.env.FILE_SCAN_RETRIES || 5;
const delay = process.env.FILE_SCAN_BACKOFF_DELAY || 10000;
Expand Down
10 changes: 10 additions & 0 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require('newrelic');
import {} from 'dotenv/config';
import throng from 'throng';
import { logger, auditLogger } from './logger';
import reconcileLegacyReports, { reconciliationQueue } from './services/legacyreports';
import { scanQueue } from './services/scanQueue';
import processFile from './workers/files';

Expand All @@ -23,6 +24,15 @@ function start() {
}
});
scanQueue.process(maxJobsPerWorker, (job) => processFile(job.data.key));

reconciliationQueue.on('failed', (job, error) => auditLogger
.error(`${job.data.key}: Legacy report reconciliation failed with error ${error}`));
reconciliationQueue.on('completed', () => logger
.info('Legacy report reconciliation completed successfully'));
reconciliationQueue.process('legacyReports', async (job) => {
logger.info(`starting ${job}`);
await reconcileLegacyReports();
});
}

// spawn workers and start them
Expand Down

0 comments on commit 53922f8

Please sign in to comment.