diff --git a/builtin/providers/aws/data_source_aws_kinesis_stream.go b/builtin/providers/aws/data_source_aws_kinesis_stream.go new file mode 100644 index 000000000000..ebc843d11c8a --- /dev/null +++ b/builtin/providers/aws/data_source_aws_kinesis_stream.go @@ -0,0 +1,95 @@ +package aws + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform/helper/schema" +) + +func dataSourceAwsKinesisStream() *schema.Resource { + return &schema.Resource{ + Read: dataSourceAwsKinesisStreamRead, + + Schema: map[string]*schema.Schema{ + "name": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + + "arn": &schema.Schema{ + Type: schema.TypeString, + Computed: true, + }, + + "creation_timestamp": &schema.Schema{ + Type: schema.TypeInt, + Computed: true, + }, + + "status": &schema.Schema{ + Type: schema.TypeString, + Computed: true, + }, + + "retention_period": &schema.Schema{ + Type: schema.TypeInt, + Computed: true, + }, + + "open_shards": &schema.Schema{ + Type: schema.TypeSet, + Computed: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + + "closed_shards": &schema.Schema{ + Type: schema.TypeSet, + Computed: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + + "shard_level_metrics": &schema.Schema{ + Type: schema.TypeSet, + Computed: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + + "tags": &schema.Schema{ + Type: schema.TypeMap, + Computed: true, + }, + }, + } +} + +func dataSourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + sn := d.Get("name").(string) + + state, err := readKinesisStreamState(conn, sn) + if err != nil { + return err + } + d.SetId(state.arn) + d.Set("arn", state.arn) + d.Set("name", sn) + d.Set("open_shards", state.openShards) + d.Set("closed_shards", state.closedShards) + d.Set("status", state.status) + d.Set("creation_timestamp", state.creationTimestamp) + d.Set("retention_period", state.retentionPeriod) + d.Set("shard_level_metrics", state.shardLevelMetrics) + + tags, err := conn.ListTagsForStream(&kinesis.ListTagsForStreamInput{ + StreamName: aws.String(sn), + }) + if err != nil { + return err + } + d.Set("tags", tagsToMapKinesis(tags.Tags)) + + return nil +} diff --git a/builtin/providers/aws/data_source_aws_kinesis_stream_test.go b/builtin/providers/aws/data_source_aws_kinesis_stream_test.go new file mode 100644 index 000000000000..815724ae8937 --- /dev/null +++ b/builtin/providers/aws/data_source_aws_kinesis_stream_test.go @@ -0,0 +1,94 @@ +package aws + +import ( + "fmt" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform/helper/acctest" + "github.com/hashicorp/terraform/helper/resource" +) + +func TestAccAWSKinesisStreamDataSource(t *testing.T) { + var stream kinesis.StreamDescription + + sn := fmt.Sprintf("terraform-kinesis-test-%d", acctest.RandInt()) + config := fmt.Sprintf(testAccCheckAwsKinesisStreamDataSourceConfig, sn) + + updateShardCount := func() { + conn := testAccProvider.Meta().(*AWSClient).kinesisconn + _, err := conn.UpdateShardCount(&kinesis.UpdateShardCountInput{ + ScalingType: aws.String(kinesis.ScalingTypeUniformScaling), + StreamName: aws.String(sn), + TargetShardCount: aws.Int64(3), + }) + if err != nil { + t.Fatalf("Error calling UpdateShardCount: %s", err) + } + if err := waitForKinesisToBeActive(conn, sn); err != nil { + t.Fatal(err) + } + } + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisStreamDestroy, + Steps: []resource.TestStep{ + { + Config: config, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "arn"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "name", sn), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "status", "ACTIVE"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "open_shards.#", "2"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "closed_shards.#", "0"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "shard_level_metrics.#", "2"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"), + resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"), + ), + }, + { + Config: config, + PreConfig: updateShardCount, + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream), + resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "arn"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "name", sn), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "status", "ACTIVE"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "open_shards.#", "3"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "closed_shards.#", "4"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "shard_level_metrics.#", "2"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"), + resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"), + ), + }, + }, + }) +} + +var testAccCheckAwsKinesisStreamDataSourceConfig = ` +resource "aws_kinesis_stream" "test_stream" { + name = "%s" + shard_count = 2 + retention_period = 72 + tags { + Name = "tf-test" + } + shard_level_metrics = [ + "IncomingBytes", + "OutgoingBytes" + ] + lifecycle { + ignore_changes = ["shard_count"] + } +} + +data "aws_kinesis_stream" "test_stream" { + name = "${aws_kinesis_stream.test_stream.name}" +} +` diff --git a/builtin/providers/aws/provider.go b/builtin/providers/aws/provider.go index 744eb21ad9ad..3fa234f81dce 100644 --- a/builtin/providers/aws/provider.go +++ b/builtin/providers/aws/provider.go @@ -172,6 +172,7 @@ func Provider() terraform.ResourceProvider { "aws_eip": dataSourceAwsEip(), "aws_elb_hosted_zone_id": dataSourceAwsElbHostedZoneId(), "aws_elb_service_account": dataSourceAwsElbServiceAccount(), + "aws_kinesis_stream": dataSourceAwsKinesisStream(), "aws_iam_account_alias": dataSourceAwsIamAccountAlias(), "aws_iam_policy_document": dataSourceAwsIamPolicyDocument(), "aws_iam_server_certificate": dataSourceAwsIAMServerCertificate(), diff --git a/builtin/providers/aws/resource_aws_kinesis_stream.go b/builtin/providers/aws/resource_aws_kinesis_stream.go index ed731f1657d1..2a59d3449607 100644 --- a/builtin/providers/aws/resource_aws_kinesis_stream.go +++ b/builtin/providers/aws/resource_aws_kinesis_stream.go @@ -95,10 +95,10 @@ func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) er sn, err) } - s := streamRaw.(kinesisStreamState) + s := streamRaw.(*kinesisStreamState) d.SetId(s.arn) d.Set("arn", s.arn) - d.Set("shard_count", s.shardCount) + d.Set("shard_count", len(s.openShards)) return resourceAwsKinesisStreamUpdate(d, meta) } @@ -141,7 +141,7 @@ func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) erro } d.Set("arn", state.arn) - d.Set("shard_count", state.shardCount) + d.Set("shard_count", len(state.openShards)) d.Set("retention_period", state.retentionPeriod) if len(state.shardLevelMetrics) > 0 { @@ -290,23 +290,27 @@ func updateKinesisShardLevelMetrics(conn *kinesis.Kinesis, d *schema.ResourceDat type kinesisStreamState struct { arn string + creationTimestamp int64 status string - shardCount int retentionPeriod int64 + openShards []string + closedShards []string shardLevelMetrics []string } -func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) { +func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (*kinesisStreamState, error) { describeOpts := &kinesis.DescribeStreamInput{ StreamName: aws.String(sn), } - var state kinesisStreamState + state := &kinesisStreamState{} err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, last bool) (shouldContinue bool) { state.arn = aws.StringValue(page.StreamDescription.StreamARN) + state.creationTimestamp = aws.TimeValue(page.StreamDescription.StreamCreationTimestamp).Unix() state.status = aws.StringValue(page.StreamDescription.StreamStatus) - state.shardCount += len(openShards(page.StreamDescription.Shards)) state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours) + state.openShards = append(state.openShards, flattenShards(openShards(page.StreamDescription.Shards))...) + state.closedShards = append(state.closedShards, flattenShards(closedShards(page.StreamDescription.Shards))...) state.shardLevelMetrics = flattenKinesisShardLevelMetrics(page.StreamDescription.EnhancedMonitoring) return !last }) @@ -349,14 +353,31 @@ func waitForKinesisToBeActive(conn *kinesis.Kinesis, sn string) error { return nil } -// See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html func openShards(shards []*kinesis.Shard) []*kinesis.Shard { - var open []*kinesis.Shard + return filterShards(shards, true) +} + +func closedShards(shards []*kinesis.Shard) []*kinesis.Shard { + return filterShards(shards, false) +} + +// See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html +func filterShards(shards []*kinesis.Shard, open bool) []*kinesis.Shard { + res := make([]*kinesis.Shard, 0, len(shards)) for _, s := range shards { - if s.SequenceNumberRange.EndingSequenceNumber == nil { - open = append(open, s) + if open && s.SequenceNumberRange.EndingSequenceNumber == nil { + res = append(res, s) + } else if !open && s.SequenceNumberRange.EndingSequenceNumber != nil { + res = append(res, s) } } + return res +} - return open +func flattenShards(shards []*kinesis.Shard) []string { + res := make([]string, len(shards)) + for i, s := range shards { + res[i] = aws.StringValue(s.ShardId) + } + return res } diff --git a/website/source/docs/providers/aws/d/kinesis_stream.html.markdown b/website/source/docs/providers/aws/d/kinesis_stream.html.markdown new file mode 100644 index 000000000000..1784eca18631 --- /dev/null +++ b/website/source/docs/providers/aws/d/kinesis_stream.html.markdown @@ -0,0 +1,45 @@ +--- +layout: "aws" +page_title: "AWS: aws_kinesis_stream" +sidebar_current: "docs-aws-datasource-kinesis-stream" +description: |- + Provides a Kinesis Stream data source. +--- + +# aws\_kinesis\_stream + +Use this data source to get information about a Kinesis Stream for use in other +resources. + +For more details, see the [Amazon Kinesis Documentation][1]. + +## Example Usage + +``` +data "aws_kinesis_stream" "stream" { + name = "stream-name" +} +``` + +## Argument Reference + +* `name` - (Required) The name of the Kinesis Stream. + +## Attributes Reference + +`id` is set to the Amazon Resource Name (ARN) of the Kinesis Stream. In addition, the following attributes +are exported: + +* `arn` - The Amazon Resource Name (ARN) of the Kinesis Stream (same as id). +* `name` - The name of the Kinesis Stream. +* `creation_timestamp` - The approximate UNIX timestamp that the stream was created. +* `status` - The current status of the stream. The stream status is one of CREATING, DELETING, ACTIVE, or UPDATING. +* `retention_period` - Length of time (in hours) data records are accessible after they are added to the stream. +* `open_shards` - The list of shard ids in the OPEN state. See [Shard State][2] for more. +* `closed_shards` - The list of shard ids in the CLOSED state. See [Shard State][2] for more. +* `shard_level_metrics` - A list of shard-level CloudWatch metrics which are enabled for the stream. See [Monitoring with CloudWatch][3] for more. +* `tags` - A mapping of tags to assigned to the stream. + +[1]: https://aws.amazon.com/documentation/kinesis/ +[2]: https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing +[3]: https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html \ No newline at end of file diff --git a/website/source/layouts/aws.erb b/website/source/layouts/aws.erb index 73656110a0d3..b00307a19352 100644 --- a/website/source/layouts/aws.erb +++ b/website/source/layouts/aws.erb @@ -68,6 +68,9 @@ > aws_elb_service_account + > + kinesis_stream + > aws_iam_account_alias