Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lambda): Support S3 as onFailure destinations on MSK and SelfManagedKafka events #28010

Merged
merged 15 commits into from
Nov 15, 2023

Conversation

otaviomacedo
Copy link
Contributor

Summary

This PR will include following features

  • add destination(currently sns/sqs) for Kafka event sources (SelfManagedKafkaEventSource andManagedKafkaEventSource)
  • add a dedicated S3 destination to the kafka event sources (non-kafka event source doesn't support S3 as destination)

Backgrounds

Lambda Event Source Mapping (ESM) processes events from event sources in a sequence manner. However, a potential issue with this approach is that if a record is deemed a “poison pill”, it will be retried indefinitely until it is processed successfully or until the record expires. This can cause delays in processing others records in the queue. Additionally, for events that exceed the Lambda payload limit of 6 MB, they might be dropped as they cannot be processed.

Today, Lambda supports configuring an OnSuccess destination and OnFailure destination for asynchronous invocations. For stream-based event sources, such as Kinesis, and DynamoDB streams, Lambda supports configuring an OnFailure destination. Regarding SQS, SQS poller doesn’t support OnFailure destination, but SQS support Dead Letter Queue (DLQ) natively.

For CDK, Some event source mappings (events) can have onFailure destination through a DestinationConfig. Right now that supports only DynamoDB and Kinesis event sources and only SQS and SNS destinations.

Solution

  • Add a new s3onFailureDestination destination in s3-ofd.ts file
  • Add a new field onFailure to KafkaEventSourceProps that customer can use s3onFailureDestination to pass in.
  • Add a check in enrichMappingOptions for every event type against EventSourceMappingOptions.supportS3OFD to check if they support S3 as onFailure.

User Experience

import { Secret } from 'aws-cdk-lib/aws-secretsmanager';
+ import { ManagedKafkaEventSource, S3OnFailureDestination } from 'aws-cdk-lib/aws-lambda-event-sources';
import * as lambda from 'aws-cdk-lib/aws-lambda'
import { App, StackProps, Stack }  from 'aws-cdk-lib';
+ import { Bucket } from 'aws-cdk-lib/aws-s3';

export class CdkTestStack extends Stack {
  constructor(scope: App, id: string, props?: StackProps) {
    super(scope, id, props);

    const myFunction = new lambda.Function(this, 'myFunction', {
      runtime: lambda.Runtime.NODEJS_16_X,
      handler: 'index.handler',
      code: lambda.Code.fromInline('//handler_code_here'),
    });
    // Your MSK cluster arn
    const clusterArn = 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4';

    // The Kafka topic you want to subscribe to
    const topic = 'some-cool-topic';

    // The secret that allows access to your MSK cluster
    // You still have to make sure that it is associated with your cluster as described in the documentation
    const secret = new Secret(this, 'Secret', { secretName: 'AmazonMSK_KafkaSecret' });
    // Your bucket for Kafka's onFailure Destination
+    const bucket = Bucket.fromBucketName(this, 'BucketByName', 'my-bucket');
+    const s3ofd = new S3OnFailureDestination(bucket);
    myFunction.addEventSource(new ManagedKafkaEventSource({
      clusterArn,
      topic: topic,
      secret: secret,
      batchSize: 100, // default
      startingPosition: lambda.StartingPosition.TRIM_HORIZON,
+      onFailure: s3ofd,
    }));
  }
}

sample synth output

  myFunctionServiceRoleDefaultPolicyECBA61F7:
    Type: AWS::IAM::Policy
    Properties:
      PolicyDocument:
        Statement:
          - Action:
              - s3:Abort*
              - s3:DeleteObject*
              - s3:GetBucket*
              - s3:GetObject*
              - s3:List*
              - s3:PutObject
              - s3:PutObjectLegalHold
              - s3:PutObjectRetention
              - s3:PutObjectTagging
              - s3:PutObjectVersionTagging
            Effect: Allow
            Resource:
              - Fn::Join:
                  - ""
                  - - "arn:"
                    - Ref: AWS::Partition
                    - :s3:::my-bucket
              - Fn::Join:
                  - ""
                  - - "arn:"
                    - Ref: AWS::Partition
                    - :s3:::my-bucket/*
  myFunctionKafkaEventSourceCdkTestStackmyFunction09D44E80somecooltopic1B4580FF:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 100
      DestinationConfig:
        OnFailure:
          Destination:
            Fn::Join:
              - ""
              - - "arn:"
                - Ref: AWS::Partition
                - :s3:::my-bucket

To discuss

Change parent class name

The parent class of s3ofd is called IEventSourceDlq. DLQ stands for Dead Letter Queue, Which is very specific for SQS, we might want to work out a new naming for S3

  • Option 1:
    Rename the IEventSourceDlq to IEventSourceOfd (Ofd: OnFailureDestinaion)And create IEventSourceDlq again to extends from it for keeping backwards compatibility.
    For S3, Create a new class IEventSourceS3ofd. also extend from the parent class and will be implemented by our new S3 Destination class.

  • Option 2:
    Don’t create new parent classes, the new S3 Destination class just extend the original IEventSourceDlq Class. Like what is shown in the current code.

Where to add checking for s3 on failure desintation support

Currently in the commit, I was checking for s3ofd support in enrichMappingOptions Which will be called by all event source classes. Does this design make sense, or should we create a new dedicated function for chcecking?

Questions:

  • Check out the current UX

Closes #.


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license

@aws-cdk-automation aws-cdk-automation requested a review from a team November 15, 2023 15:27
@github-actions github-actions bot added the p2 label Nov 15, 2023
@mergify mergify bot added the contribution/core This is a PR that came from AWS. label Nov 15, 2023
Copy link
Collaborator

@aws-cdk-automation aws-cdk-automation left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pull request linter has failed. See the aws-cdk-automation comment below for failure reasons. If you believe this pull request should receive an exemption, please comment and provide a justification.

A comment requesting an exemption should contain the text Exemption Request. Additionally, if clarification is needed add Clarification Request to a comment.

@aws-cdk-automation aws-cdk-automation dismissed their stale review November 15, 2023 15:46

✅ Updated pull request passes all PRLinter validations. Dismissing previous PRLinter review.

Copy link
Contributor

mergify bot commented Nov 15, 2023

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

@aws-cdk-automation
Copy link
Collaborator

AWS CodeBuild CI Report

  • CodeBuild project: AutoBuildv2Project1C6BFA3F-wQm2hXv2jqQv
  • Commit ID: 4cdd909
  • Result: SUCCEEDED
  • Build Logs (available for 30 days)

Powered by github-codebuild-logs, available on the AWS Serverless Application Repository

@mergify mergify bot merged commit e789adc into main Nov 15, 2023
12 checks passed
@mergify mergify bot deleted the otaviom/lambda-msk-ofd branch November 15, 2023 18:50
Copy link
Contributor

mergify bot commented Nov 15, 2023

Thank you for contributing! Your pull request will be updated from main and then merged automatically (do not update manually, and be sure to allow changes to be pushed to your fork).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
contribution/core This is a PR that came from AWS. p2
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants