Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provider/aws: AWS Kinesis Stream support #2110

Merged
merged 2 commits into from
May 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion builtin/providers/aws/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
"github.com/awslabs/aws-sdk-go/service/elasticache"
"github.com/awslabs/aws-sdk-go/service/elb"
"github.com/awslabs/aws-sdk-go/service/iam"
"github.com/awslabs/aws-sdk-go/service/kinesis"
"github.com/awslabs/aws-sdk-go/service/rds"
"github.com/awslabs/aws-sdk-go/service/route53"
"github.com/awslabs/aws-sdk-go/service/s3"
"github.com/awslabs/aws-sdk-go/service/sqs"
"github.com/awslabs/aws-sdk-go/service/sns"
"github.com/awslabs/aws-sdk-go/service/sqs"
)

type Config struct {
Expand All @@ -43,6 +44,7 @@ type AWSClient struct {
region string
rdsconn *rds.RDS
iamconn *iam.IAM
kinesisconn *kinesis.Kinesis
elasticacheconn *elasticache.ElastiCache
}

Expand Down Expand Up @@ -100,6 +102,9 @@ func (c *Config) Client() (interface{}, error) {
log.Println("[INFO] Initializing IAM Connection")
client.iamconn = iam.New(awsConfig)

log.Println("[INFO] Initializing Kinesis Connection")
client.kinesisconn = kinesis.New(awsConfig)

err := c.ValidateAccountId(client.iamconn)
if err != nil {
errs = append(errs, err)
Expand Down
1 change: 1 addition & 0 deletions builtin/providers/aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func Provider() terraform.ResourceProvider {
"aws_instance": resourceAwsInstance(),
"aws_internet_gateway": resourceAwsInternetGateway(),
"aws_key_pair": resourceAwsKeyPair(),
"aws_kinesis_stream": resourceAwsKinesisStream(),
"aws_launch_configuration": resourceAwsLaunchConfiguration(),
"aws_lb_cookie_stickiness_policy": resourceAwsLBCookieStickinessPolicy(),
"aws_main_route_table_association": resourceAwsMainRouteTableAssociation(),
Expand Down
156 changes: 156 additions & 0 deletions builtin/providers/aws/resource_aws_kinesis_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package aws

import (
"fmt"
"time"

"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/aws/awserr"
"github.com/awslabs/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/helper/schema"
)

func resourceAwsKinesisStream() *schema.Resource {
return &schema.Resource{
Create: resourceAwsKinesisStreamCreate,
Read: resourceAwsKinesisStreamRead,
Delete: resourceAwsKinesisStreamDelete,

Schema: map[string]*schema.Schema{
"name": &schema.Schema{
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"shard_count": &schema.Schema{
Type: schema.TypeInt,
Required: true,
ForceNew: true,
},

"arn": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Computed: true,
},
},
}
}

func resourceAwsKinesisStreamCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kinesisconn
sn := d.Get("name").(string)
createOpts := &kinesis.CreateStreamInput{
ShardCount: aws.Long(int64(d.Get("shard_count").(int))),
StreamName: aws.String(sn),
}

_, err := conn.CreateStream(createOpts)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
return fmt.Errorf("[WARN] Error creating Kinesis Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code())
}
return err
}

stateConf := &resource.StateChangeConf{
Pending: []string{"CREATING"},
Target: "ACTIVE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha their states are so yelly!

Refresh: streamStateRefreshFunc(conn, sn),
Timeout: 5 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

streamRaw, err := stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"Error waiting for Kinesis Stream (%s) to become active: %s",
sn, err)
}

s := streamRaw.(*kinesis.StreamDescription)
d.SetId(*s.StreamARN)
d.Set("arn", s.StreamARN)

return nil
}

func resourceAwsKinesisStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kinesisconn
describeOpts := &kinesis.DescribeStreamInput{
StreamName: aws.String(d.Get("name").(string)),
Limit: aws.Long(1),
}
resp, err := conn.DescribeStream(describeOpts)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" {
d.SetId("")
return nil
}
return fmt.Errorf("[WARN] Error reading Kinesis Stream: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code())
}
return err
}

s := resp.StreamDescription
d.Set("arn", *s.StreamARN)
d.Set("shard_count", len(s.Shards))

return nil
}

func resourceAwsKinesisStreamDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kinesisconn
sn := d.Get("name").(string)
_, err := conn.DeleteStream(&kinesis.DeleteStreamInput{
StreamName: aws.String(sn),
})

if err != nil {
return err
}

stateConf := &resource.StateChangeConf{
Pending: []string{"DELETING"},
Target: "DESTROYED",
Refresh: streamStateRefreshFunc(conn, sn),
Timeout: 5 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf(
"Error waiting for Stream (%s) to be destroyed: %s",
sn, err)
}

d.SetId("")
return nil
}

