Skip to content

Commit

Permalink
resource/aws_lambda_event_source_mapping: Add topics argument (#14746)
Browse files Browse the repository at this point in the history
Output from acceptance testing in AWS Commercial:

```
--- PASS: TestAccAWSLambdaEventSourceMapping_sqs_withFunctionName (55.29s)
--- FAIL: TestAccAWSLambdaEventSourceMapping_sqs_basic (64.70s) # #14765
--- PASS: TestAccAWSLambdaEventSourceMapping_SQSBatchWindow (77.70s)
--- PASS: TestAccAWSLambdaEventSourceMapping_StartingPositionTimestamp (83.78s)
--- PASS: TestAccAWSLambdaEventSourceMapping_MaximumRecordAgeInSeconds (106.53s)
--- PASS: TestAccAWSLambdaEventSourceMapping_changesInEnabledAreDetected (107.34s)
--- PASS: TestAccAWSLambdaEventSourceMapping_BisectBatch (107.85s)
--- PASS: TestAccAWSLambdaEventSourceMapping_KinesisDestinationConfig (108.87s)
--- PASS: TestAccAWSLambdaEventSourceMapping_kinesis_removeBatchSize (109.37s)
--- PASS: TestAccAWSLambdaEventSourceMapping_disappears (112.05s)
--- PASS: TestAccAWSLambdaEventSourceMapping_KinesisBatchWindow (115.26s)
--- PASS: TestAccAWSLambdaEventSourceMapping_MaximumRetryAttempts (116.99s)
--- PASS: TestAccAWSLambdaEventSourceMapping_ParallelizationFactor (117.33s)
--- PASS: TestAccAWSLambdaEventSourceMapping_kinesis_basic (118.04s)
--- PASS: TestAccAWSLambdaEventSourceMapping_MaximumRetryAttemptsZero (120.57s)
--- PASS: TestAccAWSLambdaEventSourceMapping_MSK (1684.38s)
```
  • Loading branch information
ewbankkit authored Feb 11, 2021
1 parent ef17ab4 commit 21520f5
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 5 deletions.
3 changes: 3 additions & 0 deletions .changelog/14746.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `topics` attribute to support Amazon MSK as an event source
```
18 changes: 16 additions & 2 deletions aws/resource_aws_lambda_event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource {
ForceNew: true,
ValidateFunc: validation.IsRFC3339Time,
},
"topics": {
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
},
"batch_size": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -205,6 +212,11 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte
t, _ := time.Parse(time.RFC3339, startingPositionTimestamp.(string))
params.StartingPositionTimestamp = aws.Time(t)
}

if topics, ok := d.GetOk("topics"); ok && topics.(*schema.Set).Len() > 0 {
params.Topics = expandStringSet(topics.(*schema.Set))
}

if parallelizationFactor, ok := d.GetOk("parallelization_factor"); ok {
params.ParallelizationFactor = aws.Int64(int64(parallelizationFactor.(int)))
}
Expand Down Expand Up @@ -253,9 +265,8 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte
return fmt.Errorf("Error creating Lambda event source mapping: %s", err)
}

// No error
d.Set("uuid", eventSourceMappingConfiguration.UUID)
d.SetId(aws.StringValue(eventSourceMappingConfiguration.UUID))

return resourceAwsLambdaEventSourceMappingRead(d, meta)
}

Expand Down Expand Up @@ -298,6 +309,9 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf
if err := d.Set("destination_config", flattenLambdaEventSourceMappingDestinationConfig(eventSourceMappingConfiguration.DestinationConfig)); err != nil {
return fmt.Errorf("error setting destination_config: %s", err)
}
if err := d.Set("topics", flattenStringSet(eventSourceMappingConfiguration.Topics)); err != nil {
return fmt.Errorf("error setting topics: %s", err)
}

state := aws.StringValue(eventSourceMappingConfiguration.State)

Expand Down
153 changes: 153 additions & 0 deletions aws/resource_aws_lambda_event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestAccAWSLambdaEventSourceMapping_kinesis_basic(t *testing.T) {
resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceNameUpdated, "arn"),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, "arn"),
resource.TestCheckResourceAttr(resourceName, "starting_position", "TRIM_HORIZON"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "0"),
),
},
},
Expand Down Expand Up @@ -621,6 +622,39 @@ func TestAccAWSLambdaEventSourceMapping_KinesisDestinationConfig(t *testing.T) {
})
}

func TestAccAWSLambdaEventSourceMapping_MSK(t *testing.T) {
var v lambda.EventSourceMappingConfiguration
resourceName := "aws_lambda_event_source_mapping.test"
eventSourceResourceName := "aws_msk_cluster.test"
rName := acctest.RandomWithPrefix("tf-acc-test")

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSLambdaEventSourceMappingConfigMsk(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &v),
testAccCheckAWSLambdaEventSourceMappingAttributes(&v),
resource.TestCheckResourceAttrPair(resourceName, "event_source_arn", eventSourceResourceName, "arn"),
testAccCheckResourceAttrRfc3339(resourceName, "last_modified"),
resource.TestCheckNoResourceAttr(resourceName, "starting_position"),
resource.TestCheckResourceAttr(resourceName, "topics.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "topics.*", "test"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"enabled", "starting_position"},
},
},
})
}

func testAccCheckAWSLambdaEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).lambdaconn
Expand Down Expand Up @@ -1527,3 +1561,122 @@ resource "aws_lambda_event_source_mapping" "test" {
}
`, batchWindow))
}

