Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provider/aws: Add aws_kinesis_stream data source #13562

Merged
merged 1 commit into from
Apr 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions builtin/providers/aws/data_source_aws_kinesis_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform/helper/schema"
)

func dataSourceAwsKinesisStream() *schema.Resource {
return &schema.Resource{
Read: dataSourceAwsKinesisStreamRead,

Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
},

"arn": &schema.Schema{
Type: schema.TypeString,
Computed: true,
},

"creation_timestamp": &schema.Schema{
Type: schema.TypeInt,
Computed: true,
},

"status": &schema.Schema{
Type: schema.TypeString,
Computed: true,
},

"retention_period": &schema.Schema{
Type: schema.TypeInt,
Computed: true,
},

"open_shards": &schema.Schema{
Type: schema.TypeSet,
Computed: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
},

"closed_shards": &schema.Schema{
Type: schema.TypeSet,
Computed: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
},

"shard_level_metrics": &schema.Schema{
Type: schema.TypeSet,
Computed: true,
Elem: &schema.Schema{Type: schema.TypeString},
Set: schema.HashString,
},

"tags": &schema.Schema{
Type: schema.TypeMap,
Computed: true,
},
},
}
}

func dataSourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kinesisconn
sn := d.Get("name").(string)

state, err := readKinesisStreamState(conn, sn)
if err != nil {
return err
}
d.SetId(state.arn)
d.Set("arn", state.arn)
d.Set("name", sn)
d.Set("open_shards", state.openShards)
d.Set("closed_shards", state.closedShards)
d.Set("status", state.status)
d.Set("creation_timestamp", state.creationTimestamp)
d.Set("retention_period", state.retentionPeriod)
d.Set("shard_level_metrics", state.shardLevelMetrics)

tags, err := conn.ListTagsForStream(&kinesis.ListTagsForStreamInput{
StreamName: aws.String(sn),
})
if err != nil {
return err
}
d.Set("tags", tagsToMapKinesis(tags.Tags))

return nil
}
94 changes: 94 additions & 0 deletions builtin/providers/aws/data_source_aws_kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package aws

import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform/helper/acctest"
"github.com/hashicorp/terraform/helper/resource"
)

func TestAccAWSKinesisStreamDataSource(t *testing.T) {
var stream kinesis.StreamDescription

sn := fmt.Sprintf("terraform-kinesis-test-%d", acctest.RandInt())
config := fmt.Sprintf(testAccCheckAwsKinesisStreamDataSourceConfig, sn)

updateShardCount := func() {
conn := testAccProvider.Meta().(*AWSClient).kinesisconn
_, err := conn.UpdateShardCount(&kinesis.UpdateShardCountInput{
ScalingType: aws.String(kinesis.ScalingTypeUniformScaling),
StreamName: aws.String(sn),
TargetShardCount: aws.Int64(3),
})
if err != nil {
t.Fatalf("Error calling UpdateShardCount: %s", err)
}
if err := waitForKinesisToBeActive(conn, sn); err != nil {
t.Fatal(err)
}
}

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisStreamDestroy,
Steps: []resource.TestStep{
{
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "arn"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "name", sn),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "status", "ACTIVE"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "open_shards.#", "2"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "closed_shards.#", "0"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "shard_level_metrics.#", "2"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"),
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"),
),
},
{
Config: config,
PreConfig: updateShardCount,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "arn"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "name", sn),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "status", "ACTIVE"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "open_shards.#", "3"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "closed_shards.#", "4"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "shard_level_metrics.#", "2"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"),
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"),
),
},
},
})
}

var testAccCheckAwsKinesisStreamDataSourceConfig = `
resource "aws_kinesis_stream" "test_stream" {
name = "%s"
shard_count = 2
retention_period = 72
tags {
Name = "tf-test"
}
shard_level_metrics = [
"IncomingBytes",
"OutgoingBytes"
]
lifecycle {
ignore_changes = ["shard_count"]
}
}

data "aws_kinesis_stream" "test_stream" {
name = "${aws_kinesis_stream.test_stream.name}"
}
`
1 change: 1 addition & 0 deletions builtin/providers/aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func Provider() terraform.ResourceProvider {
"aws_eip": dataSourceAwsEip(),
"aws_elb_hosted_zone_id": dataSourceAwsElbHostedZoneId(),
"aws_elb_service_account": dataSourceAwsElbServiceAccount(),
"aws_kinesis_stream": dataSourceAwsKinesisStream(),
"aws_iam_account_alias": dataSourceAwsIamAccountAlias(),
"aws_iam_policy_document": dataSourceAwsIamPolicyDocument(),
"aws_iam_server_certificate": dataSourceAwsIAMServerCertificate(),
Expand Down
45 changes: 33 additions & 12 deletions builtin/providers/aws/resource_aws_kinesis_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) er
sn, err)
}

s := streamRaw.(kinesisStreamState)
s := streamRaw.(*kinesisStreamState)
d.SetId(s.arn)
d.Set("arn", s.arn)
d.Set("shard_count", s.shardCount)
d.Set("shard_count", len(s.openShards))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this to OpenShards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no actual change in behavior here, the value of s.shardCount before was the number of open shards. And since I'm now storing the actual list of open shard ids, we can derive the shard count from it rather than storing it separately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it :)


