diff --git a/builtin/providers/aws/resource_aws_kinesis_stream.go b/builtin/providers/aws/resource_aws_kinesis_stream.go index 1d70d847640a..ed731f1657d1 100644 --- a/builtin/providers/aws/resource_aws_kinesis_stream.go +++ b/builtin/providers/aws/resource_aws_kinesis_stream.go @@ -46,6 +46,13 @@ func resourceAwsKinesisStream() *schema.Resource { }, }, + "shard_level_metrics": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + "arn": &schema.Schema{ Type: schema.TypeString, Optional: true, @@ -110,6 +117,9 @@ func resourceAwsKinesisStreamUpdate(d *schema.ResourceData, meta interface{}) er if err := setKinesisRetentionPeriod(conn, d); err != nil { return err } + if err := updateKinesisShardLevelMetrics(conn, d); err != nil { + return err + } return resourceAwsKinesisStreamRead(d, meta) } @@ -134,6 +144,10 @@ func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) erro d.Set("shard_count", state.shardCount) d.Set("retention_period", state.retentionPeriod) + if len(state.shardLevelMetrics) > 0 { + d.Set("shard_level_metrics", state.shardLevelMetrics) + } + // set tags describeTagsOpts := &kinesis.ListTagsForStreamInput{ StreamName: aws.String(sn), @@ -212,30 +226,74 @@ func setKinesisRetentionPeriod(conn *kinesis.Kinesis, d *schema.ResourceData) er } } - stateConf := &resource.StateChangeConf{ - Pending: []string{"UPDATING"}, - Target: []string{"ACTIVE"}, - Refresh: streamStateRefreshFunc(conn, sn), - Timeout: 5 * time.Minute, - Delay: 10 * time.Second, - MinTimeout: 3 * time.Second, + if err := waitForKinesisToBeActive(conn, sn); err != nil { + return err } - _, err := stateConf.WaitForState() - if err != nil { - return fmt.Errorf( - "Error waiting for Kinesis Stream (%s) to become active: %s", - sn, err) + return nil +} + +func updateKinesisShardLevelMetrics(conn *kinesis.Kinesis, d *schema.ResourceData) error { + sn := d.Get("name").(string) + + o, n := d.GetChange("shard_level_metrics") + if o == nil { + o = new(schema.Set) + } + if n == nil { + n = new(schema.Set) + } + + os := o.(*schema.Set) + ns := n.(*schema.Set) + + disableMetrics := os.Difference(ns) + if disableMetrics.Len() != 0 { + metrics := disableMetrics.List() + log.Printf("[DEBUG] Disabling shard level metrics %v for stream %s", metrics, sn) + + props := &kinesis.DisableEnhancedMonitoringInput{ + StreamName: aws.String(sn), + ShardLevelMetrics: expandStringList(metrics), + } + + _, err := conn.DisableEnhancedMonitoring(props) + if err != nil { + return fmt.Errorf("Failure to disable shard level metrics for stream %s: %s", sn, err) + } + if err := waitForKinesisToBeActive(conn, sn); err != nil { + return err + } + } + + enabledMetrics := ns.Difference(os) + if enabledMetrics.Len() != 0 { + metrics := enabledMetrics.List() + log.Printf("[DEBUG] Enabling shard level metrics %v for stream %s", metrics, sn) + + props := &kinesis.EnableEnhancedMonitoringInput{ + StreamName: aws.String(sn), + ShardLevelMetrics: expandStringList(metrics), + } + + _, err := conn.EnableEnhancedMonitoring(props) + if err != nil { + return fmt.Errorf("Failure to enable shard level metrics for stream %s: %s", sn, err) + } + if err := waitForKinesisToBeActive(conn, sn); err != nil { + return err + } } return nil } type kinesisStreamState struct { - arn string - status string - shardCount int - retentionPeriod int64 + arn string + status string + shardCount int + retentionPeriod int64 + shardLevelMetrics []string } func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) { @@ -249,6 +307,7 @@ func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamStat state.status = aws.StringValue(page.StreamDescription.StreamStatus) state.shardCount += len(openShards(page.StreamDescription.Shards)) state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours) + state.shardLevelMetrics = flattenKinesisShardLevelMetrics(page.StreamDescription.EnhancedMonitoring) return !last }) return state, err @@ -271,6 +330,25 @@ func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefr } } +func waitForKinesisToBeActive(conn *kinesis.Kinesis, sn string) error { + stateConf := &resource.StateChangeConf{ + Pending: []string{"UPDATING"}, + Target: []string{"ACTIVE"}, + Refresh: streamStateRefreshFunc(conn, sn), + Timeout: 5 * time.Minute, + Delay: 10 * time.Second, + MinTimeout: 3 * time.Second, + } + + _, err := stateConf.WaitForState() + if err != nil { + return fmt.Errorf( + "Error waiting for Kinesis Stream (%s) to become active: %s", + sn, err) + } + return nil +} + // See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html func openShards(shards []*kinesis.Shard) []*kinesis.Shard { var open []*kinesis.Shard diff --git a/builtin/providers/aws/resource_aws_kinesis_stream_test.go b/builtin/providers/aws/resource_aws_kinesis_stream_test.go index 626f949f681e..974761182984 100644 --- a/builtin/providers/aws/resource_aws_kinesis_stream_test.go +++ b/builtin/providers/aws/resource_aws_kinesis_stream_test.go @@ -116,6 +116,52 @@ func TestAccAWSKinesisStream_retentionPeriod(t *testing.T) { }) } +func TestAccAWSKinesisStream_shardLevelMetrics(t *testing.T) { + var stream kinesis.StreamDescription + + ri := rand.New(rand.NewSource(time.Now().UnixNano())).Int() + config := fmt.Sprintf(testAccKinesisStreamConfig, ri) + allConfig := fmt.Sprintf(testAccKinesisStreamConfigAllShardLevelMetrics, ri) + singleConfig := fmt.Sprintf(testAccKinesisStreamConfigSingleShardLevelMetric, ri) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisStreamDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: config, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + testAccCheckAWSKinesisStreamAttributes(&stream), + resource.TestCheckResourceAttr( + "aws_kinesis_stream.test_stream", "shard_level_metrics.#", ""), + ), + }, + + resource.TestStep{ + Config: allConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + testAccCheckAWSKinesisStreamAttributes(&stream), + resource.TestCheckResourceAttr( + "aws_kinesis_stream.test_stream", "shard_level_metrics.#", "7"), + ), + }, + + resource.TestStep{ + Config: singleConfig, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + testAccCheckAWSKinesisStreamAttributes(&stream), + resource.TestCheckResourceAttr( + "aws_kinesis_stream.test_stream", "shard_level_metrics.#", "1"), + ), + }, + }, + }) +} + func testAccCheckKinesisStreamExists(n string, stream *kinesis.StreamDescription) resource.TestCheckFunc { return func(s *terraform.State) error { rs, ok := s.RootModule().Resources[n] @@ -227,3 +273,35 @@ resource "aws_kinesis_stream" "test_stream" { } } ` + +var testAccKinesisStreamConfigAllShardLevelMetrics = ` +resource "aws_kinesis_stream" "test_stream" { + name = "terraform-kinesis-test-%d" + shard_count = 2 + tags { + Name = "tf-test" + } + shard_level_metrics = [ + "IncomingBytes", + "IncomingRecords", + "OutgoingBytes", + "OutgoingRecords", + "WriteProvisionedThroughputExceeded", + "ReadProvisionedThroughputExceeded", + "IteratorAgeMilliseconds" + ] +} +` + +var testAccKinesisStreamConfigSingleShardLevelMetric = ` +resource "aws_kinesis_stream" "test_stream" { + name = "terraform-kinesis-test-%d" + shard_count = 2 + tags { + Name = "tf-test" + } + shard_level_metrics = [ + "IncomingBytes" + ] +} +` diff --git a/builtin/providers/aws/structure.go b/builtin/providers/aws/structure.go index 41464a5d3762..cdedb27ca062 100644 --- a/builtin/providers/aws/structure.go +++ b/builtin/providers/aws/structure.go @@ -21,6 +21,7 @@ import ( "github.com/aws/aws-sdk-go/service/elasticbeanstalk" elasticsearch "github.com/aws/aws-sdk-go/service/elasticsearchservice" "github.com/aws/aws-sdk-go/service/elb" + "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/lambda" "github.com/aws/aws-sdk-go/service/rds" "github.com/aws/aws-sdk-go/service/redshift" @@ -1005,6 +1006,17 @@ func flattenAsgEnabledMetrics(list []*autoscaling.EnabledMetric) []string { return strs } +func flattenKinesisShardLevelMetrics(list []*kinesis.EnhancedMetrics) []string { + if len(list) == 0 { + return []string{} + } + strs := make([]string, 0, len(list[0].ShardLevelMetrics)) + for _, s := range list[0].ShardLevelMetrics { + strs = append(strs, *s) + } + return strs +} + func flattenApiGatewayStageKeys(keys []*string) []map[string]interface{} { stageKeys := make([]map[string]interface{}, 0, len(keys)) for _, o := range keys { diff --git a/builtin/providers/aws/structure_test.go b/builtin/providers/aws/structure_test.go index 0ac0a73dcfae..937411af1651 100644 --- a/builtin/providers/aws/structure_test.go +++ b/builtin/providers/aws/structure_test.go @@ -11,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/elasticache" "github.com/aws/aws-sdk-go/service/elb" + "github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/rds" "github.com/aws/aws-sdk-go/service/redshift" "github.com/aws/aws-sdk-go/service/route53" @@ -839,6 +840,27 @@ func TestFlattenAsgEnabledMetrics(t *testing.T) { } } +func TestFlattenKinesisShardLevelMetrics(t *testing.T) { + expanded := []*kinesis.EnhancedMetrics{ + &kinesis.EnhancedMetrics{ + ShardLevelMetrics: []*string{ + aws.String("IncomingBytes"), + aws.String("IncomingRecords"), + }, + }, + } + result := flattenKinesisShardLevelMetrics(expanded) + if len(result) != 2 { + t.Fatalf("expected result had %d elements, but got %d", 2, len(result)) + } + if result[0] != "IncomingBytes" { + t.Fatalf("expected element 0 to be IncomingBytes, but was %s", result[0]) + } + if result[1] != "IncomingRecords" { + t.Fatalf("expected element 0 to be IncomingRecords, but was %s", result[1]) + } +} + func TestFlattenSecurityGroups(t *testing.T) { cases := []struct { ownerId *string diff --git a/website/source/docs/providers/aws/r/kinesis_stream.html.markdown b/website/source/docs/providers/aws/r/kinesis_stream.html.markdown index 90220bffb84c..1ae13bfcfb2d 100644 --- a/website/source/docs/providers/aws/r/kinesis_stream.html.markdown +++ b/website/source/docs/providers/aws/r/kinesis_stream.html.markdown @@ -20,6 +20,10 @@ resource "aws_kinesis_stream" "test_stream" { name = "terraform-kinesis-test" shard_count = 1 retention_period = 48 + shard_level_metrics = [ + "IncomingBytes", + "OutgoingBytes" + ] tags { Environment = "test" } @@ -36,6 +40,7 @@ AWS account and region the Stream is created in. Amazon has guidlines for specifying the Stream size that should be referenced when creating a Kinesis stream. See [Amazon Kinesis Streams][2] for more. * `retention_period` - (Optional) Length of time data records are accessible after they are added to the stream. The maximum value of a stream's retention period is 168 hours. Minimum value is 24. Default is 24. +* `shard_level_metrics` - (Optional) A list of shard-level CloudWatch metrics which can be enabled for the stream. See [Monitoring with CloudWatch][3] for more. Note that the value ALL should not be used; instead you should provide an explicit list of metrics you wish to enable. * `tags` - (Optional) A mapping of tags to assign to the resource. ## Attributes Reference @@ -48,3 +53,4 @@ when creating a Kinesis stream. See [Amazon Kinesis Streams][2] for more. [1]: https://aws.amazon.com/documentation/kinesis/ [2]: https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html +[3]: https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html