func testAccAWSLambdaEventSourceMappingConfigMsk(rName string) string {
return composeConfig(testAccAvailableAZsNoOptInConfig(), fmt.Sprintf(`
resource "aws_iam_role" "test" {
name = %[1]q
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
resource "aws_iam_policy" "test" {
name = %[1]q
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka:DescribeCluster",
"kafka:GetBootstrapBrokers",
"ec2:CreateNetworkInterface",
"ec2:DeleteNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
EOF
}
resource "aws_iam_policy_attachment" "test" {
name = %[1]q
roles = [aws_iam_role.test.name]
policy_arn = aws_iam_policy.test.arn
}
resource "aws_vpc" "test" {
cidr_block = "192.168.0.0/22"
tags = {
Name = %[1]q
}
}
resource "aws_subnet" "test" {
count = 2
vpc_id = aws_vpc.test.id
cidr_block = cidrsubnet(aws_vpc.test.cidr_block, 2, count.index)
availability_zone = data.aws_availability_zones.available.names[count.index]
tags = {
Name = %[1]q
}
}
resource "aws_security_group" "test" {
name = %[1]q
vpc_id = aws_vpc.test.id
tags = {
Name = %[1]q
}
}
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.2.1"
number_of_broker_nodes = 2
broker_node_group_info {
client_subnets = aws_subnet.test[*].id
ebs_volume_size = 10
instance_type = "kafka.t3.small"
security_groups = [aws_security_group.test.id]
}
}
resource "aws_lambda_function" "test" {
filename = "test-fixtures/lambdatest.zip"
function_name = %[1]q
role = aws_iam_role.test.arn
handler = "exports.example"
runtime = "nodejs12.x"
}
resource "aws_lambda_event_source_mapping" "test" {
batch_size = 100
event_source_arn = aws_msk_cluster.test.arn
enabled = true
function_name = aws_lambda_function.test.arn
topics = ["test"]
depends_on = [aws_iam_policy_attachment.test]
}
`, rName))
}
18 changes: 15 additions & 3 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,17 @@ resource "aws_lambda_event_source_mapping" "example" {
}
```

### Managed Streaming for Kafka (MSK)

```hcl
resource "aws_lambda_event_source_mapping" "example" {
event_source_arn = aws_msk_cluster.example.arn
function_name = aws_lambda_function.example.arn
topics = ["Example"]
starting_position = "TRIM_HORIZON"
}
```

### SQS

```hcl
Expand All @@ -46,17 +57,18 @@ resource "aws_lambda_event_source_mapping" "example" {

## Argument Reference

* `batch_size` - (Optional) The largest number of records that Lambda will retrieve from your event source at the time of invocation. Defaults to `100` for DynamoDB and Kinesis, `10` for SQS.
* `batch_size` - (Optional) The largest number of records that Lambda will retrieve from your event source at the time of invocation. Defaults to `100` for DynamoDB, Kinesis and MSK, `10` for SQS.
* `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds (between 0 and 300). Records will continue to buffer (or accumulate in the case of an SQS queue event source) until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. For streaming event sources, defaults to as soon as records are available in the stream. If the batch it reads from the stream/queue only has one record in it, Lambda only sends one record to the function.
* `event_source_arn` - (Required) The event source ARN - can be a Kinesis stream, DynamoDB stream, or SQS queue.
* `event_source_arn` - (Required) The event source ARN - can be a Kinesis stream, DynamoDB stream, SQS queue or MSK cluster.
* `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`.
* `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events.
* `starting_position` - (Optional) The position in the stream where AWS Lambda should start reading. Must be one of `AT_TIMESTAMP` (Kinesis only), `LATEST` or `TRIM_HORIZON` if getting events from Kinesis or DynamoDB. Must not be provided if getting events from SQS. More information about these positions can be found in the [AWS DynamoDB Streams API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html) and [AWS Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).
* `starting_position` - (Optional) The position in the stream where AWS Lambda should start reading. Must be one of `AT_TIMESTAMP` (Kinesis only), `LATEST` or `TRIM_HORIZON` if getting events from Kinesis, DynamoDB or MSK. Must not be provided if getting events from SQS. More information about these positions can be found in the [AWS DynamoDB Streams API Reference](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html) and [AWS Kinesis API Reference](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType).
* `starting_position_timestamp` - (Optional) A timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of the data record which to start reading when using `starting_position` set to `AT_TIMESTAMP`. If a record with this exact timestamp does not exist, the next later record is chosen. If the timestamp is older than the current trim horizon, the oldest available record is chosen.
* `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10.
* `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum of 0, maximum and default of 10000.
* `maximum_record_age_in_seconds`: - (Optional) The maximum age of a record that Lambda sends to a function for processing. Only available for stream sources (DynamoDB and Kinesis). Minimum of 60, maximum and default of 604800.
* `bisect_batch_on_function_error`: - (Optional) If the function returns an error, split the batch in two and retry. Only available for stream sources (DynamoDB and Kinesis). Defaults to `false`.
* `topics` - (Optional) The name of the Kafka topics. Only available for MSK sources. A single topic name must be specified.
* `destination_config`: - (Optional) An Amazon SQS queue or Amazon SNS topic destination for failed records. Only available for stream sources (DynamoDB and Kinesis). Detailed below.

### destination_config Configuration Block
Expand Down

0 comments on commit 21520f5

Please sign in to comment.