Skip to content

Commit

Permalink
fix: Tag changes do not cause updates to firehose destinations
Browse files Browse the repository at this point in the history
Fixes #25396
  • Loading branch information
mbamber committed Aug 23, 2022
1 parent 6a21eb5 commit 604e2e9
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 73 deletions.
3 changes: 3 additions & 0 deletions .changelog/26451.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_kinesis_firehose_delivery_stream: Updating tags no longer causes an uncessary update
```
149 changes: 76 additions & 73 deletions internal/service/firehose/delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2721,92 +2721,95 @@ func resourceDeliveryStreamUpdate(d *schema.ResourceData, meta interface{}) erro
conn := meta.(*conns.AWSClient).FirehoseConn

sn := d.Get("name").(string)
updateInput := &firehose.UpdateDestinationInput{
DeliveryStreamName: aws.String(sn),
CurrentDeliveryStreamVersionId: aws.String(d.Get("version_id").(string)),
DestinationId: aws.String(d.Get("destination_id").(string)),
}

if d.Get("destination").(string) == destinationTypeExtendedS3 {
extendedS3Config := updateExtendedS3Config(d)
updateInput.ExtendedS3DestinationUpdate = extendedS3Config
} else {
s3Config := updateS3Config(d)
if d.HasChangesExcept("tags", "tags_all") {
updateInput := &firehose.UpdateDestinationInput{
DeliveryStreamName: aws.String(sn),
CurrentDeliveryStreamVersionId: aws.String(d.Get("version_id").(string)),
DestinationId: aws.String(d.Get("destination_id").(string)),
}

if d.Get("destination").(string) == destinationTypeS3 {
updateInput.S3DestinationUpdate = s3Config
} else if d.Get("destination").(string) == destinationTypeElasticsearch {
esUpdate, err := updateElasticsearchConfig(d, s3Config)
if err != nil {
return err
}
updateInput.ElasticsearchDestinationUpdate = esUpdate
} else if d.Get("destination").(string) == destinationTypeRedshift {
// Redshift does not currently support ErrorOutputPrefix,
// which is set to the empty string within "updateS3Config",
// thus we must remove it here to avoid an InvalidArgumentException.
if s3Config != nil {
s3Config.ErrorOutputPrefix = nil
}
rc, err := updateRedshiftConfig(d, s3Config)
if err != nil {
return err
}
updateInput.RedshiftDestinationUpdate = rc
} else if d.Get("destination").(string) == destinationTypeSplunk {
rc, err := updateSplunkConfig(d, s3Config)
if err != nil {
return err
}
updateInput.SplunkDestinationUpdate = rc
} else if d.Get("destination").(string) == destinationTypeHTTPEndpoint {
rc, err := updateHTTPEndpointConfig(d, s3Config)
if err != nil {
return err
if d.Get("destination").(string) == destinationTypeExtendedS3 {
extendedS3Config := updateExtendedS3Config(d)
updateInput.ExtendedS3DestinationUpdate = extendedS3Config
} else {
s3Config := updateS3Config(d)

if d.Get("destination").(string) == destinationTypeS3 {
updateInput.S3DestinationUpdate = s3Config
} else if d.Get("destination").(string) == destinationTypeElasticsearch {
esUpdate, err := updateElasticsearchConfig(d, s3Config)
if err != nil {
return err
}
updateInput.ElasticsearchDestinationUpdate = esUpdate
} else if d.Get("destination").(string) == destinationTypeRedshift {
// Redshift does not currently support ErrorOutputPrefix,
// which is set to the empty string within "updateS3Config",
// thus we must remove it here to avoid an InvalidArgumentException.
if s3Config != nil {
s3Config.ErrorOutputPrefix = nil
}
rc, err := updateRedshiftConfig(d, s3Config)
if err != nil {
return err
}
updateInput.RedshiftDestinationUpdate = rc
} else if d.Get("destination").(string) == destinationTypeSplunk {
rc, err := updateSplunkConfig(d, s3Config)
if err != nil {
return err
}
updateInput.SplunkDestinationUpdate = rc
} else if d.Get("destination").(string) == destinationTypeHTTPEndpoint {
rc, err := updateHTTPEndpointConfig(d, s3Config)
if err != nil {
return err
}
updateInput.HttpEndpointDestinationUpdate = rc
}
updateInput.HttpEndpointDestinationUpdate = rc
}
}

err := resource.Retry(propagationTimeout, func() *resource.RetryError {
_, err := conn.UpdateDestination(updateInput)
if err != nil {
// Access was denied when calling Glue. Please ensure that the role specified in the data format conversion configuration has the necessary permissions.
if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "Access was denied") {
return resource.RetryableError(err)
}
err := resource.Retry(propagationTimeout, func() *resource.RetryError {
_, err := conn.UpdateDestination(updateInput)
if err != nil {
// Access was denied when calling Glue. Please ensure that the role specified in the data format conversion configuration has the necessary permissions.
if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "Access was denied") {
return resource.RetryableError(err)
}

if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "is not authorized to") {
return resource.RetryableError(err)
}
if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "is not authorized to") {
return resource.RetryableError(err)
}

if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "Please make sure the role specified in VpcConfiguration has permissions") {
return resource.RetryableError(err)
}
if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "Please make sure the role specified in VpcConfiguration has permissions") {
return resource.RetryableError(err)
}

// InvalidArgumentException: Verify that the IAM role has access to the Elasticsearch domain.
if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "Verify that the IAM role has access") {
return resource.RetryableError(err)
}
// InvalidArgumentException: Verify that the IAM role has access to the Elasticsearch domain.
if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "Verify that the IAM role has access") {
return resource.RetryableError(err)
}

if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "Firehose is unable to assume role") {
return resource.RetryableError(err)
}
if tfawserr.ErrMessageContains(err, firehose.ErrCodeInvalidArgumentException, "Firehose is unable to assume role") {
return resource.RetryableError(err)
}

return resource.NonRetryableError(err)
}
return resource.NonRetryableError(err)
}

return nil
})
return nil
})

if tfresource.TimedOut(err) {
_, err = conn.UpdateDestination(updateInput)
}
if tfresource.TimedOut(err) {
_, err = conn.UpdateDestination(updateInput)
}

if err != nil {
return fmt.Errorf(
"Error Updating Kinesis Firehose Delivery Stream: \"%s\"\n%s",
sn, err)
if err != nil {
return fmt.Errorf(
"Error Updating Kinesis Firehose Delivery Stream: \"%s\"\n%s",
sn, err)
}
}

if d.HasChange("tags_all") {
Expand Down
96 changes: 96 additions & 0 deletions internal/service/firehose/delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,45 @@ func TestAccFirehoseDeliveryStream_redshiftUpdates(t *testing.T) {
})
}

func TestAccFirehoseDeliveryStream_tagUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_kinesis_firehose_delivery_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, firehose.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckDeliveryStreamDestroy,
Steps: []resource.TestStep{
{
Config: testAccDeliveryStreamConfig_tag(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDeliveryStreamExists(resourceName, &stream),
testAccCheckDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil, nil),
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
resource.TestCheckResourceAttr(resourceName, "tags.Key", "Value"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"redshift_configuration.0.password"},
},
{
Config: testAccDeliveryStreamConfig_tagUpdates(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckDeliveryStreamExists(resourceName, &stream),
testAccCheckDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil, nil),
resource.TestCheckResourceAttr(resourceName, "tags.%", "1"),
resource.TestCheckResourceAttr(resourceName, "tags.Key", "Value2"),
),
},
},
})
}

func TestAccFirehoseDeliveryStream_splunkUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
Expand Down Expand Up @@ -3100,6 +3139,63 @@ resource "aws_kinesis_firehose_delivery_stream" "test" {
`, rName))
}

