Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource/aws_kinesis_firehose_delivery_stream: Prevent nil dereference crashes and ensure attributes are flattened correctly #4047

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 161 additions & 121 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,38 +174,156 @@ func cloudwatchLoggingOptionsHash(v interface{}) int {
return hashcode.String(buf.String())
}

func flattenCloudwatchLoggingOptions(clo firehose.CloudWatchLoggingOptions) *schema.Set {
func flattenCloudwatchLoggingOptions(clo *firehose.CloudWatchLoggingOptions) *schema.Set {
if clo == nil {
return nil
}

cloudwatchLoggingOptions := map[string]interface{}{
"enabled": *clo.Enabled,
"enabled": aws.BoolValue(clo.Enabled),
}
if *clo.Enabled {
cloudwatchLoggingOptions["log_group_name"] = *clo.LogGroupName
cloudwatchLoggingOptions["log_stream_name"] = *clo.LogStreamName
if aws.BoolValue(clo.Enabled) {
cloudwatchLoggingOptions["log_group_name"] = aws.StringValue(clo.LogGroupName)
cloudwatchLoggingOptions["log_stream_name"] = aws.StringValue(clo.LogStreamName)
}
return schema.NewSet(cloudwatchLoggingOptionsHash, []interface{}{cloudwatchLoggingOptions})
}

func flattenFirehoseS3Configuration(s3 firehose.S3DestinationDescription) []interface{} {
s3Configuration := map[string]interface{}{
"role_arn": *s3.RoleARN,
"bucket_arn": *s3.BucketARN,
"buffer_size": *s3.BufferingHints.SizeInMBs,
"buffer_interval": *s3.BufferingHints.IntervalInSeconds,
"compression_format": *s3.CompressionFormat,
func flattenFirehoseElasticsearchConfiguration(description *firehose.ElasticsearchDestinationDescription) []map[string]interface{} {
if description == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions),
"domain_arn": aws.StringValue(description.DomainARN),
"role_arn": aws.StringValue(description.RoleARN),
"type_name": aws.StringValue(description.TypeName),
"index_name": aws.StringValue(description.IndexName),
"s3_backup_mode": aws.StringValue(description.S3BackupMode),
"index_rotation_period": aws.StringValue(description.IndexRotationPeriod),
"processing_configuration": flattenProcessingConfiguration(description.ProcessingConfiguration, aws.StringValue(description.RoleARN)),
}

if description.BufferingHints != nil {
m["buffering_interval"] = int(aws.Int64Value(description.BufferingHints.IntervalInSeconds))
m["buffering_size"] = int(aws.Int64Value(description.BufferingHints.SizeInMBs))
}

if description.RetryOptions != nil {
m["retry_duration"] = int(aws.Int64Value(description.RetryOptions.DurationInSeconds))
}

return []map[string]interface{}{m}
}

func flattenFirehoseExtendedS3Configuration(description *firehose.ExtendedS3DestinationDescription) []map[string]interface{} {
if description == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"bucket_arn": aws.StringValue(description.BucketARN),
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions),
"compression_format": aws.StringValue(description.CompressionFormat),
"prefix": aws.StringValue(description.Prefix),
"processing_configuration": flattenProcessingConfiguration(description.ProcessingConfiguration, aws.StringValue(description.RoleARN)),
"role_arn": aws.StringValue(description.RoleARN),
"s3_backup_configuration": flattenFirehoseS3Configuration(description.S3BackupDescription),
"s3_backup_mode": aws.StringValue(description.S3BackupMode),
}

if description.BufferingHints != nil {
m["buffer_interval"] = int(aws.Int64Value(description.BufferingHints.IntervalInSeconds))
m["buffer_size"] = int(aws.Int64Value(description.BufferingHints.SizeInMBs))
}

if description.EncryptionConfiguration != nil && description.EncryptionConfiguration.KMSEncryptionConfig != nil {
m["kms_key_arn"] = aws.StringValue(description.EncryptionConfiguration.KMSEncryptionConfig.AWSKMSKeyARN)
}

return []map[string]interface{}{m}
}

func flattenFirehoseRedshiftConfiguration(description *firehose.RedshiftDestinationDescription, configuredPassword string) []map[string]interface{} {
if description == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions),
"cluster_jdbcurl": aws.StringValue(description.ClusterJDBCURL),
"password": configuredPassword,
"role_arn": aws.StringValue(description.RoleARN),
"s3_backup_configuration": flattenFirehoseS3Configuration(description.S3BackupDescription),
"s3_backup_mode": aws.StringValue(description.S3BackupMode),
"username": aws.StringValue(description.Username),
}

