Skip to content

Commit

Permalink
feat(lambda-event-sources): add support for batching window to sqs ev…
Browse files Browse the repository at this point in the history
…ent source
  • Loading branch information
stefanodesjo committed Mar 25, 2021
1 parent 63f78a6 commit 2876e3b
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 6 deletions.
5 changes: 4 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ behavior:
* __receiveMessageWaitTime__: Will determine [long
poll](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html)
duration. The default value is 20 seconds.
* __batchSize__: Determines how many records are buffered before invoking your lambda function.
* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing.
* __enabled__: If the SQS event source mapping should be enabled. The default is true.

```ts
Expand All @@ -67,6 +69,7 @@ const queue = new sqs.Queue(this, 'MyQueue', {

lambda.addEventSource(new SqsEventSource(queue, {
batchSize: 10, // default
maxBatchingWindow: Duration.minutes(5),
}));
```

Expand Down Expand Up @@ -212,7 +215,7 @@ myFunction.addEventSource(new KinesisEventSource(stream, {
You can write Lambda functions to process data either from [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html) or a [self managed Kafka](https://docs.aws.amazon.com/lambda/latest/dg/kafka-smaa.html) cluster.

The following code sets up Amazon MSK as an event source for a lambda function. Credentials will need to be configured to access the
MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html).
MSK cluster, as described in [Username/Password authentication](https://docs.aws.amazon.com/msk/latest/developerguide/msk-password.html).

```ts
import * as lambda from '@aws-cdk/aws-lambda';
Expand Down
29 changes: 26 additions & 3 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import { Names } from '@aws-cdk/core';
import { Duration, Names, Token } from '@aws-cdk/core';

export interface SqsEventSourceProps {
/**
Expand All @@ -14,6 +14,15 @@ export interface SqsEventSourceProps {
*/
readonly batchSize?: number;

/**
* The maximum amount of time to gather records before invoking the function.
*
* Valid Range: Minimum value of 0 minutes. Maximum value of 5 minutes.
*
* @default - no batching window. The lambda function will be invoked immediately with the records that are available.
*/
readonly maxBatchingWindow?: Duration;

/**
* If the SQS event source mapping should be enabled.
*
Expand All @@ -29,14 +38,28 @@ export class SqsEventSource implements lambda.IEventSource {
private _eventSourceMappingId?: string = undefined;

constructor(readonly queue: sqs.IQueue, private readonly props: SqsEventSourceProps = { }) {
if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize})`);
if (this.props.maxBatchingWindow !== undefined) {
if (queue.fifo) {
throw new Error('Batching window is not supported for FIFO queues');
}
if (!this.props.maxBatchingWindow.isUnresolved() && this.props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`Maximum batching window must be 300 seconds or less (given ${this.props.maxBatchingWindow.toHumanString()})`);
}
}
if (this.props.batchSize !== undefined && !Token.isUnresolved(this.props.batchSize)) {
if (this.props.maxBatchingWindow !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10000)) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize}) when batching window is specified.`);
}
if (this.props.maxBatchingWindow === undefined && (this.props.batchSize < 1 || this.props.batchSize > 10)) {
throw new Error(`Maximum batch size must be between 1 and 10 inclusive (given ${this.props.batchSize}) when batching window is not specified.`);
}
}
}

public bind(target: lambda.IFunction) {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${Names.nodeUniqueId(this.queue.node)}`, {
batchSize: this.props.batchSize,
maxBatchingWindow: this.props.maxBatchingWindow,
enabled: this.props.enabled,
eventSourceArn: this.queue.queueArn,
});
Expand Down
113 changes: 111 additions & 2 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,30 @@ export = {
test.done();
},

'unresolved batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');
const batchSize : number = 500;

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: cdk.Lazy.number({
produce() {
return batchSize;
},
}),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'BatchSize': 500,
}));

test.done();
},

'fails if batch size is < 1'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand All @@ -94,7 +118,7 @@ export = {
// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 0,
})), /Maximum batch size must be between 1 and 10 inclusive \(given 0\)/);
})), /Maximum batch size must be between 1 and 10 inclusive \(given 0\) when batching window is not specified\./);

test.done();
},
Expand All @@ -108,7 +132,92 @@ export = {
// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 11,
})), /Maximum batch size must be between 1 and 10 inclusive \(given 11\)/);
})), /Maximum batch size must be between 1 and 10 inclusive \(given 11\) when batching window is not specified\./);

test.done();
},

'batch size is > 10 and batch window is defined'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 1000,
maxBatchingWindow: cdk.Duration.minutes(5),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'BatchSize': 1000,
'MaximumBatchingWindowInSeconds': 300,
}));

test.done();
},

'fails if batch size is > 10000 and batch window is defined'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
batchSize: 11000,
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Maximum batch size must be between 1 and 10000 inclusive/i);

test.done();
},

'specific batch window'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'MaximumBatchingWindowInSeconds': 300,
}));

test.done();
},

'fails if batch window defined for FIFO queue'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q', {
fifo: true,
});

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(5),
})), /Batching window is not supported for FIFO queues/);

test.done();
},

'fails if batch window is > 5'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN/THEN
test.throws(() => fn.addEventSource(new sources.SqsEventSource(q, {
maxBatchingWindow: cdk.Duration.minutes(7),
})), /Maximum batching window must be 300 seconds or less/i);

test.done();
},
Expand Down

0 comments on commit 2876e3b

Please sign in to comment.