Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

perf: partial failures for restHook Lambda #579

Merged
merged 2 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloudformation/subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Resources:
KmsMasterKeyId: !Ref SubscriptionsKey
RedrivePolicy:
deadLetterTargetArn: !GetAtt RestHookDLQ.Arn
maxReceiveCount: 3
maxReceiveCount: 2

RestHookDLQ:
Type: AWS::SQS::Queue
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"fhir-works-on-aws-routing": "6.3.0",
"fhir-works-on-aws-search-es": "3.9.2",
"lodash": "^4.17.21",
"p-settle": "^4.1.1",
"serverless-http": "^2.7.0",
"tslib": "^2.3.1",
"yargs": "^16.2.0"
Expand All @@ -69,7 +70,7 @@
"jsonwebtoken": "^8.5.1",
"prettier": "^2.4.1",
"qs": "^6.10.1",
"serverless": "2.64.1",
"serverless": "2.68.0",
"serverless-bundle": "^4.4.0",
"serverless-offline": "^8.2.0",
"serverless-step-functions": "^3.1.1",
Expand Down
7 changes: 6 additions & 1 deletion serverless.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ functions:
startingPosition: LATEST

subscriptionsMatcher:
timeout: 60
timeout: 20
memorySize: !If [isDev, 512, 1024]
reservedConcurrency: !If [isDev, 10, 200]
runtime: nodejs14.x
description: 'Match ddb events against active Subscriptions and emit notifications'
role: SubscriptionsMatcherLambdaRole
Expand Down Expand Up @@ -230,6 +232,9 @@ functions:
- sqs:
arn:
!GetAtt RestHookQueue.Arn
batchSize: 50
maximumBatchingWindow: 10
functionResponseType: ReportBatchItemFailures

stepFunctions:
stateMachines:
Expand Down
1 change: 0 additions & 1 deletion src/subscriptions/restHookLambda/allowListUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ export async function getAllowListInfo({
enableMultitenancy: boolean;
}): Promise<{ [key: string]: AllowListInfo }> {
const originalAllowList = await getAllowListedSubscriptionEndpoints();
logger.debug(originalAllowList);
if (!enableMultitenancy) {
return { [SINGLE_TENANT_ALLOW_LIST_KEY]: extractAllowListInfo(originalAllowList) };
}
Expand Down
49 changes: 39 additions & 10 deletions src/subscriptions/restHookLambda/restHook.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ describe('Single tenant: Rest hook notification', () => {
test('Empty POST notification is sent when channelPayload is null', async () => {
await expect(
restHookHandler.sendRestHookNotification(getEvent({ channelPayload: null as any }), allowListPromise),
).resolves.toEqual([{ message: 'POST Successful' }]);
).resolves.toMatchInlineSnapshot(`
Object {
"batchItemFailures": Array [],
}
`);
expect(axios.post).toHaveBeenCalledWith('https://fake-end-point-1', null, {
headers: { 'header-name-1': ' header-value-1', testKey: 'testValue' },
});
Expand All @@ -85,7 +89,11 @@ describe('Single tenant: Rest hook notification', () => {
getEvent({ endpoint: 'https://fake-end-point-2-something' }),
allowListPromise,
),
).resolves.toEqual([{ message: 'PUT Successful' }]);
).resolves.toMatchInlineSnapshot(`
Object {
"batchItemFailures": Array [],
}
`);
expect(axios.put).toHaveBeenCalledWith('https://fake-end-point-2-something/Patient/1234567', null, {
headers: { 'header-name-2': ' header-value-2', testKey: 'testValue' },
});
Expand All @@ -100,7 +108,11 @@ describe('Single tenant: Rest hook notification', () => {
}),
allowListPromise,
),
).resolves.toEqual([{ message: 'PUT Successful' }]);
).resolves.toMatchInlineSnapshot(`
Object {
"batchItemFailures": Array [],
}
`);
expect(axios.put).toHaveBeenCalledWith('https://fake-end-point-2-something/Patient/1234567', null, {
headers: { 'header-name-2': ' header-value-2-something' },
});
Expand All @@ -112,7 +124,11 @@ describe('Single tenant: Rest hook notification', () => {
getEvent({ endpoint: 'https://fake-end-point-2-something', channelHeader: ['testKey'] }),
allowListPromise,
),
).resolves.toEqual([{ message: 'PUT Successful' }]);
).resolves.toMatchInlineSnapshot(`
Object {
"batchItemFailures": Array [],
}
`);
expect(axios.put).toHaveBeenCalledWith('https://fake-end-point-2-something/Patient/1234567', null, {
headers: { 'header-name-2': ' header-value-2', testKey: '' },
});
Expand All @@ -124,14 +140,27 @@ describe('Single tenant: Rest hook notification', () => {
getEvent({ endpoint: 'https://fake-end-point-3' }),
allowListPromise,
),
).rejects.toThrow(new Error('Endpoint https://fake-end-point-3 is not allow listed.'));
).resolves.toMatchInlineSnapshot(`
Object {
"batchItemFailures": Array [
Object {
"itemIdentifier": "fake-message-id",
},
],
}
`);
});

