Skip to content

Commit

Permalink
feat(lambda-event-sources): failure handling for stream event sources (
Browse files Browse the repository at this point in the history
…#5929)

Co-authored-by: Niranjan Jayakar <nija@amazon.com>

closes #5236
  • Loading branch information
xerofun authored Mar 16, 2020
1 parent a75f711 commit 5028009
Show file tree
Hide file tree
Showing 13 changed files with 921 additions and 14 deletions.
35 changes: 27 additions & 8 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,43 +130,62 @@ CloudWatch.
### DynamoDB Streams
You can write Lambda functions to process change events from a DynamoDB Table. An event is emitted to a DynamoDB stream (if configured) whenever a write (Put, Delete, Update)
operation is performed against the table. See [Using AWS Lambda with Amazon DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) for more information.
operation is performed against the table. See [Using AWS Lambda with Amazon DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) for more information about configuring Lambda function event sources with DynamoDB.
To process events with a Lambda function, first create or update a DynamoDB table and enable a `stream` specification. Then, create a `DynamoEventSource`
and add it to your Lambda function. The following parameters will impact Amazon DynamoDB's polling behavior:
* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __bisectBatchOnError__: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated.
* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of delayed processing.
* __maxRecordAge__: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures.
* __onFailure__: In the event a record fails after all retries or if the record age has exceeded the configured value, the record will be sent to SQS queue or SNS topic that is specified here
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source.
```ts
import dynamodb = require('@aws-cdk/aws-dynamodb');
import lambda = require('@aws-cdk/aws-lambda');
import { DynamoEventSource } from '@aws-cdk/aws-lambda-event-sources';
import sqs = require('@aws-cdk/aws-sqs');
import { DynamoEventSource, SqsDlq } from '@aws-cdk/aws-lambda-event-sources';

const table = new dynamodb.Table(..., {
partitionKey: ...,
stream: dynamodb.StreamViewType.NEW_IMAGE // make sure stream is configured
});

const deadLetterQueue = new sqs.Queue(this, 'deadLetterQueue');

const function = new lambda.Function(...);
function.addEventSource(new DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
batchSize: 5,
bisectBatchOnError: true,
onFailure: new SqsDlq(deadLetterQueue),
retryAttempts: 10
}));
```
### Kinesis
You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon SQS, see [Amazon Kinesis
Service](https://aws.amazon.com/kinesis/data-streams/). To view a sample event,
see [Amazon SQS Event](https://docs.aws.amazon.com/lambda/latest/dg/eventsources.html#eventsources-kinesis-streams).
You can write Lambda functions to process streaming data in Amazon Kinesis Streams. For more information about Amazon Kinesis, see [Amazon Kinesis
Service](https://aws.amazon.com/kinesis/data-streams/). To learn more about configuring Lambda function event sources with kinesis and view a sample event,
see [Amazon Kinesis Event](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).
To set up Amazon Kinesis as an event source for AWS Lambda, you
first create or update an Amazon Kinesis stream and select custom values for the
event source parameters. The following parameters will impact Amazon Kinesis's polling
behavior:
* __batchSize__: Determines how many records are buffered before invoking your lambda function - could impact your function's memory usage (if too high) and ability to keep up with incoming data velocity (if too low).
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all records that arrived prior to attaching the event source.
* __bisectBatchOnError__: If a batch encounters an error, this will cause the batch to be split in two and have each new smaller batch retried, allowing the records in error to be isolated.
* __maxBatchingWindow__: The maximum amount of time to gather records before invoking the lambda. This increases the likelihood of a full batch at the cost of possibly delaying processing.
* __maxRecordAge__: The maximum age of a record that will be sent to the function for processing. Records that exceed the max age will be treated as failures.
* __onFailure__: In the event a record fails and consumes all retries, the record will be sent to SQS queue or SNS topic that is specified here
* __parallelizationFactor__: The number of batches to concurrently process on each shard.
* __retryAttempts__: The maximum number of times a record should be retried in the event of failure.
* __startingPosition__: Will determine where to being consumption, either at the most recent ('LATEST') record or the oldest record ('TRIM_HORIZON'). 'TRIM_HORIZON' will ensure you process all available data, while 'LATEST' will ignore all reocrds that arrived prior to attaching the event source.
```ts
import lambda = require('@aws-cdk/aws-lambda');
Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ export * from './dynamodb';
export * from './kinesis';
export * from './s3';
export * from './sns';
export * from './sns-dlq';
export * from './stream';
export * from './sqs';
export * from './sqs-dlq';
21 changes: 21 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sns-dlq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { DlqDestinationConfig, IEventSourceDlq, IEventSourceMapping, IFunction } from "@aws-cdk/aws-lambda";
import * as sns from '@aws-cdk/aws-sns';

/**
* An SNS dead letter queue destination configuration for a Lambda event source
*/
export class SnsDlq implements IEventSourceDlq {
constructor(private readonly topic: sns.ITopic) {
}

/**
* Returns a destination configuration for the DLQ
*/
public bind(_target: IEventSourceMapping, targetHandler: IFunction): DlqDestinationConfig {
this.topic.grantPublish(targetHandler);

return {
destination: this.topic.topicArn
};
}
}
21 changes: 21 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/lib/sqs-dlq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { DlqDestinationConfig, IEventSourceDlq, IEventSourceMapping, IFunction } from "@aws-cdk/aws-lambda";
import * as sqs from '@aws-cdk/aws-sqs';

/**
* An SQS dead letter queue destination configuration for a Lambda event source
*/
export class SqsDlq implements IEventSourceDlq {
constructor(private readonly queue: sqs.IQueue) {
}

/**
* Returns a destination configuration for the DLQ
*/
public bind(_target: IEventSourceMapping, targetHandler: IFunction): DlqDestinationConfig {
this.queue.grantSendMessages(targetHandler);

return {
destination: this.queue.queueArn
};
}
}
51 changes: 50 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as lambda from '@aws-cdk/aws-lambda';
import {Duration} from '@aws-cdk/core';
import { Duration } from '@aws-cdk/core';

/**
* The set of properties for event sources that follow the streaming model,
Expand All @@ -21,6 +21,50 @@ export interface StreamEventSourceProps {
*/
readonly batchSize?: number;

/**
* If the function returns an error, split the batch in two and retry.
*
* @default false
*/
readonly bisectBatchOnError?: boolean;

/**
* An Amazon SQS queue or Amazon SNS topic destination for discarded records.
*
* @default discarded records are ignored
*/
readonly onFailure?: lambda.IEventSourceDlq;

/**
* The maximum age of a record that Lambda sends to a function for processing.
* Valid Range:
* * Minimum value of 60 seconds
* * Maximum value of 7 days
*
* @default Duration.days(7)
*/
readonly maxRecordAge?: Duration;

/**
* Maximum number of retry attempts
* Valid Range:
* * Minimum value of 0
* * Maximum value of 10000
*
* @default 10000
*/
readonly retryAttempts?: number;

/**
* The number of batches to process from each shard concurrently.
* Valid Range:
* * Minimum value of 1
* * Maximum value of 10
*
* @default 1
*/
readonly parallelizationFactor?: number;

/**
* Where to begin consuming the stream.
*/
Expand Down Expand Up @@ -48,8 +92,13 @@ export abstract class StreamEventSource implements lambda.IEventSource {
return {
...options,
batchSize: this.props.batchSize || 100,
bisectBatchOnError: this.props.bisectBatchOnError,
startingPosition: this.props.startingPosition,
maxBatchingWindow: this.props.maxBatchingWindow,
maxRecordAge: this.props.maxRecordAge,
retryAttempts: this.props.retryAttempts,
parallelizationFactor: this.props.parallelizationFactor,
onFailure: this.props.onFailure
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
{
"Resources": {
"FServiceRole3AC82EE1": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"FServiceRoleDefaultPolicy17A19BFA": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"sqs:SendMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"Q63C6E3AB",
"Arn"
]
}
},
{
"Action": [
"kinesis:DescribeStream",
"kinesis:GetRecords",
"kinesis:GetShardIterator"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
}
}
],
"Version": "2012-10-17"
},
"PolicyName": "FServiceRoleDefaultPolicy17A19BFA",
"Roles": [
{
"Ref": "FServiceRole3AC82EE1"
}
]
}
},
"FC4345940": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "exports.handler = async function handler(event) {\n // tslint:disable-next-line:no-console\n console.log('event:', JSON.stringify(event, undefined, 2));\n throw new Error();\n}"
},
"Handler": "index.handler",
"Role": {
"Fn::GetAtt": [
"FServiceRole3AC82EE1",
"Arn"
]
},
"Runtime": "nodejs10.x"
},
"DependsOn": [
"FServiceRoleDefaultPolicy17A19BFA",
"FServiceRole3AC82EE1"
]
},
"FKinesisEventSourcelambdaeventsourcekinesiswithdlqSD357FCB87EEA8CB4": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"EventSourceArn": {
"Fn::GetAtt": [
"S509448A1",
"Arn"
]
},
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 100,
"DestinationConfig": {
"OnFailure": {
"Destination": {
"Fn::GetAtt": [
"Q63C6E3AB",
"Arn"
]
}
}
},
"MaximumRetryAttempts": 0,
"StartingPosition": "TRIM_HORIZON"
}
},
"S509448A1": {
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 1,
"RetentionPeriodHours": 24
}
},
"Q63C6E3AB": {
"Type": "AWS::SQS::Queue"
}
},
"Outputs": {
"InputKinesisStreamName": {
"Value": {
"Ref": "S509448A1"
}
},
"DlqSqsQueueUrl": {
"Value": {
"Ref": "Q63C6E3AB"
}
},
"FunctionArn": {
"Value":{
"Fn::GetAtt":["FC4345940","Arn"]
}
}
},
"Parameters": {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import { App, CfnOutput, Stack } from "@aws-cdk/core";
import { KinesisEventSource, SqsDlq } from '../lib';

/*
* Stack verification steps:
* * aws kinesis put-record --stream-name <value of stack output: InputKinesisStreamName> --partition-key 123 --data testdata
* * aws sqs receive-message --queue-url <value of stack output: DlqSqsQueueUrl> --max-number-of-messages 1 --query 'Messages[0].Body'
* The last command should return a string that contains the Lambda function ARN in it.
*/

async function handler(event: any) {
// tslint:disable-next-line:no-console
console.log('event:', JSON.stringify(event, undefined, 2));
throw new Error();
}

class KinesisWithDLQTest extends Stack {
constructor(scope: App, id: string) {
super(scope, id);

const fn = new lambda.Function(this, 'F', {
runtime: lambda.Runtime.NODEJS_10_X,
handler: 'index.handler',
code: lambda.Code.fromInline(`exports.handler = ${handler.toString()}`)
});
new CfnOutput(this, 'FunctionArn', { value: fn.functionArn });

const stream = new kinesis.Stream(this, 'S');
new CfnOutput(this, 'InputKinesisStreamName', { value: stream.streamName });

const dlq = new sqs.Queue(this, 'Q');
new CfnOutput(this, 'DlqSqsQueueUrl', { value: dlq.queueUrl });

fn.addEventSource(new KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
onFailure: new SqsDlq(dlq),
retryAttempts: 0,
}));
}
}

const app = new App();
new KinesisWithDLQTest(app, 'lambda-event-source-kinesis-with-dlq');
app.synth();
Loading

0 comments on commit 5028009

Please sign in to comment.