From 0d9cfdeaf77b5271c93218459608bb82eddb74a4 Mon Sep 17 00:00:00 2001 From: Tim Coulson Date: Wed, 29 Jan 2020 15:36:50 +0000 Subject: [PATCH] add dms elasticsearch target --- aws/resource_aws_dms_endpoint.go | 76 ++++++++++++++++ aws/resource_aws_dms_endpoint_test.go | 100 ++++++++++++++++++++++ website/docs/r/dms_endpoint.html.markdown | 3 +- 3 files changed, 178 insertions(+), 1 deletion(-) diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index 7aadf3eb708..9c086e3494c 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -69,6 +69,7 @@ func resourceAwsDmsEndpoint() *schema.Resource { "db2", "docdb", "dynamodb", + "elasticsearch", "mariadb", "mongodb", "mysql", @@ -217,6 +218,41 @@ func resourceAwsDmsEndpoint() *schema.Resource { }, }, }, + "elasticsearch_settings": { + Type: schema.TypeList, + Optional: true, + MaxItems: 1, + DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool { + if old == "1" && new == "0" { + return true + } + return false + }, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "service_access_role_arn": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "endpoint_uri": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "error_retry_duration": { + Type: schema.TypeInt, + Optional: true, + Default: "300", + }, + "full_load_error_percentage": { + Type: schema.TypeInt, + Optional: true, + Default: "10", + }, + }, + }, + }, }, } } @@ -270,6 +306,13 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)), } + case "elasticsearch": + request.ElasticsearchSettings = &dms.ElasticsearchSettings{ + ServiceAccessRoleArn: aws.String(d.Get("elasticsearch_settings.0.service_access_role_arn").(string)), + EndpointUri: aws.String(d.Get("elasticsearch_settings.0.endpoint_uri").(string)), + ErrorRetryDuration: aws.Int64(int64(d.Get("elasticsearch_settings.0.error_retry_duration").(int))), + FullLoadErrorPercentage: aws.Int64(int64(d.Get("elasticsearch_settings.0.full_load_error_percentage").(int))), + } default: request.Password = aws.String(d.Get("password").(string)) request.Port = aws.Int64(int64(d.Get("port").(int))) @@ -477,6 +520,20 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 's3') hasChanges = true } + case "elasticsearch": + if d.HasChange("elasticsearch_settings.0.endpoint_uri") || + d.HasChange("elasticsearch_settings.0.error_retry_duration") || + d.HasChange("elasticsearch_settings.0.full_load_error_percentage") || + d.HasChange("elasticsearch_settings.0.service_access_role_arn") { + request.ElasticsearchSettings = &dms.ElasticsearchSettings{ + ServiceAccessRoleArn: aws.String(d.Get("elasticsearch_settings.0.service_access_role_arn").(string)), + EndpointUri: aws.String(d.Get("elasticsearch_settings.0.endpoint_uri").(string)), + ErrorRetryDuration: aws.Int64(int64(d.Get("elasticsearch_settings.0.error_retry_duration").(int))), + FullLoadErrorPercentage: aws.Int64(int64(d.Get("elasticsearch_settings.0.full_load_error_percentage").(int))), + } + request.EngineName = aws.String(d.Get("engine_name").(string)) + hasChanges = true + } default: if d.HasChange("database_name") { request.DatabaseName = aws.String(d.Get("database_name").(string)) @@ -567,6 +624,10 @@ func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoi if err := d.Set("s3_settings", flattenDmsS3Settings(endpoint.S3Settings)); err != nil { return fmt.Errorf("Error setting s3_settings for DMS: %s", err) } + case "elasticsearch": + if err := d.Set("elasticsearch_settings", flattenDmsElasticsearchSettings(endpoint.ElasticsearchSettings)); err != nil { + return fmt.Errorf("Error setting elasticsearch for DMS: %s", err) + } default: d.Set("database_name", endpoint.DatabaseName) d.Set("extra_connection_attributes", endpoint.ExtraConnectionAttributes) @@ -615,3 +676,18 @@ func flattenDmsS3Settings(settings *dms.S3Settings) []map[string]interface{} { return []map[string]interface{}{m} } + +func flattenDmsElasticsearchSettings(settings *dms.ElasticsearchSettings) []map[string]interface{} { + if settings == nil { + return []map[string]interface{}{} + } + + m := map[string]interface{}{ + "service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn), + "endpoint_uri": aws.StringValue(settings.EndpointUri), + "full_load_error_percentage": aws.Int64Value(settings.FullLoadErrorPercentage), + "error_retry_duration": aws.Int64Value(settings.ErrorRetryDuration), + } + + return []map[string]interface{}{m} +} diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index c2d11a14a4d..a43aebe815c 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -96,6 +96,37 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { }) } +func TestAccAwsDmsEndpoint_Elasticsearch(t *testing.T) { + resourceName := "aws_dms_endpoint.dms_endpoint" + randId := acctest.RandString(8) + "-elasticsearch" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: dmsEndpointDestroy, + Steps: []resource.TestStep{ + { + Config: dmsEndpointElasticsearchConfig(randId), + Check: resource.ComposeTestCheckFunc( + checkDmsEndpointExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.endpoint_uri", "search-estest.us-west-2.es.amazonaws.com"), + resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.full_load_error_percentage", "10"), + resource.TestCheckResourceAttr(resourceName, "elasticsearch_settings.0.error_retry_duration", "300"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"password"}, + }, + // No update step due to: + // InvalidParameterCombinationException: Elasticsearch endpoint cant be modified. + }, + }) +} + func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) { resourceName := "aws_dms_endpoint.dms_endpoint" randId := acctest.RandString(8) + "-dynamodb" @@ -623,6 +654,75 @@ EOF `, randId) } +func dmsEndpointElasticsearchConfig(randId string) string { + return fmt.Sprintf(` +resource "aws_dms_endpoint" "dms_endpoint" { + endpoint_id = "tf-test-dms-endpoint-%[1]s" + endpoint_type = "target" + engine_name = "elasticsearch" + ssl_mode = "none" + + tags = { + Name = "tf-test-elasticsearch-endpoint-%[1]s" + Update = "to-update" + Remove = "to-remove" + } + + elasticsearch_settings { + service_access_role_arn = "${aws_iam_role.iam_role.arn}" + endpoint_uri = "search-estest.us-west-2.es.amazonaws.com" + full_load_error_percentage = 10 + error_retry_duration = 300 + } + + depends_on = ["aws_iam_role_policy.dms_elasticsearch_access"] +} + +resource "aws_iam_role" "iam_role" { + name = "tf-test-iam-elasticsearch-role-%[1]s" + + assume_role_policy = <