Skip to content

Commit

Permalink
Merge pull request #4251 from terraform-providers/f-aws_kinesis_fireh…
Browse files Browse the repository at this point in the history
…ose_delivery_stream-redshift-processing_configuration

resource/aws_kinesis_firehose_delivery_stream: Support Redshift processing_configuration
  • Loading branch information
bflad authored Apr 23, 2018
2 parents 6d35e25 + 4b60308 commit 274035c
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 3 deletions.
9 changes: 9 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ func flattenFirehoseRedshiftConfiguration(description *firehose.RedshiftDestinat
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions),
"cluster_jdbcurl": aws.StringValue(description.ClusterJDBCURL),
"password": configuredPassword,
"processing_configuration": flattenProcessingConfiguration(description.ProcessingConfiguration, aws.StringValue(description.RoleARN)),
"role_arn": aws.StringValue(description.RoleARN),
"s3_backup_configuration": flattenFirehoseS3Configuration(description.S3BackupDescription),
"s3_backup_mode": aws.StringValue(description.S3BackupMode),
Expand Down Expand Up @@ -590,6 +591,8 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Sensitive: true,
},

"processing_configuration": processingConfigurationSchema(),

"role_arn": {
Type: schema.TypeString,
Required: true,
Expand Down Expand Up @@ -1129,6 +1132,9 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati
if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if _, ok := redshift["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift)
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupConfiguration = expandS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}))
Expand Down Expand Up @@ -1159,6 +1165,9 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati
if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if _, ok := redshift["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift)
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
configuration.S3BackupUpdate = updateS3BackupConfig(d.Get("redshift_configuration").([]interface{})[0].(map[string]interface{}))
Expand Down
41 changes: 38 additions & 3 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,16 +290,35 @@ func TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates(t *testing.T)
var stream firehose.DeliveryStreamDescription

ri := acctest.RandInt()
rString := acctest.RandString(8)
funcName := fmt.Sprintf("aws_kinesis_firehose_delivery_stream_test_%s", rString)
policyName := fmt.Sprintf("tf_acc_policy_%s", rString)
roleName := fmt.Sprintf("tf_acc_role_%s", rString)
preConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_RedshiftBasic,
ri, ri, ri, ri, ri)
postConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_RedshiftUpdates,
ri, ri, ri, ri, ri)
postConfig := testAccFirehoseAWSLambdaConfigBasic(funcName, policyName, roleName) +
fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_RedshiftUpdates,
ri, ri, ri, ri, ri)

updatedRedshiftConfig := &firehose.RedshiftDestinationDescription{
CopyCommand: &firehose.CopyCommand{
CopyOptions: aws.String("GZIP"),
},
S3BackupMode: aws.String("Enabled"),
ProcessingConfiguration: &firehose.ProcessingConfiguration{
Enabled: aws.Bool(true),
Processors: []*firehose.Processor{
&firehose.Processor{
Type: aws.String("Lambda"),
Parameters: []*firehose.ProcessorParameter{
&firehose.ProcessorParameter{
ParameterName: aws.String("LambdaArn"),
ParameterValue: aws.String("valueNotTested"),
},
},
},
},
},
}

resource.Test(t, resource.TestCase{
Expand Down Expand Up @@ -558,7 +577,7 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
r := redshiftConfig.(*firehose.RedshiftDestinationDescription)
// Range over the Stream Destinations, looking for the matching Redshift
// destination
var matchCopyOptions, matchS3BackupMode bool
var matchCopyOptions, matchS3BackupMode, processingConfigMatch bool
for _, d := range stream.Destinations {
if d.RedshiftDestinationDescription != nil {
if *d.RedshiftDestinationDescription.CopyCommand.CopyOptions == *r.CopyCommand.CopyOptions {
Expand All @@ -567,11 +586,17 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
if *d.RedshiftDestinationDescription.S3BackupMode == *r.S3BackupMode {
matchS3BackupMode = true
}
if r.ProcessingConfiguration != nil && d.RedshiftDestinationDescription.ProcessingConfiguration != nil {
processingConfigMatch = len(r.ProcessingConfiguration.Processors) == len(d.RedshiftDestinationDescription.ProcessingConfiguration.Processors)
}
}
}
if !matchCopyOptions || !matchS3BackupMode {
return fmt.Errorf("Mismatch Redshift CopyOptions or S3BackupMode, expected: %s, got: %s", r, stream.Destinations)
}
if !processingConfigMatch {
return fmt.Errorf("Mismatch Redshift ProcessingConfiguration.Processors count, expected: %s, got: %s", r, stream.Destinations)
}
}

if elasticsearchConfig != nil {
Expand Down Expand Up @@ -1177,6 +1202,16 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
data_table_name = "test-table"
copy_options = "GZIP"
data_table_columns = "test-col"
processing_configuration = [{
enabled = false,
processors = [{
type = "Lambda"
parameters = [{
parameter_name = "LambdaArn"
parameter_value = "${aws_lambda_function.lambda_function_test.arn}:$LATEST"
}]
}]
}]
}
}`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ The `redshift_configuration` object supports the following:
* `copy_options` - (Optional) Copy options for copying the data from the s3 intermediate bucket into redshift, for example to change the default delimiter. For valid values, see the [AWS documentation](http://docs.aws.amazon.com/firehose/latest/APIReference/API_CopyCommand.html)
* `data_table_columns` - (Optional) The data table columns that will be targeted by the copy command.
* `cloudwatch_logging_options` - (Optional) The CloudWatch Logging Options for the delivery stream. More details are given below
* `processing_configuration` - (Optional) The data processing configuration. More details are given below.

The `elasticsearch_configuration` object supports the following:

Expand Down

0 comments on commit 274035c

Please sign in to comment.