diff --git a/.changelog/34724.txt b/.changelog/34724.txt new file mode 100644 index 00000000000..46c34e9d4a7 --- /dev/null +++ b/.changelog/34724.txt @@ -0,0 +1,7 @@ +```release-note:enhancement +resource/aws_dms_endpoint: Add `postgres_settings` configuration block +``` + +```release-note:enhancement +data-source/aws_dms_endpoint: Add `postgres_settings` attribute +``` \ No newline at end of file diff --git a/internal/service/dms/endpoint.go b/internal/service/dms/endpoint.go index c79a0a76656..0d597410865 100644 --- a/internal/service/dms/endpoint.go +++ b/internal/service/dms/endpoint.go @@ -348,6 +348,80 @@ func ResourceEndpoint() *schema.Resource { Optional: true, ConflictsWith: []string{"secrets_manager_access_role_arn", "secrets_manager_arn"}, }, + "postgres_settings": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "after_connect_script": { + Type: schema.TypeString, + Optional: true, + }, + "babelfish_database_name": { + Type: schema.TypeString, + Optional: true, + }, + "capture_ddls": { + Type: schema.TypeBool, + Optional: true, + }, + "database_mode": { + Type: schema.TypeString, + Optional: true, + }, + "ddl_artifacts_schema": { + Type: schema.TypeString, + Optional: true, + }, + "execute_timeout": { + Type: schema.TypeInt, + Optional: true, + }, + "fail_tasks_on_lob_truncation": { + Type: schema.TypeBool, + Optional: true, + }, + "heartbeat_enable": { + Type: schema.TypeBool, + Optional: true, + }, + "heartbeat_frequency": { + Type: schema.TypeInt, + Optional: true, + }, + "heartbeat_schema": { + Type: schema.TypeString, + Optional: true, + }, + "map_boolean_as_boolean": { + Type: schema.TypeBool, + Optional: true, + }, + "map_jsonb_as_clob": { + Type: schema.TypeBool, + Optional: true, + }, + "map_long_varchar_as": { + Type: schema.TypeString, + Optional: true, + }, + "max_file_size": { + Type: schema.TypeInt, + Optional: true, + }, + "plugin_name": { + Type: schema.TypeString, + Optional: true, + }, + "slot_name": { + Type: schema.TypeString, + Optional: true, + }, + }, + }, + }, "redis_settings": { Type: schema.TypeList, Optional: true, @@ -752,24 +826,27 @@ func resourceEndpointCreate(ctx context.Context, d *schema.ResourceData, meta in expandTopLevelConnectionInfo(d, input) } case engineNameAuroraPostgresql, engineNamePostgres: + settings := &dms.PostgreSQLSettings{} + if _, ok := d.GetOk("postgres_settings"); ok { + settings = expandPostgreSQLSettings(d.Get("postgres_settings").([]interface{})[0].(map[string]interface{})) + } + if _, ok := d.GetOk("secrets_manager_arn"); ok { - input.PostgreSQLSettings = &dms.PostgreSQLSettings{ - SecretsManagerAccessRoleArn: aws.String(d.Get("secrets_manager_access_role_arn").(string)), - SecretsManagerSecretId: aws.String(d.Get("secrets_manager_arn").(string)), - DatabaseName: aws.String(d.Get("database_name").(string)), - } + settings.SecretsManagerAccessRoleArn = aws.String(d.Get("secrets_manager_access_role_arn").(string)) + settings.SecretsManagerSecretId = aws.String(d.Get("secrets_manager_arn").(string)) + settings.DatabaseName = aws.String(d.Get("database_name").(string)) } else { - input.PostgreSQLSettings = &dms.PostgreSQLSettings{ - Username: aws.String(d.Get("username").(string)), - Password: aws.String(d.Get("password").(string)), - ServerName: aws.String(d.Get("server_name").(string)), - Port: aws.Int64(int64(d.Get("port").(int))), - DatabaseName: aws.String(d.Get("database_name").(string)), - } + settings.Username = aws.String(d.Get("username").(string)) + settings.Password = aws.String(d.Get("password").(string)) + settings.ServerName = aws.String(d.Get("server_name").(string)) + settings.Port = aws.Int64(int64(d.Get("port").(int))) + settings.DatabaseName = aws.String(d.Get("database_name").(string)) // Set connection info in top-level namespace as well expandTopLevelConnectionInfo(d, input) } + + input.PostgreSQLSettings = settings case engineNameDynamoDB: input.DynamoDbSettings = &dms.DynamoDbSettings{ ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)), @@ -1476,6 +1553,9 @@ func resourceEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) er } else { flattenTopLevelConnectionInfo(d, endpoint) } + if err := d.Set("postgres_settings", flattenPostgreSQLSettings(endpoint.PostgreSQLSettings)); err != nil { + return fmt.Errorf("setting postgres_settings: %w", err) + } case engineNameDynamoDB: if endpoint.DynamoDbSettings != nil { d.Set("service_access_role", endpoint.DynamoDbSettings.ServiceAccessRoleArn) @@ -1636,9 +1716,6 @@ func stopEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrati switch aws.StringValue(task.Status) { case replicationTaskStatusRunning: err := stopReplicationTask(ctx, rtID, conn) - if tfawserr.ErrCodeEquals(err, dms.ErrCodeInvalidResourceStateFault) { - continue - } if err != nil { return stoppedTasks, err @@ -2064,6 +2141,124 @@ func flattenRedshiftSettings(settings *dms.RedshiftSettings) []map[string]interf return []map[string]interface{}{m} } +func expandPostgreSQLSettings(tfMap map[string]interface{}) *dms.PostgreSQLSettings { + if tfMap == nil { + return nil + } + + apiObject := &dms.PostgreSQLSettings{} + + if v, ok := tfMap["after_connect_script"].(string); ok && v != "" { + apiObject.AfterConnectScript = aws.String(v) + } + if v, ok := tfMap["babelfish_database_name"].(string); ok && v != "" { + apiObject.BabelfishDatabaseName = aws.String(v) + } + if v, ok := tfMap["capture_ddls"].(bool); ok { + apiObject.CaptureDdls = aws.Bool(v) + } + if v, ok := tfMap["database_mode"].(string); ok && v != "" { + apiObject.DatabaseMode = aws.String(v) + } + if v, ok := tfMap["ddl_artifacts_schema"].(string); ok && v != "" { + apiObject.DdlArtifactsSchema = aws.String(v) + } + if v, ok := tfMap["execute_timeout"].(int); ok { + apiObject.ExecuteTimeout = aws.Int64(int64(v)) + } + if v, ok := tfMap["fail_tasks_on_lob_truncation"].(bool); ok { + apiObject.FailTasksOnLobTruncation = aws.Bool(v) + } + if v, ok := tfMap["heartbeat_enable"].(bool); ok { + apiObject.HeartbeatEnable = aws.Bool(v) + } + if v, ok := tfMap["heartbeat_frequency"].(int); ok { + apiObject.HeartbeatFrequency = aws.Int64(int64(v)) + } + if v, ok := tfMap["heartbeat_schema"].(string); ok && v != "" { + apiObject.HeartbeatSchema = aws.String(v) + } + if v, ok := tfMap["map_boolean_as_boolean"].(bool); ok { + apiObject.MapBooleanAsBoolean = aws.Bool(v) + } + if v, ok := tfMap["map_jsonb_as_clob"].(bool); ok { + apiObject.MapJsonbAsClob = aws.Bool(v) + } + if v, ok := tfMap["map_long_varchar_as"].(string); ok && v != "" { + apiObject.MapLongVarcharAs = aws.String(v) + } + if v, ok := tfMap["max_file_size"].(int); ok { + apiObject.MaxFileSize = aws.Int64(int64(v)) + } + if v, ok := tfMap["plugin_name"].(string); ok && v != "" { + apiObject.PluginName = aws.String(v) + } + if v, ok := tfMap["slot_name"].(string); ok && v != "" { + apiObject.SlotName = aws.String(v) + } + + return apiObject +} + +func flattenPostgreSQLSettings(apiObject *dms.PostgreSQLSettings) []map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.AfterConnectScript; v != nil { + tfMap["after_connect_script"] = aws.StringValue(v) + } + if v := apiObject.BabelfishDatabaseName; v != nil { + tfMap["babelfish_database_name"] = aws.StringValue(v) + } + if v := apiObject.CaptureDdls; v != nil { + tfMap["capture_ddls"] = aws.BoolValue(v) + } + if v := apiObject.DatabaseMode; v != nil { + tfMap["database_mode"] = aws.StringValue(v) + } + if v := apiObject.DdlArtifactsSchema; v != nil { + tfMap["ddl_artifacts_schema"] = aws.StringValue(v) + } + if v := apiObject.ExecuteTimeout; v != nil { + tfMap["execute_timeout"] = aws.Int64Value(v) + } + if v := apiObject.FailTasksOnLobTruncation; v != nil { + tfMap["fail_tasks_on_lob_truncation"] = aws.BoolValue(v) + } + if v := apiObject.HeartbeatEnable; v != nil { + tfMap["heartbeat_enable"] = aws.BoolValue(v) + } + if v := apiObject.HeartbeatFrequency; v != nil { + tfMap["heartbeat_frequency"] = aws.Int64Value(v) + } + if v := apiObject.HeartbeatSchema; v != nil { + tfMap["heartbeat_schema"] = aws.StringValue(v) + } + if v := apiObject.MapBooleanAsBoolean; v != nil { + tfMap["map_boolean_as_boolean"] = aws.BoolValue(v) + } + if v := apiObject.MapJsonbAsClob; v != nil { + tfMap["map_jsonb_as_clob"] = aws.BoolValue(v) + } + if v := apiObject.MapLongVarcharAs; v != nil { + tfMap["map_long_varchar_as"] = aws.StringValue(v) + } + if v := apiObject.MaxFileSize; v != nil { + tfMap["max_file_size"] = aws.Int64Value(v) + } + if v := apiObject.PluginName; v != nil { + tfMap["plugin_name"] = aws.StringValue(v) + } + if v := apiObject.SlotName; v != nil { + tfMap["slot_name"] = aws.StringValue(v) + } + + return []map[string]interface{}{tfMap} +} + func expandS3Settings(tfMap map[string]interface{}) *dms.S3Settings { if tfMap == nil { return nil diff --git a/internal/service/dms/endpoint_data_source.go b/internal/service/dms/endpoint_data_source.go index c6d4c3f95a4..94ddeb07a50 100644 --- a/internal/service/dms/endpoint_data_source.go +++ b/internal/service/dms/endpoint_data_source.go @@ -240,6 +240,78 @@ func DataSourceEndpoint() *schema.Resource { Type: schema.TypeInt, Computed: true, }, + "postgres_settings": { + Type: schema.TypeList, + Computed: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "after_connect_script": { + Type: schema.TypeString, + Computed: true, + }, + "babelfish_database_name": { + Type: schema.TypeString, + Computed: true, + }, + "capture_ddls": { + Type: schema.TypeBool, + Computed: true, + }, + "database_mode": { + Type: schema.TypeString, + Computed: true, + }, + "ddl_artifacts_schema": { + Type: schema.TypeString, + Computed: true, + }, + "execute_timeout": { + Type: schema.TypeInt, + Computed: true, + }, + "fail_tasks_on_lob_truncation": { + Type: schema.TypeBool, + Computed: true, + }, + "heartbeat_enable": { + Type: schema.TypeBool, + Computed: true, + }, + "heartbeat_frequency": { + Type: schema.TypeInt, + Computed: true, + }, + "heartbeat_schema": { + Type: schema.TypeString, + Computed: true, + }, + "map_boolean_as_boolean": { + Type: schema.TypeBool, + Computed: true, + }, + "map_jsonb_as_clob": { + Type: schema.TypeBool, + Computed: true, + }, + "map_long_varchar_as": { + Type: schema.TypeString, + Computed: true, + }, + "max_file_size": { + Type: schema.TypeInt, + Computed: true, + }, + "plugin_name": { + Type: schema.TypeString, + Computed: true, + }, + "slot_name": { + Type: schema.TypeString, + Computed: true, + }, + }, + }, + }, "redis_settings": { Type: schema.TypeList, Computed: true, diff --git a/internal/service/dms/endpoint_test.go b/internal/service/dms/endpoint_test.go index 2401e7f01c4..8dc8d0d08d1 100644 --- a/internal/service/dms/endpoint_test.go +++ b/internal/service/dms/endpoint_test.go @@ -1349,6 +1349,69 @@ func TestAccDMSEndpoint_PostgreSQL_kmsKey(t *testing.T) { }) } +func TestAccDMSEndpoint_PostgreSQL_settings_source(t *testing.T) { + ctx := acctest.Context(t) + resourceName := "aws_dms_endpoint.test" + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckEndpointDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccEndpointConfig_postgreSQLSourceSettings(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckEndpointExists(ctx, resourceName), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.after_connect_script", "SET search_path TO pg_catalog,public;"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.capture_ddls", "true"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.ddl_artifacts_schema", "true"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.execute_timeout", "100"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.fail_tasks_on_lob_truncation", "false"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.heartbeat_enable", "true"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.heartbeat_frequency", "5"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.heartbeat_schema", "test"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.map_boolean_as_boolean", "true"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.map_jsonb_as_clob", "true"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.map_long_varchar_as", "wstring"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.max_file_size", "1024"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.plugin_name", "pglogical"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.slot_name", "test"), + ), + }, + }, + }) +} + +func TestAccDMSEndpoint_PostgreSQL_settings_target(t *testing.T) { + ctx := acctest.Context(t) + resourceName := "aws_dms_endpoint.test" + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckEndpointDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccEndpointConfig_postgreSQLTargetSettings(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckEndpointExists(ctx, resourceName), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.after_connect_script", "SET search_path TO pg_catalog,public;"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.babelfish_database_name", "babelfish"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.database_mode", "babelfish"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.execute_timeout", "100"), + resource.TestCheckResourceAttr(resourceName, "postgres_settings.0.max_file_size", "1024"), + ), + }, + }, + }) +} + func TestAccDMSEndpoint_SQLServer_basic(t *testing.T) { ctx := acctest.Context(t) resourceName := "aws_dms_endpoint.test" @@ -3800,6 +3863,65 @@ resource "aws_dms_endpoint" "test" { `, rName) } +func testAccEndpointConfig_postgreSQLSourceSettings(rName string) string { + return fmt.Sprintf(` +resource "aws_dms_endpoint" "test" { + endpoint_id = %[1]q + endpoint_type = "source" + engine_name = "postgres" + server_name = "tftest" + port = 5432 + username = "tftest" + password = "tftest" + database_name = "tftest" + ssl_mode = "require" + extra_connection_attributes = "" + + postgres_settings { + after_connect_script = "SET search_path TO pg_catalog,public;" + capture_ddls = true + ddl_artifacts_schema = true + execute_timeout = 100 + fail_tasks_on_lob_truncation = false + heartbeat_enable = true + heartbeat_frequency = 5 + heartbeat_schema = "test" + map_boolean_as_boolean = true + map_jsonb_as_clob = true + map_long_varchar_as = "wstring" + max_file_size = 1024 + plugin_name = "pglogical" + slot_name = "test" + } +} +`, rName) +} + +func testAccEndpointConfig_postgreSQLTargetSettings(rName string) string { + return fmt.Sprintf(` +resource "aws_dms_endpoint" "test" { + endpoint_id = %[1]q + endpoint_type = "target" + engine_name = "postgres" + server_name = "tftest" + port = 5432 + username = "tftest" + password = "tftest" + database_name = "tftest" + ssl_mode = "require" + extra_connection_attributes = "" + + postgres_settings { + after_connect_script = "SET search_path TO pg_catalog,public;" + babelfish_database_name = "babelfish" + database_mode = "babelfish" + execute_timeout = 100 + max_file_size = 1024 + } +} +`, rName) +} + func testAccEndpointConfig_sqlServer(rName string) string { return fmt.Sprintf(` resource "aws_dms_endpoint" "test" { diff --git a/internal/service/dms/replication_task.go b/internal/service/dms/replication_task.go index 74dee1dae40..aeb3ae87f0f 100644 --- a/internal/service/dms/replication_task.go +++ b/internal/service/dms/replication_task.go @@ -387,7 +387,7 @@ func startReplicationTask(ctx context.Context, conn *dms.DatabaseMigrationServic } func stopReplicationTask(ctx context.Context, id string, conn *dms.DatabaseMigrationService) error { - log.Printf("[DEBUG] Stopping DMS Replication Task: (%s)", id) + log.Printf("[DEBUG] Stopping DMS Replication Task: %s", id) task, err := FindReplicationTaskByID(ctx, conn, id) if err != nil { @@ -402,6 +402,10 @@ func stopReplicationTask(ctx context.Context, id string, conn *dms.DatabaseMigra ReplicationTaskArn: task.ReplicationTaskArn, }) + if tfawserr.ErrMessageContains(err, dms.ErrCodeInvalidResourceStateFault, "is currently not running") { + return nil + } + if err != nil { return fmt.Errorf("stopping DMS Replication Task (%s): %w", id, err) } diff --git a/website/docs/r/dms_endpoint.html.markdown b/website/docs/r/dms_endpoint.html.markdown index 77903a417a4..191b0af2070 100644 --- a/website/docs/r/dms_endpoint.html.markdown +++ b/website/docs/r/dms_endpoint.html.markdown @@ -58,6 +58,7 @@ The following arguments are optional: * `kinesis_settings` - (Optional) Configuration block for Kinesis settings. See below. * `mongodb_settings` - (Optional) Configuration block for MongoDB settings. See below. * `password` - (Optional) Password to be used to login to the endpoint database. +* `postgres_settings` - (Optional) Configuration block for Postgres settings. See below. * `pause_replication_tasks` - (Optional) Whether to pause associated running replication tasks, regardless if they are managed by Terraform, prior to modifying the endpoint. Only tasks paused by the resource will be restarted after the modification completes. Default is `false`. * `port` - (Optional) Port used by the endpoint database. * `redshift_settings` - (Optional) Configuration block for Redshift settings. See below. @@ -127,6 +128,27 @@ The following arguments are optional: * `extract_doc_id` - (Optional) Document ID. Use this setting when `nesting_level` is set to `none`. Default is `false`. * `nesting_level` - (Optional) Specifies either document or table mode. Default is `none`. Valid values are `one` (table mode) and `none` (document mode). +### postgres_settings + +-> Additional information can be found in the [Using PostgreSQL as a Source for AWS DMS documentation](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html). + +* `after_connect_script` - (Optional) For use with change data capture (CDC) only, this attribute has AWS DMS bypass foreign keys and user triggers to reduce the time it takes to bulk load data. +* `babelfish_database_name` - (Optional) The Babelfish for Aurora PostgreSQL database name for the endpoint. +* `capture_ddls` - (Optional) To capture DDL events, AWS DMS creates various artifacts in the PostgreSQL database when the task starts. +* `database_mode` - (Optional) Specifies the default behavior of the replication's handling of PostgreSQL- compatible endpoints that require some additional configuration, such as Babelfish endpoints. +* `ddl_artifacts_schema` - (Optional) Sets the schema in which the operational DDL database artifacts are created. Default is `public`. +* `execute_timeout` - (Optional) Sets the client statement timeout for the PostgreSQL instance, in seconds. Default value is `60`. +* `fail_tasks_on_lob_truncation` - (Optional) When set to `true`, this value causes a task to fail if the actual size of a LOB column is greater than the specified `LobMaxSize`. Default is `false`. +* `heartbeat_enable` - (Optional) The write-ahead log (WAL) heartbeat feature mimics a dummy transaction. By doing this, it prevents idle logical replication slots from holding onto old WAL logs, which can result in storage full situations on the source. +* `heartbeat_frequency` - (Optional) Sets the WAL heartbeat frequency (in minutes). Default value is `5`. +* `heartbeat_schema` - (Optional) Sets the schema in which the heartbeat artifacts are created. Default value is `public`. +* `map_boolean_as_boolean` - (Optional) You can use PostgreSQL endpoint settings to map a boolean as a boolean from your PostgreSQL source to a Amazon Redshift target. Default value is `false`. +* `map_jsonb_as_clob` - Optional When true, DMS migrates JSONB values as CLOB. +* `map_long_varchar_as` - Optional When true, DMS migrates LONG values as VARCHAR. +* `max_file_size` - (Optional) Specifies the maximum size (in KB) of any .csv file used to transfer data to PostgreSQL. Default is `32,768 KB`. +* `plugin_name` - (Optional) Specifies the plugin to use to create a replication slot. Valid values: `pglogical`, `test_decoding`. +* `slot_name` - (Optional) Sets the name of a previously created logical replication slot for a CDC load of the PostgreSQL source instance. + ### redis_settings -> Additional information can be found in the [Using Redis as a target for AWS Database Migration Service](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Redis.html).