Skip to content

Commit

Permalink
feat(aws-lambda-event-sources): add support for batching window to sq…
Browse files Browse the repository at this point in the history
…s event source
  • Loading branch information
SeekerWing committed Dec 19, 2020
1 parent c046ccf commit 992fe2c
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 9 deletions.
20 changes: 15 additions & 5 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import { Duration, Names } from '@aws-cdk/core';
import { Duration, Names, Token } from '@aws-cdk/core';

export interface SqsEventSourceProps {
/**
Expand All @@ -15,7 +15,7 @@ export interface SqsEventSourceProps {
readonly batchSize?: number;

/**
* TThe maximum amount of time to gather records before invoking the function.
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* Valid Range: Minimum value of 0 minutes. Maximum value of 5 minutes.
Expand All @@ -39,12 +39,22 @@ export class SqsEventSource implements lambda.IEventSource {
private _eventSourceMappingId?: string = undefined;

constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) {
if (this.props.batchSize !== undefined) {
const maxBatchingWindow = this.props.maxBatchingWindow;
if (maxBatchingWindow !== undefined) {
if (queue.fifo) {
throw new Error('Batching window is not supported for FIFO queues');
}
if (maxBatchingWindow.toSeconds() < 0 || maxBatchingWindow.toSeconds() > 300) {
throw new Error(`Maximum batching window must be between 0 and 300 seconds (given ${maxBatchingWindow.toSeconds()})`);
}
}

if (this.props.batchSize !== undefined && !Token.isUnresolved(this.props.batchSize)) {
if (this.props.maxBatchingWindow !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10000)) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize})`);
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize}) when batching window is specified.`);
}
if (this.props.maxBatchingWindow === undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`);
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize}) when batching window is not specified.`);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 5,
"MaximumBatchingWindowInSeconds": 180
"BatchSize": 5
}
},
"Q63C6E3AB": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ class SqsEventSourceTest extends cdk.Stack {

fn.addEventSource(new SqsEventSource(queue, {
batchSize: 5,
maxBatchingWindow: cdk.Duration.minutes(3),
}));
}
}
Expand Down
50 changes: 49 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ export = {
test.done();
},

'unresolved batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');
let batchSize : number;

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: cdk.Lazy.number({
produce() {
return batchSize;
},
}),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'Q63C6E3AB',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
}));

test.done();
},

'fails if batch size is < 1'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down Expand Up @@ -186,6 +218,22 @@ export = {
test.done();
},

'fails if batch window defined for FIFO queue'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q', {
fifo: true,
});

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Batching window is not supported for FIFO queues/);

test.done();
},

'fails if batch window is > 5'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand All @@ -195,7 +243,7 @@ export = {
// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(7),
})), /maxBatchingWindow cannot be over 300 seconds, got 420/);
})), /Maximum batching window must be between 0 and 300 seconds \(given 420\)/);

test.done();
},
Expand Down

0 comments on commit 992fe2c

Please sign in to comment.