Skip to content

Commit 8ef4a47

Browse files
authored
Merge pull request #1674 from devops-arch-cloud/purohmid-terraform-poison-kill-serverless
new pattern - Kinesis Poison Pill Pattern
2 parents 18bb5c6 + 3247063 commit 8ef4a47

22 files changed

+575
-0
lines changed
+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# Kinesis Poison Pill Pattern
2+
3+
This pattern demonstrates how to handle a Lambda consumer failure when reading from a Kinesis Data Stream with Terraform
4+
5+
Without proper handling of failure when working with Kinesis Data Streams, an iterator will get stuck and the only way for the data to clear the stream is for it to **Age Out** beyond the trim horizon.
6+
7+
This will ultimately create wasteful invocations of a Lambda, wasted CPU cycles in a container and worst of all the downstream consumers will not get the data they need.
8+
9+
Learn more about this pattern at Serverless Land Patterns: << Add the live URL here >>
10+
11+
**Important**: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
12+
13+
## Requirements
14+
15+
- [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
16+
- [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
17+
- [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
18+
- [Node and NPM](https://nodejs.org/en/download/) installed
19+
- [The Go Programming Langage](https://go.dev/doc/install) must be installed in order to build the Lambda
20+
- [Terraform](https://www.terraform.io) must be installed in order to provision infrastructure
21+
22+
## Deployment Instructions
23+
24+
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
25+
```
26+
git clone https://github.com/aws-samples/serverless-patterns
27+
```
28+
2. Change directory to the lambda module directory:
29+
```
30+
cd terraform-kinesis-poison-pill/modules/lambda
31+
```
32+
3. Install the project dependencies
33+
```
34+
npm install
35+
```
36+
4. Build lambda using make command
37+
```
38+
make
39+
```
40+
5. Change directory to the pattern directory:
41+
```
42+
cd ../..
43+
44+
6. Deploy the stack to your default AWS account and region using terraform
45+
```
46+
terraform init
47+
terraform plan
48+
terraform apply
49+
```
50+
51+
## How it works
52+
53+
This pattern is designed to highlight how to handle failure when processing records in a Lambda. The `Kinesis Event Source` turns a Lambda into a Data Consumer which will process a JSON event that looks like this in Go
54+
55+
```golang
56+
type SampleEvent struct {
57+
FieldA string `json:"fieldA"`
58+
FieldB string `json:"fieldB"`
59+
FieldC int `json:"fieldC"`
60+
}
61+
```
62+
63+
And like this in JSON
64+
65+
```javascript
66+
// defintion
67+
{
68+
fieldA: string;
69+
fieldB: string;
70+
fieldC: number
71+
}
72+
73+
// example
74+
{
75+
"fieldA": "ABC",
76+
"fieldB": "EFG",
77+
"fieldC": 123
78+
}
79+
```
80+
81+
Once the pattern is deployed to AWS, you will have the following resources created with the described capabilities
82+
83+
- Kinesis Data Stream where the data will be "sourced"
84+
- Lambda Consumer written in Golang that will read from Kinesis in an attempt to process the records
85+
- SQS Queue where failed Kinesis records will be put
86+
87+
## Testing
88+
89+
90+
### Testing Success
91+
92+
93+
```bash
94+
aws kinesis put-record \
95+
--stream-name kinesis-stream \
96+
--data 'eyJmaWVsZEEiOiAiQUJDIiwgImZpZWxkQiI6ICJFRkciLCAiZmllbGRDIjogMTIzIH0K' \
97+
--partition-key key --region us-west-1
98+
```
99+
100+
This command will publish a JSON payload that will successfully be Unmarshalled in the Lambda.
101+
102+
### Testing Failure
103+
104+
```bash
105+
aws kinesis put-record \
106+
--stream-name kinesis-stream \
107+
--data 'eyJmaWVsZEEiOiAiQUJDIiwgImZpZWxkQiI6ICJFRkciLCAiZmllbGRDIjogIjEyMyIgfQo=' \
108+
--partition-key key --region us-west-1
109+
```
110+
111+
This command will publish a JSON payload that will not be Unmarshalled successfully in the Lambda. `FieldC` will be a `string` instead of a `number`
112+
113+
Based upon the definition in the terraform code, this will be attempted 5 times and will be bisected out of batch should more records be in the read.
114+
115+
### Inspecting in the AWS Console
116+
117+
- Cloudwatch - for Logs
118+
119+
Go to Cloudwatch log group /aws/lambda/kinesis_stream
120+
You should see two kind of log event for success and failure
121+
For Success event shows as
122+
```
123+
level=info msg="All good, see you later
124+
```
125+
For failure event shows as
126+
```
127+
json: cannot unmarshal string into Go struct field SampleEvent.fieldC of type int: UnmarshalTypeError
128+
```
129+
130+
- SQS - for the failed message
131+
- Kinesis - DataViewer looking at the TRIM_HORIZON
132+
133+
## Cleanup
134+
135+
1. Delete the stack
136+
```bash
137+
terraform destroy
138+
```
139+
140+
## Documentation
141+
142+
- [AWS Using Lambda with Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html)
143+
- [AWS Lambda Failure-Handling Features](https://www.amazonaws.cn/en/new/2019/aws-lambda-supports-failure-handling-features-for-kinesis-and-dynamodb-event-sources/)
144+
145+
---
146+
147+
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
148+
149+
SPDX-License-Identifier: MIT-0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
{
2+
"title": "Kinesis Poison Pill Pattern",
3+
"description": "Lambda consumer failure when reading from a Kinesis Data Stream with Terraform.",
4+
"language": "",
5+
"level": "200",
6+
"framework": "Terraform",
7+
"introBox": {
8+
"headline": "How it works",
9+
"text": [
10+
"This pattern demonstrates how to handle a Lambda consumer failure when reading from a Kinesis Data Stream with terraform."
11+
]
12+
},
13+
"gitHub": {
14+
"template": {
15+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/terraform-kinesis-poison-pill",
16+
"templateURL": "serverless-patterns/terraform-kinesis-poison-pill",
17+
"projectFolder": "terraform-kinesis-poison-pill",
18+
"templateFile": "terraform-kinesis-poison-pill/main.tf"
19+
}
20+
},
21+
"resources": {
22+
"bullets": [{
23+
"text": "AWS Using Lambda with Kinesis",
24+
"link": "https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html"
25+
}
26+
]
27+
},
28+
"deploy": {
29+
"text": [
30+
"terraform init",
31+
"terraform plan",
32+
"terraform apply"
33+
]
34+
},
35+
"testing": {
36+
"text": [
37+
"See the README in the GitHub repo for detailed testing instructions."
38+
]
39+
},
40+
"cleanup": {
41+
"text": [
42+
"terraform destroy",
43+
"terraform show"
44+
]
45+
},
46+
"authors": [{
47+
"name": "Mitesh Purohit",
48+
"image": "",
49+
"bio": "Sr Solution Architect, AWS",
50+
"linkedin": "https://www.linkedin.com/in/mitup/"
51+
},
52+
{
53+
"name": "Naresh Rajaram",
54+
"image": "",
55+
"bio": "Cloud Infrastructure Architect, AWS",
56+
"linkedin": "https://www.linkedin.com/in/naresh-rajaram-25bb106/"
57+
}
58+
]
59+
}

terraform-kinesis-poison-pill/main.tf

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# This is required to get the AWS region via ${data.aws_region.current}.
2+
#data "aws_region" "current" {
3+
#}
4+
5+
provider "aws" {
6+
region = var.region
7+
8+
}
9+
10+
# --- main.tf ---
11+
12+
# Create Amazon S3 Destination Bucket
13+
module "kinesis-stream" {
14+
source = "./modules/kinesis-stream"
15+
}
16+
17+
module "sqs_queue" {
18+
source = "./modules/sqs"
19+
}
20+
21+
# Create Data Transformation Lambda
22+
module "kinesis_stream_lambda" {
23+
source = "./modules/lambda"
24+
kinesis_stream_arn = module.kinesis-stream.kinesis_stream_arn
25+
sqs_arn = module.sqs_queue.sqs_arn
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
//Create kinesis stream
2+
3+
resource "aws_kinesis_stream" "kinesis-stream" {
4+
name = "kinesis-stream"
5+
shard_count = var.shard_count
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
//Output Kinesis Stream ARN
2+
output "kinesis_stream_arn" {
3+
value = aws_kinesis_stream.kinesis-stream.arn
4+
description = "kinesis_stream_arn"
5+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// shard count
2+
variable "shard_count" {
3+
default = 1
4+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
kinesis_stream: main.go
2+
export GOPROXY=direct
3+
GOOS=linux GOARCH=amd64 go build -o $(CURDIR)/bin/kinesis_stream
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module kinesis-sample
2+
3+
go 1.19
4+
5+
require github.com/aws/aws-lambda-go v1.36.0
6+
7+
require (
8+
github.com/sirupsen/logrus v1.9.0 // indirect
9+
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
10+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
github.com/aws/aws-lambda-go v1.36.0 h1:NWBWBJgavrQOjF1uKDG5D7Qs5y5o75HcrjfA16Hwfak=
2+
github.com/aws/aws-lambda-go v1.36.0/go.mod h1:jwFe2KmMsHmffA1X2R09hH6lFzJQxzI8qK17ewzbQMM=
3+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
5+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
7+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
8+
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
9+
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
10+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
11+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
12+
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
13+
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
14+
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
15+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
16+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
17+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"Version": "2012-10-17",
3+
"Statement": [
4+
{
5+
"Action": [
6+
"logs:CreateLogGroup",
7+
"logs:CreateLogStream",
8+
"logs:PutLogEvents",
9+
"kinesis:*",
10+
"sqs:*"
11+
],
12+
"Effect": "Allow",
13+
"Resource": ["*"]
14+
},
15+
{
16+
"Action": [
17+
"lambda:InvokeFunction",
18+
"lambda:GetFunctionConfiguration"
19+
],
20+
"Effect": "Allow",
21+
"Resource": ["${kinesis_strem_lambda_arn}"]
22+
}
23+
]
24+
}
25+
26+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"Version": "2012-10-17",
3+
"Statement": [
4+
{
5+
"Effect": "Allow",
6+
"Principal": {
7+
"Service": "lambda.amazonaws.com"
8+
},
9+
"Action": "sts:AssumeRole"
10+
}
11+
]
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/aws/aws-lambda-go/events"
8+
log "github.com/sirupsen/logrus"
9+
10+
"github.com/aws/aws-lambda-go/lambda"
11+
)
12+
13+
func handleRequest(ctx context.Context, e events.KinesisEvent) error {
14+
// Kinesis gives a batch of records, let's loop through them
15+
for _, r := range e.Records {
16+
event := &SampleEvent{}
17+
err := json.Unmarshal(r.Kinesis.Data, event)
18+
19+
// if unmarshal fails, we can't do much so let's return an error
20+
// the batch fails but due to `bisectBatchOnError` the batch
21+
// will get split up until the failing item is pulled out
22+
if err != nil {
23+
log.WithFields(log.Fields{
24+
"err": err,
25+
}).Error("error encountered")
26+
27+
return err
28+
}
29+
}
30+
31+
log.Info("All good, see you later")
32+
33+
return nil
34+
}
35+
36+
func main() {
37+
lambda.Start(handleRequest)
38+
}

0 commit comments

Comments
 (0)