diff --git a/aws/resource_aws_dms_endpoint.go b/aws/resource_aws_dms_endpoint.go index cc67a6d5789..661582b4ed9 100644 --- a/aws/resource_aws_dms_endpoint.go +++ b/aws/resource_aws_dms_endpoint.go @@ -291,14 +291,33 @@ func resourceAwsDmsEndpoint() *schema.Resource { }, "bucket_name": { Type: schema.TypeString, - Optional: true, - Default: "", + Required: true, }, "compression_type": { Type: schema.TypeString, Optional: true, Default: "NONE", }, + "timestamp_column_name": { + Type: schema.TypeString, + Optional: true, + }, + "data_format": { + Type: schema.TypeString, + Optional: true, + }, + "parquet_version": { + Type: schema.TypeString, + Optional: true, + }, + "encryption_mode": { + Type: schema.TypeString, + Optional: true, + }, + "server_side_encryption_kms_key_id": { + Type: schema.TypeString, + Optional: true, + }, }, }, }, @@ -330,6 +349,15 @@ func resourceAwsDmsEndpoint() *schema.Resource { } } +func getStringRefOrNil(d *schema.ResourceData, key string) *string { + value, exists := d.GetOk(key) + if !exists { + return nil + } + valueString := value.(string) + return &valueString +} + func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).dmsconn @@ -389,13 +417,18 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro request.DatabaseName = aws.String(d.Get("database_name").(string)) case "s3": request.S3Settings = &dms.S3Settings{ - ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)), - ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)), - CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)), - CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)), - BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)), - BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), - CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)), + ServiceAccessRoleArn: getStringRefOrNil(d, "s3_settings.0.service_access_role_arn"), + ExternalTableDefinition: getStringRefOrNil(d, "s3_settings.0.external_table_definition"), + CsvRowDelimiter: getStringRefOrNil(d, "s3_settings.0.csv_row_delimiter"), + CsvDelimiter: getStringRefOrNil(d, "s3_settings.0.csv_delimiter"), + BucketFolder: getStringRefOrNil(d, "s3_settings.0.bucket_folder"), + BucketName: getStringRefOrNil(d, "s3_settings.0.bucket_name"), + CompressionType: getStringRefOrNil(d, "s3_settings.0.compression_type"), + TimestampColumnName: getStringRefOrNil(d, "s3_settings.0.timestamp_column_name"), + DataFormat: getStringRefOrNil(d, "s3_settings.0.data_format"), + ParquetVersion: getStringRefOrNil(d, "s3_settings.0.parquet_version"), + EncryptionMode: getStringRefOrNil(d, "s3_settings.0.encryption_mode"), + ServerSideEncryptionKmsKeyId: getStringRefOrNil(d, "s3_settings.0.server_side_encryption_kms_key_id"), } default: request.Password = aws.String(d.Get("password").(string)) @@ -443,14 +476,13 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro } d.SetId(d.Get("endpoint_id").(string)) + response, _ := describeEndpointResponse(conn, d) + log.Println("[DEBUG] DMS response from create endpoint:", response) return resourceAwsDmsEndpointRead(d, meta) } -func resourceAwsDmsEndpointRead(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).dmsconn - ignoreTagsConfig := meta.(*AWSClient).IgnoreTagsConfig - - response, err := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{ +func describeEndpointResponse(conn *dms.DatabaseMigrationService, d *schema.ResourceData) (*dms.DescribeEndpointsOutput, error) { + response, err2 := conn.DescribeEndpoints(&dms.DescribeEndpointsInput{ Filters: []*dms.Filter{ { Name: aws.String("endpoint-id"), @@ -458,6 +490,14 @@ func resourceAwsDmsEndpointRead(d *schema.ResourceData, meta interface{}) error }, }, }) + return response, err2 +} + +func resourceAwsDmsEndpointRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).dmsconn + ignoreTagsConfig := meta.(*AWSClient).IgnoreTagsConfig + + response, err := describeEndpointResponse(conn, d) if err != nil { if dmserr, ok := err.(awserr.Error); ok && dmserr.Code() == "ResourceNotFoundFault" { log.Printf("[DEBUG] DMS Replication Endpoint %q Not Found", d.Id()) @@ -622,15 +662,23 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro if d.HasChanges( "s3_settings.0.service_access_role_arn", "s3_settings.0.external_table_definition", "s3_settings.0.csv_row_delimiter", "s3_settings.0.csv_delimiter", "s3_settings.0.bucket_folder", - "s3_settings.0.bucket_name", "s3_settings.0.compression_type") { + "s3_settings.0.bucket_name", "s3_settings.0.compression_type", + "s3_settings.0.timestamp_column_name", "s3_settings.0.data_format", "s3_settings.0.parquet_version", + "s3_settings.0.parquet_version", "s3_settings.0.encryption_mode", + "s3_settings.0.server_side_encryption_kms_key_id") { request.S3Settings = &dms.S3Settings{ - ServiceAccessRoleArn: aws.String(d.Get("s3_settings.0.service_access_role_arn").(string)), - ExternalTableDefinition: aws.String(d.Get("s3_settings.0.external_table_definition").(string)), - CsvRowDelimiter: aws.String(d.Get("s3_settings.0.csv_row_delimiter").(string)), - CsvDelimiter: aws.String(d.Get("s3_settings.0.csv_delimiter").(string)), - BucketFolder: aws.String(d.Get("s3_settings.0.bucket_folder").(string)), - BucketName: aws.String(d.Get("s3_settings.0.bucket_name").(string)), - CompressionType: aws.String(d.Get("s3_settings.0.compression_type").(string)), + ServiceAccessRoleArn: getStringRefOrNil(d, "s3_settings.0.service_access_role_arn"), + ExternalTableDefinition: getStringRefOrNil(d, "s3_settings.0.external_table_definition"), + CsvRowDelimiter: getStringRefOrNil(d, "s3_settings.0.csv_row_delimiter"), + CsvDelimiter: getStringRefOrNil(d, "s3_settings.0.csv_delimiter"), + BucketFolder: getStringRefOrNil(d, "s3_settings.0.bucket_folder"), + BucketName: getStringRefOrNil(d, "s3_settings.0.bucket_name"), + TimestampColumnName: getStringRefOrNil(d, "s3_settings.0.timestamp_column_name"), + CompressionType: getStringRefOrNil(d, "s3_settings.0.compression_type"), + DataFormat: getStringRefOrNil(d, "s3_settings.0.data_format"), + ParquetVersion: getStringRefOrNil(d, "s3_settings.0.parquet_version"), + EncryptionMode: getStringRefOrNil(d, "s3_settings.0.encryption_mode"), + ServerSideEncryptionKmsKeyId: getStringRefOrNil(d, "s3_settings.0.server_side_encryption_kms_key_id"), } request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 's3') hasChanges = true @@ -811,19 +859,27 @@ func flattenDmsMongoDbSettings(settings *dms.MongoDbSettings) []map[string]inter } func flattenDmsS3Settings(settings *dms.S3Settings) []map[string]interface{} { - if settings == nil { - return []map[string]interface{}{} - } - - m := map[string]interface{}{ - "service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn), - "external_table_definition": aws.StringValue(settings.ExternalTableDefinition), - "csv_row_delimiter": aws.StringValue(settings.CsvRowDelimiter), - "csv_delimiter": aws.StringValue(settings.CsvDelimiter), - "bucket_folder": aws.StringValue(settings.BucketFolder), - "bucket_name": aws.StringValue(settings.BucketName), - "compression_type": aws.StringValue(settings.CompressionType), - } - - return []map[string]interface{}{m} + result := make([]map[string]interface{}, 0, 1) + + mapWithNils := map[string]interface{}{ + "service_access_role_arn": settings.ServiceAccessRoleArn, + "external_table_definition": settings.ExternalTableDefinition, + "csv_row_delimiter": settings.CsvRowDelimiter, + "csv_delimiter": settings.CsvDelimiter, + "bucket_folder": settings.BucketFolder, + "bucket_name": settings.BucketName, + "compression_type": settings.CompressionType, + "timestamp_column_name": settings.TimestampColumnName, + "data_format": settings.DataFormat, + "parquet_version": settings.ParquetVersion, + "encryption_mode": settings.EncryptionMode, + "server_side_encryption_kms_key_id": settings.ServerSideEncryptionKmsKeyId, + } + mapWithoutNills := make(map[string]interface{}) + for key := range mapWithNils { + if mapWithNils[key].(*string) != nil { + mapWithoutNills[key] = aws.StringValue(mapWithNils[key].(*string)) + } + } + return append(result, mapWithoutNills) } diff --git a/aws/resource_aws_dms_endpoint_test.go b/aws/resource_aws_dms_endpoint_test.go index b93ef43afd0..734c2a67bca 100644 --- a/aws/resource_aws_dms_endpoint_test.go +++ b/aws/resource_aws_dms_endpoint_test.go @@ -50,7 +50,9 @@ func TestAccAwsDmsEndpoint_basic(t *testing.T) { }) } -func TestAccAwsDmsEndpoint_S3(t *testing.T) { +type TestCheckFunc func(*terraform.State) error + +func TestAccAwsDmsEndpoint_S3_Csv(t *testing.T) { resourceName := "aws_dms_endpoint.dms_endpoint" randId := acctest.RandString(8) + "-s3" @@ -60,7 +62,8 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { CheckDestroy: dmsEndpointDestroy, Steps: []resource.TestStep{ { - Config: dmsEndpointS3Config(randId), + PreventDiskCleanup: true, + Config: dmsEndpointS3Config(randId), Check: resource.ComposeTestCheckFunc( checkDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), @@ -70,6 +73,8 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", ""), resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "bucket_name"), resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "NONE"), + resource.TestCheckNoResourceAttr(resourceName, "s3_settings.0.enable_statistics"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.timestamp_column_name", ""), ), }, { @@ -79,7 +84,7 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { ImportStateVerifyIgnore: []string{"password"}, }, { - Config: dmsEndpointS3ConfigUpdate(randId), + Config: dmsEndpointS3CsvConfigUpdate(randId), Check: resource.ComposeTestCheckFunc( checkDmsEndpointExists(resourceName), resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"), @@ -96,6 +101,59 @@ func TestAccAwsDmsEndpoint_S3(t *testing.T) { }) } +func TestAccAwsDmsEndpoint_S3_Parquet(t *testing.T) { + resourceName := "aws_dms_endpoint.dms_endpoint" + randId := acctest.RandString(8) + "-s3" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: dmsEndpointDestroy, + Steps: []resource.TestStep{ + { + Config: dmsEndpointS3Config(randId), + Check: resource.ComposeTestCheckFunc( + checkDmsEndpointExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", ""), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "bucket_name"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "NONE"), + resource.TestCheckNoResourceAttr(resourceName, "s3_settings.0.enable_statistics"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.timestamp_column_name", ""), + resource.TestCheckNoResourceAttr(resourceName, "s3_settings.0.enable_statistics"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", ""), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.parquet_version", ""), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.server_side_encryption_kms_key_id", ""), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"password"}, + }, + { + Config: dmsEndpointS3ParquetConfigUpdate(randId), + Check: resource.ComposeTestCheckFunc( + checkDmsEndpointExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.#", "1"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.external_table_definition", "new-external_table_definition"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_folder", "new-bucket_folder"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.bucket_name", "new-bucket_name"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.compression_type", "GZIP"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.timestamp_column_name", "some_timestamp_column_name"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.data_format", "parquet"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.parquet_version", "parquet-2-0"), + resource.TestCheckResourceAttr(resourceName, "s3_settings.0.encryption_mode", "SSE_S3"), + resource.TestCheckResourceAttrSet(resourceName, "s3_settings.0.data_format"), + resource.TestCheckResourceAttrSet(resourceName, "s3_settings.0.server_side_encryption_kms_key_id"), + ), + }, + }, + }) +} + func TestAccAwsDmsEndpoint_DynamoDb(t *testing.T) { resourceName := "aws_dms_endpoint.dms_endpoint" randId := acctest.RandString(8) + "-dynamodb" @@ -764,7 +822,7 @@ EOF `, randId) } -func dmsEndpointS3ConfigUpdate(randId string) string { +func dmsEndpointS3CsvConfigUpdate(randId string) string { return fmt.Sprintf(` resource "aws_dms_endpoint" "dms_endpoint" { endpoint_id = "tf-test-dms-endpoint-%[1]s" @@ -843,6 +901,83 @@ EOF `, randId) } +func dmsEndpointS3ParquetConfigUpdate(randId string) string { + return fmt.Sprintf(` +data "aws_kms_alias" "dms" { + name = "alias/aws/dms" +} +resource "aws_dms_endpoint" "dms_endpoint" { + endpoint_id = "tf-test-dms-endpoint-%[1]s" + endpoint_type = "target" + engine_name = "s3" + ssl_mode = "none" + extra_connection_attributes = "key=value;" + tags = { + Name = "tf-test-s3-endpoint-%[1]s" + Update = "updated" + Add = "added" + } + s3_settings { + service_access_role_arn = "${aws_iam_role.iam_role.arn}" + external_table_definition = "new-external_table_definition" + bucket_folder = "new-bucket_folder" + bucket_name = "new-bucket_name" + compression_type = "GZIP" + timestamp_column_name = "some_timestamp_column_name" + data_format = "parquet" + parquet_version = "parquet-2-0" + encryption_mode = "SSE_S3" + server_side_encryption_kms_key_id = "${data.aws_kms_alias.dms.target_key_arn}" + } +} +resource "aws_iam_role" "iam_role" { + name = "tf-test-iam-s3-role-%[1]s" + assume_role_policy = <