Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include additional kinesis-settings in dms endpoint #18750

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions aws/resource_aws_dms_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,36 @@ func resourceAwsDmsEndpoint() *schema.Resource {
Optional: true,
ValidateFunc: validateArn,
},
"include_transaction_details": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"include_partition_value": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"partition_include_schema_table": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"include_table_alter_operations": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"include_control_details": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"include_null_and_empty": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
},
},
},
Expand Down Expand Up @@ -367,9 +397,15 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro
}
case "kinesis":
request.KinesisSettings = &dms.KinesisSettings{
MessageFormat: aws.String(d.Get("kinesis_settings.0.message_format").(string)),
ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)),
StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)),
MessageFormat: aws.String(d.Get("kinesis_settings.0.message_format").(string)),
ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)),
StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)),
IncludeTransactionDetails: aws.Bool(d.Get("kinesis_settings.0.include_transaction_details").(bool)),
IncludePartitionValue: aws.Bool(d.Get("kinesis_settings.0.include_partition_value").(bool)),
PartitionIncludeSchemaTable: aws.Bool(d.Get("kinesis_settings.0.partition_include_schema_table").(bool)),
IncludeTableAlterOperations: aws.Bool(d.Get("kinesis_settings.0.include_table_alter_operations").(bool)),
IncludeControlDetails: aws.Bool(d.Get("kinesis_settings.0.include_control_details").(bool)),
IncludeNullAndEmpty: aws.Bool(d.Get("kinesis_settings.0.include_null_and_empty").(bool)),
}
case "mongodb":
request.MongoDbSettings = &dms.MongoDbSettings{
Expand Down Expand Up @@ -594,8 +630,14 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
// "An error occurred (InvalidParameterValueException) when calling the ModifyEndpoint
// operation: Message format cannot be modified for kinesis endpoints."
request.KinesisSettings = &dms.KinesisSettings{
ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)),
StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)),
ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)),
StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)),
IncludeTransactionDetails: aws.Bool(d.Get("kinesis_settings.0.include_transaction_details").(bool)),
IncludePartitionValue: aws.Bool(d.Get("kinesis_settings.0.include_partition_value").(bool)),
PartitionIncludeSchemaTable: aws.Bool(d.Get("kinesis_settings.0.partition_include_schema_table").(bool)),
IncludeTableAlterOperations: aws.Bool(d.Get("kinesis_settings.0.include_table_alter_operations").(bool)),
IncludeControlDetails: aws.Bool(d.Get("kinesis_settings.0.include_control_details").(bool)),
IncludeNullAndEmpty: aws.Bool(d.Get("kinesis_settings.0.include_null_and_empty").(bool)),
}
request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 'kinesis')
hasChanges = true
Expand Down Expand Up @@ -799,9 +841,14 @@ func flattenDmsKinesisSettings(settings *dms.KinesisSettings) []map[string]inter
}

m := map[string]interface{}{
"message_format": aws.StringValue(settings.MessageFormat),
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
"stream_arn": aws.StringValue(settings.StreamArn),
"message_format": aws.StringValue(settings.MessageFormat),
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
"stream_arn": aws.StringValue(settings.StreamArn),
"include_transaction_details": aws.BoolValue(settings.IncludeTransactionDetails),
"include_partition_value": aws.BoolValue(settings.IncludePartitionValue),
"partition_include_schema_table": aws.BoolValue(settings.PartitionIncludeSchemaTable),
"include_control_details": aws.BoolValue(settings.IncludeControlDetails),
"include_null_and_empty": aws.BoolValue(settings.IncludeNullAndEmpty),
}

return []map[string]interface{}{m}
Expand Down
12 changes: 12 additions & 0 deletions aws/resource_aws_dms_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,12 @@ func TestAccAwsDmsEndpoint_Kinesis(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.message_format", "json"),
resource.TestCheckResourceAttrPair(resourceName, "kinesis_settings.0.stream_arn", "aws_kinesis_stream.stream1", "arn"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_transaction_details", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_partition_value", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.partition_include_schema_table", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_table_alter_operations", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_control_details", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_null_and_empty", "false"),
),
},
{
Expand All @@ -407,6 +413,12 @@ func TestAccAwsDmsEndpoint_Kinesis(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.message_format", "json"),
resource.TestCheckResourceAttrPair(resourceName, "kinesis_settings.0.stream_arn", "aws_kinesis_stream.stream2", "arn"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_transaction_details", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_partition_value", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.partition_include_schema_table", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_table_alter_operations", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_control_details", "false"),
resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.include_null_and_empty", "false"),
),
},
},
Expand Down
6 changes: 6 additions & 0 deletions website/docs/r/dms_endpoint.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ The `kinesis_settings` configuration block supports the following arguments:
* `message_format` - (Optional) Output format for the records created. Defaults to `json`. Valid values are `json` and `json_unformatted` (a single line with no tab).
* `service_access_role_arn` - (Optional) Amazon Resource Name (ARN) of the IAM Role with permissions to write to the Kinesis data stream.
* `stream_arn` - (Optional) Amazon Resource Name (ARN) of the Kinesis data stream.
* `include_transaction_details` - (Optional) Provides detailed transaction information from the source database. The default is `false`.
* `include_partition_value` - (Optional) Shows the partition value within the Kinesis message output, unless the partition type is schema-table-type. The default is `false`.
* `partition_include_schema_table` - (Optional) Prefixes schema and table names to partition values, when the partition type is primary-key-type. The default is `false`.
* `include_table_alter_operations` - (Optional) Includes any data definition language (DDL) operations that change the table in the control data. The default is `false`.
* `include_control_details` - (Optional) Shows detailed control information for table definition, column definition, and table and column changes in the Kinesis message output. The default is `false`.
* `include_null_and_empty` - (Optional) Include NULL and empty columns in the target. The default is `false`.

### mongodb_settings Arguments

Expand Down