Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r/aws_lambda_event_source_mapping: Add MSK as an event source #14746

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14746.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `topics` attribute to support Amazon MSK as an event source
```
18 changes: 16 additions & 2 deletions aws/resource_aws_lambda_event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource {
ForceNew: true,
ValidateFunc: validation.IsRFC3339Time,
},
"topics": {
ewbankkit marked this conversation as resolved.
Show resolved Hide resolved
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
ewbankkit marked this conversation as resolved.
Show resolved Hide resolved
},
"batch_size": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -205,6 +212,11 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte
t, _ := time.Parse(time.RFC3339, startingPositionTimestamp.(string))
params.StartingPositionTimestamp = aws.Time(t)
}

if topics, ok := d.GetOk("topics"); ok && topics.(*schema.Set).Len() > 0 {
params.Topics = expandStringSet(topics.(*schema.Set))
}

if parallelizationFactor, ok := d.GetOk("parallelization_factor"); ok {
params.ParallelizationFactor = aws.Int64(int64(parallelizationFactor.(int)))
}
Expand Down Expand Up @@ -253,9 +265,8 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte
return fmt.Errorf("Error creating Lambda event source mapping: %s", err)
}

// No error
d.Set("uuid", eventSourceMappingConfiguration.UUID)
ewbankkit marked this conversation as resolved.
Show resolved Hide resolved
d.SetId(aws.StringValue(eventSourceMappingConfiguration.UUID))

return resourceAwsLambdaEventSourceMappingRead(d, meta)
}

Expand Down Expand Up @@ -298,6 +309,9 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf
if err := d.Set("destination_config", flattenLambdaEventSourceMappingDestinationConfig(eventSourceMappingConfiguration.DestinationConfig)); err != nil {
return fmt.Errorf("error setting destination_config: %s", err)
}
if err := d.Set("topics", flattenStringSet(eventSourceMappingConfiguration.Topics)); err != nil {
return fmt.Errorf("error setting topics: %s", err)
ewbankkit marked this conversation as resolved.
Show resolved Hide resolved
}

state := aws.StringValue(eventSourceMappingConfiguration.State)

Expand Down
153 changes: 153 additions & 0 deletions aws/resource_aws_lambda_event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestAccAWSLambdaEventSourceMapping_kinesis_basic(t *testing.T) {
resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceNameUpdated, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, "arn"),
resource.TestCheckResourceAttr(resourceName, "starting_position", "TRIM_HORIZON"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "0"),
),
},
},
Expand Down Expand Up @@ -621,6 +622,39 @@ func TestAccAWSLambdaEventSourceMapping_KinesisDestinationConfig(t *testing.T) {
})
}

func TestAccAWSLambdaEventSourceMapping_MSK(t *testing.T) {
var v lambda.EventSourceMappingConfiguration
resourceName := "aws_lambda_event_source_mapping.test"
eventSourceResourceName := "aws_msk_cluster.test"
rName := acctest.RandomWithPrefix("tf-acc-test")

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSLambdaEventSourceMappingConfigMsk(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &v),
testAccCheckAWSLambdaEventSourceMappingAttributes(&v),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, "arn"),
testAccCheckResourceAttrRfc3339(resourceName, "last_modified"),
resource.TestCheckNoResourceAttr(resourceName, "starting_position"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"enabled", "starting_position"},
},
},
})
}

func testAccCheckAWSLambdaEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).lambdaconn
Expand Down Expand Up @@ -1527,3 +1561,122 @@ resource "aws_lambda_event_source_mapping" "test" {
}
`, batchWindow))
}

