Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add data source for Kinesis Firehose Delivery Stream. #18445

Merged
merged 7 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/18445.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:new-data-source
aws_kinesis_firehose_delivery_stream
```
42 changes: 42 additions & 0 deletions aws/data_source_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package aws

import (
"fmt"

"github.com/aws/aws-sdk-go/aws"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/firehose/finder"
)

func dataSourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
return &schema.Resource{
Read: dataSourceAwsKinesisFirehoseDeliveryStreamRead,
Schema: map[string]*schema.Schema{
"arn": {
Type: schema.TypeString,
Computed: true,
},
"name": {
Type: schema.TypeString,
Required: true,
},
},
}
}

func dataSourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn

sn := d.Get("name").(string)
output, err := finder.DeliveryStreamByName(conn, sn)

if err != nil {
return fmt.Errorf("error reading Kinesis Firehose Delivery Stream (%s): %w", sn, err)
}

d.SetId(aws.StringValue(output.DeliveryStreamARN))
d.Set("arn", output.DeliveryStreamARN)
d.Set("name", output.DeliveryStreamName)

return nil
}
115 changes: 115 additions & 0 deletions aws/data_source_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package aws

import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/service/firehose"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccDataSourceAwsKinesisFirehoseDeliveryStream_basic(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_firehose_delivery_stream.test"
resourceName := "aws_kinesis_firehose_delivery_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ErrorCheck: testAccErrorCheck(t, firehose.EndpointsID),
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
{
Config: testAccDataSourceAwsKinesisFirehoseDeliveryStreamConfigBasic(rName),
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrPair(resourceName, "arn", dataSourceName, "arn"),
resource.TestCheckResourceAttr(dataSourceName, "name", rName),
),
},
},
})
}

func testAccDataSourceAwsKinesisFirehoseDeliveryStreamConfigBasic(rName string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}

resource "aws_iam_role" "firehose" {
name = %[1]q

assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
}

resource "aws_iam_role_policy" "firehose" {
name = %[1]q
role = aws_iam_role.firehose.id

policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"${aws_s3_bucket.bucket.arn}",
"${aws_s3_bucket.bucket.arn}/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:putLogEvents"
],
"Resource": [
"arn:${data.aws_partition.current.partition}:logs::log-group:/aws/kinesisfirehose/*"
]
}
]
}
EOF
}

resource "aws_s3_bucket" "bucket" {
bucket = %[1]q
acl = "private"
}

resource "aws_kinesis_firehose_delivery_stream" "test" {
name = %[1]q
destination = "extended_s3"

extended_s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.bucket.arn
}
}

data "aws_kinesis_firehose_delivery_stream" "test" {
name = aws_kinesis_firehose_delivery_stream.test.name
}
`, rName)
}
48 changes: 48 additions & 0 deletions aws/internal/service/firehose/finder/finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package finder

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

func DeliveryStreamByName(conn *firehose.Firehose, name string) (*firehose.DeliveryStreamDescription, error) {
input := &firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(name),
}

output, err := conn.DescribeDeliveryStream(input)

if tfawserr.ErrCodeEquals(err, firehose.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil || output.DeliveryStreamDescription == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output.DeliveryStreamDescription, nil
}

func DeliveryStreamEncryptionConfigurationByName(conn *firehose.Firehose, name string) (*firehose.DeliveryStreamEncryptionConfiguration, error) {
output, err := DeliveryStreamByName(conn, name)

if err != nil {
return nil, err
}

if output.DeliveryStreamEncryptionConfiguration == nil {
return nil, tfresource.NewEmptyResultError(nil)
}

return output.DeliveryStreamEncryptionConfiguration, nil
}
41 changes: 41 additions & 0 deletions aws/internal/service/firehose/waiter/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package waiter

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/firehose/finder"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

func DeliveryStreamStatus(conn *firehose.Firehose, name string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
output, err := finder.DeliveryStreamByName(conn, name)

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return output, aws.StringValue(output.DeliveryStreamStatus), nil
}
}

