Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda): event-source maxBatchingWindow property #4260

Merged
merged 19 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dynamodb = require('@aws-cdk/aws-dynamodb');
import lambda = require('@aws-cdk/aws-lambda');
import { Duration } from "@aws-cdk/core";

export interface DynamoEventSourceProps {
nija-at marked this conversation as resolved.
Show resolved Hide resolved
/**
Expand All @@ -17,6 +18,14 @@ export interface DynamoEventSourceProps {
* Where to begin consuming the DynamoDB stream.
*/
readonly startingPosition: lambda.StartingPosition;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find documentation on the default, but I'm assuming that since this is a new feature, the previous behavior is the default.

*/
readonly maximumBatchingWindow?: Duration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use terser form max instead of maximum everywhere.

}

/**
Expand All @@ -37,7 +46,8 @@ export class DynamoEventSource implements lambda.IEventSource {
target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`, {
batchSize: this.props.batchSize || 100,
eventSourceArn: this.table.tableStreamArn,
startingPosition: this.props.startingPosition
startingPosition: this.props.startingPosition,
maximumBatchingWindow: this.props.maximumBatchingWindow,
});

this.table.grantStreamRead(target);
Expand Down
10 changes: 10 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import kinesis = require('@aws-cdk/aws-kinesis');
import lambda = require('@aws-cdk/aws-lambda');
import {Duration} from "@aws-cdk/core";

export interface KinesisEventSourceProps {
/**
Expand All @@ -17,6 +18,14 @@ export interface KinesisEventSourceProps {
* Where to begin consuming the Kinesis stream.
*/
readonly startingPosition: lambda.StartingPosition;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maximumBatchingWindow?: Duration;
}

/**
Expand All @@ -34,6 +43,7 @@ export class KinesisEventSource implements lambda.IEventSource {
batchSize: this.props.batchSize || 100,
startingPosition: this.props.startingPosition,
eventSourceArn: this.stream.streamArn,
maximumBatchingWindow: this.props.maximumBatchingWindow,
});

this.stream.grantRead(target);
Expand Down
59 changes: 59 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,63 @@ export = {

test.done();
},

'specific maximumBatchingWindow'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
maximumBatchingWindow: cdk.Duration.minutes(2),
startingPosition: lambda.StartingPosition.LATEST
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"MaximumBatchingWindowInSeconds": 120,
"StartingPosition": "LATEST"
}));

test.done();
},

'throws if maximumBatchingWindow > 300 seconds'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// THEN
test.throws(() =>
fn.addEventSource(new sources.DynamoEventSource(table, {
maximumBatchingWindow: cdk.Duration.seconds(301),
startingPosition: lambda.StartingPosition.LATEST
})), /maximumBatchingWindow cannot be over 300 seconds/);

test.done();
},

};
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,34 @@ export = {

test.done();
},

'specific maximumBatchingWindow'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');

// WHEN
fn.addEventSource(new sources.KinesisEventSource(stream, {
maximumBatchingWindow: cdk.Duration.minutes(2),
startingPosition: lambda.StartingPosition.LATEST
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"MaximumBatchingWindowInSeconds": 120,
"StartingPosition": "LATEST"
}));

test.done();
},
};
15 changes: 14 additions & 1 deletion packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ export interface EventSourceMappingOptions {
*
* @default - Required for Amazon Kinesis and Amazon DynamoDB Streams sources.
*/
readonly startingPosition?: StartingPosition
readonly startingPosition?: StartingPosition;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maximumBatchingWindow?: cdk.Duration;
}

export interface EventSourceMappingProps extends EventSourceMappingOptions {
Expand All @@ -63,12 +71,17 @@ export class EventSourceMapping extends Resource {
constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) {
super(scope, id);

if (props.maximumBatchingWindow && props.maximumBatchingWindow.toSeconds() > 300) {
throw new Error(`maximumBatchingWindow cannot be over 300 seconds, got ${props.maximumBatchingWindow.toSeconds()}`);
}

new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
enabled: props.enabled,
eventSourceArn: props.eventSourceArn,
functionName: props.target.functionName,
startingPosition: props.startingPosition,
maximumBatchingWindowInSeconds: props.maximumBatchingWindow && props.maximumBatchingWindow.toSeconds(),
});
}
}
Expand Down