Skip to content

Commit

Permalink
feat(lambda-event-sources): add support for batching window to sqs ev…
Browse files Browse the repository at this point in the history
…ent source
  • Loading branch information
stefanodesjo committed Mar 8, 2021
1 parent b3fba43 commit c520e5d
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 3 deletions.
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const queue = new sqs.Queue(this, 'MyQueue', {

lambda.addEventSource(new SqsEventSource(queue, {
batchSize: 10, // default
maxBatchingWindow: Duration.minutes(5),
}));
```

Expand Down
31 changes: 28 additions & 3 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import { Names } from '@aws-cdk/core';
import { Duration, Names, Token } from '@aws-cdk/core';

export interface SqsEventSourceProps {
/**
Expand All @@ -14,6 +14,16 @@ export interface SqsEventSourceProps {
*/
readonly batchSize?: number;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* Valid Range: Minimum value of 0 minutes. Maximum value of 5 minutes.
*
* @default - no batching window. The lambda function will be invoked immediately with the records that are available.
*/
readonly maxBatchingWindow?: Duration;

/**
* If the SQS event source mapping should be enabled.
*
Expand All @@ -29,14 +39,29 @@ export class SqsEventSource implements lambda.IEventSource {
private _eventSourceMappingId?: string = undefined;

constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) {
if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`);
const maxBatchingWindow = this.props.maxBatchingWindow;
if (maxBatchingWindow !== undefined) {
if (queue.fifo) {
throw new Error('Batching window is not supported for FIFO queues');
}
if (maxBatchingWindow.toSeconds() > 300) {
throw new Error(`Maximum batching window must be 300 seconds or less (given ${maxBatchingWindow.toSeconds()})`);
}
}
if (this.props.batchSize !== undefined && !Token.isUnresolved(this.props.batchSize)) {
if (this.props.maxBatchingWindow !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10000)) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize}) when batching window is specified.`);
}
if (this.props.maxBatchingWindow === undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize}) when batching window is not specified.`);
}
}
}

public bind(target: lambda.IFunction) {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${Names.nodeUniqueId(this.queue.node)}`, {
batchSize: this.props.batchSize,
maxBatchingWindow: this.props.maxBatchingWindow,
enabled: this.props.enabled,
eventSourceArn: this.queue.queueArn,
});
Expand Down
135 changes: 135 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ export = {
test.done();
},

'unresolved batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');
const batchSize : number = 500;

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: cdk.Lazy.number({
produce() {
return batchSize;
},
}),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'Q63C6E3AB',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
}));

test.done();
},

'fails if batch size is < 1'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down Expand Up @@ -113,6 +145,109 @@ export = {
test.done();
},

'batch size is > 10 and batch window is defined'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 1000,
maxBatchingWindow: cdk.Duration.minutes(5),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'Q63C6E3AB',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'BatchSize': 1000,
'MaximumBatchingWindowInSeconds': 300,
}));

test.done();
},

'fails if batch size is > 10000 and batch window is defined'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 11000,
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Maximum batch size must be between 1 and 10000 inclusive \(given 11000\)/);

test.done();
},

'specific batch window'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'Q63C6E3AB',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'MaximumBatchingWindowInSeconds': 300,
}));

test.done();
},

'fails if batch window defined for FIFO queue'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q', {
fifo: true,
});

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Batching window is not supported for FIFO queues/);

test.done();
},

'fails if batch window is > 5'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(7),
})), /Maximum batching window must be 300 seconds or less \(given 420\)/);

test.done();
},

'contains eventSourceMappingId after lambda binding'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down

0 comments on commit c520e5d

Please sign in to comment.