test('Error thrown when tenantID is passed in', async () => {
await expect(
restHookHandler.sendRestHookNotification(getEvent({ tenantId: 'tenant1' }), allowListPromise),
).rejects.toThrow(
new Error('This instance has multi-tenancy disabled, but the incoming request has a tenantId'),
);
await expect(restHookHandler.sendRestHookNotification(getEvent({ tenantId: 'tenant1' }), allowListPromise))
.resolves.toMatchInlineSnapshot(`
Object {
"batchItemFailures": Array [
Object {
"itemIdentifier": "fake-message-id",
},
],
}
`);
});
});
41 changes: 34 additions & 7 deletions src/subscriptions/restHookLambda/restHook.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
import axios from 'axios';
import { makeLogger } from 'fhir-works-on-aws-interface';
import { SQSEvent } from 'aws-lambda';
import { SQSEvent, SQSBatchResponse } from 'aws-lambda';
import { SubscriptionNotification } from 'fhir-works-on-aws-search-es';
import { metricScope, Unit } from 'aws-embedded-metrics';
import https from 'https';
import pSettle from 'p-settle';
import ensureAsyncInit from '../../index';
import { AllowListInfo, getAllowListHeaders } from './allowListUtil';

const logger = makeLogger({ component: 'subscriptions' });

const httpsAgent = new https.Agent({
maxSockets: 100,
keepAlive: true,
});

const REQUEST_TIMEOUT = 5_000;
const MAX_NOTIFICATION_REQUESTS_CONCURRENCY = 10;

axios.defaults.httpsAgent = httpsAgent;
axios.defaults.timeout = REQUEST_TIMEOUT;

/**
* Merge headers from allow list and subscription resource
* If same header name is present from both sources, header value in subscription resource will be used
Expand Down Expand Up @@ -53,7 +66,7 @@ export default class RestHookHandler {
async sendRestHookNotification(
event: SQSEvent,
allowListPromise: Promise<{ [key: string]: AllowListInfo }>,
): Promise<any> {
): Promise<SQSBatchResponse> {
await ensureAsyncInit(allowListPromise);
const allowList = await allowListPromise;
const messages = event.Records.map((record: any): SubscriptionNotification => {
Expand All @@ -62,8 +75,9 @@ export default class RestHookHandler {
});
// Latency is reported before HTTP call since the external endpoint latency is out of our control.
await logLatencyMetric(messages);
const notificationPromises = messages.map((message: SubscriptionNotification) => {
const notificationPromiseFns = messages.map((message: SubscriptionNotification) => () => {
const { endpoint, channelHeader, channelPayload, matchedResource, tenantId } = message;

const allowListHeaders = getAllowListHeaders(allowList, endpoint, {
enableMultitenancy: this.enableMultitenancy,
tenantId,
Expand All @@ -76,9 +90,22 @@ export default class RestHookHandler {
}
return axios.post(endpoint, null, { headers });
});
const responses = (await Promise.all(notificationPromises)).map((response: any) => response.data);
logger.info('Subscription notifications sent.');
logger.debug(responses);
return responses;

const results = await pSettle(notificationPromiseFns, { concurrency: MAX_NOTIFICATION_REQUESTS_CONCURRENCY });

const failures = results.flatMap((settledPromise, i) => {
if (settledPromise.isRejected) {
logger.error(settledPromise.reason);
return [{ itemIdentifier: event.Records[i].messageId }];
}
return [];
});

logger.info(`Notifications sent: ${results.length - failures.length}`);
logger.info(`Failed notifications: ${failures.length}`);

return {
batchItemFailures: failures,
};
}
}
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2019", // Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019' or 'ESNEXT'.
"target": "es2020", // Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019' or 'ESNEXT'.
"moduleResolution": "node",
"strict": true, // // Enable all strict type-checking options.
"esModuleInterop": true, // Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'.
Expand Down
Loading