diff --git a/aws/resource_aws_dynamodb_table.go b/aws/resource_aws_dynamodb_table.go index 83e71b6b9a3..9773c4aaef6 100644 --- a/aws/resource_aws_dynamodb_table.go +++ b/aws/resource_aws_dynamodb_table.go @@ -279,6 +279,18 @@ func resourceAwsDynamoDbTable() *schema.Resource { }, }, }, + "replica": { + Type: schema.TypeSet, + Optional: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "region_name": { + Type: schema.TypeString, + Required: true, + }, + }, + }, + }, }, } } @@ -418,6 +430,12 @@ func resourceAwsDynamoDbTableCreate(d *schema.ResourceData, meta interface{}) er } } + if _, ok := d.GetOk("replica"); ok { + if err := createDynamoDbReplicas(d.Id(), d.Get("replica").(*schema.Set).List(), conn); err != nil { + return fmt.Errorf("error enabled DynamoDB Table (%s) replicas: %s", d.Id(), err) + } + } + return resourceAwsDynamoDbTableRead(d, meta) } @@ -595,9 +613,88 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er } } + if d.HasChange("replica") { + if err := updateDynamoDbReplica(d, conn); err != nil { + return fmt.Errorf("error updating DynamoDB Table (%s) replica: %s", d.Id(), err) + } + } + return resourceAwsDynamoDbTableRead(d, meta) } +func updateDynamoDbReplica(d *schema.ResourceData, conn *dynamodb.DynamoDB) error { + oRaw, nRaw := d.GetChange("replica") + o := oRaw.(*schema.Set) + n := nRaw.(*schema.Set) + + removed := o.Difference(n).List() + added := n.Difference(o).List() + + for _, replicaRaw := range added { + m, ok := replicaRaw.(map[string]interface{}) + + if !ok { + continue + } + + regionName := m["region_name"].(string) + + replicaUpdate := &dynamodb.ReplicationGroupUpdate{ + Create: &dynamodb.CreateReplicationGroupMemberAction{ + RegionName: aws.String(regionName), + }, + } + + input := &dynamodb.UpdateTableInput{ + TableName: aws.String(d.Id()), + ReplicaUpdates: []*dynamodb.ReplicationGroupUpdate{replicaUpdate}, + } + + _, err := conn.UpdateTable(input) + + if err != nil { + return fmt.Errorf("error adding DynamoDB Table (%s) replica (%s): %w", d.Id(), regionName, err) + } + + if err := waitForDynamoDbReplicaUpdateToBeCompleted(d.Id(), regionName, 20*time.Minute, conn); err != nil { + return fmt.Errorf("error waiting for DynamoDB Table (%s) replica (%s) to update: %s", d.Id(), regionName, err) + } + } + + for _, replicaRaw := range removed { + m, ok := replicaRaw.(map[string]interface{}) + + if !ok { + continue + } + + regionName := m["region_name"].(string) + + replicaUpdate := &dynamodb.ReplicationGroupUpdate{ + Delete: &dynamodb.DeleteReplicationGroupMemberAction{ + RegionName: aws.String(regionName), + }, + } + + input := &dynamodb.UpdateTableInput{ + TableName: aws.String(d.Id()), + ReplicaUpdates: []*dynamodb.ReplicationGroupUpdate{replicaUpdate}, + } + + _, err := conn.UpdateTable(input) + + if err != nil { + return fmt.Errorf("error adding DynamoDB Table (%s) replica (%s): %w", d.Id(), regionName, err) + } + + if err := waitForDynamoDbReplicaDeleteToBeCompleted(d.Id(), regionName, 20*time.Minute, conn); err != nil { + return fmt.Errorf("error waiting for DynamoDB Table (%s) replica (%s) to update: %s", d.Id(), regionName, err) + } + } + + return nil +} + func resourceAwsDynamoDbTableRead(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).dynamodbconn @@ -652,6 +749,12 @@ func resourceAwsDynamoDbTableDelete(d *schema.ResourceData, meta interface{}) er log.Printf("[DEBUG] DynamoDB delete table: %s", d.Id()) + if replicas := d.Get("replica").(*schema.Set).List(); len(replicas) > 0 { + if err := deleteDynamoDbReplicas(d.Id(), replicas, conn); err != nil { + return fmt.Errorf("error deleting DynamoDB Table (%s) replicas: %s", d.Id(), err) + } + } + err := deleteAwsDynamoDbTable(d.Id(), conn) if err != nil { if isAWSErr(err, dynamodb.ErrCodeResourceNotFoundException, "Requested resource not found: Table: ") { @@ -702,6 +805,52 @@ func deleteAwsDynamoDbTable(tableName string, conn *dynamodb.DynamoDB) error { return err } +func deleteDynamoDbReplicas(tableName string, replicas []interface{}, conn *dynamodb.DynamoDB) error { + for _, replica := range replicas { + var ops []*dynamodb.ReplicationGroupUpdate + if regionName, ok := replica.(map[string]interface{})["region_name"]; ok { + ops = append(ops, &dynamodb.ReplicationGroupUpdate{ + Delete: &dynamodb.DeleteReplicationGroupMemberAction{ + RegionName: aws.String(regionName.(string)), + }, + }) + + input := &dynamodb.UpdateTableInput{ + TableName: aws.String(tableName), + ReplicaUpdates: ops, + } + + log.Printf("[DEBUG] Deleting DynamoDB Replicas to %v", input) + + err := resource.Retry(20*time.Minute, func() *resource.RetryError { + _, err := conn.UpdateTable(input) + if err != nil { + if isAWSErr(err, "ThrottlingException", "") { + return resource.RetryableError(err) + } + if isAWSErr(err, dynamodb.ErrCodeLimitExceededException, "can be created, updated, or deleted simultaneously") { + return resource.RetryableError(err) + } + + return resource.NonRetryableError(err) + } + return nil + }) + if isResourceTimeoutError(err) { + _, err = conn.UpdateTable(input) + } + if err != nil { + return fmt.Errorf("Error deleting DynamoDB Replicas status: %s", err) + } + + if err := waitForDynamoDbReplicaDeleteToBeCompleted(tableName, regionName.(string), 20*time.Minute, conn); err != nil { + return fmt.Errorf("Error waiting for DynamoDB replica delete: %s", err) + } + } + } + return nil +} + func waitForDynamodbTableDeletion(conn *dynamodb.DynamoDB, tableName string, timeout time.Duration) error { stateConf := &resource.StateChangeConf{ Pending: []string{ @@ -738,6 +887,133 @@ func waitForDynamodbTableDeletion(conn *dynamodb.DynamoDB, tableName string, tim return err } +func waitForDynamoDbReplicaUpdateToBeCompleted(tableName string, region string, timeout time.Duration, conn *dynamodb.DynamoDB) error { + stateConf := &resource.StateChangeConf{ + Pending: []string{ + dynamodb.ReplicaStatusCreating, + dynamodb.ReplicaStatusUpdating, + dynamodb.ReplicaStatusDeleting, + }, + Target: []string{ + dynamodb.ReplicaStatusActive, + }, + Timeout: timeout, + Refresh: func() (interface{}, string, error) { + result, err := conn.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: aws.String(tableName), + }) + if err != nil { + return 42, "", err + } + log.Printf("[DEBUG] DynamoDB replicas: %s", result.Table.Replicas) + + var targetReplica *dynamodb.ReplicaDescription + + for _, replica := range result.Table.Replicas { + if aws.StringValue(replica.RegionName) == region { + targetReplica = replica + break + } + } + + if targetReplica == nil { + return nil, dynamodb.ReplicaStatusCreating, nil + } + + return result, aws.StringValue(targetReplica.ReplicaStatus), nil + }, + } + _, err := stateConf.WaitForState() + + return err +} + +func waitForDynamoDbReplicaDeleteToBeCompleted(tableName string, region string, timeout time.Duration, conn *dynamodb.DynamoDB) error { + stateConf := &resource.StateChangeConf{ + Pending: []string{ + dynamodb.ReplicaStatusCreating, + dynamodb.ReplicaStatusUpdating, + dynamodb.ReplicaStatusDeleting, + dynamodb.ReplicaStatusActive, + }, + Target: []string{""}, + Timeout: timeout, + Refresh: func() (interface{}, string, error) { + result, err := conn.DescribeTable(&dynamodb.DescribeTableInput{ + TableName: aws.String(tableName), + }) + if err != nil { + return 42, "", err + } + + log.Printf("[DEBUG] all replicas for waiting: %s", result.Table.Replicas) + var targetReplica *dynamodb.ReplicaDescription + + for _, replica := range result.Table.Replicas { + if aws.StringValue(replica.RegionName) == region { + targetReplica = replica + break + } + } + + if targetReplica == nil { + return nil, "", nil + } + + return result, aws.StringValue(targetReplica.ReplicaStatus), nil + }, + } + _, err := stateConf.WaitForState() + + return err +} + +func createDynamoDbReplicas(tableName string, replicas []interface{}, conn *dynamodb.DynamoDB) error { + for _, replica := range replicas { + var ops []*dynamodb.ReplicationGroupUpdate + if regionName, ok := replica.(map[string]interface{})["region_name"]; ok { + ops = append(ops, &dynamodb.ReplicationGroupUpdate{ + Create: &dynamodb.CreateReplicationGroupMemberAction{ + RegionName: aws.String(regionName.(string)), + }, + }) + + input := &dynamodb.UpdateTableInput{ + TableName: aws.String(tableName), + ReplicaUpdates: ops, + } + + log.Printf("[DEBUG] Updating DynamoDB Replicas to %v", input) + + err := resource.Retry(20*time.Minute, func() *resource.RetryError { + _, err := conn.UpdateTable(input) + if err != nil { + if isAWSErr(err, "ThrottlingException", "") { + return resource.RetryableError(err) + } + if isAWSErr(err, dynamodb.ErrCodeLimitExceededException, "can be created, updated, or deleted simultaneously") { + return resource.RetryableError(err) + } + + return resource.NonRetryableError(err) + } + return nil + }) + if isResourceTimeoutError(err) { + _, err = conn.UpdateTable(input) + } + if err != nil { + return fmt.Errorf("Error updating DynamoDB Replicas status: %s", err) + } + + if err := waitForDynamoDbReplicaUpdateToBeCompleted(tableName, regionName.(string), 20*time.Minute, conn); err != nil { + return fmt.Errorf("Error waiting for DynamoDB replica update: %s", err) + } + } + } + return nil +} + func updateDynamoDbTimeToLive(tableName string, ttlList []interface{}, conn *dynamodb.DynamoDB) error { ttlMap := ttlList[0].(map[string]interface{}) diff --git a/aws/resource_aws_dynamodb_table_test.go b/aws/resource_aws_dynamodb_table_test.go index 21b37ad009e..2815d6422d1 100644 --- a/aws/resource_aws_dynamodb_table_test.go +++ b/aws/resource_aws_dynamodb_table_test.go @@ -12,6 +12,7 @@ import ( "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/hashicorp/terraform-plugin-sdk/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/terraform" ) @@ -1382,6 +1383,110 @@ func testAccCheckAWSDynamoDbTableDisappears(table *dynamodb.DescribeTableOutput) } } +func TestAccAWSDynamoDbTable_Replica(t *testing.T) { + var conf dynamodb.DescribeTableOutput + var providers []*schema.Provider + alternateRegionDataSourceName := "data.aws_region.alternate" + resourceName := "aws_dynamodb_table.test" + tableName := acctest.RandomWithPrefix("TerraformTestTable-") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + testAccPreCheck(t) + testAccMultipleRegionsPreCheck(t) + testAccAlternateRegionPreCheck(t) + }, + ProviderFactories: testAccProviderFactories(&providers), + CheckDestroy: testAccCheckAWSDynamoDbTableDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSDynamoDbReplicaUpdates(tableName), + Check: resource.ComposeTestCheckFunc( + testAccCheckInitialAWSDynamoDbTableExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "name", tableName), + resource.TestCheckResourceAttr(resourceName, "hash_key", "TestTableHashKey"), + resource.TestCheckResourceAttr(resourceName, "attribute.2990477658.name", "TestTableHashKey"), + resource.TestCheckResourceAttr(resourceName, "attribute.2990477658.type", "S"), + resource.TestCheckResourceAttr(resourceName, "replica.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "replica.0.region_name", alternateRegionDataSourceName, "name"), + ), + }, + { + Config: testAccAWSDynamoDbReplicaUpdates(tableName), + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccAWSDynamoDbReplicaDeletes(tableName), + Check: resource.ComposeTestCheckFunc( + testAccCheckInitialAWSDynamoDbTableExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "name", tableName), + resource.TestCheckResourceAttr(resourceName, "hash_key", "TestTableHashKey"), + resource.TestCheckResourceAttr(resourceName, "attribute.2990477658.name", "TestTableHashKey"), + resource.TestCheckResourceAttr(resourceName, "attribute.2990477658.type", "S"), + resource.TestCheckResourceAttr(resourceName, "hash_key", "TestTableHashKey"), + resource.TestCheckResourceAttr(resourceName, "replica.#", "0"), + ), + }, + { + Config: testAccAWSDynamoDbReplicaUpdates(tableName), + Check: resource.ComposeTestCheckFunc( + testAccCheckInitialAWSDynamoDbTableExists(resourceName, &conf), + resource.TestCheckResourceAttr(resourceName, "name", tableName), + resource.TestCheckResourceAttr(resourceName, "hash_key", "TestTableHashKey"), + resource.TestCheckResourceAttr(resourceName, "attribute.2990477658.name", "TestTableHashKey"), + resource.TestCheckResourceAttr(resourceName, "attribute.2990477658.type", "S"), + resource.TestCheckResourceAttr(resourceName, "replica.#", "1"), + resource.TestCheckResourceAttrPair(resourceName, "replica.0.region_name", alternateRegionDataSourceName, "name"), + ), + }, + }, + }) +} + +func testAccAWSDynamoDbReplicaUpdates(rName string) string { + return testAccAlternateRegionProviderConfig() + fmt.Sprintf(` +data "aws_region" "alternate" { + provider = "aws.alternate" +} + +resource "aws_dynamodb_table" "test" { + name = %[1]q + hash_key = "TestTableHashKey" + billing_mode = "PAY_PER_REQUEST" + stream_enabled = true + stream_view_type = "NEW_AND_OLD_IMAGES" + + attribute { + name = "TestTableHashKey" + type = "S" + } + + replica { + region_name = %[2]q + } +} +`, rName, testAccGetAlternateRegion()) +} + +func testAccAWSDynamoDbReplicaDeletes(rName string) string { + return fmt.Sprintf(` +resource "aws_dynamodb_table" "test" { + name = "%s" + hash_key = "TestTableHashKey" + billing_mode = "PAY_PER_REQUEST" + stream_enabled = true + stream_view_type = "NEW_AND_OLD_IMAGES" + + attribute { + name = "TestTableHashKey" + type = "S" + } +} +`, rName) +} + func dynamoDbGetGSIIndex(gsiList *[]*dynamodb.GlobalSecondaryIndexDescription, target string) int { for idx, gsiObject := range *gsiList { if *gsiObject.IndexName == target { diff --git a/aws/structure.go b/aws/structure.go index a2e8cfda8f5..a4d2146993a 100644 --- a/aws/structure.go +++ b/aws/structure.go @@ -4256,6 +4256,55 @@ func diffDynamoDbGSI(oldGsi, newGsi []interface{}, billingMode string) (ops []*d return } +func diffDynamoDbReplicas(oldReplica, newReplica []interface{}) (ops []*dynamodb.ReplicationGroupUpdate, e error) { + // Transform slices into maps + oldReplicas := make(map[string]interface{}) + for _, replicaData := range oldReplica { + m := replicaData.(map[string]interface{}) + oldReplicas[m["region_name"].(string)] = m + } + newReplicas := make(map[string]interface{}) + for _, replicaData := range newReplica { + m := replicaData.(map[string]interface{}) + newReplicas[m["region_name"].(string)] = m + } + + for _, data := range newReplica { + newMap := data.(map[string]interface{}) + newName := newMap["region_name"].(string) + + if _, exists := oldReplicas[newName]; !exists { + m := data.(map[string]interface{}) + regionName := m["region_name"].(string) + + ops = append(ops, &dynamodb.ReplicationGroupUpdate{ + Create: &dynamodb.CreateReplicationGroupMemberAction{ + RegionName: aws.String(regionName), + }, + }) + } + } + + for _, data := range oldReplicas { + oldMap := data.(map[string]interface{}) + oldName := oldMap["region_name"].(string) + + _, exists := newReplicas[oldName] + if exists { + // newMap := newData.(map[string]interface{}) + // regionName := newMap["region"].(string) + } else { + regionName := oldName + ops = append(ops, &dynamodb.ReplicationGroupUpdate{ + Delete: &dynamodb.DeleteReplicationGroupMemberAction{ + RegionName: aws.String(regionName), + }, + }) + } + } + return +} + func stripCapacityAttributes(in map[string]interface{}) (map[string]interface{}, error) { mapCopy, err := copystructure.Copy(in) if err != nil { @@ -4423,6 +4472,20 @@ func flattenAwsDynamoDbTableResource(d *schema.ResourceData, table *dynamodb.Tab return err } + replicaList := make([]map[string]interface{}, 0, len(table.Replicas)) + for _, replicaObject := range table.Replicas { + replica := map[string]interface{}{ + "region_name": aws.StringValue(replicaObject.RegionName), + } + + replicaList = append(replicaList, replica) + } + + err = d.Set("replica", replicaList) + if err != nil { + return err + } + d.Set("arn", table.TableArn) return nil