Skip to content

Commit

Permalink
Fix #34819.
Browse files Browse the repository at this point in the history
  • Loading branch information
ewbankkit committed Apr 30, 2024
1 parent 1750f12 commit 59ec384
Showing 1 changed file with 65 additions and 74 deletions.
139 changes: 65 additions & 74 deletions internal/service/elasticache/replication_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,25 +732,20 @@ func resourceReplicationGroupUpdate(ctx context.Context, d *schema.ResourceData,
conn := meta.(*conns.AWSClient).ElastiCacheConn(ctx)

if d.HasChangesExcept("tags", "tags_all") {
if d.HasChanges(
"num_node_groups",
"replicas_per_node_group",
) {
err := modifyReplicationGroupShardConfiguration(ctx, conn, d)
if err != nil {
return sdkdiag.AppendErrorf(diags, "modifying ElastiCache Replication Group (%s) shard configuration: %s", d.Id(), err)
o, n := d.GetChange("num_cache_clusters")
oldCacheClusterCount, newCacheClusterCount := o.(int), n.(int)

if d.HasChanges("num_node_groups", "replicas_per_node_group") {
if err := modifyReplicationGroupShardConfiguration(ctx, conn, d); err != nil {
return sdkdiag.AppendFromErr(diags, err)
}
} else if d.HasChange("num_cache_clusters") {
o, n := d.GetChange("num_cache_clusters")
oldNumberCacheClusters := o.(int)
newNumberCacheClusters := n.(int)
if newNumberCacheClusters > oldNumberCacheClusters {
err := increaseReplicationGroupNumCacheClusters(ctx, conn, d.Id(), newNumberCacheClusters, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return sdkdiag.AppendErrorf(diags, "increasing ElastiCache Replication size (%s) clusters: %s", d.Id(), err)
if newCacheClusterCount > oldCacheClusterCount {
if err := increaseReplicationGroupReplicaCount(ctx, conn, d.Id(), newCacheClusterCount, d.Timeout(schema.TimeoutUpdate)); err != nil {
return sdkdiag.AppendFromErr(diags, err)
}
} else if newNumberCacheClusters < oldNumberCacheClusters {
// deferring size in until after cluster modifications
} else if newCacheClusterCount < oldCacheClusterCount {
// Defer until after all other modifications are made.
}
}

Expand Down Expand Up @@ -922,40 +917,36 @@ func resourceReplicationGroupUpdate(ctx context.Context, d *schema.ResourceData,
}

if d.HasChanges("auth_token", "auth_token_update_strategy") {
params := &elasticache.ModifyReplicationGroupInput{
input := &elasticache.ModifyReplicationGroupInput{
ApplyImmediately: aws.Bool(true),
ReplicationGroupId: aws.String(d.Id()),
AuthTokenUpdateStrategy: aws.String(d.Get("auth_token_update_strategy").(string)),
AuthToken: aws.String(d.Get("auth_token").(string)),
AuthTokenUpdateStrategy: aws.String(d.Get("auth_token_update_strategy").(string)),
ReplicationGroupId: aws.String(d.Id()),
}

// tagging may cause this resource to not yet be available, so wait for it to be available
const (
delay = 0 * time.Second
)
_, err := waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay)
if err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for ElastiCache Replication Group (%s) to update: %s", d.Id(), err)
if _, err := waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for ElastiCache Replication Group (%s) update: %s", d.Id(), err)
}

_, err = conn.ModifyReplicationGroupWithContext(ctx, params)
_, err := conn.ModifyReplicationGroupWithContext(ctx, input)

if err != nil {
return sdkdiag.AppendErrorf(diags, "changing auth_token for ElastiCache Replication Group (%s): %s", d.Id(), err)
return sdkdiag.AppendErrorf(diags, "modifying ElastiCache Replication Group (%s) authentication: %s", d.Id(), err)
}
_, err = waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay)
if err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for ElastiCache Replication Group (%s) auth_token change: %s", d.Id(), err)

if _, err := waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for ElastiCache Replication Group (%s) update: %s", d.Id(), err)
}
}

if d.HasChange("num_cache_clusters") {
o, n := d.GetChange("num_cache_clusters")
oldNumberCacheClusters := o.(int)
newNumberCacheClusters := n.(int)
if oldNumberCacheClusters < newNumberCacheClusters {
err := decreaseReplicationGroupNumCacheClusters(ctx, conn, d.Id(), newNumberCacheClusters, d.Timeout(schema.TimeoutUpdate))
if err != nil {
return sdkdiag.AppendErrorf(diags, "decreasing ElastiCache Replication size (%s) clusters: %s", d.Id(), err)
if newCacheClusterCount < oldCacheClusterCount {
if err := decreaseReplicationGroupReplicaCount(ctx, conn, d.Id(), newCacheClusterCount, d.Timeout(schema.TimeoutUpdate)); err != nil {
return sdkdiag.AppendFromErr(diags, err)
}
}
}
Expand Down Expand Up @@ -1083,15 +1074,13 @@ func deleteReplicationGroup(ctx context.Context, replicationGroupID string, conn

func modifyReplicationGroupShardConfiguration(ctx context.Context, conn *elasticache.ElastiCache, d *schema.ResourceData) error {
if d.HasChange("num_node_groups") {
err := modifyReplicationGroupShardConfigurationNumNodeGroups(ctx, conn, d, "num_node_groups")
if err != nil {
if err := modifyReplicationGroupShardConfigurationNumNodeGroups(ctx, conn, d, "num_node_groups"); err != nil {
return err
}
}

if d.HasChange("replicas_per_node_group") {
err := modifyReplicationGroupShardConfigurationReplicasPerNodeGroup(ctx, conn, d, "replicas_per_node_group")
if err != nil {
if err := modifyReplicationGroupShardConfigurationReplicasPerNodeGroup(ctx, conn, d, "replicas_per_node_group"); err != nil {
return err
}
}
Expand All @@ -1101,121 +1090,123 @@ func modifyReplicationGroupShardConfiguration(ctx context.Context, conn *elastic

func modifyReplicationGroupShardConfigurationNumNodeGroups(ctx context.Context, conn *elasticache.ElastiCache, d *schema.ResourceData, argument string) error {
o, n := d.GetChange(argument)
oldNumNodeGroups := o.(int)
newNumNodeGroups := n.(int)
oldNodeGroupCount, newNodeGroupCount := o.(int), n.(int)

input := &elasticache.ModifyReplicationGroupShardConfigurationInput{
ApplyImmediately: aws.Bool(true),
NodeGroupCount: aws.Int64(int64(newNumNodeGroups)),
NodeGroupCount: aws.Int64(int64(newNodeGroupCount)),
ReplicationGroupId: aws.String(d.Id()),
}

if oldNumNodeGroups > newNumNodeGroups {
if oldNodeGroupCount > newNodeGroupCount {
// Node Group IDs are 1 indexed: 0001 through 0015
// Loop from highest old ID until we reach highest new ID
nodeGroupsToRemove := []string{}
for i := oldNumNodeGroups; i > newNumNodeGroups; i-- {
for i := oldNodeGroupCount; i > newNodeGroupCount; i-- {
nodeGroupID := fmt.Sprintf("%04d", i)
nodeGroupsToRemove = append(nodeGroupsToRemove, nodeGroupID)
}
input.NodeGroupsToRemove = aws.StringSlice(nodeGroupsToRemove)
}

log.Printf("[DEBUG] Modifying ElastiCache Replication Group (%s) shard configuration: %s", d.Id(), input)
_, err := conn.ModifyReplicationGroupShardConfigurationWithContext(ctx, input)

if err != nil {
return fmt.Errorf("modifying ElastiCache Replication Group shard configuration: %w", err)
return fmt.Errorf("modifying ElastiCache Replication Group (%s) shard configuration: %w", d.Id(), err)
}

const (
delay = 30 * time.Second
)
_, err = waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay)
if err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) shard reconfiguration completion: %w", d.Id(), err)
if _, err := waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay); err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) update: %w", d.Id(), err)
}

return nil
}

func modifyReplicationGroupShardConfigurationReplicasPerNodeGroup(ctx context.Context, conn *elasticache.ElastiCache, d *schema.ResourceData, argument string) error {
o, n := d.GetChange(argument)
oldReplicas := o.(int)
newReplicas := n.(int)
oldReplicaCount, newReplicaCount := o.(int), n.(int)

if newReplicas > oldReplicas {
if newReplicaCount > oldReplicaCount {
input := &elasticache.IncreaseReplicaCountInput{
ApplyImmediately: aws.Bool(true),
NewReplicaCount: aws.Int64(int64(newReplicas)),
NewReplicaCount: aws.Int64(int64(newReplicaCount)),
ReplicationGroupId: aws.String(d.Id()),
}

_, err := conn.IncreaseReplicaCountWithContext(ctx, input)

if err != nil {
return fmt.Errorf("adding ElastiCache Replication Group (%s) replicas: %w", d.Id(), err)
return fmt.Errorf("increasing ElastiCache Replication Group (%s) replica count (%d): %w", d.Id(), newReplicaCount, err)
}

const (
delay = 30 * time.Second
)
_, err = waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay)
if err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) replica addition: %w", d.Id(), err)
if _, err := waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay); err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) update: %w", d.Id(), err)
}
} else {
} else if newReplicaCount < oldReplicaCount {
input := &elasticache.DecreaseReplicaCountInput{
ApplyImmediately: aws.Bool(true),
NewReplicaCount: aws.Int64(int64(newReplicas)),
NewReplicaCount: aws.Int64(int64(newReplicaCount)),
ReplicationGroupId: aws.String(d.Id()),
}

_, err := conn.DecreaseReplicaCountWithContext(ctx, input)

if err != nil {
return fmt.Errorf("removing ElastiCache Replication Group (%s) replicas: %w", d.Id(), err)
return fmt.Errorf("decreasing ElastiCache Replication Group (%s) replica count (%d): %w", d.Id(), newReplicaCount, err)
}

const (
delay = 30 * time.Second
)
_, err = waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay)
if err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) replica removal: %w", d.Id(), err)
if _, err := waitReplicationGroupAvailable(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate), delay); err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) update: %w", d.Id(), err)
}
}