return resourceAwsKinesisStreamUpdate(d, meta)
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) erro

}
d.Set("arn", state.arn)
d.Set("shard_count", state.shardCount)
d.Set("shard_count", len(state.openShards))
d.Set("retention_period", state.retentionPeriod)

if len(state.shardLevelMetrics) > 0 {
Expand Down Expand Up @@ -290,23 +290,27 @@ func updateKinesisShardLevelMetrics(conn *kinesis.Kinesis, d *schema.ResourceDat

type kinesisStreamState struct {
arn string
creationTimestamp int64
status string
shardCount int
retentionPeriod int64
openShards []string
closedShards []string
shardLevelMetrics []string
}

func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (kinesisStreamState, error) {
func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (*kinesisStreamState, error) {
describeOpts := &kinesis.DescribeStreamInput{
StreamName: aws.String(sn),
}

var state kinesisStreamState
state := &kinesisStreamState{}
err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, last bool) (shouldContinue bool) {
state.arn = aws.StringValue(page.StreamDescription.StreamARN)
state.creationTimestamp = aws.TimeValue(page.StreamDescription.StreamCreationTimestamp).Unix()
state.status = aws.StringValue(page.StreamDescription.StreamStatus)
state.shardCount += len(openShards(page.StreamDescription.Shards))
state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours)
state.openShards = append(state.openShards, flattenShards(openShards(page.StreamDescription.Shards))...)
state.closedShards = append(state.closedShards, flattenShards(closedShards(page.StreamDescription.Shards))...)
state.shardLevelMetrics = flattenKinesisShardLevelMetrics(page.StreamDescription.EnhancedMonitoring)
return !last
})
Expand Down Expand Up @@ -349,14 +353,31 @@ func waitForKinesisToBeActive(conn *kinesis.Kinesis, sn string) error {
return nil
}

// See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html
func openShards(shards []*kinesis.Shard) []*kinesis.Shard {
var open []*kinesis.Shard
return filterShards(shards, true)
}

func closedShards(shards []*kinesis.Shard) []*kinesis.Shard {
return filterShards(shards, false)
}

// See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html
func filterShards(shards []*kinesis.Shard, open bool) []*kinesis.Shard {
res := make([]*kinesis.Shard, 0, len(shards))
for _, s := range shards {
if s.SequenceNumberRange.EndingSequenceNumber == nil {
open = append(open, s)
if open && s.SequenceNumberRange.EndingSequenceNumber == nil {
res = append(res, s)
} else if !open && s.SequenceNumberRange.EndingSequenceNumber != nil {
res = append(res, s)
}
}
return res
}

return open
func flattenShards(shards []*kinesis.Shard) []string {
res := make([]string, len(shards))
for i, s := range shards {
res[i] = aws.StringValue(s.ShardId)
}
return res
}
45 changes: 45 additions & 0 deletions website/source/docs/providers/aws/d/kinesis_stream.html.markdown
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
layout: "aws"
page_title: "AWS: aws_kinesis_stream"
sidebar_current: "docs-aws-datasource-kinesis-stream"
description: |-
Provides a Kinesis Stream data source.
---

# aws\_kinesis\_stream

Use this data source to get information about a Kinesis Stream for use in other
resources.

For more details, see the [Amazon Kinesis Documentation][1].

## Example Usage

```
data "aws_kinesis_stream" "stream" {
name = "stream-name"
}
```

## Argument Reference

* `name` - (Required) The name of the Kinesis Stream.

## Attributes Reference

`id` is set to the Amazon Resource Name (ARN) of the Kinesis Stream. In addition, the following attributes
are exported:

* `arn` - The Amazon Resource Name (ARN) of the Kinesis Stream (same as id).
* `name` - The name of the Kinesis Stream.
* `creation_timestamp` - The approximate UNIX timestamp that the stream was created.
* `status` - The current status of the stream. The stream status is one of CREATING, DELETING, ACTIVE, or UPDATING.
* `retention_period` - Length of time (in hours) data records are accessible after they are added to the stream.
* `open_shards` - The list of shard ids in the OPEN state. See [Shard State][2] for more.
* `closed_shards` - The list of shard ids in the CLOSED state. See [Shard State][2] for more.
* `shard_level_metrics` - A list of shard-level CloudWatch metrics which are enabled for the stream. See [Monitoring with CloudWatch][3] for more.
* `tags` - A mapping of tags to assigned to the stream.

[1]: https://aws.amazon.com/documentation/kinesis/
[2]: https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-after-resharding.html#kinesis-using-sdk-java-resharding-data-routing
[3]: https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html
3 changes: 3 additions & 0 deletions website/source/layouts/aws.erb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@
<li<%= sidebar_current("docs-aws-datasource-elb-service-account") %>>
<a href="/docs/providers/aws/d/elb_service_account.html">aws_elb_service_account</a>
</li>
<li<%= sidebar_current("docs-aws-datasource-kinesis-stream") %>>
<a href="/docs/providers/aws/d/kinesis_stream.html">kinesis_stream</a>
</li>
<li<%= sidebar_current("docs-aws-datasource-iam-account-alias") %>>
<a href="/docs/providers/aws/d/iam_account_alias.html">aws_iam_account_alias</a>
</li>
Expand Down