From 2f7e1613be503278cfbcd75b422a2a0132dea079 Mon Sep 17 00:00:00 2001 From: mg Date: Wed, 20 Nov 2019 07:47:07 -0600 Subject: [PATCH] resource/aws_lambda_event_source_mapping: Add maximum_batching_window_in_seconds argument (#10051) Output from acceptance testing: ``` --- PASS: TestAccAWSLambdaEventSourceMapping_sqs_withFunctionName (59.92s) --- PASS: TestAccAWSLambdaEventSourceMapping_kinesis_disappears (70.86s) --- PASS: TestAccAWSLambdaEventSourceMapping_StartingPositionTimestamp (72.83s) --- PASS: TestAccAWSLambdaEventSourceMapping_kinesis_removeBatchSize (84.38s) --- PASS: TestAccAWSLambdaEventSourceMapping_kinesis_basic (84.60s) --- PASS: TestAccAWSLambdaEventSourceMapping_BatchWindow (85.49s) --- PASS: TestAccAWSLambdaEventSourceMapping_sqsDisappears (109.06s) --- PASS: TestAccAWSLambdaEventSourceMapping_sqs_basic (122.73s) --- PASS: TestAccAWSLambdaEventSourceMapping_changesInEnabledAreDetected (132.31s) ``` --- ...esource_aws_lambda_event_source_mapping.go | 18 ++++++- ...ce_aws_lambda_event_source_mapping_test.go | 52 +++++++++++++++++++ .../lambda_event_source_mapping.html.markdown | 1 + 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/aws/resource_aws_lambda_event_source_mapping.go b/aws/resource_aws_lambda_event_source_mapping.go index 55ee33d3008..0285455eb06 100644 --- a/aws/resource_aws_lambda_event_source_mapping.go +++ b/aws/resource_aws_lambda_event_source_mapping.go @@ -96,6 +96,10 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource { Optional: true, Default: true, }, + "maximum_batching_window_in_seconds": { + Type: schema.TypeInt, + Optional: true, + }, "function_arn": { Type: schema.TypeString, Computed: true, @@ -144,6 +148,10 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte params.BatchSize = aws.Int64(int64(batchSize.(int))) } + if batchWindow, ok := d.GetOk("maximum_batching_window_in_seconds"); ok { + params.MaximumBatchingWindowInSeconds = aws.Int64(int64(batchWindow.(int))) + } + if startingPosition, ok := d.GetOk("starting_position"); ok { params.StartingPosition = aws.String(startingPosition.(string)) } @@ -210,6 +218,7 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf } d.Set("batch_size", eventSourceMappingConfiguration.BatchSize) + d.Set("maximum_batching_window_in_seconds", eventSourceMappingConfiguration.MaximumBatchingWindowInSeconds) d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn) d.Set("function_arn", eventSourceMappingConfiguration.FunctionArn) d.Set("last_modified", eventSourceMappingConfiguration.LastModified) @@ -278,7 +287,14 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte Enabled: aws.Bool(d.Get("enabled").(bool)), } - err := resource.Retry(5*time.Minute, func() *resource.RetryError { + // AWS API will fail if this parameter is set (even as default value) for sqs event source. Ideally this should be implemented in GO SDK or AWS API itself. + eventSourceArn, err := arn.Parse(d.Get("event_source_arn").(string)) + + if err == nil && eventSourceArn.Service != "sqs" { + params.MaximumBatchingWindowInSeconds = aws.Int64(int64(d.Get("maximum_batching_window_in_seconds").(int))) + } + + err = resource.Retry(5*time.Minute, func() *resource.RetryError { _, err := conn.UpdateEventSourceMapping(params) if err != nil { if isAWSErr(err, lambda.ErrCodeInvalidParameterValueException, "") || diff --git a/aws/resource_aws_lambda_event_source_mapping_test.go b/aws/resource_aws_lambda_event_source_mapping_test.go index 899227cb4bb..03d9bd3055b 100644 --- a/aws/resource_aws_lambda_event_source_mapping_test.go +++ b/aws/resource_aws_lambda_event_source_mapping_test.go @@ -311,6 +311,45 @@ func TestAccAWSLambdaEventSourceMapping_StartingPositionTimestamp(t *testing.T) }) } +func TestAccAWSLambdaEventSourceMapping_BatchWindow(t *testing.T) { + var conf lambda.EventSourceMappingConfiguration + rName := acctest.RandomWithPrefix("tf-acc-test") + resourceName := "aws_lambda_event_source_mapping.test" + batchWindow := int64(5) + batchWindowUpdate := int64(10) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSLambdaEventSourceMappingConfigKinesisBatchWindow(rName, batchWindow), + Check: resource.ComposeTestCheckFunc( + testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "maximum_batching_window_in_seconds", strconv.Itoa(int(batchWindow))), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "starting_position", + "starting_position_timestamp", + }, + }, + { + Config: testAccAWSLambdaEventSourceMappingConfigKinesisBatchWindow(rName, batchWindowUpdate), + Check: resource.ComposeTestCheckFunc( + testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "maximum_batching_window_in_seconds", strconv.Itoa(int(batchWindowUpdate))), + ), + }, + }, + }) +} + func testAccCheckAWSLambdaEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc { return func(s *terraform.State) error { conn := testAccProvider.Meta().(*AWSClient).lambdaconn @@ -554,6 +593,19 @@ resource "aws_lambda_event_source_mapping" "test" { `, startingPositionTimestamp) } +func testAccAWSLambdaEventSourceMappingConfigKinesisBatchWindow(rName string, batchWindow int64) string { + return testAccAWSLambdaEventSourceMappingConfigKinesisBase(rName) + fmt.Sprintf(` +resource "aws_lambda_event_source_mapping" "test" { + batch_size = 100 + maximum_batching_window_in_seconds = %v + enabled = true + event_source_arn = "${aws_kinesis_stream.test.arn}" + function_name = "${aws_lambda_function.test.arn}" + starting_position = "TRIM_HORIZON" +} +`, batchWindow) +} + func testAccAWSLambdaEventSourceMappingConfig_kinesis(roleName, policyName, attName, streamName, funcName, uFuncName string) string { return fmt.Sprintf(` diff --git a/website/docs/r/lambda_event_source_mapping.html.markdown b/website/docs/r/lambda_event_source_mapping.html.markdown index 0577d69b9af..c0d52369f83 100644 --- a/website/docs/r/lambda_event_source_mapping.html.markdown +++ b/website/docs/r/lambda_event_source_mapping.html.markdown @@ -47,6 +47,7 @@ resource "aws_lambda_event_source_mapping" "example" { ## Argument Reference * `batch_size` - (Optional) The largest number of records that Lambda will retrieve from your event source at the time of invocation. Defaults to `100` for DynamoDB and Kinesis, `10` for SQS. +* `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds. Records will continue to buffer until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. Defaults to as soon as records are available in the stream. If the batch it reads from the stream only has one record in it, Lambda only sends one record to the function. * `event_source_arn` - (Required) The event source ARN - can be a Kinesis stream, DynamoDB stream, or SQS queue. * `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`. * `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events.