func DeliveryStreamEncryptionConfigurationStatus(conn *firehose.Firehose, name string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
output, err := finder.DeliveryStreamEncryptionConfigurationByName(conn, name)

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return output, aws.StringValue(output.Status), nil
}
}
103 changes: 103 additions & 0 deletions aws/internal/service/firehose/waiter/waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package waiter

import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
)

const (
DeliveryStreamCreatedTimeout = 20 * time.Minute
DeliveryStreamDeletedTimeout = 20 * time.Minute

DeliveryStreamEncryptionEnabledTimeout = 10 * time.Minute
DeliveryStreamEncryptionDisabledTimeout = 10 * time.Minute
)

func DeliveryStreamCreated(conn *firehose.Firehose, name string) (*firehose.DeliveryStreamDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamStatusCreating},
Target: []string{firehose.DeliveryStreamStatusActive},
Refresh: DeliveryStreamStatus(conn, name),
Timeout: DeliveryStreamCreatedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*firehose.DeliveryStreamDescription); ok {
if status, failureDescription := aws.StringValue(output.DeliveryStreamStatus), output.FailureDescription; status == firehose.DeliveryStreamStatusCreatingFailed && failureDescription != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(failureDescription.Type), aws.StringValue(failureDescription.Details)))
}

return output, err
}

return nil, err
}

func DeliveryStreamDeleted(conn *firehose.Firehose, name string) (*firehose.DeliveryStreamDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamStatusDeleting},
Target: []string{},
Refresh: DeliveryStreamStatus(conn, name),
Timeout: DeliveryStreamDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*firehose.DeliveryStreamDescription); ok {
if status, failureDescription := aws.StringValue(output.DeliveryStreamStatus), output.FailureDescription; status == firehose.DeliveryStreamStatusDeletingFailed && failureDescription != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(failureDescription.Type), aws.StringValue(failureDescription.Details)))
}

return output, err
}

return nil, err
}

func DeliveryStreamEncryptionEnabled(conn *firehose.Firehose, name string) (*firehose.DeliveryStreamEncryptionConfiguration, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamEncryptionStatusEnabling},
Target: []string{firehose.DeliveryStreamEncryptionStatusEnabled},
Refresh: DeliveryStreamEncryptionConfigurationStatus(conn, name),
Timeout: DeliveryStreamEncryptionEnabledTimeout,
}

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*firehose.DeliveryStreamEncryptionConfiguration); ok {
if status, failureDescription := aws.StringValue(output.Status), output.FailureDescription; status == firehose.DeliveryStreamEncryptionStatusEnablingFailed && failureDescription != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(failureDescription.Type), aws.StringValue(failureDescription.Details)))
}

return output, err
}

return nil, err
}

func DeliveryStreamEncryptionDisabled(conn *firehose.Firehose, name string) (*firehose.DeliveryStreamEncryptionConfiguration, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamEncryptionStatusDisabling},
Target: []string{firehose.DeliveryStreamEncryptionStatusDisabled},
Refresh: DeliveryStreamEncryptionConfigurationStatus(conn, name),
Timeout: DeliveryStreamEncryptionDisabledTimeout,
}

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*firehose.DeliveryStreamEncryptionConfiguration); ok {
if status, failureDescription := aws.StringValue(output.Status), output.FailureDescription; status == firehose.DeliveryStreamEncryptionStatusDisablingFailed && failureDescription != nil {
tfresource.SetLastError(err, fmt.Errorf("%s: %s", aws.StringValue(failureDescription.Type), aws.StringValue(failureDescription.Details)))
}

return output, err
}

return nil, err
}
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ func Provider() *schema.Provider {
"aws_internet_gateway": dataSourceAwsInternetGateway(),
"aws_iot_endpoint": dataSourceAwsIotEndpoint(),
"aws_ip_ranges": dataSourceAwsIPRanges(),
"aws_kinesis_firehose_delivery_stream": dataSourceAwsKinesisFirehoseDeliveryStream(),
"aws_kinesis_stream": dataSourceAwsKinesisStream(),
"aws_kinesis_stream_consumer": dataSourceAwsKinesisStreamConsumer(),
"aws_kms_alias": dataSourceAwsKmsAlias(),
Expand Down
Loading