Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OC-911: ARI ingest dry runs #694

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions api/serverless-config-default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,19 @@ functions:
method: GET
cors: true
# Integrations
incrementalAriIngest:
incrementalAriIngestHttp:
handler: src/components/integration/routes.incrementalAriIngest
timeout: 900
events:
- schedule:
rate: cron(0 5 ? * TUE *) # Every Tuesday at 5 a.m.
enabled: ${self:custom.scheduledAriIngestEnabled.${opt:stage}, false}
- http:
adwearing-jisc marked this conversation as resolved.
Show resolved Hide resolved
path: ${self:custom.versions.v1}/integrations/ari/incremental
method: POST
cors: true
# Commented out - for the time being ARI ingests will just be manually triggered.
# incrementalAriIngestScheduled:
# handler: src/components/integration/controller.incrementalAriIngest
# timeout: 900
# events:
# - schedule:
# rate: cron(0 5 ? * TUE *) # Every Tuesday at 5 a.m.
# enabled: ${self:custom.scheduledAriIngestEnabled.${opt:stage}, false}
11 changes: 9 additions & 2 deletions api/src/components/integration/__tests__/ari.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,16 @@ describe('ARI import processes', () => {
test('Incremental import endpoint requires API key', async () => {
const triggerImport = await testUtils.agent.post('/integrations/ari/incremental');

expect(triggerImport.status).toEqual(401);
expect(triggerImport.status).toEqual(400);
expect(triggerImport.body).toMatchObject({
message: "Please provide a valid 'apiKey'."
message: [
{
keyword: 'required',
params: {
missingProperty: 'apiKey'
}
}
]
});
});

Expand Down
19 changes: 18 additions & 1 deletion api/src/components/integration/ariUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ export const detectChangesToARIPublication = (
return somethingChanged ? changes : false;
};

export const handleIncomingARI = async (question: I.ARIQuestion): Promise<I.HandledARI> => {
export const handleIncomingARI = async (question: I.ARIQuestion, dryRun?: boolean): Promise<I.HandledARI> => {
// Validate question ID.
// Quite random criteria for now - value is typed as a number which
// stops us checking the type. May be revisited later.
Expand Down Expand Up @@ -237,6 +237,14 @@ export const handleIncomingARI = async (question: I.ARIQuestion): Promise<I.Hand

// If the ARI has not been ingested previously, a new research problem is created.
if (!existingPublication) {
if (dryRun) {
return {
...baseReturnObject,
actionTaken: 'create',
success: true
};
}

const newPublication = await publicationService.create(
{ ...mappedData, type: 'PROBLEM', conflictOfInterestStatus: false },
user,
Expand Down Expand Up @@ -287,6 +295,15 @@ export const handleIncomingARI = async (question: I.ARIQuestion): Promise<I.Hand

if (changes) {
console.log(`Changes found when handling ARI ${question.questionId}`, changes);

if (dryRun) {
return {
...baseReturnObject,
actionTaken: 'update',
success: true
};
}

// Data differs from what is in octopus, so update the publication.
// Unlike manually created publications, these just have 1 version that
// updates in-place so that we don't pollute datacite with lots of version DOIs.
Expand Down
35 changes: 24 additions & 11 deletions api/src/components/integration/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,42 @@ import * as response from 'lib/response';
export const incrementalAriIngest = async (
event: I.APIRequest | I.EventBridgeEvent<'Scheduled Event', string>
): Promise<I.JSONResponse> => {
// Check if a process is currently running.
const lastLog = await ingestLogService.getMostRecentLog('ARI', true);

if (lastLog && !lastLog.end) {
return response.json(202, {
message: 'Cancelling ingest. Either an import is already in progress or the last import failed.'
});
}
const triggeredByHttp = event && 'headers' in event;

// This can also be triggered on a schedule, in which case we don't need to check for an API key,
// so only check for the API key if the event is an API request.
if (event && 'headers' in event) {
if (triggeredByHttp) {
const apiKey = event.queryStringParameters?.apiKey;

if (apiKey !== process.env.TRIGGER_ARI_INGEST_API_KEY) {
return response.json(401, { message: "Please provide a valid 'apiKey'." });
}
}

// Check if a process is currently running.
const lastLog = await ingestLogService.getMostRecentLog('ARI', true);
const dryRun = triggeredByHttp ? !!event.queryStringParameters?.dryRun : false;
const dryRunMessages: string[] = [];

if (lastLog && !lastLog.end) {
if (dryRun) {
dryRunMessages.push(
'This run would have been cancelled because another run is currently in progress. However, the run has still been simulated.'
);
} else {
return response.json(202, {
message: 'Cancelling ingest. Either an import is already in progress or the last import failed.'
});
}
}

try {
const ingestResult = await integrationService.incrementalAriIngest();
const ingestResult = await integrationService.incrementalAriIngest(dryRun);

return response.json(200, ingestResult);
return response.json(
200,
dryRunMessages.length ? { messages: [...dryRunMessages, ingestResult] } : ingestResult
);
} catch (error) {
console.log(error);

Expand Down
8 changes: 7 additions & 1 deletion api/src/components/integration/routes.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import middy from '@middy/core';

import * as integrationController from 'integration/controller';
import * as integrationSchema from 'integration/schema';
import * as middleware from 'middleware';

export const incrementalAriIngest = integrationController.incrementalAriIngest;
export const incrementalAriIngest = middy(integrationController.incrementalAriIngest)
.use(middleware.doNotWaitForEmptyEventLoop({ runOnError: true, runOnBefore: true, runOnAfter: true }))
.use(middleware.validator(integrationSchema.incrementalAriIngestHttp, 'queryStringParameters'));
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import * as I from 'interface';

const incrementalAriIngestHttpSchema: I.Schema = {
type: 'object',
properties: {
apiKey: {
type: 'string'
},
dryRun: {
type: 'boolean'
}
},
additionalProperties: false,
required: ['apiKey']
};

export default incrementalAriIngestHttpSchema;
1 change: 1 addition & 0 deletions api/src/components/integration/schema/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { default as incrementalAriIngestHttp } from './incrementalAriIngestHttp';
43 changes: 33 additions & 10 deletions api/src/components/integration/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import * as ingestLogService from 'ingestLog/service';
* - It encounters an ARI with dateUpdated before the start time of the most
* recent successful ingest (if this start time is available).
*/
export const incrementalAriIngest = async (): Promise<string> => {
export const incrementalAriIngest = async (dryRun: boolean): Promise<string> => {
const start = new Date();
const MAX_UNCHANGED_STREAK = 5;
// Get most start time of last successful run to help us know when to stop.
Expand All @@ -26,7 +26,12 @@ export const incrementalAriIngest = async (): Promise<string> => {
const mostRecentStart = mostRecentLog?.start;

// Log start time.
const log = await ingestLogService.create('ARI');
let logId: string | null = null;

if (!dryRun) {
const log = await ingestLogService.create('ARI');
logId = log.id;
}

// Count sequential unchanged ARIs so that we can stop when the streak hits MAX_UNCHANGED_STREAK.
let unchangedStreak = 0;
Expand Down Expand Up @@ -57,7 +62,7 @@ export const incrementalAriIngest = async (): Promise<string> => {

if (!pageAri.isArchived) {
// Create, update, or skip this ARI as appropriate.
const handle = await ariUtils.handleIncomingARI(pageAri);
const handle = await ariUtils.handleIncomingARI(pageAri, dryRun);
checkedCount++;

if (handle.unrecognisedDepartment) {
Expand All @@ -81,21 +86,32 @@ export const incrementalAriIngest = async (): Promise<string> => {
} else {
// This was not a skip so reset unchangedStreak counter.
unchangedStreak = 0;
// Log action taken.
console.log(`ARI ${pageAri.questionId} handled successfully with action: ${handle.actionTaken}`);

if (!dryRun) {
// Log action taken.
console.log(
`ARI ${pageAri.questionId} handled successfully with action: ${handle.actionTaken}`
);
}

// Artificial delay to avoid hitting datacite rate limits with publication creates/updates.
// https://support.datacite.org/docs/is-there-a-rate-limit-for-making-requests-against-the-datacite-apis
if (handle.actionTaken === 'create') {
createdCount++;

// Datacite is hit twice, to initialise DOI and get publication ID, then update DOI with data.
await new Promise((resolve) => setTimeout(resolve, 1000));
if (!dryRun) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}

if (handle.actionTaken === 'update') {
updatedCount++;

// Datacite is hit once, to update the DOI with changes.
await new Promise((resolve) => setTimeout(resolve, 500));
if (!dryRun) {
await new Promise((resolve) => setTimeout(resolve, 500));
}
}
}
}
Expand All @@ -110,17 +126,24 @@ export const incrementalAriIngest = async (): Promise<string> => {
const end = new Date();
// Get duration in seconds to the nearest 1st decimal place.
const durationSeconds = Math.round((end.getTime() - start.getTime()) / 100) / 10;
await ingestLogService.setEndTime(log.id, end);

if (!dryRun && logId) {
await ingestLogService.setEndTime(logId, end);
}

await email.incrementalAriIngestReport({
checkedCount,
durationSeconds,
createdCount,
updatedCount,
unrecognisedDepartments: Array.from(unrecognisedDepartments).sort(),
unrecognisedTopics: Array.from(unrecognisedTopics).sort()
unrecognisedTopics: Array.from(unrecognisedTopics).sort(),
dryRun
});

const writeCount = createdCount + updatedCount;

return `Update complete. Updated ${writeCount} publication${writeCount !== 1 ? 's' : ''}.`;
const preamble = dryRun ? 'Dry run complete. Would have updated' : 'Update complete. Updated';

return `${preamble} ${writeCount} publication${writeCount !== 1 ? 's' : ''}.`;
};
47 changes: 29 additions & 18 deletions api/src/lib/email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -928,17 +928,30 @@ export const incrementalAriIngestReport = async (options: {
updatedCount: number;
unrecognisedDepartments: string[];
unrecognisedTopics: string[];
dryRun: boolean;
}): Promise<void> => {
const { createdCount, dryRun, updatedCount } = options;
const cleanDepartments = options.unrecognisedDepartments.map((department) => Helpers.getSafeHTML(department));
const cleanTopics = options.unrecognisedTopics.map((topic) => Helpers.getSafeHTML(topic));
const intro = `Incremental ARI import ${dryRun ? 'dry ' : ''}run completed.`;
const timingInfo =
`Duration: ${options.durationSeconds} seconds.` +
(dryRun && (createdCount || updatedCount)
? ` A real run would have taken a minimum of ${
createdCount + updatedCount / 2
} additional seconds due to datacite API rate limits while creating/updating publications.`
: '');
const detailsPrefix = `The ${dryRun ? 'simulated ' : ''}results of this run are as follows.`;
const html = `
<html>
<body>
<p>Incremental ARI import run completed in ${options.durationSeconds} seconds.</p>
<p>${intro}</p>
<p>${timingInfo}</p>
<p>${detailsPrefix}</p>
<ul>
<li>ARIs checked: ${options.checkedCount}</li><li>Publications created: ${
options.createdCount
}</li><li>Publications updated: ${options.updatedCount}</li>
<li>ARIs checked: ${options.checkedCount}</li>
<li>Publications created: ${createdCount}</li>
<li>Publications updated: ${updatedCount}</li>
${
cleanDepartments.length
? '<li>Unrecognised departments: <ul><li>' +
Expand All @@ -956,20 +969,18 @@ export const incrementalAriIngestReport = async (options: {
</html>
`;
const text = `
Incremental ARI ingest run completed in ${options.durationSeconds} seconds.
ARIs checked: ${options.checkedCount}.
Publications created: ${options.createdCount}.
Publications updated: ${options.updatedCount}.
${
options.unrecognisedDepartments.length
? 'Unrecognised departments: "' + options.unrecognisedDepartments.join('", "') + '".'
: ''
}
${
options.unrecognisedTopics.length
? 'Unrecognised topics: "' + options.unrecognisedTopics.join('", "') + '".'
: ''
}`;
${intro}
adwearing-jisc marked this conversation as resolved.
Show resolved Hide resolved
${timingInfo}
${detailsPrefix}
ARIs checked: ${options.checkedCount}.
Publications created: ${options.createdCount}.
Publications updated: ${options.updatedCount}.
${
options.unrecognisedDepartments.length
? 'Unrecognised departments: "' + options.unrecognisedDepartments.join('", "') + '".'
: ''
}
${options.unrecognisedTopics.length ? 'Unrecognised topics: "' + options.unrecognisedTopics.join('", "') + '".' : ''}`;
await send({
html,
text,
Expand Down