if description.CopyCommand != nil {
m["copy_options"] = aws.StringValue(description.CopyCommand.CopyOptions)
m["data_table_columns"] = aws.StringValue(description.CopyCommand.DataTableColumns)
m["data_table_name"] = aws.StringValue(description.CopyCommand.DataTableName)
}

if description.RetryOptions != nil {
m["retry_duration"] = int(aws.Int64Value(description.RetryOptions.DurationInSeconds))
}

return []map[string]interface{}{m}
}

func flattenFirehoseSplunkConfiguration(description *firehose.SplunkDestinationDescription) []map[string]interface{} {
if description == nil {
return []map[string]interface{}{}
}
m := map[string]interface{}{
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions),
"hec_acknowledgment_timeout": int(aws.Int64Value(description.HECAcknowledgmentTimeoutInSeconds)),
"hec_endpoint_type": aws.StringValue(description.HECEndpointType),
"hec_endpoint": aws.StringValue(description.HECEndpoint),
"hec_token": aws.StringValue(description.HECToken),
"processing_configuration": flattenProcessingConfiguration(description.ProcessingConfiguration, ""),
"s3_backup_mode": aws.StringValue(description.S3BackupMode),
}

if description.RetryOptions != nil {
m["retry_duration"] = int(aws.Int64Value(description.RetryOptions.DurationInSeconds))
}

return []map[string]interface{}{m}
}

