diff --git a/packages/@aws-cdk/aws-lambda-event-sources/README.md b/packages/@aws-cdk/aws-lambda-event-sources/README.md index eb4f206c8f3c9..9c97da615eccc 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/README.md +++ b/packages/@aws-cdk/aws-lambda-event-sources/README.md @@ -53,6 +53,8 @@ behavior: * __receiveMessageWaitTime__: Will determine [long poll](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html) duration. The default value is 20 seconds. +* __batchSize__: Determines how many records are buffered before invoking your lambda function. +* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing. * __enabled__: If the SQS event source mapping should be enabled. The default is true. ```ts @@ -67,6 +69,7 @@ const queue = new sqs.Queue(this, 'MyQueue', { lambda.addEventSource(new SqsEventSource(queue, { batchSize: 10, // default + maxBatchingWindow: Duration.minutes(5), })); ``` @@ -212,7 +215,7 @@ myFunction.addEventSource(new KinesisEventSource(stream, { You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster. The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the -MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html). +MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html). ```ts import * as lambda from '@aws-cdk/aws-lambda'; diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts index 88cc1682f2be5..7eba04d2f0649 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts @@ -1,6 +1,6 @@ import * as lambda from '@aws-cdk/aws-lambda'; import * as sqs from '@aws-cdk/aws-sqs'; -import { Names } from '@aws-cdk/core'; +import { Duration, Names, Token } from '@aws-cdk/core'; export interface SqsEventSourceProps { /** @@ -14,6 +14,15 @@ export interface SqsEventSourceProps { */ readonly batchSize?: number; + /** + * The maximum amount of time to gather records before invoking the function. + * + * Valid Range: Minimum value of 0 minutes. Maximum value of 5 minutes. + * + * @default - no batching window. The lambda function will be invoked immediately with the records that are available. + */ + readonly maxBatchingWindow?: Duration; + /** * If the SQS event source mapping should be enabled. * @@ -29,14 +38,35 @@ export class SqsEventSource implements lambda.IEventSource { private _eventSourceMappingId?: string = undefined; constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) { - if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) { - throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`); + const maxBatchingWindow = this.props.maxBatchingWindow; + if (maxBatchingWindow !== undefined) { + if (queue.fifo) { + throw new Error('Batching window is not supported for FIFO queues'); + } + if (maxBatchingWindow.toSeconds() > 300) { + throw new Error(`Maximum batching window must be 300 seconds or less (given ${maxBatchingWindow.toHumanString()})`); + } + } + if (this.props.batchSize !== undefined && !Token.isUnresolved(this.props.batchSize)) { + if ( + this.props.maxBatchingWindow !== undefined && !Token.isUnresolved(this.props.maxBatchingWindow) && + (this.props.batchSize < 1 || this.props.batchSize > 10000) + ) { + throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize}) when batching window is specified.`); + } + if ( + (this.props.maxBatchingWindow === undefined || Token.isUnresolved(this.props.maxBatchingWindow)) && + (this.props.batchSize < 1 || this.props.batchSize > 10) + ) { + throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize}) when batching window is not specified.`); + } } } public bind(target: lambda.IFunction) { const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${Names.nodeUniqueId(this.queue.node)}`, { batchSize: this.props.batchSize, + maxBatchingWindow: this.props.maxBatchingWindow, enabled: this.props.enabled, eventSourceArn: this.queue.queueArn, }); diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts index fd02dda47a304..a93e577d66ffe 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts @@ -85,6 +85,38 @@ export = { test.done(); }, + 'unresolved batch size'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const q = new sqs.Queue(stack, 'Q'); + const batchSize : number = 500; + + // WHEN + fn.addEventSource(new sources.SqsEventSource(q, { + batchSize: cdk.Lazy.number({ + produce() { + return batchSize; + }, + }), + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + 'EventSourceArn': { + 'Fn::GetAtt': [ + 'Q63C6E3AB', + 'Arn', + ], + }, + 'FunctionName': { + 'Ref': 'Fn9270CBC0', + }, + })); + + test.done(); + }, + 'fails if batch size is < 1'(test: Test) { // GIVEN const stack = new cdk.Stack(); @@ -113,6 +145,109 @@ export = { test.done(); }, + 'batch size is > 10 and batch window is defined'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const q = new sqs.Queue(stack, 'Q'); + + // WHEN + fn.addEventSource(new sources.SqsEventSource(q, { + batchSize: 1000, + maxBatchingWindow: cdk.Duration.minutes(5), + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + 'EventSourceArn': { + 'Fn::GetAtt': [ + 'Q63C6E3AB', + 'Arn', + ], + }, + 'FunctionName': { + 'Ref': 'Fn9270CBC0', + }, + 'BatchSize': 1000, + 'MaximumBatchingWindowInSeconds': 300, + })); + + test.done(); + }, + + 'fails if batch size is > 10000 and batch window is defined'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const q = new sqs.Queue(stack, 'Q'); + + // WHEN/THEN + test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, { + batchSize: 11000, + maxBatchingWindow: cdk.Duration.minutes(5), + })), /Maximum batch size must be between 1 and 10000 inclusive \(given 11000\)/); + + test.done(); + }, + + 'specific batch window'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const q = new sqs.Queue(stack, 'Q'); + + // WHEN + fn.addEventSource(new sources.SqsEventSource(q, { + maxBatchingWindow: cdk.Duration.minutes(5), + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + 'EventSourceArn': { + 'Fn::GetAtt': [ + 'Q63C6E3AB', + 'Arn', + ], + }, + 'FunctionName': { + 'Ref': 'Fn9270CBC0', + }, + 'MaximumBatchingWindowInSeconds': 300, + })); + + test.done(); + }, + + 'fails if batch window defined for FIFO queue'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const q = new sqs.Queue(stack, 'Q', { + fifo: true, + }); + + // WHEN/THEN + test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, { + maxBatchingWindow: cdk.Duration.minutes(5), + })), /Batching window is not supported for FIFO queues/); + + test.done(); + }, + + 'fails if batch window is > 5'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const q = new sqs.Queue(stack, 'Q'); + + // WHEN/THEN + test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, { + maxBatchingWindow: cdk.Duration.minutes(7), + })), /Maximum batching window must be 300 seconds or less \(given 420\)/); + + test.done(); + }, + 'contains eventSourceMappingId after lambda binding'(test: Test) { // GIVEN const stack = new cdk.Stack();