Skip to content

Commit

Permalink
CR updates; refactor to service package and update test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
anGie44 committed Mar 18, 2021
1 parent b39b230 commit 48bf260
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 429 deletions.
7 changes: 7 additions & 0 deletions .changelog/17149.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:new-data-source
aws_kinesis_stream_consumer
```

```release-note:new-resource
aws_kinesis_stream_consumer
```
92 changes: 71 additions & 21 deletions aws/data_source_aws_kinesis_stream_consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package aws

import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

Expand All @@ -9,49 +14,94 @@ func dataSourceAwsKinesisStreamConsumer() *schema.Resource {
Read: dataSourceAwsKinesisStreamConsumerRead,

Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
},

"stream_arn": {
Type: schema.TypeString,
Required: true,
"arn": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ValidateFunc: validateArn,
},

"arn": {
"creation_timestamp": {
Type: schema.TypeString,
Computed: true,
},

"creation_timestamp": {
Type: schema.TypeInt,
"name": {
Type: schema.TypeString,
Optional: true,
Computed: true,
},

"status": {
Type: schema.TypeString,
Computed: true,
},

"stream_arn": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateArn,
},
},
}
}

func dataSourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kinesisconn
cn := d.Get("name").(string)
sa := d.Get("stream_arn").(string)

state, err := readKinesisStreamConsumerState(conn, cn, sa)
streamArn := d.Get("stream_arn").(string)

input := &kinesis.ListStreamConsumersInput{
StreamARN: aws.String(streamArn),
}

var results []*kinesis.Consumer

err := conn.ListStreamConsumersPages(input, func(page *kinesis.ListStreamConsumersOutput, lastPage bool) bool {
if page == nil {
return !lastPage
}

for _, consumer := range page.Consumers {
if consumer == nil {
continue
}

if v, ok := d.GetOk("name"); ok && v.(string) != aws.StringValue(consumer.ConsumerName) {
continue
}

if v, ok := d.GetOk("arn"); ok && v.(string) != aws.StringValue(consumer.ConsumerARN) {
continue
}

results = append(results, consumer)

}

return !lastPage
})

if err != nil {
return err
return fmt.Errorf("error listing Kinesis Stream Consumers: %w", err)
}

if len(results) == 0 {
return fmt.Errorf("no Kinesis Stream Consumer found matching criteria; try different search")
}
d.SetId(state.arn)
d.Set("arn", state.arn)
d.Set("name", cn)
d.Set("stream_arn", sa)
d.Set("status", state.status)
d.Set("creation_timestamp", state.creationTimestamp)

if len(results) > 1 {
return fmt.Errorf("multiple Kinesis Stream Consumers found matching criteria; try different search")
}

consumer := results[0]

d.SetId(aws.StringValue(consumer.ConsumerARN))
d.Set("arn", consumer.ConsumerARN)
d.Set("name", consumer.ConsumerName)
d.Set("status", consumer.ConsumerStatus)
d.Set("stream_arn", streamArn)
d.Set("creation_timestamp", aws.TimeValue(consumer.ConsumerCreationTimestamp).Format(time.RFC3339))

return nil
}
131 changes: 118 additions & 13 deletions aws/data_source_aws_kinesis_stream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,137 @@ import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccAWSKinesisStreamConsumerDataSource_basic(t *testing.T) {
var stream kinesis.StreamDescription
var consumer kinesis.ConsumerDescription
config := createAccKinesisStreamConsumerConfig()
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisStreamConsumerDestroy,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: config.data(),
Config: testAccAWSKinesisStreamConsumerDataSourceConfig(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisStreamExists(config.stream.getName(), &stream),
testAccCheckKinesisStreamConsumerExists(config, -1, &consumer),
resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "arn"),
resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "stream_arn"),
resource.TestCheckResourceAttr(fmt.Sprintf("data.%s", config.getName()), "name", config.getConsumerName()),
resource.TestCheckResourceAttr(fmt.Sprintf("data.%s", config.getName()), "status", "ACTIVE"),
resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "creation_timestamp"),
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func TestAccAWSKinesisStreamConsumerDataSource_Name(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAWSKinesisStreamConsumerDataSourceConfigName(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func TestAccAWSKinesisStreamConsumerDataSource_Arn(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName string) string {
return fmt.Sprintf(`
resource "aws_kinesis_stream" "test" {
name = %q
shard_count = 2
}
`, rName)
}

func testAccAWSKinesisStreamConsumerDataSourceConfig(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}

func testAccAWSKinesisStreamConsumerDataSourceConfigName(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
name = aws_kinesis_stream_consumer.test.name
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}

func testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
arn = aws_kinesis_stream_consumer.test.arn
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}
25 changes: 25 additions & 0 deletions aws/internal/service/kinesis/finder/finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package finder

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)

// StreamConsumerByARN returns the stream consumer corresponding to the specified ARN.
// Returns nil if no stream consumer is found.
func StreamConsumerByARN(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
input := &kinesis.DescribeStreamConsumerInput{
ConsumerARN: aws.String(arn),
}

output, err := conn.DescribeStreamConsumer(input)
if err != nil {
return nil, err
}

if output == nil {
return nil, nil
}

return output.ConsumerDescription, nil
}
30 changes: 30 additions & 0 deletions aws/internal/service/kinesis/waiter/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package waiter

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder"
)

const (
StreamConsumerStatusNotFound = "NotFound"
StreamConsumerStatusUnknown = "Unknown"
)

// StreamConsumerStatus fetches the StreamConsumer and its Status
func StreamConsumerStatus(conn *kinesis.Kinesis, arn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
consumer, err := finder.StreamConsumerByARN(conn, arn)

if err != nil {
return nil, StreamConsumerStatusUnknown, err
}

if consumer == nil {
return nil, StreamConsumerStatusNotFound, nil
}

return consumer, aws.StringValue(consumer.ConsumerStatus), nil
}
}
49 changes: 49 additions & 0 deletions aws/internal/service/kinesis/waiter/waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package waiter

import (
"time"

"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

const (
StreamConsumerCreatedTimeout = 5 * time.Minute
StreamConsumerDeletedTimeout = 5 * time.Minute
)

// StreamConsumerCreated waits for an Stream Consumer to return Active
func StreamConsumerCreated(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.ConsumerStatusCreating},
Target: []string{kinesis.ConsumerStatusActive},
Refresh: StreamConsumerStatus(conn, arn),
Timeout: StreamConsumerCreatedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok {
return v, err
}

return nil, err
}

// StreamConsumerDeleted waits for a Stream Consumer to be deleted
func StreamConsumerDeleted(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.ConsumerStatusDeleting},
Target: []string{},
Refresh: StreamConsumerStatus(conn, arn),
Timeout: StreamConsumerDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok {
return v, err
}

return nil, err
}
Loading

0 comments on commit 48bf260

Please sign in to comment.