From d8c4a87d3ae8d966120154088b99f984c94b7e90 Mon Sep 17 00:00:00 2001 From: Aman Date: Fri, 6 Oct 2017 18:36:48 +0530 Subject: [PATCH 1/4] Add s3_backup_mode option in Firehose Redshift destination --- ...ce_aws_kinesis_firehose_delivery_stream.go | 134 ++++++++++++++++++ ...s_kinesis_firehose_delivery_stream_test.go | 17 ++- 2 files changed, 147 insertions(+), 4 deletions(-) diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream.go b/aws/resource_aws_kinesis_firehose_delivery_stream.go index 4d673f6b8f9..8474e7042ac 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream.go @@ -41,6 +41,59 @@ func cloudWatchLoggingOptionsSchema() *schema.Schema { } } +func S3BackupConfigurationSchema() *schema.Schema { + return &schema.Schema{ + Type: schema.TypeSet, + MaxItems: 1, + Optional: true, + Computed: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "bucket_arn": { + Type: schema.TypeString, + Required: true, + }, + + "buffer_size": { + Type: schema.TypeInt, + Optional: true, + Default: 5, + }, + + "buffer_interval": { + Type: schema.TypeInt, + Optional: true, + Default: 300, + }, + + "compression_format": { + Type: schema.TypeString, + Optional: true, + Default: "UNCOMPRESSED", + }, + + "kms_key_arn": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validateArn, + }, + + "role_arn": { + Type: schema.TypeString, + Required: true, + }, + + "prefix": { + Type: schema.TypeString, + Optional: true, + }, + + "cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(), + }, + }, + } +} + func processingConfigurationSchema() *schema.Schema { return &schema.Schema{ Type: schema.TypeList, @@ -281,6 +334,22 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { Required: true, }, + "s3_backup_mode": { + Type: schema.TypeString, + Optional: true, + Default: "Disabled", + ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) { + value := v.(string) + if value != "Disabled" && value != "Enabled" { + errors = append(errors, fmt.Errorf( + "%q must be one of 'Disabled', 'Enabled'", k)) + } + return + }, + }, + + "s3_backup_configuration": S3BackupConfigurationSchema(), + "retry_duration": { Type: schema.TypeInt, Optional: true, @@ -467,6 +536,34 @@ func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration return configuration } +func createS3BackupConfig(d *schema.ResourceData) *firehose.S3DestinationConfiguration { + redshiftConfig := d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}) + config := redshiftConfig["s3_backup_configuration"].(*schema.Set).List() + if len(config) == 0 { + return nil + } + + s3 := config[0].(map[string]interface{}) + + configuration := &firehose.S3DestinationConfiguration{ + BucketARN: aws.String(s3["bucket_arn"].(string)), + RoleARN: aws.String(s3["role_arn"].(string)), + BufferingHints: &firehose.BufferingHints{ + IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))), + SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))), + }, + Prefix: extractPrefixConfiguration(s3), + CompressionFormat: aws.String(s3["compression_format"].(string)), + EncryptionConfiguration: extractEncryptionConfiguration(s3), + } + + if _, ok := s3["cloudwatch_logging_options"]; ok { + configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) + } + + return configuration +} + func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationConfiguration { s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{}) @@ -516,6 +613,35 @@ func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate { return configuration } +func updateS3BackupConfig(d *schema.ResourceData) *firehose.S3DestinationUpdate { + redshiftConfig := d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}) + config := redshiftConfig["s3_backup_configuration"].(*schema.Set).List() + if len(config) == 0 { + return nil + } + + s3 := config[0].(map[string]interface{}) + + configuration := &firehose.S3DestinationUpdate{ + BucketARN: aws.String(s3["bucket_arn"].(string)), + RoleARN: aws.String(s3["role_arn"].(string)), + BufferingHints: &firehose.BufferingHints{ + IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))), + SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))), + }, + Prefix: extractPrefixConfiguration(s3), + CompressionFormat: aws.String(s3["compression_format"].(string)), + EncryptionConfiguration: extractEncryptionConfiguration(s3), + CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3), + } + + if _, ok := s3["cloudwatch_logging_options"]; ok { + configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3) + } + + return configuration +} + func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationUpdate { s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{}) @@ -657,6 +783,10 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati if _, ok := redshift["cloudwatch_logging_options"]; ok { configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift) } + if s3BackupMode, ok := redshift["s3_backup_mode"]; ok { + configuration.S3BackupMode = aws.String(s3BackupMode.(string)) + configuration.S3BackupConfiguration = createS3BackupConfig(d) + } return configuration, nil } @@ -683,6 +813,10 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati if _, ok := redshift["cloudwatch_logging_options"]; ok { configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift) } + if s3BackupMode, ok := redshift["s3_backup_mode"]; ok { + configuration.S3BackupMode = aws.String(s3BackupMode.(string)) + configuration.S3BackupUpdate = updateS3BackupConfig(d) + } return configuration, nil } diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream_test.go b/aws/resource_aws_kinesis_firehose_delivery_stream_test.go index 7ecf40afeef..a75f523a46c 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream_test.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream_test.go @@ -242,6 +242,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates(t *testing.T) CopyCommand: &firehose.CopyCommand{ CopyOptions: aws.String("GZIP"), }, + S3BackupMode: aws.String("Enabled"), } resource.Test(t, resource.TestCase{ @@ -416,16 +417,19 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del r := redshiftConfig.(*firehose.RedshiftDestinationDescription) // Range over the Stream Destinations, looking for the matching Redshift // destination - var match bool + var matchCopyOptions, matchS3BackupMode bool for _, d := range stream.Destinations { if d.RedshiftDestinationDescription != nil { if *d.RedshiftDestinationDescription.CopyCommand.CopyOptions == *r.CopyCommand.CopyOptions { - match = true + matchCopyOptions = true + } + if *d.RedshiftDestinationDescription.S3BackupMode == *r.S3BackupMode { + matchS3BackupMode = true } } } - if !match { - return fmt.Errorf("Mismatch Redshift CopyOptions, expected: %s, got: %s", r, stream.Destinations) + if !matchCopyOptions || !matchS3BackupMode { + return fmt.Errorf("Mismatch Redshift CopyOptions or S3BackupMode, expected: %s, got: %s", r, stream.Destinations) } } @@ -890,6 +894,11 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" { cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test_cluster.endpoint}/${aws_redshift_cluster.test_cluster.database_name}" username = "testuser" password = "T3stPass" + s3_backup_mode = "Enabled" + s3_backup_configuration { + role_arn = "${aws_iam_role.firehose.arn}" + bucket_arn = "${aws_s3_bucket.bucket.arn}" + } data_table_name = "test-table" copy_options = "GZIP" data_table_columns = "test-col" From dba66017d2c39fa8b20a9d3b20cd128a5a0455d9 Mon Sep 17 00:00:00 2001 From: Aman Date: Fri, 6 Oct 2017 18:53:17 +0530 Subject: [PATCH 2/4] Update docs for Firehose Redshift destination configuration --- .../r/kinesis_firehose_delivery_stream.html.markdown | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/website/docs/r/kinesis_firehose_delivery_stream.html.markdown b/website/docs/r/kinesis_firehose_delivery_stream.html.markdown index 751ac73ec84..a1ac987c99f 100644 --- a/website/docs/r/kinesis_firehose_delivery_stream.html.markdown +++ b/website/docs/r/kinesis_firehose_delivery_stream.html.markdown @@ -167,6 +167,14 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" { data_table_name = "test-table" copy_options = "delimiter '|'" # the default delimiter data_table_columns = "test-col" + s3_backup_mode = "Enabled" + s3_backup_configuration { + role_arn = "${aws_iam_role.firehose_role.arn}" + bucket_arn = "${aws_s3_bucket.bucket.arn}" + buffer_size = 15 + buffer_interval = 300 + compression_format = "GZIP" + } } } ``` @@ -239,6 +247,8 @@ The `redshift_configuration` object supports the following: * `password` - (Required) The password for the username above. * `retry_duration` - (Optional) The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt. The default value is 3600 seconds (60 minutes). Firehose does not retry if the value of DurationInSeconds is 0 (zero) or if the first delivery attempt takes longer than the current value. * `role_arn` - (Required) The arn of the role the stream assumes. +* `s3_backup_mode` - (Optional) The Amazon S3 backup mode. Valid values are `Disabled` and `Enabled`. Default value is `Disabled`. +* `s3_backup_configuration` - (Optional) The configuration for backup in Amazon S3. Required if `s3_backup_mode` is `Enabled`. Supports the same fields as `s3_configuration` object. * `data_table_name` - (Required) The name of the table in the redshift cluster that the s3 bucket will copy to. * `copy_options` - (Optional) Copy options for copying the data from the s3 intermediate bucket into redshift, for example to change the default delimiter. For valid values, see the [AWS documentation](http://docs.aws.amazon.com/firehose/latest/APIReference/API_CopyCommand.html) * `data_table_columns` - (Optional) The data table columns that will be targeted by the copy command. From 766a25ad87f6e03f17bb01a199f548aa23fc9fd8 Mon Sep 17 00:00:00 2001 From: Aman Date: Wed, 25 Oct 2017 20:31:42 +0530 Subject: [PATCH 3/4] Changes as per review comments --- ...ce_aws_kinesis_firehose_delivery_stream.go | 71 +++---------------- 1 file changed, 10 insertions(+), 61 deletions(-) diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream.go b/aws/resource_aws_kinesis_firehose_delivery_stream.go index 8474e7042ac..37fd52bc494 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream.go @@ -41,12 +41,11 @@ func cloudWatchLoggingOptionsSchema() *schema.Schema { } } -func S3BackupConfigurationSchema() *schema.Schema { +func S3ConfigurationSchema() *schema.Schema { return &schema.Schema{ - Type: schema.TypeSet, + Type: schema.TypeList, MaxItems: 1, Optional: true, - Computed: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "bucket_arn": { @@ -204,55 +203,7 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { }, }, - "s3_configuration": { - Type: schema.TypeList, - Optional: true, - MaxItems: 1, - Elem: &schema.Resource{ - Schema: map[string]*schema.Schema{ - "bucket_arn": { - Type: schema.TypeString, - Required: true, - }, - - "buffer_size": { - Type: schema.TypeInt, - Optional: true, - Default: 5, - }, - - "buffer_interval": { - Type: schema.TypeInt, - Optional: true, - Default: 300, - }, - - "compression_format": { - Type: schema.TypeString, - Optional: true, - Default: "UNCOMPRESSED", - }, - - "kms_key_arn": { - Type: schema.TypeString, - Optional: true, - ValidateFunc: validateArn, - }, - - "role_arn": { - Type: schema.TypeString, - Required: true, - }, - - "prefix": { - Type: schema.TypeString, - Optional: true, - }, - - "cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(), - }, - }, - }, + "s3_configuration": S3ConfigurationSchema(), "extended_s3_configuration": { Type: schema.TypeList, @@ -348,7 +299,7 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { }, }, - "s3_backup_configuration": S3BackupConfigurationSchema(), + "s3_backup_configuration": S3ConfigurationSchema(), "retry_duration": { Type: schema.TypeInt, @@ -536,9 +487,8 @@ func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration return configuration } -func createS3BackupConfig(d *schema.ResourceData) *firehose.S3DestinationConfiguration { - redshiftConfig := d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}) - config := redshiftConfig["s3_backup_configuration"].(*schema.Set).List() +func expandS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationConfiguration { + config := d["s3_backup_configuration"].([]interface{}) if len(config) == 0 { return nil } @@ -613,9 +563,8 @@ func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate { return configuration } -func updateS3BackupConfig(d *schema.ResourceData) *firehose.S3DestinationUpdate { - redshiftConfig := d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}) - config := redshiftConfig["s3_backup_configuration"].(*schema.Set).List() +func updateS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationUpdate { + config := d["s3_backup_configuration"].([]interface{}) if len(config) == 0 { return nil } @@ -785,7 +734,7 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati } if s3BackupMode, ok := redshift["s3_backup_mode"]; ok { configuration.S3BackupMode = aws.String(s3BackupMode.(string)) - configuration.S3BackupConfiguration = createS3BackupConfig(d) + configuration.S3BackupConfiguration = expandS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{})) } return configuration, nil @@ -815,7 +764,7 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati } if s3BackupMode, ok := redshift["s3_backup_mode"]; ok { configuration.S3BackupMode = aws.String(s3BackupMode.(string)) - configuration.S3BackupUpdate = updateS3BackupConfig(d) + configuration.S3BackupUpdate = updateS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{})) } return configuration, nil From 7d7c6e69cab01ddec306f6ba1a9a720e03fa0a1e Mon Sep 17 00:00:00 2001 From: Aman Date: Wed, 25 Oct 2017 20:39:17 +0530 Subject: [PATCH 4/4] Don't export a private function --- aws/resource_aws_kinesis_firehose_delivery_stream.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/aws/resource_aws_kinesis_firehose_delivery_stream.go b/aws/resource_aws_kinesis_firehose_delivery_stream.go index 37fd52bc494..0f03af99f0e 100644 --- a/aws/resource_aws_kinesis_firehose_delivery_stream.go +++ b/aws/resource_aws_kinesis_firehose_delivery_stream.go @@ -41,7 +41,7 @@ func cloudWatchLoggingOptionsSchema() *schema.Schema { } } -func S3ConfigurationSchema() *schema.Schema { +func s3ConfigurationSchema() *schema.Schema { return &schema.Schema{ Type: schema.TypeList, MaxItems: 1, @@ -203,7 +203,7 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { }, }, - "s3_configuration": S3ConfigurationSchema(), + "s3_configuration": s3ConfigurationSchema(), "extended_s3_configuration": { Type: schema.TypeList, @@ -299,7 +299,7 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource { }, }, - "s3_backup_configuration": S3ConfigurationSchema(), + "s3_backup_configuration": s3ConfigurationSchema(), "retry_duration": { Type: schema.TypeInt,