func flattenFirehoseS3Configuration(description *firehose.S3DestinationDescription) []map[string]interface{} {
if description == nil {
return []map[string]interface{}{}
}
if s3.CloudWatchLoggingOptions != nil {
s3Configuration["cloudwatch_logging_options"] = flattenCloudwatchLoggingOptions(*s3.CloudWatchLoggingOptions)

m := map[string]interface{}{
"bucket_arn": aws.StringValue(description.BucketARN),
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(description.CloudWatchLoggingOptions),
"compression_format": aws.StringValue(description.CompressionFormat),
"prefix": aws.StringValue(description.Prefix),
"role_arn": aws.StringValue(description.RoleARN),
}
if s3.EncryptionConfiguration.KMSEncryptionConfig != nil {
s3Configuration["kms_key_arn"] = *s3.EncryptionConfiguration.KMSEncryptionConfig.AWSKMSKeyARN

if description.BufferingHints != nil {
m["buffer_interval"] = int(aws.Int64Value(description.BufferingHints.IntervalInSeconds))
m["buffer_size"] = int(aws.Int64Value(description.BufferingHints.SizeInMBs))
}
if s3.Prefix != nil {
s3Configuration["prefix"] = *s3.Prefix

if description.EncryptionConfiguration != nil && description.EncryptionConfiguration.KMSEncryptionConfig != nil {
m["kms_key_arn"] = aws.StringValue(description.EncryptionConfiguration.KMSEncryptionConfig.AWSKMSKeyARN)
}
return []interface{}{s3Configuration}

return []map[string]interface{}{m}
}

func flattenProcessingConfiguration(pc firehose.ProcessingConfiguration, roleArn string) []map[string]interface{} {
func flattenProcessingConfiguration(pc *firehose.ProcessingConfiguration, roleArn string) []map[string]interface{} {
if pc == nil {
return []map[string]interface{}{}
}

processingConfiguration := make([]map[string]interface{}, 1)

// It is necessary to explicitly filter this out
Expand All @@ -220,11 +338,12 @@ func flattenProcessingConfiguration(pc firehose.ProcessingConfiguration, roleArn

processors := make([]interface{}, len(pc.Processors), len(pc.Processors))
for i, p := range pc.Processors {
t := *p.Type
t := aws.StringValue(p.Type)
parameters := make([]interface{}, 0)

for _, params := range p.Parameters {
name, value := *params.ParameterName, *params.ParameterValue
name := aws.StringValue(params.ParameterName)
value := aws.StringValue(params.ParameterValue)

if t == firehose.ProcessorTypeLambda {
// Ignore defaults
Expand All @@ -245,7 +364,7 @@ func flattenProcessingConfiguration(pc firehose.ProcessingConfiguration, roleArn
}
}
processingConfiguration[0] = map[string]interface{}{
"enabled": *pc.Enabled,
"enabled": aws.BoolValue(pc.Enabled),
"processors": processors,
}
return processingConfiguration
Expand All @@ -259,120 +378,41 @@ func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.De
destination := s.Destinations[0]
if destination.RedshiftDestinationDescription != nil {
d.Set("destination", "redshift")
password := d.Get("redshift_configuration.0.password").(string)

redshiftConfiguration := map[string]interface{}{
"cluster_jdbcurl": *destination.RedshiftDestinationDescription.ClusterJDBCURL,
"role_arn": *destination.RedshiftDestinationDescription.RoleARN,
"username": *destination.RedshiftDestinationDescription.Username,
"password": password,
"data_table_name": *destination.RedshiftDestinationDescription.CopyCommand.DataTableName,
"copy_options": *destination.RedshiftDestinationDescription.CopyCommand.CopyOptions,
"data_table_columns": *destination.RedshiftDestinationDescription.CopyCommand.DataTableColumns,
"s3_backup_mode": *destination.RedshiftDestinationDescription.S3BackupMode,
"retry_duration": *destination.RedshiftDestinationDescription.RetryOptions.DurationInSeconds,
configuredPassword := d.Get("redshift_configuration.0.password").(string)
if err := d.Set("redshift_configuration", flattenFirehoseRedshiftConfiguration(destination.RedshiftDestinationDescription, configuredPassword)); err != nil {
return fmt.Errorf("error setting redshift_configuration: %s", err)
}

if v := destination.RedshiftDestinationDescription.CloudWatchLoggingOptions; v != nil {
redshiftConfiguration["cloudwatch_logging_options"] = flattenCloudwatchLoggingOptions(*v)
}

if v := destination.RedshiftDestinationDescription.S3BackupDescription; v != nil {
redshiftConfiguration["s3_backup_configuration"] = flattenFirehoseS3Configuration(*v)
if err := d.Set("s3_configuration", flattenFirehoseS3Configuration(destination.RedshiftDestinationDescription.S3DestinationDescription)); err != nil {
return fmt.Errorf("error setting s3_configuration: %s", err)
}

redshiftConfList := make([]map[string]interface{}, 1)
redshiftConfList[0] = redshiftConfiguration
d.Set("redshift_configuration", redshiftConfList)
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.RedshiftDestinationDescription.S3DestinationDescription))

} else if destination.ElasticsearchDestinationDescription != nil {
d.Set("destination", "elasticsearch")

roleArn := *destination.ElasticsearchDestinationDescription.RoleARN
elasticsearchConfiguration := map[string]interface{}{
"buffering_interval": *destination.ElasticsearchDestinationDescription.BufferingHints.IntervalInSeconds,
"buffering_size": *destination.ElasticsearchDestinationDescription.BufferingHints.SizeInMBs,
"domain_arn": *destination.ElasticsearchDestinationDescription.DomainARN,
"role_arn": *destination.ElasticsearchDestinationDescription.RoleARN,
"type_name": *destination.ElasticsearchDestinationDescription.TypeName,
"index_name": *destination.ElasticsearchDestinationDescription.IndexName,
"s3_backup_mode": *destination.ElasticsearchDestinationDescription.S3BackupMode,
"retry_duration": *destination.ElasticsearchDestinationDescription.RetryOptions.DurationInSeconds,
"index_rotation_period": *destination.ElasticsearchDestinationDescription.IndexRotationPeriod,
if err := d.Set("elasticsearch_configuration", flattenFirehoseElasticsearchConfiguration(destination.ElasticsearchDestinationDescription)); err != nil {
return fmt.Errorf("error setting elasticsearch_configuration: %s", err)
}

if v := destination.ElasticsearchDestinationDescription.CloudWatchLoggingOptions; v != nil {
elasticsearchConfiguration["cloudwatch_logging_options"] = flattenCloudwatchLoggingOptions(*v)
if err := d.Set("s3_configuration", flattenFirehoseS3Configuration(destination.ElasticsearchDestinationDescription.S3DestinationDescription)); err != nil {
return fmt.Errorf("error setting s3_configuration: %s", err)
}

if v := destination.ElasticsearchDestinationDescription.ProcessingConfiguration; v != nil {
elasticsearchConfiguration["processing_configuration"] = flattenProcessingConfiguration(*v, roleArn)
}

elasticsearchConfList := make([]map[string]interface{}, 1)
elasticsearchConfList[0] = elasticsearchConfiguration
d.Set("elasticsearch_configuration", elasticsearchConfList)
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.ElasticsearchDestinationDescription.S3DestinationDescription))
} else if destination.SplunkDestinationDescription != nil {
d.Set("destination", "splunk")

splunkConfiguration := map[string]interface{}{
"hec_acknowledgment_timeout": *destination.SplunkDestinationDescription.HECAcknowledgmentTimeoutInSeconds,
"hec_endpoint": *destination.SplunkDestinationDescription.HECEndpoint,
"hec_endpoint_type": *destination.SplunkDestinationDescription.HECEndpointType,
"hec_token": *destination.SplunkDestinationDescription.HECToken,
"s3_backup_mode": *destination.SplunkDestinationDescription.S3BackupMode,
"retry_duration": *destination.SplunkDestinationDescription.RetryOptions.DurationInSeconds,
if err := d.Set("splunk_configuration", flattenFirehoseSplunkConfiguration(destination.SplunkDestinationDescription)); err != nil {
return fmt.Errorf("error setting splunk_configuration: %s", err)
}

if v := destination.SplunkDestinationDescription.ProcessingConfiguration; v != nil {
splunkConfiguration["processing_configuration"] = v
if err := d.Set("s3_configuration", flattenFirehoseS3Configuration(destination.SplunkDestinationDescription.S3DestinationDescription)); err != nil {
return fmt.Errorf("error setting s3_configuration: %s", err)
}

if v := destination.SplunkDestinationDescription.CloudWatchLoggingOptions; v != nil {
splunkConfiguration["cloudwatch_logging_options"] = flattenCloudwatchLoggingOptions(*v)
}

splunkConfList := make([]map[string]interface{}, 1)
splunkConfList[0] = splunkConfiguration
d.Set("splunk_configuration", splunkConfList)
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.SplunkDestinationDescription.S3DestinationDescription))
} else if d.Get("destination").(string) == "s3" {
d.Set("destination", "s3")
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.S3DestinationDescription))
if err := d.Set("s3_configuration", flattenFirehoseS3Configuration(destination.S3DestinationDescription)); err != nil {
return fmt.Errorf("error setting s3_configuration: %s", err)
}
} else {
d.Set("destination", "extended_s3")

roleArn := *destination.ExtendedS3DestinationDescription.RoleARN
extendedS3Configuration := map[string]interface{}{
"buffer_interval": *destination.ExtendedS3DestinationDescription.BufferingHints.IntervalInSeconds,
"buffer_size": *destination.ExtendedS3DestinationDescription.BufferingHints.SizeInMBs,
"bucket_arn": *destination.ExtendedS3DestinationDescription.BucketARN,
"role_arn": roleArn,
"compression_format": *destination.ExtendedS3DestinationDescription.CompressionFormat,
"prefix": *destination.ExtendedS3DestinationDescription.Prefix,
"s3_backup_mode": *destination.ExtendedS3DestinationDescription.S3BackupMode,
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(*destination.ExtendedS3DestinationDescription.CloudWatchLoggingOptions),
}

if v := destination.ExtendedS3DestinationDescription.EncryptionConfiguration.KMSEncryptionConfig; v != nil {
extendedS3Configuration["kms_key_arn"] = *v.AWSKMSKeyARN
}

if v := destination.ExtendedS3DestinationDescription.ProcessingConfiguration; v != nil {
extendedS3Configuration["processing_configuration"] = flattenProcessingConfiguration(*v, roleArn)
}

if v := destination.ExtendedS3DestinationDescription.S3BackupDescription; v != nil {
extendedS3Configuration["s3_backup_configuration"] = flattenFirehoseS3Configuration(*v)
if err := d.Set("extended_s3_configuration", flattenFirehoseExtendedS3Configuration(destination.ExtendedS3DestinationDescription)); err != nil {
return fmt.Errorf("error setting extended_s3_configuration: %s", err)
}

extendedS3ConfList := make([]map[string]interface{}, 1)
extendedS3ConfList[0] = extendedS3Configuration
d.Set("extended_s3_configuration", extendedS3ConfList)
}
d.Set("destination_id", *destination.DestinationId)
d.Set("destination_id", destination.DestinationId)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1234,7 +1234,7 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
},
{
parameter_name = "BufferIntervalInSeconds"
parameter_value = 60
parameter_value = 120
}
]
}
Expand Down