Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3_backup_mode option in Firehose Redshift destination #1830

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 132 additions & 49 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,58 @@ func cloudWatchLoggingOptionsSchema() *schema.Schema {
}
}

func s3ConfigurationSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
MaxItems: 1,
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket_arn": {
Type: schema.TypeString,
Required: true,
},

"buffer_size": {
Type: schema.TypeInt,
Optional: true,
Default: 5,
},

"buffer_interval": {
Type: schema.TypeInt,
Optional: true,
Default: 300,
},

"compression_format": {
Type: schema.TypeString,
Optional: true,
Default: "UNCOMPRESSED",
},

"kms_key_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validateArn,
},

"role_arn": {
Type: schema.TypeString,
Required: true,
},

"prefix": {
Type: schema.TypeString,
Optional: true,
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
},
},
}
}

func processingConfigurationSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Expand Down Expand Up @@ -151,55 +203,7 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},

"s3_configuration": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket_arn": {
Type: schema.TypeString,
Required: true,
},

"buffer_size": {
Type: schema.TypeInt,
Optional: true,
Default: 5,
},

"buffer_interval": {
Type: schema.TypeInt,
Optional: true,
Default: 300,
},

"compression_format": {
Type: schema.TypeString,
Optional: true,
Default: "UNCOMPRESSED",
},

"kms_key_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validateArn,
},

"role_arn": {
Type: schema.TypeString,
Required: true,
},

"prefix": {
Type: schema.TypeString,
Optional: true,
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
},
},
},
"s3_configuration": s3ConfigurationSchema(),

"extended_s3_configuration": {
Type: schema.TypeList,
Expand Down Expand Up @@ -281,6 +285,22 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Required: true,
},

"s3_backup_mode": {
Type: schema.TypeString,
Optional: true,
Default: "Disabled",
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != "Disabled" && value != "Enabled" {
errors = append(errors, fmt.Errorf(
"%q must be one of 'Disabled', 'Enabled'", k))
}
return
},
},

"s3_backup_configuration": s3ConfigurationSchema(),

"retry_duration": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -467,6 +487,33 @@ func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration
return configuration
}

func expandS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationConfiguration {
config := d["s3_backup_configuration"].([]interface{})
if len(config) == 0 {
return nil
}

s3 := config[0].(map[string]interface{})

configuration := &firehose.S3DestinationConfiguration{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

return configuration
}

func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationConfiguration {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

Expand Down Expand Up @@ -516,6 +563,34 @@ func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate {
return configuration
}

func updateS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationUpdate {
config := d["s3_backup_configuration"].([]interface{})
if len(config) == 0 {
return nil
}

s3 := config[0].(map[string]interface{})

configuration := &firehose.S3DestinationUpdate{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3),
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

return configuration
}

func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationUpdate {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

Expand Down Expand Up @@ -657,6 +732,10 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati
if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupConfiguration = expandS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}))
}

return configuration, nil
}
Expand All @@ -683,6 +762,10 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati
if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupUpdate = updateS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}))
}

return configuration, nil
}
Expand Down
17 changes: 13 additions & 4 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates(t *testing.T)
CopyCommand: &firehose.CopyCommand{
CopyOptions: aws.String("GZIP"),
},
S3BackupMode: aws.String("Enabled"),
}

resource.Test(t, resource.TestCase{
Expand Down Expand Up @@ -416,16 +417,19 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
r := redshiftConfig.(*firehose.RedshiftDestinationDescription)
// Range over the Stream Destinations, looking for the matching Redshift
// destination
var match bool
var matchCopyOptions, matchS3BackupMode bool
for _, d := range stream.Destinations {
if d.RedshiftDestinationDescription != nil {
if *d.RedshiftDestinationDescription.CopyCommand.CopyOptions == *r.CopyCommand.CopyOptions {
match = true
matchCopyOptions = true
}
if *d.RedshiftDestinationDescription.S3BackupMode == *r.S3BackupMode {
matchS3BackupMode = true
}
}
}
if !match {
return fmt.Errorf("Mismatch Redshift CopyOptions, expected: %s, got: %s", r, stream.Destinations)
if !matchCopyOptions || !matchS3BackupMode {
return fmt.Errorf("Mismatch Redshift CopyOptions or S3BackupMode, expected: %s, got: %s", r, stream.Destinations)
}
}

Expand Down Expand Up @@ -890,6 +894,11 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test_cluster.endpoint}/${aws_redshift_cluster.test_cluster.database_name}"
username = "testuser"
password = "T3stPass"
s3_backup_mode = "Enabled"
s3_backup_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
data_table_name = "test-table"
copy_options = "GZIP"
data_table_columns = "test-col"
Expand Down
10 changes: 10 additions & 0 deletions website/docs/r/kinesis_firehose_delivery_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
data_table_name = "test-table"
copy_options = "delimiter '|'" # the default delimiter
data_table_columns = "test-col"
s3_backup_mode = "Enabled"
s3_backup_configuration {
role_arn = "${aws_iam_role.firehose_role.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
buffer_size = 15
buffer_interval = 300
compression_format = "GZIP"
}
}
}
```
Expand Down Expand Up @@ -239,6 +247,8 @@ The `redshift_configuration` object supports the following:
* `password` - (Required) The password for the username above.
* `retry_duration` - (Optional) The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt. The default value is 3600 seconds (60 minutes). Firehose does not retry if the value of DurationInSeconds is 0 (zero) or if the first delivery attempt takes longer than the current value.
* `role_arn` - (Required) The arn of the role the stream assumes.
* `s3_backup_mode` - (Optional) The Amazon S3 backup mode. Valid values are `Disabled` and `Enabled`. Default value is `Disabled`.
* `s3_backup_configuration` - (Optional) The configuration for backup in Amazon S3. Required if `s3_backup_mode` is `Enabled`. Supports the same fields as `s3_configuration` object.
* `data_table_name` - (Required) The name of the table in the redshift cluster that the s3 bucket will copy to.
* `copy_options` - (Optional) Copy options for copying the data from the s3 intermediate bucket into redshift, for example to change the default delimiter. For valid values, see the [AWS documentation](http://docs.aws.amazon.com/firehose/latest/APIReference/API_CopyCommand.html)
* `data_table_columns` - (Optional) The data table columns that will be targeted by the copy command.
Expand Down