diff --git a/apis/v1alpha1/ack-generate-metadata.yaml b/apis/v1alpha1/ack-generate-metadata.yaml index 4b5c604..7997350 100755 --- a/apis/v1alpha1/ack-generate-metadata.yaml +++ b/apis/v1alpha1/ack-generate-metadata.yaml @@ -1,5 +1,5 @@ ack_generate_info: - build_date: "2025-03-27T16:23:51Z" + build_date: "2025-03-28T01:43:12Z" build_hash: 980cb1e4734f673d16101cf55206b84ca639ec01 go_version: go1.24.1 version: v0.44.0 diff --git a/pkg/resource/table/hooks.go b/pkg/resource/table/hooks.go index 9e4aef9..42f2bf0 100644 --- a/pkg/resource/table/hooks.go +++ b/pkg/resource/table/hooks.go @@ -15,6 +15,7 @@ package table import ( "context" + "errors" "fmt" "strings" "time" @@ -232,17 +233,17 @@ func (rm *resourceManager) customUpdateTable( } return nil, err } - // case delta.DifferentAt("Spec.TableReplicas"): - // // Enabling replicas required streams enabled and StreamViewType to be NEW_AND_OLD_IMAGES - // // Version 2019.11.21 TableUpdate API requirement - // if !hasStreamSpecificationWithNewAndOldImages(desired) { - // msg := "table must have DynamoDB Streams enabled with StreamViewType set to NEW_AND_OLD_IMAGES for replica updates" - // rlog.Debug(msg) - // return nil, ackerr.NewTerminalError(errors.New(msg)) - // } - // if err := rm.syncReplicas(ctx, latest, desired); err != nil { - // return nil, err - // } + case delta.DifferentAt("Spec.TableReplicas"): + // Enabling replicas required streams enabled and StreamViewType to be NEW_AND_OLD_IMAGES + // Version 2019.11.21 TableUpdate API requirement + if !hasStreamSpecificationWithNewAndOldImages(desired) { + msg := "table must have DynamoDB Streams enabled with StreamViewType set to NEW_AND_OLD_IMAGES for replica updates" + rlog.Debug(msg) + return nil, ackerr.NewTerminalError(errors.New(msg)) + } + if err := rm.syncReplicas(ctx, latest, desired); err != nil { + return nil, err + } } } @@ -577,14 +578,14 @@ func customPreCompare( } } - // // Handle ReplicaUpdates API comparison - // if len(a.ko.Spec.TableReplicas) != len(b.ko.Spec.TableReplicas) { - // delta.Add("Spec.TableReplicas", a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) - // } else if a.ko.Spec.TableReplicas != nil && b.ko.Spec.TableReplicas != nil { - // if !equalReplicaArrays(a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) { - // delta.Add("Spec.TableReplicas", a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) - // } - // } + // Handle ReplicaUpdates API comparison + if len(a.ko.Spec.TableReplicas) != len(b.ko.Spec.TableReplicas) { + delta.Add("Spec.TableReplicas", a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) + } else if a.ko.Spec.TableReplicas != nil && b.ko.Spec.TableReplicas != nil { + if !equalReplicaArrays(a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) { + delta.Add("Spec.TableReplicas", a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) + } + } if a.ko.Spec.DeletionProtectionEnabled == nil { a.ko.Spec.DeletionProtectionEnabled = aws.Bool(false) diff --git a/pkg/resource/table/hooks_replica_updates.go b/pkg/resource/table/hooks_replica_updates.go new file mode 100644 index 0000000..f6cc180 --- /dev/null +++ b/pkg/resource/table/hooks_replica_updates.go @@ -0,0 +1,476 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package table + +import ( + "context" + + ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log" + "github.com/aws/aws-sdk-go-v2/aws" + svcsdk "github.com/aws/aws-sdk-go-v2/service/dynamodb" + svcsdktypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + + "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1" + svcapitypes "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1" +) + +// equalCreateReplicationGroupMemberActions compares two CreateReplicationGroupMemberAction objects +func equalCreateReplicationGroupMemberActions(a, b *v1alpha1.CreateReplicationGroupMemberAction) bool { + if !equalStrings(a.RegionName, b.RegionName) { + return false + } + if !equalStrings(a.KMSMasterKeyID, b.KMSMasterKeyID) { + return false + } + if !equalStrings(a.TableClassOverride, b.TableClassOverride) { + return false + } + if a.ProvisionedThroughputOverride != nil && b.ProvisionedThroughputOverride != nil { + if !equalInt64s(a.ProvisionedThroughputOverride.ReadCapacityUnits, b.ProvisionedThroughputOverride.ReadCapacityUnits) { + return false + } + } else if (a.ProvisionedThroughputOverride == nil) != (b.ProvisionedThroughputOverride == nil) { + return false + } + + return equalReplicaGlobalSecondaryIndexArrays(a.GlobalSecondaryIndexes, b.GlobalSecondaryIndexes) +} + +// equalReplicaGlobalSecondaryIndexes compares two ReplicaGlobalSecondaryIndex objects +func equalReplicaGlobalSecondaryIndexes( + a *v1alpha1.ReplicaGlobalSecondaryIndex, + b *v1alpha1.ReplicaGlobalSecondaryIndex, +) bool { + if !equalStrings(a.IndexName, b.IndexName) { + return false + } + + if a.ProvisionedThroughputOverride != nil && b.ProvisionedThroughputOverride != nil { + if !equalInt64s(a.ProvisionedThroughputOverride.ReadCapacityUnits, b.ProvisionedThroughputOverride.ReadCapacityUnits) { + return false + } + } else if (a.ProvisionedThroughputOverride == nil) != (b.ProvisionedThroughputOverride == nil) { + return false + } + + return true +} + +// equalReplicaGlobalSecondaryIndexArrays compares two arrays of ReplicaGlobalSecondaryIndex objects +func equalReplicaGlobalSecondaryIndexArrays( + a []*v1alpha1.ReplicaGlobalSecondaryIndex, + b []*v1alpha1.ReplicaGlobalSecondaryIndex, +) bool { + if len(a) != len(b) { + return false + } + + aGSIMap := make(map[string]*v1alpha1.ReplicaGlobalSecondaryIndex) + bGSIMap := make(map[string]*v1alpha1.ReplicaGlobalSecondaryIndex) + + for _, gsi := range a { + if gsi.IndexName != nil { + aGSIMap[*gsi.IndexName] = gsi + } + } + + for _, gsi := range b { + if gsi.IndexName != nil { + bGSIMap[*gsi.IndexName] = gsi + } + } + + for indexName, aGSI := range aGSIMap { + bGSI, exists := bGSIMap[indexName] + if !exists { + return false + } + + if !equalReplicaGlobalSecondaryIndexes(aGSI, bGSI) { + return false + } + } + + return true +} + +// equalReplicaArrays returns whether two CreateReplicationGroupMemberAction arrays are equal or not. +func equalReplicaArrays(a, b []*v1alpha1.CreateReplicationGroupMemberAction) bool { + if len(a) != len(b) { + return false + } + + aMap := make(map[string]*v1alpha1.CreateReplicationGroupMemberAction) + bMap := make(map[string]*v1alpha1.CreateReplicationGroupMemberAction) + + for _, replica := range a { + if replica.RegionName != nil { + aMap[*replica.RegionName] = replica + } + } + + for _, replica := range b { + if replica.RegionName != nil { + bMap[*replica.RegionName] = replica + } + } + + for regionName, aReplica := range aMap { + bReplica, exists := bMap[regionName] + if !exists { + return false + } + + if !equalCreateReplicationGroupMemberActions(aReplica, bReplica) { + return false + } + } + + for regionName := range bMap { + if _, exists := aMap[regionName]; !exists { + return false + } + } + + return true +} + +// createReplicaUpdate creates a ReplicationGroupUpdate for creating a new replica +func createReplicaUpdate(replica *v1alpha1.CreateReplicationGroupMemberAction) svcsdktypes.ReplicationGroupUpdate { + replicaUpdate := svcsdktypes.ReplicationGroupUpdate{} + createAction := &svcsdktypes.CreateReplicationGroupMemberAction{} + + if replica.RegionName != nil { + createAction.RegionName = aws.String(*replica.RegionName) + } + + if replica.KMSMasterKeyID != nil { + createAction.KMSMasterKeyId = aws.String(*replica.KMSMasterKeyID) + } + + if replica.TableClassOverride != nil { + createAction.TableClassOverride = svcsdktypes.TableClass(*replica.TableClassOverride) + } + + if replica.ProvisionedThroughputOverride != nil { + createAction.ProvisionedThroughputOverride = &svcsdktypes.ProvisionedThroughputOverride{} + if replica.ProvisionedThroughputOverride.ReadCapacityUnits != nil { + createAction.ProvisionedThroughputOverride.ReadCapacityUnits = replica.ProvisionedThroughputOverride.ReadCapacityUnits + } + } + + if replica.GlobalSecondaryIndexes != nil { + gsiList := []svcsdktypes.ReplicaGlobalSecondaryIndex{} + for _, gsi := range replica.GlobalSecondaryIndexes { + replicaGSI := svcsdktypes.ReplicaGlobalSecondaryIndex{} + if gsi.IndexName != nil { + replicaGSI.IndexName = gsi.IndexName + } + if gsi.ProvisionedThroughputOverride != nil { + replicaGSI.ProvisionedThroughputOverride = &svcsdktypes.ProvisionedThroughputOverride{} + if gsi.ProvisionedThroughputOverride.ReadCapacityUnits != nil { + replicaGSI.ProvisionedThroughputOverride.ReadCapacityUnits = gsi.ProvisionedThroughputOverride.ReadCapacityUnits + } + } + gsiList = append(gsiList, replicaGSI) + } + createAction.GlobalSecondaryIndexes = gsiList + } + + replicaUpdate.Create = createAction + return replicaUpdate +} + +// updateReplicaUpdate creates a ReplicationGroupUpdate for updating an existing replica +func updateReplicaUpdate(replica *v1alpha1.CreateReplicationGroupMemberAction) svcsdktypes.ReplicationGroupUpdate { + replicaUpdate := svcsdktypes.ReplicationGroupUpdate{} + updateAction := &svcsdktypes.UpdateReplicationGroupMemberAction{} + + if replica.RegionName != nil { + updateAction.RegionName = aws.String(*replica.RegionName) + // RegionName is required but doesn't count as a update + } + + if replica.KMSMasterKeyID != nil { + updateAction.KMSMasterKeyId = aws.String(*replica.KMSMasterKeyID) + } + + if replica.TableClassOverride != nil { + updateAction.TableClassOverride = svcsdktypes.TableClass(*replica.TableClassOverride) + } + + if replica.ProvisionedThroughputOverride != nil && + replica.ProvisionedThroughputOverride.ReadCapacityUnits != nil { + updateAction.ProvisionedThroughputOverride = &svcsdktypes.ProvisionedThroughputOverride{ + ReadCapacityUnits: replica.ProvisionedThroughputOverride.ReadCapacityUnits, + } + } + + // Only include GSIs that have provisioned throughput overrides + var gsisWithOverrides []svcsdktypes.ReplicaGlobalSecondaryIndex + for _, gsi := range replica.GlobalSecondaryIndexes { + if gsi.IndexName != nil && gsi.ProvisionedThroughputOverride != nil && + gsi.ProvisionedThroughputOverride.ReadCapacityUnits != nil { + gsisWithOverrides = append(gsisWithOverrides, svcsdktypes.ReplicaGlobalSecondaryIndex{ + IndexName: aws.String(*gsi.IndexName), + ProvisionedThroughputOverride: &svcsdktypes.ProvisionedThroughputOverride{ + ReadCapacityUnits: gsi.ProvisionedThroughputOverride.ReadCapacityUnits, + }, + }) + } + } + + if len(gsisWithOverrides) > 0 { + updateAction.GlobalSecondaryIndexes = gsisWithOverrides + } + + // Check if there are any actual updates to perform + // replica GSI updates are invalid updates since the GSI already exists on the source table + hasUpdates := updateAction.KMSMasterKeyId != nil || + updateAction.TableClassOverride != "" || + updateAction.ProvisionedThroughputOverride != nil || + len(updateAction.GlobalSecondaryIndexes) > 0 + + if hasUpdates { + replicaUpdate.Update = updateAction + return replicaUpdate + } + + // If no valid updates, return an empty ReplicationGroupUpdate + return svcsdktypes.ReplicationGroupUpdate{ + Update: nil, + } +} + +// deleteReplicaUpdate creates a ReplicationGroupUpdate for deleting an existing replica +func deleteReplicaUpdate(regionName string) svcsdktypes.ReplicationGroupUpdate { + return svcsdktypes.ReplicationGroupUpdate{ + Delete: &svcsdktypes.DeleteReplicationGroupMemberAction{ + RegionName: aws.String(regionName), + }, + } +} + +// hasStreamSpecificationWithNewAndOldImages checks if the table has DynamoDB Streams enabled +// with the stream containing both the new and the old images of the item. +func hasStreamSpecificationWithNewAndOldImages(r *resource) bool { + StreamEnabled := r.ko.Spec.StreamSpecification != nil && + r.ko.Spec.StreamSpecification.StreamEnabled != nil && + *r.ko.Spec.StreamSpecification.StreamEnabled + StreamViewType := r.ko.Spec.StreamSpecification != nil && + r.ko.Spec.StreamSpecification.StreamViewType != nil && + *r.ko.Spec.StreamSpecification.StreamViewType == "NEW_AND_OLD_IMAGES" + return StreamEnabled && StreamViewType +} + +// syncReplicas updates the replica configuration for a table +func (rm *resourceManager) syncReplicas( + ctx context.Context, + latest *resource, + desired *resource, +) (err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.syncReplicas") + defer func() { + exit(err) + }() + + input, replicasInQueue, err := rm.newUpdateTableReplicaUpdatesOneAtATimePayload(ctx, latest, desired) + if err != nil { + return err + } + + // Call the UpdateTable API + _, err = rm.sdkapi.UpdateTable(ctx, input) + rm.metrics.RecordAPICall("UPDATE", "UpdateTable", err) + if err != nil { + return err + } + + // If there are more replicas to process, requeue + if replicasInQueue > 0 { + rlog.Debug("more replica updates pending, will requeue", + "table", *latest.ko.Spec.TableName, + "remaining_updates", replicasInQueue) + return requeueWaitWhileUpdating + } + + return nil +} + +// newUpdateTableReplicaUpdatesOneAtATimePayload creates the UpdateTable input payload for replica updates, +// processing only one replica at a time +func (rm *resourceManager) newUpdateTableReplicaUpdatesOneAtATimePayload( + ctx context.Context, + latest *resource, + desired *resource, +) (input *svcsdk.UpdateTableInput, replicasInQueue int, err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.newUpdateTableReplicaUpdatesOneAtATimePayload") + defer func() { + exit(err) + }() + + createReplicas, updateReplicas, deleteRegions := computeReplicaupdatesDelta(latest, desired) + + input = &svcsdk.UpdateTableInput{ + TableName: aws.String(*desired.ko.Spec.TableName), + ReplicaUpdates: []svcsdktypes.ReplicationGroupUpdate{}, + } + + totalReplicasOperations := len(createReplicas) + len(updateReplicas) + len(deleteRegions) + replicasInQueue = totalReplicasOperations - 1 + + // Process replica updates in order: create, update, delete + // We'll only perform one replica action at a time + + if len(createReplicas) > 0 { + replica := *createReplicas[0] + if checkIfReplicasInProgress(latest.ko.Status.Replicas, *replica.RegionName) { + return nil, 0, requeueWaitReplicasActive + } + rlog.Debug("creating replica in region", "table", *desired.ko.Spec.TableName, "region", *replica.RegionName) + input.ReplicaUpdates = append(input.ReplicaUpdates, createReplicaUpdate(createReplicas[0])) + return input, replicasInQueue, nil + } + + if len(updateReplicas) > 0 { + replica := *updateReplicas[0] + if checkIfReplicasInProgress(latest.ko.Status.Replicas, *replica.RegionName) { + return nil, 0, requeueWaitReplicasActive + } + rlog.Debug("updating replica in region", "table", *desired.ko.Spec.TableName, "region", *replica.RegionName) + updateReplica := updateReplicaUpdate(updateReplicas[0]) + if updateReplica.Update == nil { + return nil, 0, requeueWaitReplicasActive + } + input.ReplicaUpdates = append(input.ReplicaUpdates, updateReplica) + return input, replicasInQueue, nil + } + + if len(deleteRegions) > 0 { + replica := deleteRegions[0] + if checkIfReplicasInProgress(latest.ko.Status.Replicas, replica) { + return nil, 0, requeueWaitReplicasActive + } + rlog.Debug("deleting replica in region", "table", *desired.ko.Spec.TableName, "region", replica) + input.ReplicaUpdates = append(input.ReplicaUpdates, deleteReplicaUpdate(deleteRegions[0])) + return input, replicasInQueue, nil + } + + return input, replicasInQueue, nil +} + +// computeReplicaupdatesDelta calculates the replica updates needed to reconcile the latest state with the desired state +// Returns three slices: replicas to create, replicas to update, and region names to delete +func computeReplicaupdatesDelta( + latest *resource, + desired *resource, +) ( + createReplicas []*v1alpha1.CreateReplicationGroupMemberAction, + updateReplicas []*v1alpha1.CreateReplicationGroupMemberAction, + deleteRegions []string, +) { + latestReplicas := make(map[string]*v1alpha1.CreateReplicationGroupMemberAction) + if latest.ko.Spec.TableReplicas != nil { + for _, replica := range latest.ko.Spec.TableReplicas { + if replica.RegionName != nil { + latestReplicas[*replica.RegionName] = replica + } + } + } + + desiredReplicas := make(map[string]*v1alpha1.CreateReplicationGroupMemberAction) + if desired != nil && desired.ko.Spec.TableReplicas != nil { + for _, replica := range desired.ko.Spec.TableReplicas { + if replica.RegionName != nil { + desiredReplicas[*replica.RegionName] = replica + } + } + } + + // Calculate replicas to create or update + for desiredRegion, desiredReplica := range desiredReplicas { + existingReplica, exists := latestReplicas[desiredRegion] + if !exists { + createReplicas = append(createReplicas, desiredReplica) + } else if !equalCreateReplicationGroupMemberActions(existingReplica, desiredReplica) { + updateReplicas = append(updateReplicas, desiredReplica) + } + } + + // Calculate regions to delete + for regionName := range latestReplicas { + if _, exists := desiredReplicas[regionName]; !exists { + deleteRegions = append(deleteRegions, regionName) + } + } + + return createReplicas, updateReplicas, deleteRegions +} + +func setTableReplicas(ko *svcapitypes.Table, replicas []svcsdktypes.ReplicaDescription) { + if len(replicas) > 0 { + tableReplicas := []*v1alpha1.CreateReplicationGroupMemberAction{} + for _, replica := range replicas { + replicaElem := &v1alpha1.CreateReplicationGroupMemberAction{} + if replica.RegionName != nil { + replicaElem.RegionName = replica.RegionName + } + if replica.KMSMasterKeyId != nil { + replicaElem.KMSMasterKeyID = replica.KMSMasterKeyId + } + if replica.ProvisionedThroughputOverride != nil { + replicaElem.ProvisionedThroughputOverride = &v1alpha1.ProvisionedThroughputOverride{ + ReadCapacityUnits: replica.ProvisionedThroughputOverride.ReadCapacityUnits, + } + } + if replica.GlobalSecondaryIndexes != nil { + gsiList := []*v1alpha1.ReplicaGlobalSecondaryIndex{} + for _, gsi := range replica.GlobalSecondaryIndexes { + gsiElem := &v1alpha1.ReplicaGlobalSecondaryIndex{ + IndexName: gsi.IndexName, + } + if gsi.ProvisionedThroughputOverride != nil { + gsiElem.ProvisionedThroughputOverride = &v1alpha1.ProvisionedThroughputOverride{ + ReadCapacityUnits: gsi.ProvisionedThroughputOverride.ReadCapacityUnits, + } + } + gsiList = append(gsiList, gsiElem) + } + replicaElem.GlobalSecondaryIndexes = gsiList + } + if replica.ReplicaTableClassSummary != nil && replica.ReplicaTableClassSummary.TableClass != "" { + replicaElem.TableClassOverride = aws.String(string(replica.ReplicaTableClassSummary.TableClass)) + } + tableReplicas = append(tableReplicas, replicaElem) + } + ko.Spec.TableReplicas = tableReplicas + } else { + ko.Spec.TableReplicas = nil + } +} + +func checkIfReplicasInProgress(ReplicaDescription []*svcapitypes.ReplicaDescription, regionName string) bool { + for _, replica := range ReplicaDescription { + if *replica.RegionName == regionName { + replicaStatus := replica.ReplicaStatus + if *replicaStatus == string(svcsdktypes.ReplicaStatusCreating) || *replicaStatus == string(svcsdktypes.ReplicaStatusDeleting) || *replicaStatus == string(svcsdktypes.ReplicaStatusUpdating) { + return true + } + } + } + + return false +} diff --git a/pkg/resource/table/sdk.go b/pkg/resource/table/sdk.go index b655910..e9fc221 100644 --- a/pkg/resource/table/sdk.go +++ b/pkg/resource/table/sdk.go @@ -440,6 +440,7 @@ func (rm *resourceManager) sdkFind( } else { ko.Spec.BillingMode = aws.String("PROVISIONED") } + setTableReplicas(ko, resp.Table.Replicas) if isTableCreating(&resource{ko}) { return &resource{ko}, requeueWaitWhileCreating } @@ -1016,6 +1017,22 @@ func (rm *resourceManager) sdkDelete( if isTableUpdating(r) { return nil, requeueWaitWhileUpdating } + + // If there are replicas, we need to remove them before deleting the table + if len(r.ko.Spec.TableReplicas) > 0 { + desired := &resource{ + ko: r.ko.DeepCopy(), + } + desired.ko.Spec.TableReplicas = nil + + err := rm.syncReplicas(ctx, r, desired) + if err != nil { + return nil, err + } + // Requeue to wait for replica removal to complete before attempting table deletion + // When syncReplicas returns an error other than requeue + return r, requeueWaitWhileDeleting + } input, err := rm.newDeleteRequestPayload(r) if err != nil { return nil, err diff --git a/templates/hooks/table/sdk_delete_pre_build_request.go.tpl b/templates/hooks/table/sdk_delete_pre_build_request.go.tpl index fa720f3..8726b1e 100644 --- a/templates/hooks/table/sdk_delete_pre_build_request.go.tpl +++ b/templates/hooks/table/sdk_delete_pre_build_request.go.tpl @@ -3,4 +3,20 @@ } if isTableUpdating(r) { return nil, requeueWaitWhileUpdating + } + + // If there are replicas, we need to remove them before deleting the table + if len(r.ko.Spec.TableReplicas) > 0 { + desired := &resource{ + ko: r.ko.DeepCopy(), + } + desired.ko.Spec.TableReplicas = nil + + err := rm.syncReplicas(ctx, r, desired) + if err != nil { + return nil, err + } + // Requeue to wait for replica removal to complete before attempting table deletion + // When syncReplicas returns an error other than requeue + return r, requeueWaitWhileDeleting } \ No newline at end of file diff --git a/templates/hooks/table/sdk_read_one_post_set_output.go.tpl b/templates/hooks/table/sdk_read_one_post_set_output.go.tpl index 4525c25..004a0d7 100644 --- a/templates/hooks/table/sdk_read_one_post_set_output.go.tpl +++ b/templates/hooks/table/sdk_read_one_post_set_output.go.tpl @@ -53,6 +53,7 @@ } else { ko.Spec.BillingMode = aws.String("PROVISIONED") } + setTableReplicas(ko, resp.Table.Replicas) if isTableCreating(&resource{ko}) { return &resource{ko}, requeueWaitWhileCreating } diff --git a/test/e2e/tests/test_table_replicas.py b/test/e2e/tests/test_table_replicas.py new file mode 100644 index 0000000..fe28043 --- /dev/null +++ b/test/e2e/tests/test_table_replicas.py @@ -0,0 +1,448 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may +# not use this file except in compliance with the License. A copy of the +# License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Integration tests for the DynamoDB Table Replicas. +""" + +import logging +import time +from typing import Dict, Tuple + +import boto3 +import pytest +from acktest import tags +from acktest.k8s import resource as k8s +from acktest.resources import random_suffix_name +from e2e import (CRD_GROUP, CRD_VERSION, condition, + load_dynamodb_resource, service_marker, table) +from e2e.replacement_values import REPLACEMENT_VALUES +from acktest.k8s import condition + +RESOURCE_PLURAL = "tables" + +DELETE_WAIT_AFTER_SECONDS = 30 +MODIFY_WAIT_AFTER_SECONDS = 600 +REPLICA_WAIT_AFTER_SECONDS = 600 + +REPLICA_REGION_1 = "us-east-1" +REPLICA_REGION_2 = "eu-west-1" +REPLICA_REGION_3 = "eu-central-1" +REPLICA_REGION_4 = "ap-southeast-1" +REPLICA_REGION_5 = "eu-north-1" + + +def create_table_with_replicas(name: str, resource_template, regions=None): + if regions is None: + regions = [REPLICA_REGION_1, REPLICA_REGION_2] + + replacements = REPLACEMENT_VALUES.copy() + replacements["TABLE_NAME"] = name + replacements["REPLICA_REGION_1"] = regions[0] + replacements["REPLICA_REGION_2"] = regions[1] + + resource_data = load_dynamodb_resource( + resource_template, + additional_replacements=replacements, + ) + logging.debug(resource_data) + + # Create the k8s resource + ref = k8s.CustomResourceReference( + CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL, + name, namespace="default", + ) + k8s.create_custom_resource(ref, resource_data) + cr = k8s.wait_resource_consumed_by_controller(ref) + + assert cr is not None + assert k8s.get_resource_exists(ref) + + return (ref, cr) + + +@pytest.fixture(scope="function") +def table_with_replicas(): + table_name = random_suffix_name("table-replicas", 32) + + (ref, res) = create_table_with_replicas( + table_name, + "table_with_replicas", + [REPLICA_REGION_1, REPLICA_REGION_2] + ) + + # Wait for table to be ACTIVE before proceeding + table.wait_until( + table_name, + table.status_matches("ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Wait for initial replicas to be ACTIVE before yielding + table.wait_until( + table_name, + table.replicas_match([REPLICA_REGION_1, REPLICA_REGION_2]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + yield (ref, res) + + + deleted = k8s.delete_custom_resource(ref) + assert deleted + +def create_table_with_invalid_replicas(name: str): + replacements = REPLACEMENT_VALUES.copy() + replacements["TABLE_NAME"] = name + replacements["REPLICA_REGION_1"] = REPLICA_REGION_1 + + resource_data = load_dynamodb_resource( + "table_with_replicas_invalid", + additional_replacements=replacements, + ) + logging.debug(resource_data) + + # Create the k8s resource + ref = k8s.CustomResourceReference( + CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL, + name, namespace="default", + ) + k8s.create_custom_resource(ref, resource_data) + cr = k8s.wait_resource_consumed_by_controller(ref) + + assert cr is not None + assert k8s.get_resource_exists(ref) + + return (ref, cr) + + +@pytest.fixture(scope="function") +def table_with_invalid_replicas(): + table_name = random_suffix_name("table-invalid-replicas", 32) + + (ref, res) = create_table_with_invalid_replicas(table_name) + + yield (ref, res) + + # Delete the k8s resource if it still exists + if k8s.get_resource_exists(ref): + k8s.delete_custom_resource(ref) + time.sleep(DELETE_WAIT_AFTER_SECONDS) + + +@pytest.fixture(scope="function") +def table_replicas_gsi(): + table_name = random_suffix_name("table-replicas-gsi", 32) + replacements = REPLACEMENT_VALUES.copy() + replacements["TABLE_NAME"] = table_name + replacements["REPLICA_REGION_1"] = REPLICA_REGION_1 + replacements["REPLICA_REGION_2"] = REPLICA_REGION_2 + + resource_data = load_dynamodb_resource( + "table_with_gsi_and_replicas", + additional_replacements=replacements, + ) + + ref = k8s.CustomResourceReference( + CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL, + table_name, namespace="default", + ) + k8s.create_custom_resource(ref, resource_data) + cr = k8s.wait_resource_consumed_by_controller(ref) + + table.wait_until( + table_name, + table.status_matches("ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + yield (ref, cr) + + deleted = k8s.delete_custom_resource(ref) + time.sleep(DELETE_WAIT_AFTER_SECONDS) + assert deleted + +@service_marker +@pytest.mark.canary +class TestTableReplicas: + def table_exists(self, table_name: str) -> bool: + return table.get(table_name) is not None + + def test_create_table_with_replicas(self, table_with_replicas): + (_, res) = table_with_replicas + table_name = res["spec"]["tableName"] + + # Table should already be ACTIVE from fixture + assert table.get(table_name) is not None + + # Verify replicas exist and are ACTIVE + for region in [REPLICA_REGION_1, REPLICA_REGION_2]: + table.wait_until( + table_name, + table.replica_status_matches(region, "ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + def test_add_replica(self, table_with_replicas): + (ref, res) = table_with_replicas + table_name = res["spec"]["tableName"] + + assert table.get(table_name) is not None + table.wait_until( + table_name, + table.status_matches("ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Update replicas + cr = k8s.get_resource(ref) + cr["spec"]["tableReplicas"] = [ + {"regionName": REPLICA_REGION_3}, + {"regionName": REPLICA_REGION_4}, + {"regionName": REPLICA_REGION_5} + ] + k8s.patch_custom_resource(ref, cr) + table.wait_until( + table_name, + table.replicas_match( + [REPLICA_REGION_3, REPLICA_REGION_4, REPLICA_REGION_5]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Verify all replicas are ACTIVE + for region in [REPLICA_REGION_3, REPLICA_REGION_4, REPLICA_REGION_5]: + table.wait_until( + table_name, + table.replica_status_matches(region, "ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + def test_remove_replica(self, table_with_replicas): + (ref, res) = table_with_replicas + table_name = res["spec"]["tableName"] + + assert self.table_exists(table_name) + + table.wait_until( + table_name, + table.replicas_match([REPLICA_REGION_1, REPLICA_REGION_2]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + for region in [REPLICA_REGION_1, REPLICA_REGION_2]: + table.wait_until( + table_name, + table.replica_status_matches(region, "ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + cr = k8s.wait_resource_consumed_by_controller(ref) + current_replicas = table.get_replicas(table_name) + assert current_replicas is not None + assert len(current_replicas) >= 1 + + current_regions = [r["RegionName"] for r in current_replicas] + logging.info(f"Current replicas: {current_regions}") + + regions_to_keep = current_regions[:-1] + regions_to_remove = [current_regions[-1]] + + cr["spec"]["tableReplicas"] = [ + {"regionName": region} for region in regions_to_keep + ] + + k8s.patch_custom_resource(ref, cr) + + # Wait for the replica to be removed + table.wait_until( + table_name, + table.replicas_match(regions_to_keep), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Verify remaining replicas + replicas = table.get_replicas(table_name) + assert replicas is not None + assert len(replicas) == len(regions_to_keep) + + region_names = [r["RegionName"] for r in replicas] + for region in regions_to_keep: + assert region in region_names + for region in regions_to_remove: + assert region not in region_names + + def test_delete_table_with_replicas(self, table_with_replicas): + (ref, res) = table_with_replicas + + table_name = res["spec"]["tableName"] + assert self.table_exists(table_name) + + def test_terminal_condition_for_invalid_stream_specification(self, table_with_invalid_replicas): + (ref, res) = table_with_invalid_replicas + + table_name = res["spec"]["tableName"] + assert self.table_exists(table_name) + + max_wait_seconds = 120 + interval_seconds = 10 + start_time = time.time() + terminal_condition_found = False + + while time.time() - start_time < max_wait_seconds: + try: + condition.assert_type_status( + ref, + condition.CONDITION_TYPE_TERMINAL, + True) + + terminal_condition_found = True + cond = k8s.get_resource_condition( + ref, condition.CONDITION_TYPE_TERMINAL) + assert "table must have DynamoDB Streams enabled with StreamViewType set to NEW_AND_OLD_IMAGES" in cond[ + "message"] + break + except: + time.sleep(interval_seconds) + + assert terminal_condition_found, "Terminal condition was not set for invalid StreamSpecification" + + def test_staged_replicas_and_gsi_updates(self, table_replicas_gsi): + (ref, cr) = table_replicas_gsi + table_name = cr["spec"]["tableName"] + max_wait_seconds = REPLICA_WAIT_AFTER_SECONDS + interval_seconds = 30 + start_time = time.time() + + while time.time() - start_time < max_wait_seconds: + if self.table_exists(table_name): + break + time.sleep(interval_seconds) + assert self.table_exists(table_name) + + table.wait_until( + table_name, + table.gsi_matches([{ + "indexName": "GSI1", + "keySchema": [ + {"attributeName": "GSI1PK", "keyType": "HASH"}, + {"attributeName": "GSI1SK", "keyType": "RANGE"} + ], + "projection": { + "projectionType": "ALL" + } + }]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Step 2: Update - add second GSI and two more replicas + cr = k8s.wait_resource_consumed_by_controller(ref) + + # Add attribute definition needed for GSI2 + cr["spec"]["attributeDefinitions"].append( + {"attributeName": "GSI2PK", "attributeType": "S"} + ) + + # Add GSI2 + cr["spec"]["globalSecondaryIndexes"].append({ + "indexName": "GSI2", + "keySchema": [ + {"attributeName": "GSI2PK", "keyType": "HASH"} + ], + "projection": { + "projectionType": "KEYS_ONLY" + } + }) + + # Add two more replicas + cr["spec"]["tableReplicas"] = [ + {"regionName": REPLICA_REGION_1, + "globalSecondaryIndexes": [{"indexName": "GSI1"}, {"indexName": "GSI2"}]}, + {"regionName": REPLICA_REGION_2, + "globalSecondaryIndexes": [{"indexName": "GSI1"}, {"indexName": "GSI2"}]}, + {"regionName": REPLICA_REGION_3, + "globalSecondaryIndexes": [{"indexName": "GSI1"}, {"indexName": "GSI2"}]} + ] + + # Update the resource + k8s.patch_custom_resource(ref, cr) + + # Wait for the new GSI to be created + table.wait_until( + table_name, + table.gsi_matches([ + { + "indexName": "GSI1", + "keySchema": [ + {"attributeName": "GSI1PK", "keyType": "HASH"}, + {"attributeName": "GSI1SK", "keyType": "RANGE"} + ], + "projection": { + "projectionType": "ALL" + } + }, + { + "indexName": "GSI2", + "keySchema": [ + {"attributeName": "GSI2PK", "keyType": "HASH"} + ], + "projection": { + "projectionType": "KEYS_ONLY" + } + } + ]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS*2, + interval_seconds=30, + ) + + table.wait_until( + table_name, + table.replicas_match( + [REPLICA_REGION_1, REPLICA_REGION_2, REPLICA_REGION_3]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS*2, + interval_seconds=30, + ) + + for region in [REPLICA_REGION_1, REPLICA_REGION_2, REPLICA_REGION_3]: + table.wait_until( + table_name, + table.replica_status_matches(region, "ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + table_info = table.get(table_name) + assert "GlobalSecondaryIndexes" in table_info + assert len(table_info["GlobalSecondaryIndexes"]) == 2 + gsi_names = [gsi["IndexName"] + for gsi in table_info["GlobalSecondaryIndexes"]] + assert "GSI1" in gsi_names + assert "GSI2" in gsi_names + + replicas = table.get_replicas(table_name) + assert replicas is not None + assert len(replicas) == 3 + region_names = [r["RegionName"] for r in replicas] + assert REPLICA_REGION_1 in region_names + assert REPLICA_REGION_2 in region_names + assert REPLICA_REGION_3 in region_names + \ No newline at end of file