return nil
}

func increaseReplicationGroupNumCacheClusters(ctx context.Context, conn *elasticache.ElastiCache, replicationGroupID string, newNumberCacheClusters int, timeout time.Duration) error {
func increaseReplicationGroupReplicaCount(ctx context.Context, conn *elasticache.ElastiCache, replicationGroupID string, newReplicaCount int, timeout time.Duration) error {
input := &elasticache.IncreaseReplicaCountInput{
ApplyImmediately: aws.Bool(true),
NewReplicaCount: aws.Int64(int64(newNumberCacheClusters - 1)),
NewReplicaCount: aws.Int64(int64(newReplicaCount - 1)),
ReplicationGroupId: aws.String(replicationGroupID),
}

_, err := conn.IncreaseReplicaCountWithContext(ctx, input)

if err != nil {
return fmt.Errorf("adding ElastiCache Replication Group (%s) replicas: %w", replicationGroupID, err)
return fmt.Errorf("increasing ElastiCache Replication Group (%s) replica count (%d): %w", replicationGroupID, newReplicaCount-1, err)
}

_, err = waitReplicationGroupMemberClustersAvailable(ctx, conn, replicationGroupID, timeout)
if err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) replica addition: %w", replicationGroupID, err)
if _, err := waitReplicationGroupMemberClustersAvailable(ctx, conn, replicationGroupID, timeout); err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) member cluster update: %w", replicationGroupID, err)
}

