From 5f8208b985cc483c24549f607a4be6862b413df6 Mon Sep 17 00:00:00 2001 From: Josh Myers Date: Thu, 3 Aug 2017 23:34:55 +0100 Subject: [PATCH 1/2] Kinesis stream create backoff See: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html Similar to DynamoDB, only up to 5 kinesis streams are able to be in the create state simultaneously otherwise a LimitExceededException is thrown as can be seen below: ``` * aws_kinesis_stream.test_stream.6: [WARN] Error creating Kinesis Stream: "This request would have exceed the limit on the number of streams that can simultaneously be in CREATING or DELETING for the account XXXX. Limit: 5.", code: "LimitExceededException" * aws_kinesis_stream.test_stream[1]: 1 error(s) occurred ``` We create about 20 streams per environment (read: single Terraform run) and had been seeing these errors frequently. After speaking with AWS support, this is a hard limit (5) and they have no way of tuning per account. --- aws/resource_aws_kinesis_stream.go | 15 +++++--- aws/resource_aws_kinesis_stream_test.go | 51 +++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/aws/resource_aws_kinesis_stream.go b/aws/resource_aws_kinesis_stream.go index 4bf9a2b44a7..6acee0acd0b 100644 --- a/aws/resource_aws_kinesis_stream.go +++ b/aws/resource_aws_kinesis_stream.go @@ -106,14 +106,19 @@ func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) er StreamName: aws.String(sn), } - _, err := conn.CreateStream(createOpts) - if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - return fmt.Errorf("[WARN] Error creating Kinesis Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code()) + err := resource.Retry(5*time.Minute, func() *resource.RetryError { + _, err := conn.CreateStream(createOpts) + if isAWSErr(err, "LimitExceededException", "simultaneously be in CREATING or DELETING") { + return resource.RetryableError(err) } - return err + return resource.NonRetryableError(err) + }) + + if err != nil { + return fmt.Errorf("Unable to create stream: %s", err) } + // No error, wait for ACTIVE state stateConf := &resource.StateChangeConf{ Pending: []string{"CREATING"}, Target: []string{"ACTIVE"}, diff --git a/aws/resource_aws_kinesis_stream_test.go b/aws/resource_aws_kinesis_stream_test.go index 66b86879416..4865f291e9d 100644 --- a/aws/resource_aws_kinesis_stream_test.go +++ b/aws/resource_aws_kinesis_stream_test.go @@ -36,6 +36,45 @@ func TestAccAWSKinesisStream_basic(t *testing.T) { }) } +func TestAccAWSKinesisStream_createMultipleConcurrentStreams(t *testing.T) { + var stream kinesis.StreamDescription + + rInt := acctest.RandInt() + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisStreamDestroy, + Steps: []resource.TestStep{ + { + Config: testAccKinesisStreamConfigConcurrent(rInt), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.0", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.1", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.2", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.3", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.4", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.5", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.6", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.7", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.8", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.9", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.10", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.11", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.12", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.13", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.14", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.15", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.16", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.17", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.18", &stream), + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream.19", &stream), + ), + }, + }, + }) +} + func TestAccAWSKinesisStream_encryptionWithoutKmsKeyThrowsError(t *testing.T) { rInt := acctest.RandInt() @@ -324,6 +363,18 @@ resource "aws_kinesis_stream" "test_stream" { }`, rInt) } +func testAccKinesisStreamConfigConcurrent(rInt int) string { + return fmt.Sprintf(` +resource "aws_kinesis_stream" "test_stream" { + count = 20 + name = "terraform-kinesis-test-%d-${count.index}" + shard_count = 2 + tags { + Name = "tf-test" + } +}`, rInt) +} + func testAccKinesisStreamConfigWithEncryptionAndNoKmsKey(rInt int) string { return fmt.Sprintf(` resource "aws_kinesis_stream" "test_stream" { From 7e692d352eb98480926f4640e7765406b2ae02c4 Mon Sep 17 00:00:00 2001 From: Josh Myers Date: Mon, 9 Oct 2017 10:46:07 +0100 Subject: [PATCH 2/2] Retry Kinesis Rate exceeded MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I have spotted tests failing intermittently with the below error. Generally AWS API calls that get throttled, i.e. “Rate exceeded” throw a ThrottleException and are retried. It seems however that Kinesis has some discrepancies with other services in terms of what error is raised. Looks suspiciously related to: https://github.com/aws/aws-sdk-go/issues/1376 ``` * aws_kinesis_stream.test_stream.13: Unable to create stream: LimitExceededException: Rate exceeded for stream terraform-kinesis-test-3218280527529094251-13 under account 673337093959. status code: 400, request id: cf693bc0-a1a7-3610-9d3c-0dec79efd2d3 * aws_kinesis_stream.test_stream[4]: 1 error(s) occurred: ``` --- aws/resource_aws_kinesis_stream.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/aws/resource_aws_kinesis_stream.go b/aws/resource_aws_kinesis_stream.go index 6acee0acd0b..49ffaa4dcf5 100644 --- a/aws/resource_aws_kinesis_stream.go +++ b/aws/resource_aws_kinesis_stream.go @@ -111,6 +111,11 @@ func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) er if isAWSErr(err, "LimitExceededException", "simultaneously be in CREATING or DELETING") { return resource.RetryableError(err) } + // AWS (un)helpfully raises LimitExceededException + // rather than ThrottlingException here + if isAWSErr(err, "LimitExceededException", "Rate exceeded for stream") { + return resource.RetryableError(err) + } return resource.NonRetryableError(err) })