func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
describeOpts := &kinesis.DescribeStreamInput{
StreamName: aws.String(sn),
Limit: aws.Long(1),
}
resp, err := conn.DescribeStream(describeOpts)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == "ResourceNotFoundException" {
return 42, "DESTROYED", nil
}
return nil, awsErr.Code(), err
}
return nil, "failed", err
}

return resp.StreamDescription, *resp.StreamDescription.StreamStatus, nil
}
}
108 changes: 108 additions & 0 deletions builtin/providers/aws/resource_aws_kinesis_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package aws

import (
"fmt"
"math/rand"
"strings"
"testing"
"time"

"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform/helper/resource"
"github.com/hashicorp/terraform/terraform"
)

func TestAccKinesisStream_basic(t *testing.T) {
var stream kinesis.StreamDescription

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccKinesisStreamConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisStreamExists("aws_kinesis_stream.test_stream", &stream),
testAccCheckAWSKinesisStreamAttributes(&stream),
),
},
},
})
}

func testAccCheckKinesisStreamExists(n string, stream *kinesis.StreamDescription) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

if rs.Primary.ID == "" {
return fmt.Errorf("No Kinesis ID is set")
}

conn := testAccProvider.Meta().(*AWSClient).kinesisconn
describeOpts := &kinesis.DescribeStreamInput{
StreamName: aws.String(rs.Primary.Attributes["name"]),
Limit: aws.Long(1),
}
resp, err := conn.DescribeStream(describeOpts)
if err != nil {
return err
}

*stream = *resp.StreamDescription

return nil
}
}

func testAccCheckAWSKinesisStreamAttributes(stream *kinesis.StreamDescription) resource.TestCheckFunc {
return func(s *terraform.State) error {
if !strings.HasPrefix(*stream.StreamName, "terraform-kinesis-test") {
return fmt.Errorf("Bad Stream name: %s", *stream.StreamName)
}
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_kinesis_stream" {
continue
}
if *stream.StreamARN != rs.Primary.Attributes["arn"] {
return fmt.Errorf("Bad Stream ARN\n\t expected: %s\n\tgot: %s\n", rs.Primary.Attributes["arn"], *stream.StreamARN)
}
}
return nil
}
}

func testAccCheckKinesisStreamDestroy(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_kinesis_stream" {
continue
}
conn := testAccProvider.Meta().(*AWSClient).kinesisconn
describeOpts := &kinesis.DescribeStreamInput{
StreamName: aws.String(rs.Primary.Attributes["name"]),
Limit: aws.Long(1),
}
resp, err := conn.DescribeStream(describeOpts)
if err == nil {
if resp.StreamDescription != nil && *resp.StreamDescription.StreamStatus != "DELETING" {
return fmt.Errorf("Error: Stream still exists")
}
}

return nil

}

return nil
}

var testAccKinesisStreamConfig = fmt.Sprintf(`
resource "aws_kinesis_stream" "test_stream" {
name = "terraform-kinesis-test-%d"
shard_count = 1
}
`, rand.New(rand.NewSource(time.Now().UnixNano())).Int())
44 changes: 44 additions & 0 deletions website/source/docs/providers/aws/r/kinesis_stream.html.markdown
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
layout: "aws"
page_title: "AWS: aws_kinesis_stream"
sidebar_current: "docs-aws-resource-kinesis-stream"
description: |-
Provides a AWS Kinesis Stream
---

# aws\_kinesis\_stream

Provides a Kinesis Stream resource. Amazon Kinesis is a managed service that
scales elastically for real-time processing of streaming big data.

For more details, see the [Amazon Kinesis Documentation][1].

## Example Usage

```
resource "aws_kinesis_stream" "test_stream" {
name = "terraform-kinesis-test"
shard_count = 1
}
```

## Argument Reference

The following arguments are supported:

* `name` - (Required) A name to identify the stream. This is unique to the
AWS account and region the Stream is created in.
* `shard_count` – (Required) The number of shards that the stream will use.
Amazon has guidlines for specifying the Stream size that should be referenced
when creating a Kinesis stream. See [Amazon Kinesis Streams][2] for more.

## Attributes Reference

* `id` - The unique Stream id
* `name` - The unique Stream name (same as `id`)
* `shard_count` - The count of Shards for this Stream
* `arn` - The Amazon Resource Name (ARN) specifying the Stream


[1]: http://aws.amazon.com/documentation/kinesis/
[2]: http://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html
4 changes: 4 additions & 0 deletions website/source/layouts/aws.erb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@
<a href="/docs/providers/aws/r/iam_user_policy.html">aws_iam_user_policy</a>
</li>

<li<%= sidebar_current("docs-aws-resource-kinesis-stream") %>>
<a href="/docs/providers/aws/r/kinesis_stream.html">aws_kinesis_stream</a>
</li>

<li<%= sidebar_current("docs-aws-resource-instance") %>>
<a href="/docs/providers/aws/r/instance.html">aws_instance</a>
</li>
Expand Down