Skip to content

Commit

Permalink
feat(lambda-event-sources): add rootCACertificate to `SelfManagedKa…
Browse files Browse the repository at this point in the history
…fkaEventSource` (#21422)

Co-authored-by: @abks90 <alexander.backes@codecentric.de>
Co-authored-by: @ccoltx <59687842+ccoltx@users.noreply.github.com>

----

### Description
In AWS its possible to configure a self hosted Kafka as an eventsource for a Lambda, described [here](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html#services-smaa-topic-add).

Optional its possible to choose to reference the root certificate (CA) secret in the field encryption (see below screenshot `4.h)`).

However in CDK in the [SelfManagedKafkaEventSourceProps](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda_event_sources.SelfManagedKafkaEventSourceProps.html#secret) of [SelfManagedKafkaEventSource](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda_event_sources.SelfManagedKafkaEventSource.html) its not possible to specify that encryption secret. Its only possible to specify a secret for the also optional authentication field.

This PR add the possibly to specify the encryption secret to reference a root ca for self signed certificates in  [SelfManagedKafkaEventSource](https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_lambda_event_sources.SelfManagedKafkaEventSource.html).

_Screenshot of docs explaining how to set secret for root ca (field encryption)_
![adding encryption kafka trigger](https://user-images.githubusercontent.com/7139697/182471206-0302fc41-1169-492c-ace6-6b66338b07d2.png)
_Screenshot of aws console, where the field encryption is visible while adding a self hosted kafka as lambda eventsource trigger_
![lambda kafka trigger screenshot](https://user-images.githubusercontent.com/7139697/182472746-4e06b432-0d58-48e0-8fb9-f4e762ade002.png)

## Technical Approach

We started by adding a test (extending the `kafka.test.ts`) which allows to pass an optional property `encryption` of type secret.
In case this property is set, we add it to the [sourceAccessConfiguration](https://docs.aws.amazon.com/lambda/latest/dg/API_SourceAccessConfiguration.html) in the [AWS::Lambda::EventSourceMapping](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#aws-resource-lambda-eventsourcemapping-syntax).

As a small bonus we also added the missing static variable for `SERVER_ROOT_CA_CERTIFICATE` in the class `SourceAccessConfigurationType` with the value taken from this [documented list of possible values](https://docs.aws.amazon.com/lambda/latest/dg/API_SourceAccessConfiguration.html)


### All Submissions:

* [x] Have you followed the guidelines in our [Contributing guide?](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md)

### Adding new Unconventional Dependencies:

* [ ] This PR adds new unconventional dependencies following the process described [here](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md/#adding-new-unconventional-dependencies)

### New Features

* [x] Have you added the new feature to an [integration test](https://github.com/aws/aws-cdk/blob/main/INTEGRATION_TESTS.md)? No, as there was no integration test there, but we extended the regular unit tests.
	* [x] Did you use `yarn integ` to deploy the infrastructure and generate the snapshot (i.e. `yarn integ` without `--dry-run`)?

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
WtfJoke authored Aug 5, 2022
1 parent 86e396a commit 82a597a
Show file tree
Hide file tree
Showing 11 changed files with 690 additions and 1 deletion.
4 changes: 4 additions & 0 deletions packages/@aws-cdk/aws-lambda-event-sources/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,17 @@ const topic = 'some-cool-topic';
// The secret that allows access to your self hosted Kafka cluster
declare const secret: Secret;

// (Optional) The secret containing the root CA certificate that your Kafka brokers use for TLS encryption
declare const encryption: Secret;

declare const myFunction: lambda.Function;
myFunction.addEventSource(new SelfManagedKafkaEventSource({
bootstrapServers: bootstrapServers,
topic: topic,
secret: secret,
batchSize: 100, // default
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
encryption: encryption // optional
}));
```

Expand Down
17 changes: 16 additions & 1 deletion packages/@aws-cdk/aws-lambda-event-sources/lib/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { StreamEventSource, BaseStreamEventSourceProps } from './stream';
/**
* Properties for a Kafka event source
*/
export interface KafkaEventSourceProps extends BaseStreamEventSourceProps{
export interface KafkaEventSourceProps extends BaseStreamEventSourceProps {
/**
* The Kafka topic to subscribe to
*/
Expand Down Expand Up @@ -94,6 +94,14 @@ export interface SelfManagedKafkaEventSourceProps extends KafkaEventSourceProps
* @default AuthenticationMethod.SASL_SCRAM_512_AUTH
*/
readonly authenticationMethod?: AuthenticationMethod

/**
* The secret with the root CA certificate used by your Kafka brokers for TLS encryption
* This field is required if your Kafka brokers use certificates signed by a private CA
*
* @default - none
*/
readonly rootCACertificate?: secretsmanager.Secret;
}

/**
Expand Down Expand Up @@ -231,6 +239,13 @@ export class SelfManagedKafkaEventSource extends StreamEventSource {
sourceAccessConfigurations.push({ type: authType, uri: this.innerProps.secret.secretArn });
}

if (this.innerProps.rootCACertificate !== undefined) {
sourceAccessConfigurations.push({
type: lambda.SourceAccessConfigurationType.SERVER_ROOT_CA_CERTIFICATE,
uri: this.innerProps.rootCACertificate.secretArn,
});
}

if (this.innerProps.vpcSubnets !== undefined && this.innerProps.securityGroup !== undefined) {
sourceAccessConfigurations.push({
type: lambda.SourceAccessConfigurationType.VPC_SECURITY_GROUP,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import * as lambda from '@aws-cdk/aws-lambda';
import * as secretsmanager from '@aws-cdk/aws-secretsmanager';
import * as cdk from '@aws-cdk/core';
import * as integ from '@aws-cdk/integ-tests';
import { AuthenticationMethod, SelfManagedKafkaEventSource } from '../lib';
import { TestFunction } from './test-function';

class KafkaSelfManagedEventSourceTest extends cdk.Stack {
constructor(scope: cdk.App, id: string) {
super(scope, id);

const dummyCertString = `-----BEGIN CERTIFICATE-----
MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw
cmUuiAii9R0=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb
c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==
-----END CERTIFICATE-----"
`;

const dummyPrivateKey = `-----BEGIN ENCRYPTED PRIVATE KEY-----
zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==
-----END ENCRYPTED PRIVATE KEY-----`;

const fn = new TestFunction(this, 'F');
const rootCASecret = new secretsmanager.Secret(this, 'S', {
secretObjectValue: {
certificate: cdk.SecretValue.unsafePlainText(dummyCertString),
},
});
const clientCertificatesSecret = new secretsmanager.Secret(this, 'SC', {
secretObjectValue: {
certificate: cdk.SecretValue.unsafePlainText(dummyCertString),
privateKey: cdk.SecretValue.unsafePlainText(dummyPrivateKey),
},
});
rootCASecret.grantRead(fn);
clientCertificatesSecret.grantRead(fn);

const bootstrapServers = [
'my-self-hosted-kafka-broker-1:9092',
'my-self-hosted-kafka-broker-2:9092',
'my-self-hosted-kafka-broker-3:9092',
];

fn.addEventSource(
new SelfManagedKafkaEventSource({
bootstrapServers,
topic: 'my-test-topic',
secret: clientCertificatesSecret,
authenticationMethod: AuthenticationMethod.CLIENT_CERTIFICATE_TLS_AUTH,
rootCACertificate: rootCASecret,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
}),
);
}
}

const app = new cdk.App();
const stack = new KafkaSelfManagedEventSourceTest(
app,
'lambda-event-source-kafka-self-managed',
);
new integ.IntegTest(app, 'LambdaEventSourceKafkaSelfManagedTest', {
testCases: [stack],
});
app.synth();
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":"20.0.0"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"version": "20.0.0",
"testCases": {
"LambdaEventSourceKafkaSelfManagedTest/DefaultTest": {
"stacks": [
"lambda-event-source-kafka-self-managed"
],
"assertionStack": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
{
"Resources": {
"FServiceRole3AC82EE1": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
}
},
"FServiceRoleDefaultPolicy17A19BFA": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"secretsmanager:DescribeSecret",
"secretsmanager:GetSecretValue"
],
"Effect": "Allow",
"Resource": [
{
"Ref": "S509448A1"
},
{
"Ref": "SC0855C491"
}
]
}
],
"Version": "2012-10-17"
},
"PolicyName": "FServiceRoleDefaultPolicy17A19BFA",
"Roles": [
{
"Ref": "FServiceRole3AC82EE1"
}
]
}
},
"FC4345940": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "exports.handler = async function handler(event) {\n console.log('event:', JSON.stringify(event, undefined, 2));\n return { event };\n}"
},
"Role": {
"Fn::GetAtt": [
"FServiceRole3AC82EE1",
"Arn"
]
},
"Handler": "index.handler",
"Runtime": "nodejs14.x"
},
"DependsOn": [
"FServiceRoleDefaultPolicy17A19BFA",
"FServiceRole3AC82EE1"
]
},
"FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"FunctionName": {
"Ref": "FC4345940"
},
"BatchSize": 100,
"SelfManagedEventSource": {
"Endpoints": {
"KafkaBootstrapServers": [
"my-self-hosted-kafka-broker-1:9092",
"my-self-hosted-kafka-broker-2:9092",
"my-self-hosted-kafka-broker-3:9092"
]
}
},
"SourceAccessConfigurations": [
{
"Type": "CLIENT_CERTIFICATE_TLS_AUTH",
"URI": {
"Ref": "SC0855C491"
}
},
{
"Type": "SERVER_ROOT_CA_CERTIFICATE",
"URI": {
"Ref": "S509448A1"
}
}
],
"StartingPosition": "TRIM_HORIZON",
"Topics": [
"my-test-topic"
]
}
},
"S509448A1": {
"Type": "AWS::SecretsManager::Secret",
"Properties": {
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\"}"
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
},
"SC0855C491": {
"Type": "AWS::SecretsManager::Secret",
"Properties": {
"SecretString": "{\"certificate\":\"-----BEGIN CERTIFICATE-----\\nMIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw\\ncmUuiAii9R0=\\n-----END CERTIFICATE-----\\n-----BEGIN CERTIFICATE-----\\nMIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb\\nc8PH3PSoAaRwMMgOSA2ALJvbRz8mpg==\\n-----END CERTIFICATE-----\\\"\\n\",\"privateKey\":\"-----BEGIN ENCRYPTED PRIVATE KEY-----\\nzp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA==\\n-----END ENCRYPTED PRIVATE KEY-----\"}"
},
"UpdateReplacePolicy": "Delete",
"DeletionPolicy": "Delete"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"version": "20.0.0",
"artifacts": {
"Tree": {
"type": "cdk:tree",
"properties": {
"file": "tree.json"
}
},
"lambda-event-source-kafka-self-managed": {
"type": "aws:cloudformation:stack",
"environment": "aws://unknown-account/unknown-region",
"properties": {
"templateFile": "lambda-event-source-kafka-self-managed.template.json",
"validateOnSynth": false
},
"metadata": {
"/lambda-event-source-kafka-self-managed/F/ServiceRole/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FServiceRole3AC82EE1"
}
],
"/lambda-event-source-kafka-self-managed/F/ServiceRole/DefaultPolicy/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FServiceRoleDefaultPolicy17A19BFA"
}
],
"/lambda-event-source-kafka-self-managed/F/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FC4345940"
}
],
"/lambda-event-source-kafka-self-managed/F/KafkaEventSource:838c4d5ff3c99c1a617120adfca83e5b:my-test-topic/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "FKafkaEventSource838c4d5ff3c99c1a617120adfca83e5bmytesttopic1E7A7798"
}
],
"/lambda-event-source-kafka-self-managed/S/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "S509448A1"
}
],
"/lambda-event-source-kafka-self-managed/SC/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "SC0855C491"
}
]
},
"displayName": "lambda-event-source-kafka-self-managed"
},
"LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F": {
"type": "aws:cloudformation:stack",
"environment": "aws://unknown-account/unknown-region",
"properties": {
"templateFile": "LambdaEventSourceKafkaSelfManagedTestDefaultTestDeployAssertAF78BD0F.template.json",
"validateOnSynth": false
},
"displayName": "LambdaEventSourceKafkaSelfManagedTest/DefaultTest/DeployAssert"
}
}
}
Loading

0 comments on commit 82a597a

Please sign in to comment.