Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda-event-sources): batching window for sqs event source #11724

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const queue = new sqs.Queue(this, 'MyQueue', {

lambda.addEventSource(new SqsEventSource(queue, {
batchSize: 10, // default
maxBatchingWindow: Duration.minutes(5),
}));
```

Expand Down
32 changes: 29 additions & 3 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import { Names } from '@aws-cdk/core';
import { Duration, Names, Token } from '@aws-cdk/core';

export interface SqsEventSourceProps {
/**
Expand All @@ -14,6 +14,16 @@ export interface SqsEventSourceProps {
*/
readonly batchSize?: number;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* Valid Range: Minimum value of 0 minutes. Maximum value of 5 minutes.
*
* @default Duration.seconds(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to say something like this -

Suggested change
* @default Duration.seconds(0)
* @default - no batching window. The lambda function will be invoked immediately with the records that are available.

*/
readonly maxBatchingWindow?: Duration;

/**
* If the SQS event source mapping should be enabled.
*
Expand All @@ -29,14 +39,30 @@ export class SqsEventSource implements lambda.IEventSource {
private _eventSourceMappingId?: string = undefined;

constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) {
if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`);
const maxBatchingWindow = this.props.maxBatchingWindow;
if (maxBatchingWindow !== undefined) {
if (queue.fifo) {
throw new Error('Batching window is not supported for FIFO queues');
}
if (maxBatchingWindow.toSeconds() < 0 || maxBatchingWindow.toSeconds() > 300) {
throw new Error(`Maximum batching window must be between 0 and 300 seconds (given ${maxBatchingWindow.toSeconds()})`);
Comment on lines +47 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, it's not possible to construct a Duration that is less than 0 seconds.

Suggested change
if (maxBatchingWindow.toSeconds() < 0 || maxBatchingWindow.toSeconds() > 300) {
throw new Error(`Maximum batching window must be between 0 and 300 seconds (given ${maxBatchingWindow.toSeconds()})`);
if (maxBatchingWindow.toSeconds() > 300) {
throw new Error(`Maximum batching window must be 300 seconds or less (given ${maxBatchingWindow.toSeconds()})`);

}
}

if (this.props.batchSize !== undefined && !Token.isUnresolved(this.props.batchSize)) {
if (this.props.maxBatchingWindow !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10000)) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize}) when batching window is specified.`);
}
if (this.props.maxBatchingWindow === undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize}) when batching window is not specified.`);
}
}
}

public bind(target: lambda.IFunction) {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${Names.nodeUniqueId(this.queue.node)}`, {
batchSize: this.props.batchSize,
maxBatchingWindow: this.props.maxBatchingWindow,
enabled: this.props.enabled,
eventSourceArn: this.queue.queueArn,
});
Expand Down
135 changes: 135 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ export = {
test.done();
},

'unresolved batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');
let batchSize : number;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let batchSize : number;
const batchSize: number = 500;


// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: cdk.Lazy.number({
produce() {
return batchSize;
},
}),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'Q63C6E3AB',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
}));

test.done();
},

'fails if batch size is < 1'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down Expand Up @@ -113,6 +145,109 @@ export = {
test.done();
},

'batch size is > 10 and batch window is defined'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 1000,
maxBatchingWindow: cdk.Duration.minutes(5),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'Q63C6E3AB',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'BatchSize': 1000,
'MaximumBatchingWindowInSeconds': 300,
}));

test.done();
},

'fails if batch size is > 10000 and batch window is defined'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 11000,
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Maximum batch size must be between 1 and 10000 inclusive \(given 11000\)/);

test.done();
},

'specific batch window'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'Q63C6E3AB',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'MaximumBatchingWindowInSeconds': 300,
}));

test.done();
},

'fails if batch window defined for FIFO queue'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q', {
fifo: true,
});

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Batching window is not supported for FIFO queues/);

test.done();
},

'fails if batch window is > 5'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(7),
})), /Maximum batching window must be between 0 and 300 seconds \(given 420\)/);

test.done();
},

'contains eventSourceMappingId after lambda binding'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down