Skip to content

Commit

Permalink
feat(lambda-event-sources): expose enabled
Browse files Browse the repository at this point in the history
Specifically, it enables this flag in the props of which resources use
an underlying Cfn EventSourceMapping, which carries the Enabled flag
itself.
These are currently: SQS, DynamoDb Streams and Kinesis.

Closes #5750
  • Loading branch information
wtho committed Oct 21, 2020
1 parent 36fea28 commit 3292d00
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 0 deletions.
3 changes: 3 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ behavior:
* __receiveMessageWaitTime__: Will determine [long
poll](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html)
duration. The default value is 20 seconds.
* __enabled__: If If the SQS event source mapping should be enabled. The default is true.

```ts
import * as sqs from '@aws-cdk/aws-sqs';
Expand Down Expand Up @@ -145,6 +146,7 @@ and add it to your Lambda function. The following parameters will impact Amazon
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __enabled__: If If the DynamoDB Streams event source mapping should be enabled. The default is true.
```ts
import * as dynamodb from '@aws-cdk/aws-dynamodb';
Expand Down Expand Up @@ -188,6 +190,7 @@ behavior:
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __enabled__: If If the DynamoDB Streams event source mapping should be enabled. The default is true.
```ts
import * as lambda from '@aws-cdk/aws-lambda';
Expand Down
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ export interface SqsEventSourceProps {
* @default 10
*/
readonly batchSize?: number;

/**
* If the SQS event source mapping should be enabled.
*
* @default true
*/
readonly enabled?: boolean;
}

/**
Expand All @@ -29,6 +36,7 @@ export class SqsEventSource implements lambda.IEventSource {
public bind(target: lambda.IFunction) {
const eventSourceMapping = target.addEventSourceMapping(`SqsEventSource:${this.queue.node.uniqueId}`, {
batchSize: this.props.batchSize,
enabled: this.props.enabled,
eventSourceArn: this.queue.queueArn,
});
this._eventSourceMappingId = eventSourceMapping.eventSourceMappingId;
Expand Down
8 changes: 8 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ export interface StreamEventSourceProps {
* @default Duration.seconds(0)
*/
readonly maxBatchingWindow?: Duration;

/**
* If the stream event source mapping should be enabled.
*
* @default true
*/
readonly enabled?: boolean;
}

/**
Expand All @@ -99,6 +106,7 @@ export abstract class StreamEventSource implements lambda.IEventSource {
retryAttempts: this.props.retryAttempts,
parallelizationFactor: this.props.parallelizationFactor,
onFailure: this.props.onFailure,
enabled: this.props.enabled,
};
}
}
36 changes: 36 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -596,4 +596,40 @@ export = {

test.done();
},

'event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const table = new dynamodb.Table(stack, 'T', {
partitionKey: {
name: 'id',
type: dynamodb.AttributeType.STRING,
},
stream: dynamodb.StreamViewType.NEW_IMAGE,
});

// WHEN
fn.addEventSource(new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.LATEST,
enabled: false,
}));

//THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'TD925BC7E',
'StreamArn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'Enabled': false,
'StartingPosition': 'LATEST',
}));

test.done();
},
};
30 changes: 30 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,34 @@ export = {
test.throws(() => eventSource.eventSourceMappingId, /KinesisEventSource is not yet bound to an event source mapping/);
test.done();
},

'event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');
const eventSource = new sources.KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.LATEST,
enabled: false,
});

// WHEN
fn.addEventSource(eventSource);

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'S509448A1',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'Enabled': false,
'StartingPosition': 'LATEST',
}));
test.done();
},
};
28 changes: 28 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/test/test.sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,32 @@ export = {
test.throws(() => eventSource.eventSourceMappingId, /SqsEventSource is not yet bound to an event source mapping/);
test.done();
},

'event source disabled'(test: Test) {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const q = new sqs.Queue(stack, 'Q');

// WHEN
fn.addEventSource(new sources.SqsEventSource(q, {
enabled: false,
}));

// THEN
expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', {
'EventSourceArn': {
'Fn::GetAtt': [
'Q63C6E3AB',
'Arn',
],
},
'FunctionName': {
'Ref': 'Fn9270CBC0',
},
'Enabled': false,
}));

test.done();
},
};

0 comments on commit 3292d00

Please sign in to comment.