Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3_backup_mode option in Firehose Redshift destination #1830

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,59 @@ func cloudWatchLoggingOptionsSchema() *schema.Schema {
}
}

func S3BackupConfigurationSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeSet,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@radeksimko I have a doubt here. When do we use TypeSet vs TypeList?
I see S3ConfigurationSchema has TypeList with MaxItems: 1. Is this significant only during type assertion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set is used when order doesn't matter, list is used in all other cases and typically with MaxItems: 1 for easier referencing as we can predict the index, i.e. field_name.0.nested_field

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words it would be better to use TypeList here.

Can't we call this s3ConfigurationSchema and use it in "s3_configuration" too to reduce the repetition a bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we call this s3ConfigurationSchema and use it in "s3_configuration" too to reduce the repetition a bit?

That's what I was going for and noticed that the schemaTypes are different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set is used when order doesn't matter

@radeksimko Just curious. Why do we use TypeList here then? The order doesn't matter here, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, it doesn't, but also there's really nothing to order 😃 as we only ever have a single item in the list and TypeList makes it more convenient for referencing purposes, i.e. field_name.0.nested_field instead of field_name.<computed-index>.nested_field.

MaxItems: 1,
Optional: true,
Computed: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket_arn": {
Type: schema.TypeString,
Required: true,
},

"buffer_size": {
Type: schema.TypeInt,
Optional: true,
Default: 5,
},

"buffer_interval": {
Type: schema.TypeInt,
Optional: true,
Default: 300,
},

"compression_format": {
Type: schema.TypeString,
Optional: true,
Default: "UNCOMPRESSED",
},

"kms_key_arn": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validateArn,
},

"role_arn": {
Type: schema.TypeString,
Required: true,
},

"prefix": {
Type: schema.TypeString,
Optional: true,
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
},
},
}
}

func processingConfigurationSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeList,
Expand Down Expand Up @@ -281,6 +334,22 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Required: true,
},

"s3_backup_mode": {
Type: schema.TypeString,
Optional: true,
Default: "Disabled",
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != "Disabled" && value != "Enabled" {
errors = append(errors, fmt.Errorf(
"%q must be one of 'Disabled', 'Enabled'", k))
}
return
},
},

"s3_backup_configuration": S3BackupConfigurationSchema(),

"retry_duration": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -467,6 +536,34 @@ func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration
return configuration
}

func createS3BackupConfig(d *schema.ResourceData) *firehose.S3DestinationConfiguration {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind reducing the scope here? i.e. the function doesn't need access to the whole schema (all fields), all it needs is a single field, so we can pass it as map[string]interface{} I think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it is a convention throughout the codebase to call these functions "expanders" - i.e. expandS3BackupConfig

Copy link
Contributor Author

@ApsOps ApsOps Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@radeksimko We're not using "expanders" for createS3Config and createExtendedS3Config - few lines above and below this snippet. Should I change those too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice that previously - good catch - we should certainly fix that, but I'd prefer to do it in a separate PR which may come afterwards to keep the context & LOC low here. 😉

redshiftConfig := d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{})
config := redshiftConfig["s3_backup_configuration"].(*schema.Set).List()
if len(config) == 0 {
return nil
}

s3 := config[0].(map[string]interface{})

configuration := &firehose.S3DestinationConfiguration{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

return configuration
}

func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationConfiguration {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

Expand Down Expand Up @@ -516,6 +613,35 @@ func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate {
return configuration
}

func updateS3BackupConfig(d *schema.ResourceData) *firehose.S3DestinationUpdate {
redshiftConfig := d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{})
config := redshiftConfig["s3_backup_configuration"].(*schema.Set).List()
if len(config) == 0 {
return nil
}

s3 := config[0].(map[string]interface{})

configuration := &firehose.S3DestinationUpdate{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3),
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

return configuration
}

func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationUpdate {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

Expand Down Expand Up @@ -657,6 +783,10 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati
if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupConfiguration = createS3BackupConfig(d)
}

return configuration, nil
}
Expand All @@ -683,6 +813,10 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati
if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupUpdate = updateS3BackupConfig(d)
}

return configuration, nil
}
Expand Down
17 changes: 13 additions & 4 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates(t *testing.T)
CopyCommand: &firehose.CopyCommand{
CopyOptions: aws.String("GZIP"),
},
S3BackupMode: aws.String("Enabled"),
}

resource.Test(t, resource.TestCase{
Expand Down Expand Up @@ -416,16 +417,19 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
r := redshiftConfig.(*firehose.RedshiftDestinationDescription)
// Range over the Stream Destinations, looking for the matching Redshift
// destination
var match bool
var matchCopyOptions, matchS3BackupMode bool
for _, d := range stream.Destinations {
if d.RedshiftDestinationDescription != nil {
if *d.RedshiftDestinationDescription.CopyCommand.CopyOptions == *r.CopyCommand.CopyOptions {
match = true
matchCopyOptions = true
}
if *d.RedshiftDestinationDescription.S3BackupMode == *r.S3BackupMode {
matchS3BackupMode = true
}
}
}
if !match {
return fmt.Errorf("Mismatch Redshift CopyOptions, expected: %s, got: %s", r, stream.Destinations)
if !matchCopyOptions || !matchS3BackupMode {
return fmt.Errorf("Mismatch Redshift CopyOptions or S3BackupMode, expected: %s, got: %s", r, stream.Destinations)
}
}

Expand Down Expand Up @@ -890,6 +894,11 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
cluster_jdbcurl = "jdbc:redshift://${aws_redshift_cluster.test_cluster.endpoint}/${aws_redshift_cluster.test_cluster.database_name}"
username = "testuser"
password = "T3stPass"
s3_backup_mode = "Enabled"
s3_backup_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
data_table_name = "test-table"
copy_options = "GZIP"
data_table_columns = "test-col"
Expand Down
10 changes: 10 additions & 0 deletions website/docs/r/kinesis_firehose_delivery_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
data_table_name = "test-table"
copy_options = "delimiter '|'" # the default delimiter
data_table_columns = "test-col"
s3_backup_mode = "Enabled"
s3_backup_configuration {
role_arn = "${aws_iam_role.firehose_role.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
buffer_size = 15
buffer_interval = 300
compression_format = "GZIP"
}
}
}
```
Expand Down Expand Up @@ -239,6 +247,8 @@ The `redshift_configuration` object supports the following:
* `password` - (Required) The password for the username above.
* `retry_duration` - (Optional) The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt. The default value is 3600 seconds (60 minutes). Firehose does not retry if the value of DurationInSeconds is 0 (zero) or if the first delivery attempt takes longer than the current value.
* `role_arn` - (Required) The arn of the role the stream assumes.
* `s3_backup_mode` - (Optional) The Amazon S3 backup mode. Valid values are `Disabled` and `Enabled`. Default value is `Disabled`.
* `s3_backup_configuration` - (Optional) The configuration for backup in Amazon S3. Required if `s3_backup_mode` is `Enabled`. Supports the same fields as `s3_configuration` object.
* `data_table_name` - (Required) The name of the table in the redshift cluster that the s3 bucket will copy to.
* `copy_options` - (Optional) Copy options for copying the data from the s3 intermediate bucket into redshift, for example to change the default delimiter. For valid values, see the [AWS documentation](http://docs.aws.amazon.com/firehose/latest/APIReference/API_CopyCommand.html)
* `data_table_columns` - (Optional) The data table columns that will be targeted by the copy command.
Expand Down