Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda-event-sources): tumbling window #13412

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0b1e74c
feat(aws-lambda): Add tumblingWindowInSeconds to Lambda Event Source
msimpsonnz Mar 5, 2021
bb48223
feat(aws-lambda-event-sources): Add tumblingWindowInSeconds option
msimpsonnz Mar 5, 2021
bc3249e
feat(aws-lambda-event-sources): Add tumblingWindowInSeconds option
msimpsonnz Mar 7, 2021
f2a3a2e
Merge branch 'msimpsonnz/feature-lambda-tumblingwindow' of https://gi…
msimpsonnz Mar 7, 2021
5a09e1e
feat(aws-lambda-event-sources): Add tumblingWindowInSeconds option
msimpsonnz Mar 7, 2021
0f3c135
Update packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
msimpsonnz Mar 22, 2021
366df98
Update packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
msimpsonnz Mar 22, 2021
40a6e73
feat(aws-lambda):tumbling window
msimpsonnz Mar 22, 2021
870cf07
feat(aws-lambda):tumbling window
msimpsonnz Mar 24, 2021
6b2cad6
feat(aws-lambda):tumbling window
msimpsonnz Mar 24, 2021
9587fa5
feat(aws-lambda):tumbling window
msimpsonnz Mar 24, 2021
9180137
feat(aws-lambda):tumbling window
msimpsonnz Mar 24, 2021
5fc02f3
feat(aws-lambda):tumbling window
msimpsonnz Mar 24, 2021
a3c502f
Merge branch 'msimpsonnz/feature-lambda-tumblingwindow' of https://gi…
msimpsonnz Mar 24, 2021
ee764c7
Merge branch 'master' into msimpsonnz/feature-lambda-tumblingwindow
msimpsonnz Mar 26, 2021
adc53c4
feat(aws-lambda):tumbling window
msimpsonnz Mar 26, 2021
8281900
Merge branch 'master' into msimpsonnz/feature-lambda-tumblingwindow
msimpsonnz Mar 26, 2021
28f1297
feat(aws-lambda):tumbling windows
msimpsonnz Mar 26, 2021
e5a4e4d
Merge branch 'master' into msimpsonnz/feature-lambda-tumblingwindow
mergify[bot] Mar 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ and add it to your Lambda function. The following parameters will impact Amazon
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __tumblingWindow__: The duration in seconds of a processing window when using streams.
* __enabled__: If the DynamoDB Streams event source mapping should be enabled. The default is true.

