Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r/aws_kinesis_firehose_delivery_stream - Send defaults for BufferIntervalInSeconds and BufferSizeInMBs #26964

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changelog/26964.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:enhancement
resource/aws_kinesis_firehose_delivery_stream: Send default value for BufferIntervalInSeconds or BufferSizeInMBs when only one
is set in a `processing_configuration`, removing the requirement to set both if only one is required to be changed.
```
113 changes: 78 additions & 35 deletions internal/service/firehose/delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func processingConfigurationSchema() *schema.Schema {
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"parameters": {
Type: schema.TypeList,
Type: schema.TypeSet,
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
Expand Down Expand Up @@ -648,18 +648,9 @@ func flattenProcessingConfiguration(pc *firehose.ProcessingConfiguration, roleAr
return []map[string]interface{}{}
}

defaultLambdaParams := defaultProcessorParameters(roleArn)
processingConfiguration := make([]map[string]interface{}, 1)

// It is necessary to explicitly filter this out
// to prevent diffs during routine use and retain the ability
// to show diffs if any field has drifted
defaultLambdaParams := map[string]string{
"NumberOfRetries": "3",
"RoleArn": roleArn,
"BufferSizeInMBs": "3",
"BufferIntervalInSeconds": "60",
}

processors := make([]interface{}, len(pc.Processors))
for i, p := range pc.Processors {
t := aws.StringValue(p.Type)
Expand Down Expand Up @@ -1668,10 +1659,10 @@ func expandS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationConfi

func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationConfiguration {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

roleArn := s3["role_arn"].(string)
configuration := &firehose.ExtendedS3DestinationConfiguration{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
RoleARN: aws.String(roleArn),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64(int64(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64(int64(s3["buffer_size"].(int))),
Expand All @@ -1683,7 +1674,8 @@ func createExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat
}

if _, ok := s3["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(s3)
processingConfiguration := extractProcessingConfiguration(s3, defaultProcessorParameters(roleArn))
configuration.ProcessingConfiguration = processingConfiguration
}

if _, ok := s3["dynamic_partitioning_configuration"]; ok {
Expand Down Expand Up @@ -1762,9 +1754,15 @@ func updateS3BackupConfig(d map[string]interface{}) *firehose.S3DestinationUpdat
func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3DestinationUpdate {
s3 := d.Get("extended_s3_configuration").([]interface{})[0].(map[string]interface{})

roleArn := s3["role_arn"].(string)

defaultParams := defaultProcessorParameters(roleArn)

processingConfiguration := extractProcessingConfiguration(s3, defaultParams)

configuration := &firehose.ExtendedS3DestinationUpdate{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
RoleARN: aws.String(roleArn),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
Expand All @@ -1775,7 +1773,7 @@ func updateExtendedS3Config(d *schema.ResourceData) *firehose.ExtendedS3Destinat
EncryptionConfiguration: extractEncryptionConfiguration(s3),
DataFormatConversionConfiguration: expandDataFormatConversionConfiguration(s3["data_format_conversion_configuration"].([]interface{})),
CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3),
ProcessingConfiguration: extractProcessingConfiguration(s3),
ProcessingConfiguration: processingConfiguration,
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
Expand Down Expand Up @@ -1992,8 +1990,8 @@ func extractDynamicPartitioningConfiguration(s3 map[string]interface{}) *firehos
return DynamicPartitioningConfiguration
}

func extractProcessingConfiguration(s3 map[string]interface{}) *firehose.ProcessingConfiguration {
config := s3["processing_configuration"].([]interface{})
func extractProcessingConfiguration(configMap map[string]interface{}, defaultParams map[string]string) *firehose.ProcessingConfiguration {
config := configMap["processing_configuration"].([]interface{})
if len(config) == 0 || config[0] == nil {
// It is possible to just pass nil here, but this seems to be the
// canonical form that AWS uses, and is less likely to produce diffs.
Expand All @@ -2007,16 +2005,17 @@ func extractProcessingConfiguration(s3 map[string]interface{}) *firehose.Process

return &firehose.ProcessingConfiguration{
Enabled: aws.Bool(processingConfiguration["enabled"].(bool)),
Processors: extractProcessors(processingConfiguration["processors"].([]interface{})),
Processors: extractProcessors(processingConfiguration["processors"].([]interface{}), defaultParams),
}
}

func extractProcessors(processingConfigurationProcessors []interface{}) []*firehose.Processor {
func extractProcessors(processingConfigurationProcessors []interface{}, defaultParams map[string]string) []*firehose.Processor {
processors := []*firehose.Processor{}

for _, processor := range processingConfigurationProcessors {
extractedProcessor := extractProcessor(processor.(map[string]interface{}))
if extractedProcessor != nil {
extractedProcessor = mergeDefaultProcessingParameters(extractedProcessor, defaultParams)
processors = append(processors, extractedProcessor)
}
}
Expand All @@ -2030,7 +2029,7 @@ func extractProcessor(processingConfigurationProcessor map[string]interface{}) *
if processorType != "" {
processor = &firehose.Processor{
Type: aws.String(processorType),
Parameters: extractProcessorParameters(processingConfigurationProcessor["parameters"].([]interface{})),
Parameters: extractProcessorParameters(processingConfigurationProcessor["parameters"].(*schema.Set).List()),
}
}
return processor
Expand Down Expand Up @@ -2124,12 +2123,13 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati

redshift := rl[0].(map[string]interface{})

roleArn := redshift["role_arn"].(string)
configuration := &firehose.RedshiftDestinationConfiguration{
ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)),
RetryOptions: extractRedshiftRetryOptions(redshift),
Password: aws.String(redshift["password"].(string)),
Username: aws.String(redshift["username"].(string)),
RoleARN: aws.String(redshift["role_arn"].(string)),
RoleARN: aws.String(roleArn),
CopyCommand: extractCopyCommandConfiguration(redshift),
S3Configuration: s3Config,
}
Expand All @@ -2138,7 +2138,7 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if _, ok := redshift["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift)
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift, defaultProcessorParameters(roleArn))
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
Expand All @@ -2157,12 +2157,13 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati

redshift := rl[0].(map[string]interface{})

roleArn := redshift["role_arn"].(string)
configuration := &firehose.RedshiftDestinationUpdate{
ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)),
RetryOptions: extractRedshiftRetryOptions(redshift),
Password: aws.String(redshift["password"].(string)),
Username: aws.String(redshift["username"].(string)),
RoleARN: aws.String(redshift["role_arn"].(string)),
RoleARN: aws.String(roleArn),
CopyCommand: extractCopyCommandConfiguration(redshift),
S3Update: s3Update,
}
Expand All @@ -2171,7 +2172,7 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}
if _, ok := redshift["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift)
configuration.ProcessingConfiguration = extractProcessingConfiguration(redshift, defaultProcessorParameters(roleArn))
}
if s3BackupMode, ok := redshift["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
Expand All @@ -2196,11 +2197,12 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest

es := esList[0].(map[string]interface{})

roleArn := es["role_arn"].(string)
config := &firehose.ElasticsearchDestinationConfiguration{
BufferingHints: extractBufferingHints(es),
IndexName: aws.String(es["index_name"].(string)),
RetryOptions: extractElasticsearchRetryOptions(es),
RoleARN: aws.String(es["role_arn"].(string)),
RoleARN: aws.String(roleArn),
TypeName: aws.String(es["type_name"].(string)),
S3Configuration: s3Config,
}
Expand All @@ -2218,7 +2220,7 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest
}

if _, ok := es["processing_configuration"]; ok {
config.ProcessingConfiguration = extractProcessingConfiguration(es)
config.ProcessingConfiguration = extractProcessingConfiguration(es, defaultProcessorParameters(roleArn))
}

if indexRotationPeriod, ok := es["index_rotation_period"]; ok {
Expand All @@ -2244,11 +2246,12 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest

es := esList[0].(map[string]interface{})

roleArn := es["role_arn"].(string)
update := &firehose.ElasticsearchDestinationUpdate{
BufferingHints: extractBufferingHints(es),
IndexName: aws.String(es["index_name"].(string)),
RetryOptions: extractElasticsearchRetryOptions(es),
RoleARN: aws.String(es["role_arn"].(string)),
RoleARN: aws.String(roleArn),
TypeName: aws.String(es["type_name"].(string)),
S3Update: s3Update,
}
Expand All @@ -2266,7 +2269,7 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest
}

if _, ok := es["processing_configuration"]; ok {
update.ProcessingConfiguration = extractProcessingConfiguration(es)
update.ProcessingConfiguration = extractProcessingConfiguration(es, defaultProcessorParameters(roleArn))
}

if indexRotationPeriod, ok := es["index_rotation_period"]; ok {
Expand Down Expand Up @@ -2295,7 +2298,7 @@ func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3Destination
}

if _, ok := splunk["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk)
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk, defaultProcessorParameters(""))
}

if _, ok := splunk["cloudwatch_logging_options"]; ok {
Expand Down Expand Up @@ -2327,7 +2330,7 @@ func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3Destination
}

if _, ok := splunk["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk)
configuration.ProcessingConfiguration = extractProcessingConfiguration(splunk, defaultProcessorParameters(""))
}

if _, ok := splunk["cloudwatch_logging_options"]; ok {
Expand All @@ -2349,9 +2352,10 @@ func createHTTPEndpointConfig(d *schema.ResourceData, s3Config *firehose.S3Desti

HttpEndpoint := sl[0].(map[string]interface{})

roleArn := HttpEndpoint["role_arn"].(string)
configuration := &firehose.HttpEndpointDestinationConfiguration{
RetryOptions: extractHTTPEndpointRetryOptions(HttpEndpoint),
RoleARN: aws.String(HttpEndpoint["role_arn"].(string)),
RoleARN: aws.String(roleArn),
S3Configuration: s3Config,
}

Expand All @@ -2368,7 +2372,7 @@ func createHTTPEndpointConfig(d *schema.ResourceData, s3Config *firehose.S3Desti
configuration.BufferingHints = bufferingHints

if _, ok := HttpEndpoint["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint)
configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint, defaultProcessorParameters(roleArn))
}

if _, ok := HttpEndpoint["request_configuration"]; ok {
Expand All @@ -2394,9 +2398,10 @@ func updateHTTPEndpointConfig(d *schema.ResourceData, s3Update *firehose.S3Desti

HttpEndpoint := sl[0].(map[string]interface{})

roleArn := HttpEndpoint["role_arn"].(string)
configuration := &firehose.HttpEndpointDestinationUpdate{
RetryOptions: extractHTTPEndpointRetryOptions(HttpEndpoint),
RoleARN: aws.String(HttpEndpoint["role_arn"].(string)),
RoleARN: aws.String(roleArn),
S3Update: s3Update,
}

Expand All @@ -2413,7 +2418,7 @@ func updateHTTPEndpointConfig(d *schema.ResourceData, s3Update *firehose.S3Desti
configuration.BufferingHints = bufferingHints

if _, ok := HttpEndpoint["processing_configuration"]; ok {
configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint)
configuration.ProcessingConfiguration = extractProcessingConfiguration(HttpEndpoint, defaultProcessorParameters(roleArn))
}

if _, ok := HttpEndpoint["request_configuration"]; ok {
Expand Down Expand Up @@ -2965,3 +2970,41 @@ func expandDeliveryStreamEncryptionConfigurationInput(tfList []interface{}) *fir

return apiObject
}

func defaultProcessorParameters(roleArn string) map[string]string {
defaultParams := map[string]string{
"NumberOfRetries": "3",
"BufferSizeInMBs": "3",
"BufferIntervalInSeconds": "60",
}
if roleArn != "" {
defaultParams["RoleArn"] = roleArn
}
return defaultParams
}

func mergeDefaultProcessingParameters(processor *firehose.Processor, toMerge map[string]string) *firehose.Processor {
if aws.StringValue(processor.Type) != firehose.ProcessorTypeLambda {
return processor
}
params := processor.Parameters
for key, value := range toMerge {
found := false
for _, param := range processor.Parameters {
if key == aws.StringValue(param.ParameterName) {
found = true
continue
}
}
if !found {
params = append(params, &firehose.ProcessorParameter{
ParameterName: aws.String(key),
ParameterValue: aws.String(value),
})
}
}
return &firehose.Processor{
Type: processor.Type,
Parameters: params,
}
}
Loading