From b8226c5444353b210ad1a84b5dfead6aa5f98db8 Mon Sep 17 00:00:00 2001 From: Bryant Biggs Date: Tue, 14 Sep 2021 08:14:44 -0500 Subject: [PATCH 01/11] chore: update Kafka endpoint settings to include all attributes --- aws/resource_aws_dms_endpoint.go | 162 ++++++++++++++++++++++++-- aws/resource_aws_dms_endpoint_test.go | 122 +++++++++---------- 2 files changed, 214 insertions(+), 70 deletions(-) diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index 324c0a479744..0c3501287246 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -120,6 +120,83 @@ func resourceAwsDmsEndpoint() *schema.Resource { Required: true, ValidateFunc: validation.NoZeroValues, }, + "include_control_details": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "include_null_and_empty": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "include_partition_value": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "include_table_alter_operations": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "include_transaction_details": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "message_format": { + Type: schema.TypeString, + Optional: true, + Default: dms.MessageFormatValueJson, + ValidateFunc: validation.StringInSlice(dms.MessageFormatValue_Values(), false), + }, + "message_max_bytes": { + Type: schema.TypeInt, + Optional: true, + Default: 1000000, + }, + "no_hex_prefix": { + Type: schema.TypeBool, + Optional: true, + }, + "partition_include_schema_table": { + Type: schema.TypeBool, + Optional: true, + Default: false, + }, + "sasl_password": { + Type: schema.TypeString, + Optional: true, + }, + "sasl_username": { + Type: schema.TypeString, + Optional: true, + }, + "security_protocol": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validation.StringInSlice(dms.KafkaSecurityProtocol_Values(), false), + }, + "ssl_ca_certificate_arn": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validateArn, + }, + "ssl_client_certificate_arn": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validateArn, + }, + "ssl_client_key_arn": { + Type: schema.TypeString, + Optional: true, + ValidateFunc: validateArn, + }, + "ssl_client_key_password": { + Type: schema.TypeString, + Optional: true, + }, "topic": { Type: schema.TypeString, Optional: true, @@ -221,9 +298,10 @@ func resourceAwsDmsEndpoint() *schema.Resource { Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "service_access_role_arn": { - Type: schema.TypeString, - Optional: true, - Default: "", + Type: schema.TypeString, + Optional: true, + Default: "", + ValidateFunc: validateArn, }, "external_table_definition": { Type: schema.TypeString, @@ -344,8 +422,24 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro } case "kafka": request.KafkaSettings = &dms.KafkaSettings{ - Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), - Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), + Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), + Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), + MessageFormat: aws.String(d.Get("kafka_settings.0.message_format").(string)), + IncludeTransactionDetails: aws.Bool(d.Get("kafka_settings.0.include_transaction_details").(bool)), + IncludePartitionValue: aws.Bool(d.Get("kafka_settings.0.include_partition_value").(bool)), + PartitionIncludeSchemaTable: aws.Bool(d.Get("kafka_settings.0.partition_include_schema_table").(bool)), + IncludeTableAlterOperations: aws.Bool(d.Get("kafka_settings.0.include_table_alter_operations").(bool)), + IncludeControlDetails: aws.Bool(d.Get("kafka_settings.0.include_control_details").(bool)), + MessageMaxBytes: aws.Int64(int64(d.Get("kafka_settings.0.message_max_bytes").(int))), + IncludeNullAndEmpty: aws.Bool(d.Get("kafka_settings.0.include_null_and_empty").(bool)), + SecurityProtocol: aws.String(d.Get("kafka_settings.0.security_protocol").(string)), + SslClientCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_client_certificate_arn").(string)), + SslClientKeyArn: aws.String(d.Get("kafka_settings.0.ssl_client_key_arn").(string)), + SslClientKeyPassword: aws.String(d.Get("kafka_settings.0.ssl_client_key_password").(string)), + SslCaCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_ca_certificate_arn").(string)), + SaslUsername: aws.String(d.Get("kafka_settings.0.sasl_username").(string)), + SaslPassword: aws.String(d.Get("kafka_settings.0.sasl_password").(string)), + NoHexPrefix: aws.Bool(d.Get("kafka_settings.0.no_hex_prefix").(bool)), } case "kinesis": request.KinesisSettings = &dms.KinesisSettings{ @@ -573,10 +667,42 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro case "kafka": if d.HasChanges( "kafka_settings.0.broker", - "kafka_settings.0.topic") { + "kafka_settings.0.topic", + "kafka_settings.0.message_format", + "kafka_settings.0.include_transaction_details", + "kafka_settings.0.include_partition_value", + "kafka_settings.0.partition_include_schema_table", + "kafka_settings.0.include_table_alter_operations", + "kafka_settings.0.include_control_details", + "kafka_settings.0.message_max_bytes", + "kafka_settings.0.include_null_and_empty", + "kafka_settings.0.security_protocol", + "kafka_settings.0.ssl_client_certificate_arn", + "kafka_settings.0.ssl_client_key_arn", + "kafka_settings.0.ssl_client_key_password", + "kafka_settings.0.ssl_ca_certificate_arn", + "kafka_settings.0.sasl_username", + "kafka_settings.0.sasl_password", + "kafka_settings.0.no_hex_prefix") { request.KafkaSettings = &dms.KafkaSettings{ - Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), - Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), + Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), + Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), + MessageFormat: aws.String(d.Get("kafka_settings.0.message_format").(string)), + IncludeTransactionDetails: aws.Bool(d.Get("kafka_settings.0.include_transaction_details").(bool)), + IncludePartitionValue: aws.Bool(d.Get("kafka_settings.0.include_partition_value").(bool)), + PartitionIncludeSchemaTable: aws.Bool(d.Get("kafka_settings.0.partition_include_schema_table").(bool)), + IncludeTableAlterOperations: aws.Bool(d.Get("kafka_settings.0.include_table_alter_operations").(bool)), + IncludeControlDetails: aws.Bool(d.Get("kafka_settings.0.include_control_details").(bool)), + MessageMaxBytes: aws.Int64(int64(d.Get("kafka_settings.0.message_max_bytes").(int))), + IncludeNullAndEmpty: aws.Bool(d.Get("kafka_settings.0.include_null_and_empty").(bool)), + SecurityProtocol: aws.String(d.Get("kafka_settings.0.security_protocol").(string)), + SslClientCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_client_certificate_arn").(string)), + SslClientKeyArn: aws.String(d.Get("kafka_settings.0.ssl_client_key_arn").(string)), + SslClientKeyPassword: aws.String(d.Get("kafka_settings.0.ssl_client_key_password").(string)), + SslCaCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_ca_certificate_arn").(string)), + SaslUsername: aws.String(d.Get("kafka_settings.0.sasl_username").(string)), + SaslPassword: aws.String(d.Get("kafka_settings.0.sasl_password").(string)), + NoHexPrefix: aws.Bool(d.Get("kafka_settings.0.no_hex_prefix").(bool)), } request.EngineName = aws.String(d.Get("engine_name").(string)) hasChanges = true @@ -788,8 +914,24 @@ func flattenDmsKafkaSettings(settings *dms.KafkaSettings) []map[string]interface } m := map[string]interface{}{ - "broker": aws.StringValue(settings.Broker), - "topic": aws.StringValue(settings.Topic), + "broker": aws.StringValue(settings.Broker), + "topic": aws.StringValue(settings.Topic), + "message_format": aws.StringValue(settings.MessageFormat), + "include_transaction_details": aws.BoolValue(settings.IncludeTransactionDetails), + "include_partition_value": aws.BoolValue(settings.IncludePartitionValue), + "partition_include_schema_table": aws.BoolValue(settings.PartitionIncludeSchemaTable), + "include_table_alter_operations": aws.BoolValue(settings.IncludeTableAlterOperations), + "include_control_details": aws.BoolValue(settings.IncludeControlDetails), + "message_max_bytes": aws.Int64Value(settings.MessageMaxBytes), + "include_null_and_empty": aws.BoolValue(settings.IncludeNullAndEmpty), + "security_protocol": aws.StringValue(settings.SecurityProtocol), + "ssl_client_certificate_arn": aws.StringValue(settings.SslClientCertificateArn), + "ssl_client_key_arn": aws.StringValue(settings.SslClientKeyArn), + "ssl_client_key_password": aws.StringValue(settings.SslClientKeyPassword), + "ssl_ca_certificate_arn": aws.StringValue(settings.SslCaCertificateArn), + "sasl_username": aws.StringValue(settings.SaslUsername), + "sasl_password": aws.StringValue(settings.SaslPassword), + "no_hex_prefix": aws.BoolValue(settings.NoHexPrefix), } return []map[string]interface{}{m} diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index 50dec074b846..05877195a3e0 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -302,13 +302,9 @@ func TestAccAwsDmsEndpoint_Elasticsearch_FullLoadErrorPercentage(t *testing.T) { }) } -func TestAccAwsDmsEndpoint_Kafka_Broker(t *testing.T) { +func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { resourceName := "aws_dms_endpoint.test" - rName := acctest.RandomWithPrefix("tf-acc-test") - brokerPrefix := "ec2-12-345-678-901" - brokerService := "compute-1" - brokerPort1 := 2345 - brokerPort2 := 3456 + randId := acctest.RandString(8) + "-kafka" resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, @@ -317,12 +313,23 @@ func TestAccAwsDmsEndpoint_Kafka_Broker(t *testing.T) { CheckDestroy: dmsEndpointDestroy, Steps: []resource.TestStep{ { - Config: dmsEndpointKafkaConfigBroker(rName, brokerPrefix, brokerService, brokerPort1), + Config: dmsEndpointKafkaConfig(randId), Check: resource.ComposeTestCheckFunc( checkDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), - testAccCheckResourceAttrHostnameWithPort(resourceName, "kafka_settings.0.broker", brokerService, brokerPrefix, brokerPort1), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "kafka-default-topic"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "JSON"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_transaction_details", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_partition_value", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.partition_include_schema_table", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_table_alter_operations", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_control_details", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_max_bytes", "1000000"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_null_and_empty", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.security_protocol", "plaintext"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_username", "tftest"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_password", "tftest"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.no_hex_prefix", "false"), ), }, { @@ -332,48 +339,23 @@ func TestAccAwsDmsEndpoint_Kafka_Broker(t *testing.T) { ImportStateVerifyIgnore: []string{"password"}, }, { - Config: dmsEndpointKafkaConfigBroker(rName, brokerPrefix, brokerService, brokerPort2), - Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), - testAccCheckResourceAttrHostnameWithPort(resourceName, "kafka_settings.0.broker", brokerService, brokerPrefix, brokerPort2), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "kafka-default-topic"), - ), - }, - }, - }) -} - -func TestAccAwsDmsEndpoint_Kafka_Topic(t *testing.T) { - resourceName := "aws_dms_endpoint.test" - rName := acctest.RandomWithPrefix("tf-acc-test") - - resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { testAccPreCheck(t) }, - ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), - Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, - Steps: []resource.TestStep{ - { - Config: dmsEndpointKafkaConfigTopic(rName, "topic1"), + Config: dmsEndpointKafkaConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( checkDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "topic1"), - ), - }, - { - ResourceName: resourceName, - ImportState: true, - ImportStateVerify: true, - ImportStateVerifyIgnore: []string{"password"}, - }, - { - Config: dmsEndpointKafkaConfigTopic(rName, "topic2"), - Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "topic2"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "JSON_UNFORMATTED"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_transaction_details", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_partition_value", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.partition_include_schema_table", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_table_alter_operations", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_control_details", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_max_bytes", "500000"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_null_and_empty", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.security_protocol", "sasl-ssl"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_username", "tftest-new"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_password", "tftest-new"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.no_hex_prefix", "true"), ), }, }, @@ -1162,38 +1144,58 @@ resource "aws_dms_endpoint" "test" { `, rName, fullLoadErrorPercentage)) } -func dmsEndpointKafkaConfigBroker(rName, brokerPrefix, brokerServiceName string, brokerPort int) string { +func dmsEndpointKafkaConfig(randId string) string { return fmt.Sprintf(` data "aws_partition" "current" {} resource "aws_dms_endpoint" "test" { - endpoint_id = %[1]q - endpoint_type = "target" - engine_name = "kafka" + endpoint_id = "tf-test-dms-endpoint-%[1]s" + endpoint_type = "target" + engine_name = "kafka" + ssl_mode = "none" + extra_connection_attributes = "" kafka_settings { - # example kafka broker: "ec2-12-345-678-901.compute-1.amazonaws.com:2345" - broker = "%[2]s.%[3]s.${data.aws_partition.current.dns_suffix}:%[4]d" + broker = "ec2-12-345-678-901.compute-1.${data.aws_partition.current.dns_suffix}:2345" + include_null_and_empty = false + security_protocol = "plaintext" + sasl_username = "tftest" + sasl_password = "tftest" + no_hex_prefix = false } } -`, rName, brokerPrefix, brokerServiceName, brokerPort) +`, randId) } -func dmsEndpointKafkaConfigTopic(rName string, topic string) string { +func dmsEndpointKafkaConfigUpdate(randId string) string { return fmt.Sprintf(` data "aws_partition" "current" {} resource "aws_dms_endpoint" "test" { - endpoint_id = %[1]q - endpoint_type = "target" - engine_name = "kafka" + endpoint_id = "tf-test-dms-endpoint-%[1]s" + endpoint_type = "target" + engine_name = "kafka" + ssl_mode = "none" + extra_connection_attributes = "" kafka_settings { - broker = "ec2-12-345-678-901.compute-1.${data.aws_partition.current.dns_suffix}:2345" - topic = %[2]q + broker = "ec2-12-345-678-901.compute-1.${data.aws_partition.current.dns_suffix}:2345" + topic = "topic1" + message_format = "JSON_UNFORMATTED" + include_transaction_details = true + include_partition_value = true + partition_include_schema_table = true + include_table_alter_operations = true + include_control_details = true + message_max_bytes = 500000 + include_null_and_empty = true + security_protocol = "sasl-ssl" + sasl_username = "tftest-new" + sasl_password = "tftest-new" + no_hex_prefix = true } } -`, rName, topic) +`, randId) } func dmsEndpointKinesisConfig(randId string) string { From 054507454a650bab3f7ea31618f49c179c55f648 Mon Sep 17 00:00:00 2001 From: Bryant Biggs Date: Tue, 14 Sep 2021 14:26:51 -0500 Subject: [PATCH 02/11] chore: update website documentation --- website/docs/r/dms_endpoint.html.markdown | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/website/docs/r/dms_endpoint.html.markdown b/website/docs/r/dms_endpoint.html.markdown index 16f1cf32f763..f823968fec20 100644 --- a/website/docs/r/dms_endpoint.html.markdown +++ b/website/docs/r/dms_endpoint.html.markdown @@ -87,6 +87,22 @@ The `elasticsearch_settings` configuration block supports the following argument The `kafka_settings` configuration block supports the following arguments: * `broker` - (Required) Kafka broker location. Specify in the form broker-hostname-or-ip:port. +* `include_control_details` - (Optional) Shows detailed control information for table definition, column definition, and table and column changes in the Kafka message output. The default is `false`. +* `include_null_and_empty` - (Optional) Include NULL and empty columns for records migrated to the endpoint. The default is `false`. +* `include_partition_value` - (Optional) Shows the partition value within the Kafka message output unless the partition type is `schema-table-type`. The default is `false`. +* `include_table_alter_operations` - (Optional) Includes any data definition language (DDL) operations that change the table in the control data, such as `rename-table`, `drop-table`, `add-column`, `drop-column`, and `rename-column`. The default is `false`. +* `include_transaction_details` - (Optional) Provides detailed transaction information from the source database. This information includes a commit timestamp, a log position, and values for `transaction_id`, previous `transaction_id`, and `transaction_record_id` (the record offset within a transaction). The default is `false`. +* `message_format` - (Optional) The output format for the records created on the endpoint. The message format is `JSON` (default) or `JSON_UNFORMATTED` (a single line with no tab). +* `message_max_bytes` - (Optional) The maximum size in bytes for records created on the endpoint The default is `1,000,000`. +* `no_hex_prefix` - (Optional) Set this optional parameter to true to avoid adding a '0x' prefix to raw data in hexadecimal format. For example, by default, AWS DMS adds a '0x' prefix to the LOB column type in hexadecimal format moving from an Oracle source to a Kafka target. Use the `no_hex_prefix` endpoint setting to enable migration of RAW data type columns without adding the `'0x'` prefix. +* `partition_include_schema_table` - (Optional) Prefixes schema and table names to partition values, when the partition type is `primary-key-type`. Doing this increases data distribution among Kafka partitions. For example, suppose that a SysBench schema has thousands of tables and each table has only limited range for a primary key. In this case, the same primary key is sent from thousands of tables to the same partition, which causes throttling. The default is `false`. +* `sasl_password` - (Optional) The secure password you created when you first set up your MSK cluster to validate a client identity and make an encrypted connection between server and client using SASL-SSL authentication. +* `sasl_username` - (Optional) The secure user name you created when you first set up your MSK cluster to validate a client identity and make an encrypted connection between server and client using SASL-SSL authentication. +* `security_protocol` - (Optional) Set secure connection to a Kafka target endpoint using Transport Layer Security (TLS). Options include `ssl-encryption`, `ssl-authentication`, and `sasl-ssl`. `sasl-ssl` requires `sasl_username` and `sasl_password`. +* `ssl_ca_certificate_arn` - (Optional) The Amazon Resource Name (ARN) for the private certificate authority (CA) cert that AWS DMS uses to securely connect to your Kafka target endpoint. +* `ssl_client_certificate_arn` - (Optional) The Amazon Resource Name (ARN) of the client certificate used to securely connect to a Kafka target endpoint. +* `ssl_client_key_arn` - (Optional) The Amazon Resource Name (ARN) for the client private key used to securely connect to a Kafka target endpoint. +* `ssl_client_key_password` - (Optional) The password for the client private key used to securely connect to a Kafka target endpoint. * `topic` - (Optional) Kafka topic for migration. Defaults to `kafka-default-topic`. ### kinesis_settings Arguments From e869bb5c18b5278712935832f78fe0da22110f07 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 13:46:03 -0400 Subject: [PATCH 03/11] Add CHANGELOG entry. --- .changelog/20904.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/20904.txt diff --git a/.changelog/20904.txt b/.changelog/20904.txt new file mode 100644 index 000000000000..61729cc2a53a --- /dev/null +++ b/.changelog/20904.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_dms_endpoint: Add `message_format`, `include_transaction_details`, `include_partition_value`, `partition_include_schema_table`, `include_table_alter_operations`, `include_control_details`, `message_max_bytes`, `include_null_and_empty`, `security_protocol`, `ssl_client_certificate_arn`, `ssl_client_key_arn`, `ssl_client_key_password`, `ssl_ca_certificate_arn`, `sasl_username`, `sasl_password` and `no_hex_prefix` arguments to `kafka_settings` configuration block +``` \ No newline at end of file From 4797ddace019fb043a2cc1fae127a6c3a886e9ae Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 13:50:26 -0400 Subject: [PATCH 04/11] r/aws_dms_endpoint: Alphabetic order for structure fields. --- aws/resource_aws_dms_endpoint.go | 40 ++++++++++++++++---------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index 0c3501287246..af051ef34fa2 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -423,23 +423,23 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro case "kafka": request.KafkaSettings = &dms.KafkaSettings{ Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), - Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), - MessageFormat: aws.String(d.Get("kafka_settings.0.message_format").(string)), - IncludeTransactionDetails: aws.Bool(d.Get("kafka_settings.0.include_transaction_details").(bool)), + IncludeControlDetails: aws.Bool(d.Get("kafka_settings.0.include_control_details").(bool)), + IncludeNullAndEmpty: aws.Bool(d.Get("kafka_settings.0.include_null_and_empty").(bool)), IncludePartitionValue: aws.Bool(d.Get("kafka_settings.0.include_partition_value").(bool)), - PartitionIncludeSchemaTable: aws.Bool(d.Get("kafka_settings.0.partition_include_schema_table").(bool)), IncludeTableAlterOperations: aws.Bool(d.Get("kafka_settings.0.include_table_alter_operations").(bool)), - IncludeControlDetails: aws.Bool(d.Get("kafka_settings.0.include_control_details").(bool)), + IncludeTransactionDetails: aws.Bool(d.Get("kafka_settings.0.include_transaction_details").(bool)), + MessageFormat: aws.String(d.Get("kafka_settings.0.message_format").(string)), MessageMaxBytes: aws.Int64(int64(d.Get("kafka_settings.0.message_max_bytes").(int))), - IncludeNullAndEmpty: aws.Bool(d.Get("kafka_settings.0.include_null_and_empty").(bool)), + NoHexPrefix: aws.Bool(d.Get("kafka_settings.0.no_hex_prefix").(bool)), + PartitionIncludeSchemaTable: aws.Bool(d.Get("kafka_settings.0.partition_include_schema_table").(bool)), + SaslPassword: aws.String(d.Get("kafka_settings.0.sasl_password").(string)), + SaslUsername: aws.String(d.Get("kafka_settings.0.sasl_username").(string)), SecurityProtocol: aws.String(d.Get("kafka_settings.0.security_protocol").(string)), + SslCaCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_ca_certificate_arn").(string)), SslClientCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_client_certificate_arn").(string)), SslClientKeyArn: aws.String(d.Get("kafka_settings.0.ssl_client_key_arn").(string)), SslClientKeyPassword: aws.String(d.Get("kafka_settings.0.ssl_client_key_password").(string)), - SslCaCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_ca_certificate_arn").(string)), - SaslUsername: aws.String(d.Get("kafka_settings.0.sasl_username").(string)), - SaslPassword: aws.String(d.Get("kafka_settings.0.sasl_password").(string)), - NoHexPrefix: aws.Bool(d.Get("kafka_settings.0.no_hex_prefix").(bool)), + Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), } case "kinesis": request.KinesisSettings = &dms.KinesisSettings{ @@ -686,23 +686,23 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro "kafka_settings.0.no_hex_prefix") { request.KafkaSettings = &dms.KafkaSettings{ Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), - Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), - MessageFormat: aws.String(d.Get("kafka_settings.0.message_format").(string)), - IncludeTransactionDetails: aws.Bool(d.Get("kafka_settings.0.include_transaction_details").(bool)), + IncludeControlDetails: aws.Bool(d.Get("kafka_settings.0.include_control_details").(bool)), + IncludeNullAndEmpty: aws.Bool(d.Get("kafka_settings.0.include_null_and_empty").(bool)), IncludePartitionValue: aws.Bool(d.Get("kafka_settings.0.include_partition_value").(bool)), - PartitionIncludeSchemaTable: aws.Bool(d.Get("kafka_settings.0.partition_include_schema_table").(bool)), IncludeTableAlterOperations: aws.Bool(d.Get("kafka_settings.0.include_table_alter_operations").(bool)), - IncludeControlDetails: aws.Bool(d.Get("kafka_settings.0.include_control_details").(bool)), + IncludeTransactionDetails: aws.Bool(d.Get("kafka_settings.0.include_transaction_details").(bool)), + MessageFormat: aws.String(d.Get("kafka_settings.0.message_format").(string)), MessageMaxBytes: aws.Int64(int64(d.Get("kafka_settings.0.message_max_bytes").(int))), - IncludeNullAndEmpty: aws.Bool(d.Get("kafka_settings.0.include_null_and_empty").(bool)), + NoHexPrefix: aws.Bool(d.Get("kafka_settings.0.no_hex_prefix").(bool)), + PartitionIncludeSchemaTable: aws.Bool(d.Get("kafka_settings.0.partition_include_schema_table").(bool)), + SaslPassword: aws.String(d.Get("kafka_settings.0.sasl_password").(string)), + SaslUsername: aws.String(d.Get("kafka_settings.0.sasl_username").(string)), SecurityProtocol: aws.String(d.Get("kafka_settings.0.security_protocol").(string)), + SslCaCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_ca_certificate_arn").(string)), SslClientCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_client_certificate_arn").(string)), SslClientKeyArn: aws.String(d.Get("kafka_settings.0.ssl_client_key_arn").(string)), SslClientKeyPassword: aws.String(d.Get("kafka_settings.0.ssl_client_key_password").(string)), - SslCaCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_ca_certificate_arn").(string)), - SaslUsername: aws.String(d.Get("kafka_settings.0.sasl_username").(string)), - SaslPassword: aws.String(d.Get("kafka_settings.0.sasl_password").(string)), - NoHexPrefix: aws.Bool(d.Get("kafka_settings.0.no_hex_prefix").(bool)), + Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), } request.EngineName = aws.String(d.Get("engine_name").(string)) hasChanges = true From 0fa9ac9ba329c260a88195ebf7e013d02b279970 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 14:30:57 -0400 Subject: [PATCH 05/11] Add and use 'internal/service/dms/finder'. --- aws/internal/service/dms/finder/finder.go | 43 ++++++++++++++++++++++ aws/resource_aws_dms_endpoint.go | 44 ++++++++++++----------- aws/resource_aws_dms_endpoint_test.go | 36 +++++++++---------- 3 files changed, 85 insertions(+), 38 deletions(-) create mode 100644 aws/internal/service/dms/finder/finder.go diff --git a/aws/internal/service/dms/finder/finder.go b/aws/internal/service/dms/finder/finder.go new file mode 100644 index 000000000000..5c7b423cab8d --- /dev/null +++ b/aws/internal/service/dms/finder/finder.go @@ -0,0 +1,43 @@ +package finder + +import ( + "github.com/aws/aws-sdk-go/aws" + dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" +) + +func EndpointByID(conn *dms.DatabaseMigrationService, id string) (*dms.Endpoint, error) { + input := &dms.DescribeEndpointsInput{ + Filters: []*dms.Filter{ + { + Name: aws.String("endpoint-id"), + Values: aws.StringSlice([]string{id}), + }, + }, + } + + output, err := conn.DescribeEndpoints(input) + + if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + return nil, &resource.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + + if err != nil { + return nil, err + } + + if output == nil || len(output.Endpoints) == 0 || output.Endpoints[0] == nil { + return nil, tfresource.NewEmptyResultError(input) + } + + if count := len(output.Endpoints); count > 1 { + return nil, tfresource.NewTooManyResultsError(count, input) + } + + return output.Endpoints[0], nil +} diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index af051ef34fa2..5e5f0f165603 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -8,13 +8,15 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags" tfdms "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) func resourceAwsDmsEndpoint() *schema.Resource { @@ -545,24 +547,20 @@ func resourceAwsDmsEndpointRead(d *schema.ResourceData, meta interface{}) error defaultTagsConfig := meta.(*AWSClient).DefaultTagsConfig ignoreTagsConfig := meta.(*AWSClient).IgnoreTagsConfig - response, err := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{ - Filters: []*dms.Filter{ - { - Name: aws.String("endpoint-id"), - Values: []*string{aws.String(d.Id())}, // Must use d.Id() to work with import. - }, - }, - }) + endpoint, err := finder.EndpointByID(conn, d.Id()) + + if !d.IsNewResource() && tfresource.NotFound(err) { + log.Printf("[WARN] DMS Endpoint (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + if err != nil { - if dmserr, ok := err.(awserr.Error); ok && dmserr.Code() == "ResourceNotFoundFault" { - log.Printf("[DEBUG] DMS Replication Endpoint %q Not Found", d.Id()) - d.SetId("") - return nil - } - return err + return fmt.Errorf("error reading DMS Endpoint (%s): %w", d.Id(), err) } - err = resourceAwsDmsEndpointSetState(d, response.Endpoints[0]) + err = resourceAwsDmsEndpointSetState(d, endpoint) + if err != nil { return err } @@ -570,7 +568,7 @@ func resourceAwsDmsEndpointRead(d *schema.ResourceData, meta interface{}) error tags, err := keyvaluetags.DatabasemigrationserviceListTags(conn, d.Get("endpoint_arn").(string)) if err != nil { - return fmt.Errorf("error listing tags for DMS Endpoint (%s): %s", d.Get("endpoint_arn").(string), err) + return fmt.Errorf("error listing tags for DMS Endpoint (%s): %w", d.Get("endpoint_arn").(string), err) } tags = tags.IgnoreAws().IgnoreConfig(ignoreTagsConfig) @@ -820,13 +818,19 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro func resourceAwsDmsEndpointDelete(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).dmsconn - request := &dms.DeleteEndpointInput{ + log.Printf("[DEBUG] Deleting DMS Endpoint: (%s)", d.Id()) + _, err := conn.DeleteEndpoint(&dms.DeleteEndpointInput{ EndpointArn: aws.String(d.Get("endpoint_arn").(string)), + }) + + if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + return nil } - log.Printf("[DEBUG] DMS delete endpoint: %#v", request) + if err != nil { + return fmt.Errorf("error deleting DMS Endpoint (%s): %w", d.Id(), err) + } - _, err := conn.DeleteEndpoint(request) return err } diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index 05877195a3e0..a6ae1055ea42 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -5,11 +5,12 @@ import ( "regexp" "testing" - "github.com/aws/aws-sdk-go/aws" dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) func TestAccAwsDmsEndpoint_basic(t *testing.T) { @@ -556,15 +557,24 @@ func TestAccAwsDmsEndpoint_Db2(t *testing.T) { } func dmsEndpointDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).dmsconn + for _, rs := range s.RootModule().Resources { if rs.Type != "aws_dms_endpoint" { continue } - err := checkDmsEndpointExists(rs.Primary.ID) - if err == nil { - return fmt.Errorf("Found an endpoint that was not destroyed: %s", rs.Primary.ID) + _, err := finder.EndpointByID(conn, rs.Primary.ID) + + if tfresource.NotFound(err) { + continue + } + + if err != nil { + return err } + + return fmt.Errorf("DMS Endpoint %s still exists", rs.Primary.ID) } return nil @@ -578,25 +588,15 @@ func checkDmsEndpointExists(n string) resource.TestCheckFunc { } if rs.Primary.ID == "" { - return fmt.Errorf("No ID is set") + return fmt.Errorf("No DMS Endpoint ID is set") } conn := testAccProvider.Meta().(*AWSClient).dmsconn - resp, err := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{ - Filters: []*dms.Filter{ - { - Name: aws.String("endpoint-id"), - Values: []*string{aws.String(rs.Primary.ID)}, - }, - }, - }) - if err != nil { - return fmt.Errorf("DMS endpoint error: %v", err) - } + _, err := finder.EndpointByID(conn, rs.Primary.ID) - if resp.Endpoints == nil { - return fmt.Errorf("DMS endpoint not found") + if err != nil { + return err } return nil From b19cb215a6bdcb9127700c76c6c11e27697e3ebe Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 14:46:29 -0400 Subject: [PATCH 06/11] 'dmsEndpointDestroy' -> 'testAccCheckAWSDmsEndpointDestroy'. --- aws/resource_aws_dms_endpoint_test.go | 30 +++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index a6ae1055ea42..e6c4a126a24a 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -21,7 +21,7 @@ func TestAccAwsDmsEndpoint_basic(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointBasicConfig(randId), @@ -61,7 +61,7 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointS3Config(randId), @@ -114,7 +114,7 @@ func TestAccAwsDmsEndpoint_S3_ExtraConnectionAttributes(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointS3ExtraConnectionAttributesConfig(randId), @@ -141,7 +141,7 @@ func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointDynamoDbConfig(randId), @@ -174,7 +174,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointElasticsearchConfig(rName), @@ -208,7 +208,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_ExtraConnectionAttributes(t *testing.T) PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointElasticsearchExtraConnectionAttributesConfig(rName), @@ -235,7 +235,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_ErrorRetryDuration(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointElasticsearchConfigErrorRetryDuration(rName, 60), @@ -273,7 +273,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_FullLoadErrorPercentage(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointElasticsearchConfigFullLoadErrorPercentage(rName, 1), @@ -311,7 +311,7 @@ func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointKafkaConfig(randId), @@ -371,7 +371,7 @@ func TestAccAwsDmsEndpoint_Kinesis(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointKinesisConfig(randId), @@ -409,7 +409,7 @@ func TestAccAwsDmsEndpoint_MongoDb(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointMongoDbConfig(randId), @@ -439,7 +439,7 @@ func TestAccAwsDmsEndpoint_MongoDb_Update(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointMongoDbConfig(randId), @@ -484,7 +484,7 @@ func TestAccAwsDmsEndpoint_DocDB(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointDocDBConfig(randId), @@ -524,7 +524,7 @@ func TestAccAwsDmsEndpoint_Db2(t *testing.T) { PreCheck: func() { testAccPreCheck(t) }, ErrorCheck: testAccErrorCheck(t, dms.EndpointsID), Providers: testAccProviders, - CheckDestroy: dmsEndpointDestroy, + CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { Config: dmsEndpointDb2Config(randId), @@ -556,7 +556,7 @@ func TestAccAwsDmsEndpoint_Db2(t *testing.T) { }) } -func dmsEndpointDestroy(s *terraform.State) error { +func testAccCheckAWSDmsEndpointDestroy(s *terraform.State) error { conn := testAccProvider.Meta().(*AWSClient).dmsconn for _, rs := range s.RootModule().Resources { From 35336c919f6270958d81de21e934a65f10125a0d Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 14:47:26 -0400 Subject: [PATCH 07/11] 'checkDmsEndpointExists' -> 'testAccCheckAWSDmsEndpointExists'. --- aws/resource_aws_dms_endpoint_test.go | 50 +++++++++++++-------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index e6c4a126a24a..acde058cf4f9 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -26,7 +26,7 @@ func TestAccAwsDmsEndpoint_basic(t *testing.T) { { Config: dmsEndpointBasicConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -39,7 +39,7 @@ func TestAccAwsDmsEndpoint_basic(t *testing.T) { { Config: dmsEndpointBasicConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "database_name", "tf-test-dms-db-updated"), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "extra"), resource.TestCheckResourceAttr(resourceName, "password", "tftestupdate"), @@ -66,7 +66,7 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { { Config: dmsEndpointS3Config(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", ""), resource.TestCheckResourceAttr(resourceName, "s3_settings.0.csv_row_delimiter", "\\n"), @@ -90,7 +90,7 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { { Config: dmsEndpointS3ConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestMatchResourceAttr(resourceName, "extra_connection_attributes", regexp.MustCompile(`key=value;`)), resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"), @@ -119,7 +119,7 @@ func TestAccAwsDmsEndpoint_S3_ExtraConnectionAttributes(t *testing.T) { { Config: dmsEndpointS3ExtraConnectionAttributesConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestMatchResourceAttr(resourceName, "extra_connection_attributes", regexp.MustCompile(`dataFormat=parquet;`)), ), }, @@ -146,7 +146,7 @@ func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) { { Config: dmsEndpointDynamoDbConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -159,7 +159,7 @@ func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) { { Config: dmsEndpointDynamoDbConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), ), }, }, @@ -179,7 +179,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch(t *testing.T) { { Config: dmsEndpointElasticsearchConfig(rName), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), testAccCheckResourceAttrRegionalHostname(resourceName, "elasticsearch_settings.0.endpoint_uri", "es", "search-estest"), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.full_load_error_percentage", "10"), @@ -213,7 +213,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_ExtraConnectionAttributes(t *testing.T) { Config: dmsEndpointElasticsearchExtraConnectionAttributesConfig(rName), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "errorRetryDuration=400;"), ), }, @@ -240,7 +240,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_ErrorRetryDuration(t *testing.T) { { Config: dmsEndpointElasticsearchConfigErrorRetryDuration(rName, 60), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.error_retry_duration", "60"), ), @@ -256,7 +256,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_ErrorRetryDuration(t *testing.T) { // { // Config: dmsEndpointElasticsearchConfigErrorRetryDuration(rName, 120), // Check: resource.ComposeTestCheckFunc( - // checkDmsEndpointExists(resourceName), + // testAccCheckAWSDmsEndpointExists(resourceName), // resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), // resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.error_retry_duration", "120"), // ), @@ -278,7 +278,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_FullLoadErrorPercentage(t *testing.T) { { Config: dmsEndpointElasticsearchConfigFullLoadErrorPercentage(rName, 1), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.full_load_error_percentage", "1"), ), @@ -294,7 +294,7 @@ func TestAccAwsDmsEndpoint_Elasticsearch_FullLoadErrorPercentage(t *testing.T) { // { // Config: dmsEndpointElasticsearchConfigFullLoadErrorPercentage(rName, 2), // Check: resource.ComposeTestCheckFunc( - // checkDmsEndpointExists(resourceName), + // testAccCheckAWSDmsEndpointExists(resourceName), // resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), // resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.full_load_error_percentage", "2"), // ), @@ -316,7 +316,7 @@ func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { { Config: dmsEndpointKafkaConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "kafka-default-topic"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "JSON"), @@ -342,7 +342,7 @@ func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { { Config: dmsEndpointKafkaConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "topic1"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "JSON_UNFORMATTED"), @@ -376,7 +376,7 @@ func TestAccAwsDmsEndpoint_Kinesis(t *testing.T) { { Config: dmsEndpointKinesisConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kinesis_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.message_format", "json"), resource.TestCheckResourceAttrPair(resourceName, "kinesis_settings.0.stream_arn", "aws_kinesis_stream.stream1", "arn"), @@ -391,7 +391,7 @@ func TestAccAwsDmsEndpoint_Kinesis(t *testing.T) { { Config: dmsEndpointKinesisConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kinesis_settings.#", "1"), resource.TestCheckResourceAttr(resourceName, "kinesis_settings.0.message_format", "json"), resource.TestCheckResourceAttrPair(resourceName, "kinesis_settings.0.stream_arn", "aws_kinesis_stream.stream2", "arn"), @@ -414,7 +414,7 @@ func TestAccAwsDmsEndpoint_MongoDb(t *testing.T) { { Config: dmsEndpointMongoDbConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -444,14 +444,14 @@ func TestAccAwsDmsEndpoint_MongoDb_Update(t *testing.T) { { Config: dmsEndpointMongoDbConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, { Config: dmsEndpointMongoDbConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "server_name", "tftest-new-server_name"), resource.TestCheckResourceAttr(resourceName, "port", "27018"), resource.TestCheckResourceAttr(resourceName, "username", "tftest-new-username"), @@ -489,7 +489,7 @@ func TestAccAwsDmsEndpoint_DocDB(t *testing.T) { { Config: dmsEndpointDocDBConfig(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -502,7 +502,7 @@ func TestAccAwsDmsEndpoint_DocDB(t *testing.T) { { Config: dmsEndpointDocDBConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "database_name", "tf-test-dms-db-updated"), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "extra"), resource.TestCheckResourceAttr(resourceName, "password", "tftestupdate"), @@ -529,7 +529,7 @@ func TestAccAwsDmsEndpoint_Db2(t *testing.T) { { Config: dmsEndpointDb2Config(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"), ), }, @@ -542,7 +542,7 @@ func TestAccAwsDmsEndpoint_Db2(t *testing.T) { { Config: dmsEndpointDb2ConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( - checkDmsEndpointExists(resourceName), + testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "database_name", "tf-test-dms-db-updated"), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "extra"), resource.TestCheckResourceAttr(resourceName, "password", "tftestupdate"), @@ -580,7 +580,7 @@ func testAccCheckAWSDmsEndpointDestroy(s *terraform.State) error { return nil } -func checkDmsEndpointExists(n string) resource.TestCheckFunc { +func testAccCheckAWSDmsEndpointExists(n string) resource.TestCheckFunc { return func(s *terraform.State) error { rs, ok := s.RootModule().Resources[n] if !ok { From 84cc5bb4259bd1ce6ce5edafb8f9c842a355f665 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 17:00:11 -0400 Subject: [PATCH 08/11] Add 'expandDmsKafkaSettings'. --- aws/resource_aws_dms_endpoint.go | 215 ++++++++++++++++---------- aws/resource_aws_dms_endpoint_test.go | 45 +++--- 2 files changed, 156 insertions(+), 104 deletions(-) diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index 5e5f0f165603..d908f1c227d7 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -1,6 +1,7 @@ package aws import ( + "context" "fmt" "log" "regexp" @@ -10,6 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws" dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/customdiff" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" @@ -393,7 +395,10 @@ func resourceAwsDmsEndpoint() *schema.Resource { }, }, - CustomizeDiff: SetTagsDiff, + CustomizeDiff: customdiff.All( + resourceAwsDmsEndpointCustomizeDiff, + SetTagsDiff, + ), } } @@ -410,46 +415,26 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro } switch d.Get("engine_name").(string) { - // if dynamodb then add required params - case "dynamodb": + case tfdms.EngineNameDynamodb: request.DynamoDbSettings = &dms.DynamoDbSettings{ ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)), } - case "elasticsearch": + case tfdms.EngineNameElasticsearch: request.ElasticsearchSettings = &dms.ElasticsearchSettings{ ServiceAccessRoleArn: aws.String(d.Get("elasticsearch_settings.0.service_access_role_arn").(string)), EndpointUri: aws.String(d.Get("elasticsearch_settings.0.endpoint_uri").(string)), ErrorRetryDuration: aws.Int64(int64(d.Get("elasticsearch_settings.0.error_retry_duration").(int))), FullLoadErrorPercentage: aws.Int64(int64(d.Get("elasticsearch_settings.0.full_load_error_percentage").(int))), } - case "kafka": - request.KafkaSettings = &dms.KafkaSettings{ - Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), - IncludeControlDetails: aws.Bool(d.Get("kafka_settings.0.include_control_details").(bool)), - IncludeNullAndEmpty: aws.Bool(d.Get("kafka_settings.0.include_null_and_empty").(bool)), - IncludePartitionValue: aws.Bool(d.Get("kafka_settings.0.include_partition_value").(bool)), - IncludeTableAlterOperations: aws.Bool(d.Get("kafka_settings.0.include_table_alter_operations").(bool)), - IncludeTransactionDetails: aws.Bool(d.Get("kafka_settings.0.include_transaction_details").(bool)), - MessageFormat: aws.String(d.Get("kafka_settings.0.message_format").(string)), - MessageMaxBytes: aws.Int64(int64(d.Get("kafka_settings.0.message_max_bytes").(int))), - NoHexPrefix: aws.Bool(d.Get("kafka_settings.0.no_hex_prefix").(bool)), - PartitionIncludeSchemaTable: aws.Bool(d.Get("kafka_settings.0.partition_include_schema_table").(bool)), - SaslPassword: aws.String(d.Get("kafka_settings.0.sasl_password").(string)), - SaslUsername: aws.String(d.Get("kafka_settings.0.sasl_username").(string)), - SecurityProtocol: aws.String(d.Get("kafka_settings.0.security_protocol").(string)), - SslCaCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_ca_certificate_arn").(string)), - SslClientCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_client_certificate_arn").(string)), - SslClientKeyArn: aws.String(d.Get("kafka_settings.0.ssl_client_key_arn").(string)), - SslClientKeyPassword: aws.String(d.Get("kafka_settings.0.ssl_client_key_password").(string)), - Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), - } - case "kinesis": + case tfdms.EngineNameKafka: + request.KafkaSettings = expandDmsKafkaSettings(d.Get("kafka_settings").([]interface{})[0].(map[string]interface{})) + case tfdms.EngineNameKinesis: request.KinesisSettings = &dms.KinesisSettings{ MessageFormat: aws.String(d.Get("kinesis_settings.0.message_format").(string)), ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)), StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)), } - case "mongodb": + case tfdms.EngineNameMongodb: request.MongoDbSettings = &dms.MongoDbSettings{ Username: aws.String(d.Get("username").(string)), Password: aws.String(d.Get("password").(string)), @@ -472,7 +457,7 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro request.ServerName = aws.String(d.Get("server_name").(string)) request.Port = aws.Int64(int64(d.Get("port").(int))) request.DatabaseName = aws.String(d.Get("database_name").(string)) - case "s3": + case tfdms.EngineNameS3: request.S3Settings = &dms.S3Settings{ BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)), BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), @@ -639,15 +624,15 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro } } - switch d.Get("engine_name").(string) { - case "dynamodb": + switch engineName := d.Get("engine_name").(string); engineName { + case tfdms.EngineNameDynamodb: if d.HasChange("service_access_role") { request.DynamoDbSettings = &dms.DynamoDbSettings{ ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)), } hasChanges = true } - case "elasticsearch": + case tfdms.EngineNameElasticsearch: if d.HasChanges( "elasticsearch_settings.0.endpoint_uri", "elasticsearch_settings.0.error_retry_duration", @@ -659,53 +644,16 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro ErrorRetryDuration: aws.Int64(int64(d.Get("elasticsearch_settings.0.error_retry_duration").(int))), FullLoadErrorPercentage: aws.Int64(int64(d.Get("elasticsearch_settings.0.full_load_error_percentage").(int))), } - request.EngineName = aws.String(d.Get("engine_name").(string)) + request.EngineName = aws.String(engineName) hasChanges = true } - case "kafka": - if d.HasChanges( - "kafka_settings.0.broker", - "kafka_settings.0.topic", - "kafka_settings.0.message_format", - "kafka_settings.0.include_transaction_details", - "kafka_settings.0.include_partition_value", - "kafka_settings.0.partition_include_schema_table", - "kafka_settings.0.include_table_alter_operations", - "kafka_settings.0.include_control_details", - "kafka_settings.0.message_max_bytes", - "kafka_settings.0.include_null_and_empty", - "kafka_settings.0.security_protocol", - "kafka_settings.0.ssl_client_certificate_arn", - "kafka_settings.0.ssl_client_key_arn", - "kafka_settings.0.ssl_client_key_password", - "kafka_settings.0.ssl_ca_certificate_arn", - "kafka_settings.0.sasl_username", - "kafka_settings.0.sasl_password", - "kafka_settings.0.no_hex_prefix") { - request.KafkaSettings = &dms.KafkaSettings{ - Broker: aws.String(d.Get("kafka_settings.0.broker").(string)), - IncludeControlDetails: aws.Bool(d.Get("kafka_settings.0.include_control_details").(bool)), - IncludeNullAndEmpty: aws.Bool(d.Get("kafka_settings.0.include_null_and_empty").(bool)), - IncludePartitionValue: aws.Bool(d.Get("kafka_settings.0.include_partition_value").(bool)), - IncludeTableAlterOperations: aws.Bool(d.Get("kafka_settings.0.include_table_alter_operations").(bool)), - IncludeTransactionDetails: aws.Bool(d.Get("kafka_settings.0.include_transaction_details").(bool)), - MessageFormat: aws.String(d.Get("kafka_settings.0.message_format").(string)), - MessageMaxBytes: aws.Int64(int64(d.Get("kafka_settings.0.message_max_bytes").(int))), - NoHexPrefix: aws.Bool(d.Get("kafka_settings.0.no_hex_prefix").(bool)), - PartitionIncludeSchemaTable: aws.Bool(d.Get("kafka_settings.0.partition_include_schema_table").(bool)), - SaslPassword: aws.String(d.Get("kafka_settings.0.sasl_password").(string)), - SaslUsername: aws.String(d.Get("kafka_settings.0.sasl_username").(string)), - SecurityProtocol: aws.String(d.Get("kafka_settings.0.security_protocol").(string)), - SslCaCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_ca_certificate_arn").(string)), - SslClientCertificateArn: aws.String(d.Get("kafka_settings.0.ssl_client_certificate_arn").(string)), - SslClientKeyArn: aws.String(d.Get("kafka_settings.0.ssl_client_key_arn").(string)), - SslClientKeyPassword: aws.String(d.Get("kafka_settings.0.ssl_client_key_password").(string)), - Topic: aws.String(d.Get("kafka_settings.0.topic").(string)), - } - request.EngineName = aws.String(d.Get("engine_name").(string)) + case tfdms.EngineNameKafka: + if d.HasChange("kafka_settings") { + request.KafkaSettings = expandDmsKafkaSettings(d.Get("kafka_settings").([]interface{})[0].(map[string]interface{})) + request.EngineName = aws.String(engineName) hasChanges = true } - case "kinesis": + case tfdms.EngineNameKinesis: if d.HasChanges( "kinesis_settings.0.service_access_role_arn", "kinesis_settings.0.stream_arn") { @@ -716,10 +664,10 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)), StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)), } - request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 'kinesis') + request.EngineName = aws.String(engineName) hasChanges = true } - case "mongodb": + case tfdms.EngineNameMongodb: if d.HasChanges( "username", "password", "server_name", "port", "database_name", "mongodb_settings.0.auth_type", "mongodb_settings.0.auth_mechanism", "mongodb_settings.0.nesting_level", "mongodb_settings.0.extract_doc_id", @@ -739,7 +687,7 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro DocsToInvestigate: aws.String(d.Get("mongodb_settings.0.docs_to_investigate").(string)), AuthSource: aws.String(d.Get("mongodb_settings.0.auth_source").(string)), } - request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 'mongodb') + request.EngineName = aws.String(engineName) // Update connection info in top-level namespace as well request.Username = aws.String(d.Get("username").(string)) @@ -750,7 +698,7 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro hasChanges = true } - case "s3": + case tfdms.EngineNameS3: if d.HasChanges( "s3_settings.0.service_access_role_arn", "s3_settings.0.external_table_definition", "s3_settings.0.csv_row_delimiter", "s3_settings.0.csv_delimiter", "s3_settings.0.bucket_folder", @@ -771,7 +719,7 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro ServerSideEncryptionKmsKeyId: aws.String(d.Get("s3_settings.0.server_side_encryption_kms_key_id").(string)), ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)), } - request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 's3') + request.EngineName = aws.String(engineName) hasChanges = true } default: @@ -834,6 +782,33 @@ func resourceAwsDmsEndpointDelete(d *schema.ResourceData, meta interface{}) erro return err } +func resourceAwsDmsEndpointCustomizeDiff(_ context.Context, diff *schema.ResourceDiff, v interface{}) error { + switch engineName := diff.Get("engine_name").(string); engineName { + case tfdms.EngineNameElasticsearch: + if v, ok := diff.GetOk("elasticsearch_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("elasticsearch_settings must be set when engine_name = %q", engineName) + } + case tfdms.EngineNameKafka: + if v, ok := diff.GetOk("kafka_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("kafka_settings must be set when engine_name = %q", engineName) + } + case tfdms.EngineNameKinesis: + if v, ok := diff.GetOk("kinesis_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("kinesis_settings must be set when engine_name = %q", engineName) + } + case tfdms.EngineNameMongodb: + if v, ok := diff.GetOk("mongodb_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("mongodb_settings must be set when engine_name = %q", engineName) + } + case tfdms.EngineNameS3: + if v, ok := diff.GetOk("s3_settings"); !ok || len(v.([]interface{})) == 0 || v.([]interface{})[0] == nil { + return fmt.Errorf("s3_settings must be set when engine_name = %q", engineName) + } + } + + return nil +} + func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) error { d.SetId(aws.StringValue(endpoint.EndpointIdentifier)) @@ -912,6 +887,88 @@ func flattenDmsElasticsearchSettings(settings *dms.ElasticsearchSettings) []map[ return []map[string]interface{}{m} } +func expandDmsKafkaSettings(tfMap map[string]interface{}) *dms.KafkaSettings { + if tfMap == nil { + return nil + } + + apiObject := &dms.KafkaSettings{} + + if v, ok := tfMap["broker"].(string); ok && v != "" { + apiObject.Broker = aws.String(v) + } + + if v, ok := tfMap["include_control_details"].(bool); ok { + apiObject.IncludeControlDetails = aws.Bool(v) + } + + if v, ok := tfMap["include_null_and_empty"].(bool); ok { + apiObject.IncludeNullAndEmpty = aws.Bool(v) + } + + if v, ok := tfMap["include_partition_value"].(bool); ok { + apiObject.IncludePartitionValue = aws.Bool(v) + } + + if v, ok := tfMap["include_table_alter_operations"].(bool); ok { + apiObject.IncludeTableAlterOperations = aws.Bool(v) + } + + if v, ok := tfMap["include_transaction_details"].(bool); ok { + apiObject.IncludeTransactionDetails = aws.Bool(v) + } + + if v, ok := tfMap["message_format"].(string); ok && v != "" { + apiObject.MessageFormat = aws.String(v) + } + + if v, ok := tfMap["message_max_bytes"].(int); ok && v != 0 { + apiObject.MessageMaxBytes = aws.Int64(int64(v)) + } + + if v, ok := tfMap["no_hex_prefix"].(bool); ok { + apiObject.NoHexPrefix = aws.Bool(v) + } + + if v, ok := tfMap["partition_include_schema_table"].(bool); ok { + apiObject.PartitionIncludeSchemaTable = aws.Bool(v) + } + + if v, ok := tfMap["sasl_password"].(string); ok && v != "" { + apiObject.SaslPassword = aws.String(v) + } + + if v, ok := tfMap["sasl_username"].(string); ok && v != "" { + apiObject.SaslUsername = aws.String(v) + } + + if v, ok := tfMap["security_protocol"].(string); ok && v != "" { + apiObject.SecurityProtocol = aws.String(v) + } + + if v, ok := tfMap["ssl_ca_certificate_arn"].(string); ok && v != "" { + apiObject.SslCaCertificateArn = aws.String(v) + } + + if v, ok := tfMap["ssl_client_certificate_arn"].(string); ok && v != "" { + apiObject.SslClientCertificateArn = aws.String(v) + } + + if v, ok := tfMap["ssl_client_key_arn"].(string); ok && v != "" { + apiObject.SslClientKeyArn = aws.String(v) + } + + if v, ok := tfMap["ssl_client_key_password"].(string); ok && v != "" { + apiObject.SslClientKeyPassword = aws.String(v) + } + + if v, ok := tfMap["topic"].(string); ok && v != "" { + apiObject.Topic = aws.String(v) + } + + return apiObject +} + func flattenDmsKafkaSettings(settings *dms.KafkaSettings) []map[string]interface{} { if settings == nil { return []map[string]interface{}{} diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index acde058cf4f9..b2b82c4ce24a 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -304,8 +304,9 @@ func TestAccAwsDmsEndpoint_Elasticsearch_FullLoadErrorPercentage(t *testing.T) { } func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { + domainName := testAccRandomSubdomain() + rName := acctest.RandomWithPrefix("tf-acc-test") resourceName := "aws_dms_endpoint.test" - randId := acctest.RandString(8) + "-kafka" resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, @@ -314,7 +315,7 @@ func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { CheckDestroy: testAccCheckAWSDmsEndpointDestroy, Steps: []resource.TestStep{ { - Config: dmsEndpointKafkaConfig(randId), + Config: dmsEndpointKafkaConfig(rName, domainName), Check: resource.ComposeTestCheckFunc( testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), @@ -340,7 +341,7 @@ func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { ImportStateVerifyIgnore: []string{"password"}, }, { - Config: dmsEndpointKafkaConfigUpdate(randId), + Config: dmsEndpointKafkaConfigUpdate(rName, domainName), Check: resource.ComposeTestCheckFunc( testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), @@ -1144,42 +1145,36 @@ resource "aws_dms_endpoint" "test" { `, rName, fullLoadErrorPercentage)) } -func dmsEndpointKafkaConfig(randId string) string { +func dmsEndpointKafkaConfig(rName, domainName string) string { return fmt.Sprintf(` -data "aws_partition" "current" {} - resource "aws_dms_endpoint" "test" { - endpoint_id = "tf-test-dms-endpoint-%[1]s" - endpoint_type = "target" - engine_name = "kafka" - ssl_mode = "none" - extra_connection_attributes = "" + endpoint_id = %[1]q + endpoint_type = "target" + engine_name = "kafka" + ssl_mode = "none" kafka_settings { - broker = "ec2-12-345-678-901.compute-1.${data.aws_partition.current.dns_suffix}:2345" + broker = "%[2]s:2345" include_null_and_empty = false security_protocol = "plaintext" - sasl_username = "tftest" - sasl_password = "tftest" + // sasl_username = "tftest" + // sasl_password = "tftest" no_hex_prefix = false } } -`, randId) +`, rName, domainName) } -func dmsEndpointKafkaConfigUpdate(randId string) string { +func dmsEndpointKafkaConfigUpdate(rName, domainName string) string { return fmt.Sprintf(` -data "aws_partition" "current" {} - resource "aws_dms_endpoint" "test" { - endpoint_id = "tf-test-dms-endpoint-%[1]s" - endpoint_type = "target" - engine_name = "kafka" - ssl_mode = "none" - extra_connection_attributes = "" + endpoint_id = %[1]q + endpoint_type = "target" + engine_name = "kafka" + ssl_mode = "none" kafka_settings { - broker = "ec2-12-345-678-901.compute-1.${data.aws_partition.current.dns_suffix}:2345" + broker = "%[2]s:2345" topic = "topic1" message_format = "JSON_UNFORMATTED" include_transaction_details = true @@ -1195,7 +1190,7 @@ resource "aws_dms_endpoint" "test" { no_hex_prefix = true } } -`, randId) +`, rName, domainName) } func dmsEndpointKinesisConfig(randId string) string { From 461b157e9ad01183cf808c363c2bfa39a488fdd5 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 17:30:14 -0400 Subject: [PATCH 09/11] Add 'internal/service/dms/waiter'. --- aws/internal/service/dms/enum.go | 4 + aws/internal/service/dms/waiter/status.go | 25 +++++ aws/internal/service/dms/waiter/waiter.go | 30 ++++++ aws/resource_aws_dms_endpoint.go | 114 +++++++++++++++++----- aws/resource_aws_dms_endpoint_test.go | 48 +++++---- 5 files changed, 175 insertions(+), 46 deletions(-) create mode 100644 aws/internal/service/dms/waiter/status.go create mode 100644 aws/internal/service/dms/waiter/waiter.go diff --git a/aws/internal/service/dms/enum.go b/aws/internal/service/dms/enum.go index 61960847a280..559122fe2b2c 100644 --- a/aws/internal/service/dms/enum.go +++ b/aws/internal/service/dms/enum.go @@ -1,5 +1,9 @@ package dms +const ( + EndpointStatusDeleting = "deleting" +) + const ( EngineNameAurora = "aurora" EngineNameAuroraPostgresql = "aurora-postgresql" diff --git a/aws/internal/service/dms/waiter/status.go b/aws/internal/service/dms/waiter/status.go new file mode 100644 index 000000000000..145b3494c4c8 --- /dev/null +++ b/aws/internal/service/dms/waiter/status.go @@ -0,0 +1,25 @@ +package waiter + +import ( + "github.com/aws/aws-sdk-go/aws" + dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" +) + +func EndpointStatus(conn *dms.DatabaseMigrationService, id string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + output, err := finder.EndpointByID(conn, id) + + if tfresource.NotFound(err) { + return nil, "", nil + } + + if err != nil { + return nil, "", err + } + + return output, aws.StringValue(output.Status), nil + } +} diff --git a/aws/internal/service/dms/waiter/waiter.go b/aws/internal/service/dms/waiter/waiter.go new file mode 100644 index 000000000000..6227b5d1baa4 --- /dev/null +++ b/aws/internal/service/dms/waiter/waiter.go @@ -0,0 +1,30 @@ +package waiter + +import ( + "time" + + dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + tfdms "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms" +) + +const ( + EndpointDeletedTimeout = 5 * time.Minute +) + +func EndpointDeleted(conn *dms.DatabaseMigrationService, id string) (*dms.Endpoint, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{tfdms.EndpointStatusDeleting}, + Target: []string{}, + Refresh: EndpointStatus(conn, id), + Timeout: EndpointDeletedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if output, ok := outputRaw.(*dms.Endpoint); ok { + return output, err + } + + return nil, err +} diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index d908f1c227d7..2a23e6ba52c6 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -18,6 +18,7 @@ import ( "github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags" tfdms "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms" "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dms/waiter" "github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource" ) @@ -779,6 +780,12 @@ func resourceAwsDmsEndpointDelete(d *schema.ResourceData, meta interface{}) erro return fmt.Errorf("error deleting DMS Endpoint (%s): %w", d.Id(), err) } + _, err = waiter.EndpointDeleted(conn, d.Id()) + + if err != nil { + return fmt.Errorf("error waiting for DMS Endpoint (%s) delete: %w", d.Id(), err) + } + return err } @@ -832,8 +839,12 @@ func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoi return fmt.Errorf("Error setting elasticsearch for DMS: %s", err) } case "kafka": - if err := d.Set("kafka_settings", flattenDmsKafkaSettings(endpoint.KafkaSettings)); err != nil { - return fmt.Errorf("Error setting kafka_settings for DMS: %s", err) + if endpoint.KafkaSettings != nil { + if err := d.Set("kafka_settings", []interface{}{flattenDmsKafkaSettings(endpoint.KafkaSettings)}); err != nil { + return fmt.Errorf("error setting kafka_settings: %w", err) + } + } else { + d.Set("kafka_settings", nil) } case "kinesis": if err := d.Set("kinesis_settings", flattenDmsKinesisSettings(endpoint.KinesisSettings)); err != nil { @@ -969,33 +980,86 @@ func expandDmsKafkaSettings(tfMap map[string]interface{}) *dms.KafkaSettings { return apiObject } -func flattenDmsKafkaSettings(settings *dms.KafkaSettings) []map[string]interface{} { - if settings == nil { - return []map[string]interface{}{} +func flattenDmsKafkaSettings(apiObject *dms.KafkaSettings) map[string]interface{} { + if apiObject == nil { + return nil } - m := map[string]interface{}{ - "broker": aws.StringValue(settings.Broker), - "topic": aws.StringValue(settings.Topic), - "message_format": aws.StringValue(settings.MessageFormat), - "include_transaction_details": aws.BoolValue(settings.IncludeTransactionDetails), - "include_partition_value": aws.BoolValue(settings.IncludePartitionValue), - "partition_include_schema_table": aws.BoolValue(settings.PartitionIncludeSchemaTable), - "include_table_alter_operations": aws.BoolValue(settings.IncludeTableAlterOperations), - "include_control_details": aws.BoolValue(settings.IncludeControlDetails), - "message_max_bytes": aws.Int64Value(settings.MessageMaxBytes), - "include_null_and_empty": aws.BoolValue(settings.IncludeNullAndEmpty), - "security_protocol": aws.StringValue(settings.SecurityProtocol), - "ssl_client_certificate_arn": aws.StringValue(settings.SslClientCertificateArn), - "ssl_client_key_arn": aws.StringValue(settings.SslClientKeyArn), - "ssl_client_key_password": aws.StringValue(settings.SslClientKeyPassword), - "ssl_ca_certificate_arn": aws.StringValue(settings.SslCaCertificateArn), - "sasl_username": aws.StringValue(settings.SaslUsername), - "sasl_password": aws.StringValue(settings.SaslPassword), - "no_hex_prefix": aws.BoolValue(settings.NoHexPrefix), + tfMap := map[string]interface{}{} + + if v := apiObject.Broker; v != nil { + tfMap["broker"] = aws.StringValue(v) } - return []map[string]interface{}{m} + if v := apiObject.IncludeControlDetails; v != nil { + tfMap["include_control_details"] = aws.BoolValue(v) + } + + if v := apiObject.IncludeNullAndEmpty; v != nil { + tfMap["include_null_and_empty"] = aws.BoolValue(v) + } + + if v := apiObject.IncludePartitionValue; v != nil { + tfMap["include_partition_value"] = aws.BoolValue(v) + } + + if v := apiObject.IncludeTableAlterOperations; v != nil { + tfMap["include_table_alter_operations"] = aws.BoolValue(v) + } + + if v := apiObject.IncludeTransactionDetails; v != nil { + tfMap["include_transaction_details"] = aws.BoolValue(v) + } + + if v := apiObject.MessageFormat; v != nil { + tfMap["message_format"] = aws.StringValue(v) + } + + if v := apiObject.MessageMaxBytes; v != nil { + tfMap["message_max_bytes"] = aws.Int64Value(v) + } + + if v := apiObject.NoHexPrefix; v != nil { + tfMap["no_hex_prefix"] = aws.BoolValue(v) + } + + if v := apiObject.PartitionIncludeSchemaTable; v != nil { + tfMap["partition_include_schema_table"] = aws.BoolValue(v) + } + + if v := apiObject.SaslPassword; v != nil { + tfMap["sasl_password"] = aws.StringValue(v) + } + + if v := apiObject.SaslUsername; v != nil { + tfMap["sasl_username"] = aws.StringValue(v) + } + + if v := apiObject.SecurityProtocol; v != nil { + tfMap["security_protocol"] = aws.StringValue(v) + } + + if v := apiObject.SslCaCertificateArn; v != nil { + tfMap["ssl_ca_certificate_arn"] = aws.StringValue(v) + } + + if v := apiObject.SslClientCertificateArn; v != nil { + tfMap["ssl_client_certificate_arn"] = aws.StringValue(v) + } + + if v := apiObject.SslClientKeyArn; v != nil { + tfMap["ssl_client_key_arn"] = aws.StringValue(v) + } + + if v := apiObject.SslClientKeyPassword; v != nil { + tfMap["ssl_client_key_password"] = aws.StringValue(v) + } + + if v := apiObject.Topic; v != nil { + tfMap["topic"] = aws.StringValue(v) + } + + return tfMap } func flattenDmsKinesisSettings(settings *dms.KinesisSettings) []map[string]interface{} { diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index b2b82c4ce24a..3881b827cb25 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -319,19 +319,23 @@ func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { Check: resource.ComposeTestCheckFunc( testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "kafka-default-topic"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "JSON"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_transaction_details", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_control_details", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_null_and_empty", "false"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_partition_value", "false"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.partition_include_schema_table", "false"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_table_alter_operations", "false"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_control_details", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_transaction_details", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "json"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_max_bytes", "1000000"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_null_and_empty", "false"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.security_protocol", "plaintext"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_username", "tftest"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_password", "tftest"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.no_hex_prefix", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.partition_include_schema_table", "false"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_password", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_username", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.security_protocol", "plaintext"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_ca_certificate_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_certificate_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_key_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_key_password", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "kafka-default-topic"), ), }, { @@ -345,19 +349,23 @@ func TestAccAwsDmsEndpoint_Kafka(t *testing.T) { Check: resource.ComposeTestCheckFunc( testAccCheckAWSDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "kafka_settings.#", "1"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "topic1"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "JSON_UNFORMATTED"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_transaction_details", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_control_details", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_null_and_empty", "true"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_partition_value", "true"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.partition_include_schema_table", "true"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_table_alter_operations", "true"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_control_details", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_transaction_details", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_format", "json-unformatted"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.message_max_bytes", "500000"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.include_null_and_empty", "true"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.security_protocol", "sasl-ssl"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_username", "tftest-new"), - resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_password", "tftest-new"), resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.no_hex_prefix", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.partition_include_schema_table", "true"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_password", "tftest-new"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.sasl_username", "tftest-new"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.security_protocol", "sasl-ssl"), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_ca_certificate_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_certificate_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_key_arn", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.ssl_client_key_password", ""), + resource.TestCheckResourceAttr(resourceName, "kafka_settings.0.topic", "topic1"), ), }, }, @@ -1157,8 +1165,6 @@ resource "aws_dms_endpoint" "test" { broker = "%[2]s:2345" include_null_and_empty = false security_protocol = "plaintext" - // sasl_username = "tftest" - // sasl_password = "tftest" no_hex_prefix = false } } @@ -1176,7 +1182,7 @@ resource "aws_dms_endpoint" "test" { kafka_settings { broker = "%[2]s:2345" topic = "topic1" - message_format = "JSON_UNFORMATTED" + message_format = "json-unformatted" include_transaction_details = true include_partition_value = true partition_include_schema_table = true From b81a9ed1ff9d72852286e4dc08eff5f4f3f925bc Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 17:45:04 -0400 Subject: [PATCH 10/11] r/aws_dms_endpoint: 'sasl_password' and 'ssl_client_key_password' are Sensitive. --- aws/resource_aws_dms_endpoint.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index 2a23e6ba52c6..d2a29d2f3c25 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -171,8 +171,9 @@ func resourceAwsDmsEndpoint() *schema.Resource { Default: false, }, "sasl_password": { - Type: schema.TypeString, - Optional: true, + Type: schema.TypeString, + Optional: true, + Sensitive: true, }, "sasl_username": { Type: schema.TypeString, @@ -199,8 +200,9 @@ func resourceAwsDmsEndpoint() *schema.Resource { ValidateFunc: validateArn, }, "ssl_client_key_password": { - Type: schema.TypeString, - Optional: true, + Type: schema.TypeString, + Optional: true, + Sensitive: true, }, "topic": { Type: schema.TypeString, @@ -840,7 +842,11 @@ func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoi } case "kafka": if endpoint.KafkaSettings != nil { - if err := d.Set("kafka_settings", []interface{}{flattenDmsKafkaSettings(endpoint.KafkaSettings)}); err != nil { + // SASL password isn't returned in API. Propagate state value. + tfMap := flattenDmsKafkaSettings(endpoint.KafkaSettings) + tfMap["sasl_password"] = d.Get("kafka_settings.0.sasl_password").(string) + + if err := d.Set("kafka_settings", []interface{}{tfMap}); err != nil { return fmt.Errorf("error setting kafka_settings: %w", err) } } else { From 883516f2a814baf62a713861493893e8c092dbf7 Mon Sep 17 00:00:00 2001 From: Kit Ewbank Date: Thu, 7 Oct 2021 17:54:30 -0400 Subject: [PATCH 11/11] Comment out 'testAccCheckResourceAttrHostnameWithPort' as it's currently unused. --- aws/provider_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/aws/provider_test.go b/aws/provider_test.go index bae5d844fce8..8d0ff54808cb 100644 --- a/aws/provider_test.go +++ b/aws/provider_test.go @@ -356,6 +356,7 @@ func testAccCheckResourceAttrRegionalReverseDnsService(resourceName, attributeNa } } +/* // testAccCheckResourceAttrHostnameWithPort ensures the Terraform state regexp matches a formatted DNS hostname with prefix, partition DNS suffix, and given port func testAccCheckResourceAttrHostnameWithPort(resourceName, attributeName, serviceName, hostnamePrefix string, port int) resource.TestCheckFunc { return func(s *terraform.State) error { @@ -365,6 +366,7 @@ func testAccCheckResourceAttrHostnameWithPort(resourceName, attributeName, servi return resource.TestCheckResourceAttr(resourceName, attributeName, hostname)(s) } } +*/ // testAccCheckResourceAttrPrivateDnsName ensures the Terraform state exactly matches a private DNS name //