diff --git a/packages/@aws-cdk/aws-lambda-event-sources/README.md b/packages/@aws-cdk/aws-lambda-event-sources/README.md index d2e13fcdb3184..47d2a7edeb736 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/README.md +++ b/packages/@aws-cdk/aws-lambda-event-sources/README.md @@ -130,35 +130,48 @@ CloudWatch. ### DynamoDB Streams You can write Lambda functions to process change events from a DynamoDB Table. An event is emitted to a DynamoDB stream (if configured) whenever a write (Put, Delete, Update) -operation is performed against the table. See [Using AWS Lambda with Amazon DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) for more information. +operation is performed against the table. See [Using AWS Lambda with Amazon DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) for more information about configuring Lambda function event sources with DynamoDB. To process events with a Lambda function, first create or update a DynamoDB table and enable a `stream` specification. Then, create a `DynamoEventSource` and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior: * __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low). -* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source. +* __bisectBatchOnError__: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated. +* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing. +* __maxRecordAge__: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures. +* __onFailure__: In the event a record fails after all retries or if the record age has exceeded the configured value, the record will be sent to SQS queue or SNS topic that is specified here +* __parallelizationFactor__: The number of batches to concurrently process on each shard. +* __retryAttempts__: The maximum number of times a record should be retried in the event of failure. +* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source. ```ts import dynamodb = require('@aws-cdk/aws-dynamodb'); import lambda = require('@aws-cdk/aws-lambda'); -import { DynamoEventSource } from '@aws-cdk/aws-lambda-event-sources'; +import sqs = require('@aws-cdk/aws-sqs'); +import { DynamoEventSource, SqsDlq } from '@aws-cdk/aws-lambda-event-sources'; const table = new dynamodb.Table(..., { partitionKey: ..., stream: dynamodb.StreamViewType.NEW_IMAGE // make sure stream is configured }); +const deadLetterQueue = new sqs.Queue(this, 'deadLetterQueue'); + const function = new lambda.Function(...); function.addEventSource(new DynamoEventSource(table, { - startingPosition: lambda.StartingPosition.TRIM_HORIZON + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + batchSize: 5, + bisectBatchOnError: true, + onFailure: new SqsDlq(deadLetterQueue), + retryAttempts: 10 })); ``` ### Kinesis -You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon SQS, see [Amazon Kinesis -Service](https://aws.amazon.com/kinesis/data-streams/). To view a sample event, -see [Amazon SQS Event](https://docs.aws.amazon.com/lambda/latest/dg/eventsources.html#eventsources-kinesis-streams). +You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon Kinesis, see [Amazon Kinesis +Service](https://aws.amazon.com/kinesis/data-streams/). To learn more about configuring Lambda function event sources with kinesis and view a sample event, +see [Amazon Kinesis Event](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html). To set up Amazon Kinesis as an event source for AWS Lambda, you first create or update an Amazon Kinesis stream and select custom values for the @@ -166,7 +179,13 @@ event source parameters. The following parameters will impact Amazon Kinesis's p behavior: * __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low). -* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source. +* __bisectBatchOnError__: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated. +* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of possibly delaying processing. +* __maxRecordAge__: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures. +* __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here +* __parallelizationFactor__: The number of batches to concurrently process on each shard. +* __retryAttempts__: The maximum number of times a record should be retried in the event of failure. +* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source. ```ts import lambda = require('@aws-cdk/aws-lambda'); diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts index de08ac31022e8..19253a743cae8 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts @@ -3,5 +3,7 @@ export * from './dynamodb'; export * from './kinesis'; export * from './s3'; export * from './sns'; +export * from './sns-dlq'; export * from './stream'; export * from './sqs'; +export * from './sqs-dlq'; diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/sns-dlq.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/sns-dlq.ts new file mode 100644 index 0000000000000..ddac2c080f6a8 --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/sns-dlq.ts @@ -0,0 +1,21 @@ +import { DlqDestinationConfig, IEventSourceDlq, IEventSourceMapping, IFunction } from "@aws-cdk/aws-lambda"; +import * as sns from '@aws-cdk/aws-sns'; + +/** + * An SNS dead letter queue destination configuration for a Lambda event source + */ +export class SnsDlq implements IEventSourceDlq { + constructor(private readonly topic: sns.ITopic) { + } + + /** + * Returns a destination configuration for the DLQ + */ + public bind(_target: IEventSourceMapping, targetHandler: IFunction): DlqDestinationConfig { + this.topic.grantPublish(targetHandler); + + return { + destination: this.topic.topicArn + }; + } +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs-dlq.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs-dlq.ts new file mode 100644 index 0000000000000..173c82472271b --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/sqs-dlq.ts @@ -0,0 +1,21 @@ +import { DlqDestinationConfig, IEventSourceDlq, IEventSourceMapping, IFunction } from "@aws-cdk/aws-lambda"; +import * as sqs from '@aws-cdk/aws-sqs'; + +/** + * An SQS dead letter queue destination configuration for a Lambda event source + */ +export class SqsDlq implements IEventSourceDlq { + constructor(private readonly queue: sqs.IQueue) { + } + + /** + * Returns a destination configuration for the DLQ + */ + public bind(_target: IEventSourceMapping, targetHandler: IFunction): DlqDestinationConfig { + this.queue.grantSendMessages(targetHandler); + + return { + destination: this.queue.queueArn + }; + } +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts index baf06993e74b8..cb8e7f51f4c4d 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts @@ -1,5 +1,5 @@ import * as lambda from '@aws-cdk/aws-lambda'; -import {Duration} from '@aws-cdk/core'; +import { Duration } from '@aws-cdk/core'; /** * The set of properties for event sources that follow the streaming model, @@ -21,6 +21,50 @@ export interface StreamEventSourceProps { */ readonly batchSize?: number; + /** + * If the function returns an error, split the batch in two and retry. + * + * @default false + */ + readonly bisectBatchOnError?: boolean; + + /** + * An Amazon SQS queue or Amazon SNS topic destination for discarded records. + * + * @default discarded records are ignored + */ + readonly onFailure?: lambda.IEventSourceDlq; + + /** + * The maximum age of a record that Lambda sends to a function for processing. + * Valid Range: + * * Minimum value of 60 seconds + * * Maximum value of 7 days + * + * @default Duration.days(7) + */ + readonly maxRecordAge?: Duration; + + /** + * Maximum number of retry attempts + * Valid Range: + * * Minimum value of 0 + * * Maximum value of 10000 + * + * @default 10000 + */ + readonly retryAttempts?: number; + + /** + * The number of batches to process from each shard concurrently. + * Valid Range: + * * Minimum value of 1 + * * Maximum value of 10 + * + * @default 1 + */ + readonly parallelizationFactor?: number; + /** * Where to begin consuming the stream. */ @@ -48,8 +92,13 @@ export abstract class StreamEventSource implements lambda.IEventSource { return { ...options, batchSize: this.props.batchSize || 100, + bisectBatchOnError: this.props.bisectBatchOnError, startingPosition: this.props.startingPosition, maxBatchingWindow: this.props.maxBatchingWindow, + maxRecordAge: this.props.maxRecordAge, + retryAttempts: this.props.retryAttempts, + parallelizationFactor: this.props.parallelizationFactor, + onFailure: this.props.onFailure }; } } diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.expected.json b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.expected.json new file mode 100644 index 0000000000000..5439a5515bbd4 --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.expected.json @@ -0,0 +1,155 @@ +{ + "Resources": { + "FServiceRole3AC82EE1": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "ManagedPolicyArns": [ + { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" + ] + ] + } + ] + } + }, + "FServiceRoleDefaultPolicy17A19BFA": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "sqs:SendMessage", + "sqs:GetQueueAttributes", + "sqs:GetQueueUrl" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "Q63C6E3AB", + "Arn" + ] + } + }, + { + "Action": [ + "kinesis:DescribeStream", + "kinesis:GetRecords", + "kinesis:GetShardIterator" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "S509448A1", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "FServiceRoleDefaultPolicy17A19BFA", + "Roles": [ + { + "Ref": "FServiceRole3AC82EE1" + } + ] + } + }, + "FC4345940": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "ZipFile": "exports.handler = async function handler(event) {\n // tslint:disable-next-line:no-console\n console.log('event:', JSON.stringify(event, undefined, 2));\n throw new Error();\n}" + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "FServiceRole3AC82EE1", + "Arn" + ] + }, + "Runtime": "nodejs10.x" + }, + "DependsOn": [ + "FServiceRoleDefaultPolicy17A19BFA", + "FServiceRole3AC82EE1" + ] + }, + "FKinesisEventSourcelambdaeventsourcekinesiswithdlqSD357FCB87EEA8CB4": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Fn::GetAtt": [ + "S509448A1", + "Arn" + ] + }, + "FunctionName": { + "Ref": "FC4345940" + }, + "BatchSize": 100, + "DestinationConfig": { + "OnFailure": { + "Destination": { + "Fn::GetAtt": [ + "Q63C6E3AB", + "Arn" + ] + } + } + }, + "MaximumRetryAttempts": 0, + "StartingPosition": "TRIM_HORIZON" + } + }, + "S509448A1": { + "Type": "AWS::Kinesis::Stream", + "Properties": { + "ShardCount": 1, + "RetentionPeriodHours": 24 + } + }, + "Q63C6E3AB": { + "Type": "AWS::SQS::Queue" + } + }, + "Outputs": { + "InputKinesisStreamName": { + "Value": { + "Ref": "S509448A1" + } + }, + "DlqSqsQueueUrl": { + "Value": { + "Ref": "Q63C6E3AB" + } + }, + "FunctionArn": { + "Value":{ + "Fn::GetAtt":["FC4345940","Arn"] + } + } + }, + "Parameters": { + } +} \ No newline at end of file diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.ts new file mode 100644 index 0000000000000..97c470104c8f3 --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.ts @@ -0,0 +1,47 @@ +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as lambda from '@aws-cdk/aws-lambda'; +import * as sqs from '@aws-cdk/aws-sqs'; +import { App, CfnOutput, Stack } from "@aws-cdk/core"; +import { KinesisEventSource, SqsDlq } from '../lib'; + +/* + * Stack verification steps: + * * aws kinesis put-record --stream-name --partition-key 123 --data testdata + * * aws sqs receive-message --queue-url --max-number-of-messages 1 --query 'Messages[0].Body' + * The last command should return a string that contains the Lambda function ARN in it. + */ + +async function handler(event: any) { + // tslint:disable-next-line:no-console + console.log('event:', JSON.stringify(event, undefined, 2)); + throw new Error(); +} + +class KinesisWithDLQTest extends Stack { + constructor(scope: App, id: string) { + super(scope, id); + + const fn = new lambda.Function(this, 'F', { + runtime: lambda.Runtime.NODEJS_10_X, + handler: 'index.handler', + code: lambda.Code.fromInline(`exports.handler = ${handler.toString()}`) + }); + new CfnOutput(this, 'FunctionArn', { value: fn.functionArn }); + + const stream = new kinesis.Stream(this, 'S'); + new CfnOutput(this, 'InputKinesisStreamName', { value: stream.streamName }); + + const dlq = new sqs.Queue(this, 'Q'); + new CfnOutput(this, 'DlqSqsQueueUrl', { value: dlq.queueUrl }); + + fn.addEventSource(new KinesisEventSource(stream, { + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + onFailure: new SqsDlq(dlq), + retryAttempts: 0, + })); + } +} + +const app = new App(); +new KinesisWithDLQTest(app, 'lambda-event-source-kinesis-with-dlq'); +app.synth(); diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts index 437cbd556ae64..9c2de1e436e5c 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts @@ -1,6 +1,7 @@ import { expect, haveResource } from '@aws-cdk/assert'; import * as dynamodb from '@aws-cdk/aws-dynamodb'; import * as lambda from '@aws-cdk/aws-lambda'; +import * as sqs from '@aws-cdk/aws-sqs'; import * as cdk from '@aws-cdk/core'; import { Test } from 'nodeunit'; import * as sources from '../lib'; @@ -273,4 +274,326 @@ export = { test.done(); }, + 'specific retryAttempts'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // WHEN + fn.addEventSource(new sources.DynamoEventSource(table, { + retryAttempts: 10, + startingPosition: lambda.StartingPosition.LATEST + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "MaximumRetryAttempts": 10, + "StartingPosition": "LATEST" + })); + + test.done(); + }, + + 'fails if retryAttempts < 0'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // THEN + test.throws(() => + fn.addEventSource(new sources.DynamoEventSource(table, { + retryAttempts: -1, + startingPosition: lambda.StartingPosition.LATEST + })), /retryAttempts must be between 0 and 10000 inclusive, got -1/); + + test.done(); + }, + + 'fails if retryAttempts > 10000'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // THEN + test.throws(() => + fn.addEventSource(new sources.DynamoEventSource(table, { + retryAttempts: 10001, + startingPosition: lambda.StartingPosition.LATEST + })), /retryAttempts must be between 0 and 10000 inclusive, got 10001/); + + test.done(); + }, + + 'specific bisectBatchOnFunctionError'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // WHEN + fn.addEventSource(new sources.DynamoEventSource(table, { + bisectBatchOnError: true, + startingPosition: lambda.StartingPosition.LATEST + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "BisectBatchOnFunctionError": true, + "StartingPosition": "LATEST" + })); + + test.done(); + }, + + 'specific parallelizationFactor'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // WHEN + fn.addEventSource(new sources.DynamoEventSource(table, { + parallelizationFactor: 5, + startingPosition: lambda.StartingPosition.LATEST + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "ParallelizationFactor": 5, + "StartingPosition": "LATEST" + })); + + test.done(); + }, + + 'fails if parallelizationFactor < 1'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // THEN + test.throws(() => + fn.addEventSource(new sources.DynamoEventSource(table, { + parallelizationFactor: 0, + startingPosition: lambda.StartingPosition.LATEST + })), /parallelizationFactor must be between 1 and 10 inclusive, got 0/); + + test.done(); + }, + + 'fails if parallelizationFactor > 10'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // THEN + test.throws(() => + fn.addEventSource(new sources.DynamoEventSource(table, { + parallelizationFactor: 11, + startingPosition: lambda.StartingPosition.LATEST + })), /parallelizationFactor must be between 1 and 10 inclusive, got 11/); + + test.done(); + }, + + 'specific maxRecordAge'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // WHEN + fn.addEventSource(new sources.DynamoEventSource(table, { + maxRecordAge: cdk.Duration.seconds(100), + startingPosition: lambda.StartingPosition.LATEST + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "MaximumRecordAgeInSeconds": 100, + "StartingPosition": "LATEST" + })); + + test.done(); + }, + + 'fails if maxRecordAge < 60 seconds'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // THEN + test.throws(() => + fn.addEventSource(new sources.DynamoEventSource(table, { + maxRecordAge: cdk.Duration.seconds(59), + startingPosition: lambda.StartingPosition.LATEST + })), /maxRecordAge must be between 60 seconds and 7 days inclusive/); + + test.done(); + }, + + 'fails if maxRecordAge > 7 days'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // THEN + test.throws(() => + fn.addEventSource(new sources.DynamoEventSource(table, { + maxRecordAge: cdk.Duration.seconds(604801), + startingPosition: lambda.StartingPosition.LATEST + })), /maxRecordAge must be between 60 seconds and 7 days inclusive/); + + test.done(); + }, + + 'specific destinationConfig'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const queue = new sqs.Queue(stack, 'Queue'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // WHEN + fn.addEventSource(new sources.DynamoEventSource(table, { + onFailure: new sources.SqsDlq(queue), + startingPosition: lambda.StartingPosition.LATEST + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "DestinationConfig": { + "OnFailure": { + "Destination": { + "Fn::GetAtt": [ + "Queue4A7E3555", + "Arn" + ] + } + + } + }, + "StartingPosition": "LATEST" + })); + + test.done(); + }, }; diff --git a/packages/@aws-cdk/aws-lambda/lib/dlq.ts b/packages/@aws-cdk/aws-lambda/lib/dlq.ts new file mode 100644 index 0000000000000..2f2c52448a90d --- /dev/null +++ b/packages/@aws-cdk/aws-lambda/lib/dlq.ts @@ -0,0 +1,22 @@ +import { IEventSourceMapping } from './event-source-mapping'; +import { IFunction } from './function-base'; + +/** + * A destination configuration + */ +export interface DlqDestinationConfig { + /** + * The Amazon Resource Name (ARN) of the destination resource + */ + readonly destination: string; +} + +/** + * A DLQ for an event source + */ +export interface IEventSourceDlq { + /** + * Returns the DLQ destination config of the DLQ + */ + bind(target: IEventSourceMapping, targetHandler: IFunction): DlqDestinationConfig; +} diff --git a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts index d4f7f3f4c9012..a32a8ffd9b23b 100644 --- a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts +++ b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts @@ -1,4 +1,5 @@ import * as cdk from '@aws-cdk/core'; +import { IEventSourceDlq } from './dlq'; import { IFunction } from './function-base'; import { CfnEventSourceMapping } from './lambda.generated'; @@ -21,6 +22,20 @@ export interface EventSourceMappingOptions { */ readonly batchSize?: number; + /** + * If the function returns an error, split the batch in two and retry. + * + * @default false + */ + readonly bisectBatchOnError?: boolean; + + /** + * An Amazon SQS queue or Amazon SNS topic destination for discarded records. + * + * @default discarded records are ignored + */ + readonly onFailure?: IEventSourceDlq; + /** * Set to false to disable the event source upon creation. * @@ -45,8 +60,42 @@ export interface EventSourceMappingOptions { * @default Duration.seconds(0) */ readonly maxBatchingWindow?: cdk.Duration; + + /** + * The maximum age of a record that Lambda sends to a function for processing. + * Valid Range: + * * Minimum value of 60 seconds + * * Maximum value of 7 days + * + * @default Duration.days(7) + */ + readonly maxRecordAge?: cdk.Duration; + + /** + * The maximum number of times to retry when the function returns an error. + * + * Valid Range: + * * Minimum value of 0 + * * Maximum value of 10000 + * + * @default 10000 + */ + readonly retryAttempts?: number; + + /** + * The number of batches to process from each shard concurrently. + * Valid Range: + * * Minimum value of 1 + * * Maximum value of 10 + * + * @default 1 + */ + readonly parallelizationFactor?: number; } +/** + * Properties for declaring a new event source mapping. + */ export interface EventSourceMappingProps extends EventSourceMappingOptions { /** * The target AWS Lambda function. @@ -54,6 +103,18 @@ export interface EventSourceMappingProps extends EventSourceMappingOptions { readonly target: IFunction; } +/** + * Represents an event source mapping for a lambda function. + * @see https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html + */ +export interface IEventSourceMapping extends cdk.IResource { + /** + * The identifier for this EventSourceMapping + * @attribute + */ + readonly eventSourceMappingId: string; +} + /** * Defines a Lambda EventSourceMapping resource. * @@ -66,11 +127,18 @@ export interface EventSourceMappingProps extends EventSourceMappingOptions { * The `SqsEventSource` class will automatically create the mapping, and will also * modify the Lambda's execution role so it can consume messages from the queue. */ -export class EventSourceMapping extends cdk.Resource { +export class EventSourceMapping extends cdk.Resource implements IEventSourceMapping { + /** - * The identifier for this EventSourceMapping - * @attribute + * Import an event source into this stack from its event source id. */ + public static fromEventSourceMappingId(scope: cdk.Construct, id: string, eventSourceMappingId: string): IEventSourceMapping { + class Import extends cdk.Resource implements IEventSourceMapping { + public readonly eventSourceMappingId = eventSourceMappingId; + } + return new Import(scope, id); + } + public readonly eventSourceMappingId: string; constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) { @@ -80,13 +148,38 @@ export class EventSourceMapping extends cdk.Resource { throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`); } + if (props.maxRecordAge && (props.maxRecordAge.toSeconds() < 60 || props.maxRecordAge.toDays({integral: false}) > 7)) { + throw new Error('maxRecordAge must be between 60 seconds and 7 days inclusive'); + } + + if (props.retryAttempts && (props.retryAttempts < 0 || props.retryAttempts > 10000)) { + throw new Error(`retryAttempts must be between 0 and 10000 inclusive, got ${props.retryAttempts}`); + } + + if ((props.parallelizationFactor || props.parallelizationFactor === 0) && (props.parallelizationFactor < 1 || props.parallelizationFactor > 10)) { + throw new Error(`parallelizationFactor must be between 1 and 10 inclusive, got ${props.parallelizationFactor}`); + } + + let destinationConfig; + + if (props.onFailure) { + destinationConfig = { + onFailure: props.onFailure.bind(this, props.target) + }; + } + const cfnEventSourceMapping = new CfnEventSourceMapping(this, 'Resource', { batchSize: props.batchSize, + bisectBatchOnFunctionError: props.bisectBatchOnError, + destinationConfig, enabled: props.enabled, eventSourceArn: props.eventSourceArn, functionName: props.target.functionName, startingPosition: props.startingPosition, - maximumBatchingWindowInSeconds: props.maxBatchingWindow && props.maxBatchingWindow.toSeconds(), + maximumBatchingWindowInSeconds: props.maxBatchingWindow?.toSeconds(), + maximumRecordAgeInSeconds: props.maxRecordAge?.toSeconds(), + maximumRetryAttempts: props.retryAttempts, + parallelizationFactor: props.parallelizationFactor }); this.eventSourceMappingId = cfnEventSourceMapping.ref; } diff --git a/packages/@aws-cdk/aws-lambda/lib/index.ts b/packages/@aws-cdk/aws-lambda/lib/index.ts index f0a693da65974..b494e924c604a 100644 --- a/packages/@aws-cdk/aws-lambda/lib/index.ts +++ b/packages/@aws-cdk/aws-lambda/lib/index.ts @@ -1,4 +1,5 @@ export * from './alias'; +export * from './dlq'; export * from './function-base'; export * from './function'; export * from './layers'; diff --git a/packages/@aws-cdk/aws-lambda/package.json b/packages/@aws-cdk/aws-lambda/package.json index 5f2dad1b83358..abc8d32710418 100644 --- a/packages/@aws-cdk/aws-lambda/package.json +++ b/packages/@aws-cdk/aws-lambda/package.json @@ -164,7 +164,6 @@ "props-default-doc:@aws-cdk/aws-lambda.CodeConfig.inlineCode", "props-default-doc:@aws-cdk/aws-lambda.CodeConfig.s3Location", "docs-public-apis:@aws-cdk/aws-lambda.EventSourceMappingOptions", - "docs-public-apis:@aws-cdk/aws-lambda.EventSourceMappingProps", "props-default-doc:@aws-cdk/aws-lambda.FunctionAttributes.role", "props-default-doc:@aws-cdk/aws-lambda.FunctionAttributes.securityGroup", "props-default-doc:@aws-cdk/aws-lambda.FunctionAttributes.securityGroupId", diff --git a/packages/@aws-cdk/aws-lambda/test/test.event-source-mapping.ts b/packages/@aws-cdk/aws-lambda/test/test.event-source-mapping.ts new file mode 100644 index 0000000000000..3de9bbfeb5dab --- /dev/null +++ b/packages/@aws-cdk/aws-lambda/test/test.event-source-mapping.ts @@ -0,0 +1,155 @@ +import * as cdk from '@aws-cdk/core'; +import { Test } from 'nodeunit'; +import { Code, EventSourceMapping, Function, Runtime } from '../lib'; + +export = { + 'throws if maxBatchingWindow > 300 seconds'(test: Test) { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline(`exports.handler = \${handler.toString()}`), + runtime: Runtime.NODEJS_10_X + }); + + test.throws(() => + new EventSourceMapping( + stack, + 'test', + { + target: fn, + eventSourceArn: '', + maxBatchingWindow: cdk.Duration.seconds(301) + }), /maxBatchingWindow cannot be over 300 seconds/); + + test.done(); + }, + 'throws if maxRecordAge is below 60 seconds'(test: Test) { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline(`exports.handler = \${handler.toString()}`), + runtime: Runtime.NODEJS_10_X + }); + + test.throws(() => + new EventSourceMapping( + stack, + 'test', + { + target: fn, + eventSourceArn: '', + maxRecordAge: cdk.Duration.seconds(59) + }), /maxRecordAge must be between 60 seconds and 7 days inclusive/); + + test.done(); + }, + 'throws if maxRecordAge is over 7 days'(test: Test) { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline(`exports.handler = \${handler.toString()}`), + runtime: Runtime.NODEJS_10_X + }); + + test.throws(() => + new EventSourceMapping( + stack, + 'test', + { + target: fn, + eventSourceArn: '', + maxRecordAge: cdk.Duration.seconds(604801) + }), /maxRecordAge must be between 60 seconds and 7 days inclusive/); + + test.done(); + }, + 'throws if retryAttempts is negative'(test: Test) { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline(`exports.handler = \${handler.toString()}`), + runtime: Runtime.NODEJS_10_X + }); + + test.throws(() => + new EventSourceMapping( + stack, + 'test', + { + target: fn, + eventSourceArn: '', + retryAttempts: -1 + }), /retryAttempts must be between 0 and 10000 inclusive, got -1/); + + test.done(); + }, + 'throws if retryAttempts is over 10000'(test: Test) { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline(`exports.handler = \${handler.toString()}`), + runtime: Runtime.NODEJS_10_X + }); + + test.throws(() => + new EventSourceMapping( + stack, + 'test', + { + target: fn, + eventSourceArn: '', + retryAttempts: 10001 + }), /retryAttempts must be between 0 and 10000 inclusive, got 10001/); + + test.done(); + }, + 'throws if parallelizationFactor is below 1'(test: Test) { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline(`exports.handler = \${handler.toString()}`), + runtime: Runtime.NODEJS_10_X + }); + + test.throws(() => + new EventSourceMapping( + stack, + 'test', + { + target: fn, + eventSourceArn: '', + parallelizationFactor: 0 + }), /parallelizationFactor must be between 1 and 10 inclusive, got 0/); + + test.done(); + }, + 'throws if parallelizationFactor is over 10'(test: Test) { + const stack = new cdk.Stack(); + const fn = new Function(stack, 'fn', { + handler: 'index.handler', + code: Code.fromInline(`exports.handler = \${handler.toString()}`), + runtime: Runtime.NODEJS_10_X + }); + + test.throws(() => + new EventSourceMapping( + stack, + 'test', + { + target: fn, + eventSourceArn: '', + parallelizationFactor: 11 + }), /parallelizationFactor must be between 1 and 10 inclusive, got 11/); + + test.done(); + }, + + 'import event source mapping'(test: Test) { + const stack = new cdk.Stack(undefined, undefined, { stackName: 'test-stack' }); + const imported = EventSourceMapping.fromEventSourceMappingId(stack, 'imported', '14e0db71-5d35-4eb5-b481-8945cf9d10c2'); + + test.equals(imported.eventSourceMappingId, '14e0db71-5d35-4eb5-b481-8945cf9d10c2'); + test.equals(imported.stack.stackName, 'test-stack'); + test.done(); + }, +};