return nil
}

func decreaseReplicationGroupNumCacheClusters(ctx context.Context, conn *elasticache.ElastiCache, replicationGroupID string, newNumberCacheClusters int, timeout time.Duration) error {
func decreaseReplicationGroupReplicaCount(ctx context.Context, conn *elasticache.ElastiCache, replicationGroupID string, newReplicaCount int, timeout time.Duration) error {
input := &elasticache.DecreaseReplicaCountInput{
ApplyImmediately: aws.Bool(true),
NewReplicaCount: aws.Int64(int64(newNumberCacheClusters - 1)),
NewReplicaCount: aws.Int64(int64(newReplicaCount - 1)),
ReplicationGroupId: aws.String(replicationGroupID),
}

_, err := conn.DecreaseReplicaCountWithContext(ctx, input)

if err != nil {
return fmt.Errorf("removing ElastiCache Replication Group (%s) replicas: %w", replicationGroupID, err)
return fmt.Errorf("decreasing ElastiCache Replication Group (%s) replica count (%d): %w", replicationGroupID, newReplicaCount-1, err)
}

_, err = waitReplicationGroupMemberClustersAvailable(ctx, conn, replicationGroupID, timeout)
if err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) replica removal: %w", replicationGroupID, err)
if _, err := waitReplicationGroupMemberClustersAvailable(ctx, conn, replicationGroupID, timeout); err != nil {
return fmt.Errorf("waiting for ElastiCache Replication Group (%s) member cluster update: %w", replicationGroupID, err)
}

return nil
Expand Down

0 comments on commit 59ec384

Please sign in to comment.