```ts
Expand Down Expand Up @@ -192,6 +193,7 @@ behavior:
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __tumblingWindow__: The duration in seconds of a processing window when using streams.
* __enabled__: If the DynamoDB Streams event source mapping should be enabled. The default is true.

```ts
Expand Down
9 changes: 9 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ export interface StreamEventSourceProps {
*/
readonly maxBatchingWindow?: Duration;

/**
* The size of the tumbling windows to group records sent to DynamoDB or Kinesis
* Valid Range: 0 - 15 minutes
*
* @default - None
*/
readonly tumblingWindow?: Duration;

/**
* If the stream event source mapping should be enabled.
*
Expand Down Expand Up @@ -106,6 +114,7 @@ export abstract class StreamEventSource implements lambda.IEventSource {
retryAttempts: this.props.retryAttempts,
parallelizationFactor: this.props.parallelizationFactor,
onFailure: this.props.onFailure,
tumblingWindow: this.props.tumblingWindow,
enabled: this.props.enabled,
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
"StreamArn"
]
},
"TumblingWindowInSeconds": 60,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class DynamoEventSourceTest extends cdk.Stack {
fn.addEventSource(new DynamoEventSource(queue, {
batchSize: 5,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
tumblingWindow: cdk.Duration.seconds(60),
}));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
"Arn"
]
},
"TumblingWindowInSeconds": 60,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class KinesisEventSourceTest extends cdk.Stack {

fn.addEventSource(new KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
tumblingWindow: cdk.Duration.seconds(60),
}));
}
}
Expand Down
27 changes: 27 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,33 @@ export = {
test.done();
},

'specific tumblingWindow'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING,
},
stream: dynamodb.StreamViewType.NEW_IMAGE,
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
batchSize: 50,
startingPosition: lambda.StartingPosition.LATEST,
tumblingWindow: cdk.Duration.seconds(60),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
TumblingWindowInSeconds: 60,
}));

test.done();
},

'specific batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down
32 changes: 32 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ export = {
test.done();
},

'specific tumblingWindowInSeconds'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');

// WHEN
fn.addEventSource(new sources.KinesisEventSource(stream, {
batchSize: 50,
startingPosition: lambda.StartingPosition.LATEST,
tumblingWindow: cdk.Duration.seconds(60),
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
msimpsonnz marked this conversation as resolved.
Show resolved Hide resolved
'EventSourceArn': {
'Fn::GetAtt': [
'S509448A1',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'BatchSize': 50,
'StartingPosition': 'LATEST',
'TumblingWindowInSeconds': 60,
}));

test.done();
},

'specific batch size'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
Expand Down
16 changes: 16 additions & 0 deletions packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ export interface EventSourceMappingOptions {
*/
readonly kafkaTopic?: string;

/**
* The size of the tumbling windows to group records sent to DynamoDB or Kinesis
*
* @see https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
*
* Valid Range: 0 - 15 minutes
*
* @default - None
*/
readonly tumblingWindow?: cdk.Duration;

/**
* A list of host and port pairs that are the addresses of the Kafka brokers in a self managed "bootstrap" Kafka cluster
* that a Kafka client connects to initially to bootstrap itself.
Expand Down Expand Up @@ -269,6 +280,10 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
}
});

if (props.tumblingWindow && !cdk.Token.isUnresolved(props.tumblingWindow) && props.tumblingWindow.toSeconds() > 900) {
throw new Error(`tumblingWindow cannot be over 900 seconds, got ${props.tumblingWindow.toSeconds()}`);
}


let destinationConfig;

Expand Down Expand Up @@ -296,6 +311,7 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
maximumRetryAttempts: props.retryAttempts,
parallelizationFactor: props.parallelizationFactor,
topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined,
tumblingWindowInSeconds: props.tumblingWindow?.toSeconds(),
sourceAccessConfigurations: props.sourceAccessConfigurations?.map((o) => {return { type: o.type.type, uri: o.uri };}),
selfManagedEventSource,
});
Expand Down
33 changes: 32 additions & 1 deletion packages/@aws-cdk/aws-lambda/test/event-source-mapping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,35 @@ describe('event source mapping', () => {
SelfManagedEventSource: { Endpoints: { KafkaBootstrapServers: kafkaBootstrapServers } },
});
});
});

test('throws if tumblingWindow > 900 seconds', () => {
const stack = new cdk.Stack();
const fn = new Function(stack, 'fn', {
handler: 'index.handler',
code: Code.fromInline('exports.handler = ${handler.toString()}'),
runtime: Runtime.NODEJS_10_X,
});

expect(() => new EventSourceMapping(stack, 'test', {
target: fn,
eventSourceArn: '',
tumblingWindow: cdk.Duration.seconds(901),
})).toThrow(/tumblingWindow cannot be over 900 seconds/);
});

test('accepts if tumblingWindow is a token', () => {
const stack = new cdk.Stack();
const fn = new Function(stack, 'fn', {
handler: 'index.handler',
code: Code.fromInline('exports.handler = ${handler.toString()}'),
runtime: Runtime.NODEJS_10_X,
});
const lazyDuration = cdk.Duration.seconds(cdk.Lazy.number({ produce: () => 60 }));

new EventSourceMapping(stack, 'test', {
target: fn,
eventSourceArn: '',
tumblingWindow: lazyDuration,
});
});
});