Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda-event-sources): support for batching window to sqs event source #13406

Merged
merged 3 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ behavior:
* __receiveMessageWaitTime__: Will determine [long
poll](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html)
duration. The default value is 20 seconds.
* __batchSize__: Determines how many records are buffered before invoking your lambda function.
* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing.
* __enabled__: If the SQS event source mapping should be enabled. The default is true.

```ts
Expand All @@ -67,6 +69,7 @@ const queue = new sqs.Queue(this, 'MyQueue', {

lambda.addEventSource(new SqsEventSource(queue, {
batchSize: 10, // default
maxBatchingWindow: Duration.minutes(5),
stefanodesjo marked this conversation as resolved.
Show resolved Hide resolved
}));
```

Expand Down Expand Up @@ -212,7 +215,7 @@ myFunction.addEventSource(new KinesisEventSource(stream, {
You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster.

The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the
MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html).
MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html).

```ts
import * as lambda from '@aws-cdk/aws-lambda';
Expand Down
29 changes: 26 additions & 3 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import { Names } from '@aws-cdk/core';
import { Duration, Names, Token } from '@aws-cdk/core';

export interface SqsEventSourceProps {
/**
Expand All @@ -14,6 +14,15 @@ export interface SqsEventSourceProps {
*/
readonly batchSize?: number;

/**
* The maximum amount of time to gather records before invoking the function.
*
* Valid Range: Minimum value of 0 minutes. Maximum value of 5 minutes.
*
* @default - no batching window. The lambda function will be invoked immediately with the records that are available.
*/
readonly maxBatchingWindow?: Duration;
stefanodesjo marked this conversation as resolved.
Show resolved Hide resolved

/**
* If the SQS event source mapping should be enabled.
*
Expand All @@ -29,14 +38,28 @@ export class SqsEventSource implements lambda.IEventSource {
private _eventSourceMappingId?: string = undefined;

constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) {
if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`);
if (this.props.maxBatchingWindow !== undefined) {
if (queue.fifo) {
throw new Error('Batching window is not supported for FIFO queues');
}
if (!this.props.maxBatchingWindow.isUnresolved() && this.props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`Maximum batching window must be 300 seconds or less (given ${this.props.maxBatchingWindow.toHumanString()})`);
}
}
if (this.props.batchSize !== undefined && !Token.isUnresolved(this.props.batchSize)) {
stefanodesjo marked this conversation as resolved.
Show resolved Hide resolved
if (this.props.maxBatchingWindow !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10000)) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize}) when batching window is specified.`);
}
if (this.props.maxBatchingWindow === undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize}) when batching window is not specified.`);
}
}
}

public bind(target: lambda.IFunction) {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${Names.nodeUniqueId(this.queue.node)}`, {
batchSize: this.props.batchSize,
maxBatchingWindow: this.props.maxBatchingWindow,
enabled: this.props.enabled,
eventSourceArn: this.queue.queueArn,
});
Expand Down
113 changes: 111 additions & 2 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,30 @@ export = {
test.done();
},

'unresolved batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');
const batchSize : number = 500;

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: cdk.Lazy.number({
produce() {
return batchSize;
},
}),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'BatchSize': 500,
}));

test.done();
},

'fails if batch size is < 1'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand All @@ -94,7 +118,7 @@ export = {
// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 0,
})), /Maximum batch size must be between 1 and 10 inclusive \(given 0\)/);
})), /Maximum batch size must be between 1 and 10 inclusive \(given 0\) when batching window is not specified\./);

test.done();
},
Expand All @@ -108,7 +132,92 @@ export = {
// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 11,
})), /Maximum batch size must be between 1 and 10 inclusive \(given 11\)/);
})), /Maximum batch size must be between 1 and 10 inclusive \(given 11\) when batching window is not specified\./);

test.done();
},

'batch size is > 10 and batch window is defined'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 1000,
maxBatchingWindow: cdk.Duration.minutes(5),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'BatchSize': 1000,
'MaximumBatchingWindowInSeconds': 300,
}));

test.done();
},

'fails if batch size is > 10000 and batch window is defined'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 11000,
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Maximum batch size must be between 1 and 10000 inclusive/i);

test.done();
},

'specific batch window'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'MaximumBatchingWindowInSeconds': 300,
}));

test.done();
},

'fails if batch window defined for FIFO queue'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q', {
fifo: true,
});

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Batching window is not supported for FIFO queues/);

test.done();
},

'fails if batch window is > 5'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(7),
})), /Maximum batching window must be 300 seconds or less/i);

test.done();
},
Expand Down