func testAccAWSLambdaEventSourceMappingConfigMsk(rName string) string {
return composeConfig(testAccAvailableAZsNoOptInConfig(), fmt.Sprintf(`
resource "aws_iam_role" "test" {
name = %[1]q

assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}

resource "aws_iam_policy" "test" {
name = %[1]q

policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka:DescribeCluster",
"kafka:GetBootstrapBrokers",
"ec2:CreateNetworkInterface",
"ec2:DeleteNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
EOF
}

resource "aws_iam_policy_attachment" "test" {
name = %[1]q
roles = [aws_iam_role.test.name]
policy_arn = aws_iam_policy.test.arn
}

resource "aws_vpc" "test" {
cidr_block = "192.168.0.0/22"

tags = {
Name = %[1]q
}
}

resource "aws_subnet" "test" {
count = 2

vpc_id = aws_vpc.test.id
cidr_block = cidrsubnet(aws_vpc.test.cidr_block, 2, count.index)
availability_zone = data.aws_availability_zones.available.names[count.index]

tags = {
Name = %[1]q
}
}

resource "aws_security_group" "test" {
name = %[1]q
vpc_id = aws_vpc.test.id

tags = {
Name = %[1]q
}
}

resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.2.1"
number_of_broker_nodes = 2

broker_node_group_info {
client_subnets = aws_subnet.test[*].id
ebs_volume_size = 10
instance_type = "kafka.t3.small"
security_groups = [aws_security_group.test.id]
}
}

resource "aws_lambda_function" "test" {
filename = "test-fixtures/lambdatest.zip"
function_name = %[1]q
role = aws_iam_role.test.arn
handler = "exports.example"
runtime = "nodejs12.x"
}

resource "aws_lambda_event_source_mapping" "test" {
batch_size = 100
event_source_arn = aws_msk_cluster.test.arn
enabled = true
function_name = aws_lambda_function.test.arn
topics = ["test"]

depends_on = [aws_iam_policy_attachment.test]
}
`, rName))
}
18 changes: 15 additions & 3 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ resource "aws_lambda_event_source_mapping" "example" {
}
```

### Managed Streaming for Kafka (MSK)

```hcl
resource "aws_lambda_event_source_mapping" "example" {
event_source_arn = aws_msk_cluster.example.arn
function_name = aws_lambda_function.example.arn
topics = ["Example"]
starting_position = "TRIM_HORIZON"
}
```

### SQS

```hcl
Expand All @@ -46,17 +57,18 @@ resource "aws_lambda_event_source_mapping" "example" {

## Argument Reference

* `batch_size` - (Optional) The largest number of records that Lambda will retrieve from your event source at the time of invocation. Defaults to `100` for DynamoDB and Kinesis, `10` for SQS.
* `batch_size` - (Optional) The largest number of records that Lambda will retrieve from your event source at the time of invocation. Defaults to `100` for DynamoDB, Kinesis and MSK, `10` for SQS.
* `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds (between 0 and 300). Records will continue to buffer (or accumulate in the case of an SQS queue event source) until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. For streaming event sources, defaults to as soon as records are available in the stream. If the batch it reads from the stream/queue only has one record in it, Lambda only sends one record to the function.
* `event_source_arn` - (Required) The event source ARN - can be a Kinesis stream, DynamoDB stream, or SQS queue.
* `event_source_arn` - (Required) The event source ARN - can be a Kinesis stream, DynamoDB stream, SQS queue or MSK cluster.
* `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`.
* `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events.
* `starting_position` - (Optional) The position in the stream where AWS Lambda should start reading. Must be one of `AT_TIMESTAMP` (Kinesis only), `LATEST` or `TRIM_HORIZON` if getting events from Kinesis or DynamoDB. Must not be provided if getting events from SQS. More information about these positions can be found in the [AWS DynamoDB Streams API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html) and [AWS Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).
* `starting_position` - (Optional) The position in the stream where AWS Lambda should start reading. Must be one of `AT_TIMESTAMP` (Kinesis only), `LATEST` or `TRIM_HORIZON` if getting events from Kinesis, DynamoDB or MSK. Must not be provided if getting events from SQS. More information about these positions can be found in the [AWS DynamoDB Streams API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html) and [AWS Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).
* `starting_position_timestamp` - (Optional) A timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of the data record which to start reading when using `starting_position` set to `AT_TIMESTAMP`. If a record with this exact timestamp does not exist, the next later record is chosen. If the timestamp is older than the current trim horizon, the oldest available record is chosen.
* `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10.
* `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum of 0, maximum and default of 10000.
* `maximum_record_age_in_seconds`: - (Optional) The maximum age of a record that Lambda sends to a function for processing. Only available for stream sources (DynamoDB and Kinesis). Minimum of 60, maximum and default of 604800.
* `bisect_batch_on_function_error`: - (Optional) If the function returns an error, split the batch in two and retry. Only available for stream sources (DynamoDB and Kinesis). Defaults to `false`.
* `topics` - (Optional) The name of the Kafka topics. Only available for MSK sources. A single topic name must be specified.
* `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below.

### destination_config Configuration Block
Expand Down