Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda): event-source maxBatchingWindow property #4260

Merged
merged 19 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 9 additions & 23 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
import dynamodb = require('@aws-cdk/aws-dynamodb');
import lambda = require('@aws-cdk/aws-lambda');
import {StreamEventSource, StreamEventSourceProps} from './stream';

export interface DynamoEventSourceProps {
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 1000.
*
* @default 100
*/
readonly batchSize?: number;

/**
* Where to begin consuming the DynamoDB stream.
*/
readonly startingPosition: lambda.StartingPosition;
export interface DynamoEventSourceProps extends StreamEventSourceProps {
}

/**
* Use an Amazon DynamoDB stream as an event source for AWS Lambda.
*/
export class DynamoEventSource implements lambda.IEventSource {
constructor(private readonly table: dynamodb.Table, private readonly props: DynamoEventSourceProps) {
export class DynamoEventSource extends StreamEventSource {
constructor(private readonly table: dynamodb.Table, props: DynamoEventSourceProps) {
super(props);

if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 1000)) {
throw new Error(`Maximum batch size must be between 1 and 1000 inclusive (given ${this.props.batchSize})`);
}
Expand All @@ -34,11 +22,9 @@ export class DynamoEventSource implements lambda.IEventSource {
throw new Error(`DynamoDB Streams must be enabled on the table ${this.table.node.path}`);
}

target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`, {
batchSize: this.props.batchSize || 100,
eventSourceArn: this.table.tableStreamArn,
startingPosition: this.props.startingPosition
});
target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`,
this.enrichMappingOptions({eventSourceArn: this.table.tableStreamArn})
);

this.table.grantStreamRead(target);
dynamodb.Table.grantListStreams(target);
Expand Down
34 changes: 10 additions & 24 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,27 @@
import kinesis = require('@aws-cdk/aws-kinesis');
import lambda = require('@aws-cdk/aws-lambda');
import {StreamEventSource, StreamEventSourceProps} from './stream';

export interface KinesisEventSourceProps {
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 10000.
*
* @default 100
*/
readonly batchSize?: number;

/**
* Where to begin consuming the Kinesis stream.
*/
readonly startingPosition: lambda.StartingPosition;
export interface KinesisEventSourceProps extends StreamEventSourceProps {
}

/**
* Use an Amazon Kinesis stream as an event source for AWS Lambda.
*/
export class KinesisEventSource implements lambda.IEventSource {
constructor(readonly stream: kinesis.IStream, private readonly props: KinesisEventSourceProps) {
export class KinesisEventSource extends StreamEventSource {
constructor(readonly stream: kinesis.IStream, props: KinesisEventSourceProps) {
super(props);

if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10000)) {
throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize})`);
}
}

public bind(target: lambda.IFunction) {
target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`, {
batchSize: this.props.batchSize || 100,
startingPosition: this.props.startingPosition,
eventSourceArn: this.stream.streamArn,
});
target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`,
this.enrichMappingOptions({eventSourceArn: this.stream.streamArn})
);

this.stream.grantRead(target);
}
}
}
52 changes: 52 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import lambda = require('@aws-cdk/aws-lambda');
import {Duration} from '@aws-cdk/core';

/**
* @internal
*/
export interface StreamEventSourceProps {
/**
* The largest number of records that AWS Lambda will retrieve from your event
* source at the time of invoking your function. Your function receives an
* event with all the retrieved records.
*
* Valid Range: Minimum value of 1. Maximum value of 10000.
*
* @default 100
*/
readonly batchSize?: number;

/**
* Where to begin consuming the stream.
*/
readonly startingPosition: lambda.StartingPosition;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: Duration;
}

/**
* Use an stream as an event source for AWS Lambda.
*
* @internal
*/
export abstract class StreamEventSource implements lambda.IEventSource {
protected constructor(protected readonly props: StreamEventSourceProps) {
}

public abstract bind(_target: lambda.IFunction): void;

protected enrichMappingOptions(options: lambda.EventSourceMappingOptions): lambda.EventSourceMappingOptions {
return {
...options,
batchSize: this.props.batchSize || 100,
startingPosition: this.props.startingPosition,
maxBatchingWindow: this.props.maxBatchingWindow,
};
}
}
59 changes: 59 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,63 @@ export = {

test.done();
},

'specific maxBatchingWindow'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
maxBatchingWindow: cdk.Duration.minutes(2),
startingPosition: lambda.StartingPosition.LATEST
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"TD925BC7E",
"StreamArn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"MaximumBatchingWindowInSeconds": 120,
"StartingPosition": "LATEST"
}));

test.done();
},

'throws if maxBatchingWindow > 300 seconds'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING
},
stream: dynamodb.StreamViewType.NEW_IMAGE
});

// THEN
test.throws(() =>
fn.addEventSource(new sources.DynamoEventSource(table, {
maxBatchingWindow: cdk.Duration.seconds(301),
startingPosition: lambda.StartingPosition.LATEST
})), /maxBatchingWindow cannot be over 300 seconds/);

test.done();
},

};
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,34 @@ export = {

test.done();
},

'specific maxBatchingWindow'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');

// WHEN
fn.addEventSource(new sources.KinesisEventSource(stream, {
maxBatchingWindow: cdk.Duration.minutes(2),
startingPosition: lambda.StartingPosition.LATEST
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
"EventSourceArn": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
},
"FunctionName": {
"Ref": "Fn9270CBC0"
},
"MaximumBatchingWindowInSeconds": 120,
"StartingPosition": "LATEST"
}));

test.done();
},
};
15 changes: 14 additions & 1 deletion packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ export interface EventSourceMappingOptions {
*
* @default - Required for Amazon Kinesis and Amazon DynamoDB Streams sources.
*/
readonly startingPosition?: StartingPosition
readonly startingPosition?: StartingPosition;

/**
* The maximum amount of time to gather records before invoking the function.
* Maximum of Duration.minutes(5)
*
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: cdk.Duration;
}

export interface EventSourceMappingProps extends EventSourceMappingOptions {
Expand All @@ -63,12 +71,17 @@ export class EventSourceMapping extends Resource {
constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) {
super(scope, id);

if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) {
throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`);
}

new CfnEventSourceMapping(this, 'Resource', {
batchSize: props.batchSize,
enabled: props.enabled,
eventSourceArn: props.eventSourceArn,
functionName: props.target.functionName,
startingPosition: props.startingPosition,
maximumBatchingWindowInSeconds: props.maxBatchingWindow && props.maxBatchingWindow.toSeconds(),
});
}
}
Expand Down