func testAccDeliveryStreamConfig_tag(rName string) string {
return acctest.ConfigCompose(
testAccDeliveryStreamRedshiftConfigBase(rName),
fmt.Sprintf(`
resource "aws_kinesis_firehose_delivery_stream" "test" {
name = %[1]q
destination = "redshift"
s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.bucket.arn
}
redshift_configuration {
role_arn = aws_iam_role.firehose.arn
cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test.endpoint}/${aws_redshift_cluster.test.database_name}"
username = "testuser"
password = "T3stPass"
data_table_name = "test-table"
}
tags = {
"Key" = "Value"
}
}
`, rName))
}

func testAccDeliveryStreamConfig_tagUpdates(rName string) string {
return acctest.ConfigCompose(
testAccLambdaBasicConfig(rName),
testAccDeliveryStreamRedshiftConfigBase(rName),
fmt.Sprintf(`
resource "aws_kinesis_firehose_delivery_stream" "test" {
name = %[1]q
destination = "redshift"
s3_configuration {
role_arn = aws_iam_role.firehose.arn
bucket_arn = aws_s3_bucket.bucket.arn
}
redshift_configuration {
role_arn = aws_iam_role.firehose.arn
cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test.endpoint}/${aws_redshift_cluster.test.database_name}"
username = "testuser"
password = "T3stPass"
data_table_name = "test-table"
}
tags = {
"Key" = "Value2"
}
}
`, rName))
}

func testAccDeliveryStreamConfig_splunkBasic(rName string) string {
return acctest.ConfigCompose(
testAccDeliveryStreamBaseConfig(rName),
Expand Down

0 comments on commit 604e2e9

Please sign in to comment.