Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions test/extended/etcd/OWNERS
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
reviewers:
- dusk125
- hasbro17
- Elbehery
- jubittajohn
- tjungblu
approvers:
- deads2k
- soltysh
- hasbro17
- dusk125
- Elbehery
- jubittajohn
- tjungblu
161 changes: 161 additions & 0 deletions test/extended/etcd/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,167 @@ func EnsureCPMSReplicasConverged(ctx context.Context, cpmsClient machinev1client
return nil
}

// UpdateCPMSStrategy updates the CPMS strategy to the specified type (OnDelete, RollingUpdate, or Recreate)
func UpdateCPMSStrategy(ctx context.Context, t TestingT, cpmsClient machinev1client.ControlPlaneMachineSetInterface, strategy machinev1.ControlPlaneMachineSetStrategyType) error {
cpms, err := cpmsClient.Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return err
}

cpms.Spec.Strategy.Type = strategy
_, err = cpmsClient.Update(ctx, cpms, metav1.UpdateOptions{})
if err != nil {
return err
}

framework.Logf("Successfully updated CPMS strategy to %v", strategy)
return nil
}

// DeleteAllMasterMachines deletes all master machines and returns the list of deleted machine names
func DeleteAllMasterMachines(ctx context.Context, t TestingT, machineClient machinev1beta1client.MachineInterface) ([]string, error) {
machineList, err := machineClient.List(ctx, metav1.ListOptions{LabelSelector: masterMachineLabelSelector})
if err != nil {
return nil, fmt.Errorf("error listing master machines: '%w'", err)
}

var deletedMachineNames []string
for _, machine := range machineList.Items {
if err := DeleteMachine(ctx, t, machineClient, machine.Name); err != nil {
return deletedMachineNames, err
}
deletedMachineNames = append(deletedMachineNames, machine.Name)
}

return deletedMachineNames, nil
}

// EnsureUpdatedReplicasOnCPMS checks if status.updatedReplicas on the cluster CPMS equals the expected count
// updatedReplicas represents machines with the desired spec that are ready
func EnsureUpdatedReplicasOnCPMS(ctx context.Context, t TestingT, expectedCount int, cpmsClient machinev1client.ControlPlaneMachineSetInterface) error {
waitPollInterval := 15 * time.Second
waitPollTimeout := 90 * time.Minute
framework.Logf("Waiting up to %s for the CPMS to have status.updatedReplicas = %v", waitPollTimeout.String(), expectedCount)

return wait.PollUntilContextTimeout(ctx, waitPollInterval, waitPollTimeout, true, func(ctx context.Context) (done bool, err error) {
cpms, err := cpmsClient.Get(ctx, "cluster", metav1.GetOptions{})
if err != nil {
return isTransientAPIError(t, err)
}

if cpms.Status.UpdatedReplicas != int32(expectedCount) {
framework.Logf("expected %d updated replicas on CPMS, got: %v", expectedCount, cpms.Status.UpdatedReplicas)
return false, nil
}
framework.Logf("CPMS has reached the desired number of updated replicas: %v", cpms.Status.UpdatedReplicas)
return true, nil
})
}

// GetVotingMemberNames returns the list of current voting etcd member names
func GetVotingMemberNames(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator) ([]string, error) {
etcdClient, closeFn, err := etcdClientFactory.NewEtcdClient()
if err != nil {
return nil, fmt.Errorf("failed to get etcd client: %w", err)
}
defer closeFn()

memberCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
memberList, err := etcdClient.MemberList(memberCtx)
if err != nil {
return nil, fmt.Errorf("failed to get the member list: %w", err)
}

var votingMemberNames []string
for _, member := range memberList.Members {
if !member.IsLearner {
votingMemberNames = append(votingMemberNames, member.Name)
}
}

framework.Logf("Current voting etcd members: %v", votingMemberNames)
return votingMemberNames, nil
}

// EnsureVotingMembersExcluding waits for the cluster to have exactly expectedCount voting members,
// with none of the members in the excludedMemberNames list
func EnsureVotingMembersExcluding(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator, kubeClient kubernetes.Interface, excludedMemberNames []string, expectedCount int) error {
waitPollInterval := 15 * time.Second
waitPollTimeout := 90 * time.Minute
excludedSet := sets.NewString(excludedMemberNames...)
framework.Logf("Waiting up to %s for the cluster to have %v voting members with none from the excluded list: %v", waitPollTimeout.String(), expectedCount, excludedMemberNames)

return wait.PollUntilContextTimeout(ctx, waitPollInterval, waitPollTimeout, true, func(ctx context.Context) (done bool, err error) {
etcdClient, closeFn, err := etcdClientFactory.NewEtcdClient()
if err != nil {
framework.Logf("failed to get etcd client, will retry, err: %v", err)
return false, nil
}
defer closeFn()

memberCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
memberList, err := etcdClient.MemberList(memberCtx)
if err != nil {
framework.Logf("failed to get the member list, will retry, err: %v", err)
return false, nil
}

var votingMemberNames []string
excludedMemberIDs := sets.NewString()
for _, member := range memberList.Members {
if !member.IsLearner {
votingMemberNames = append(votingMemberNames, member.Name)
// Collect IDs of excluded members
if excludedSet.Has(member.Name) {
// Convert member ID to hexadecimal format to match etcd-endpoints ConfigMap format
memberID := fmt.Sprintf("%x", member.ID)
excludedMemberIDs.Insert(memberID)
}
}
}

// Check if we have the expected count
if len(votingMemberNames) != expectedCount {
framework.Logf("unexpected number of voting etcd members, expected exactly %d, got: %v, current members are: %v", expectedCount, len(votingMemberNames), votingMemberNames)
return false, nil
}

// Check if any of the current members are in the excluded list
for _, memberName := range votingMemberNames {
if excludedSet.Has(memberName) {
framework.Logf("found excluded member %q still in the cluster, current members are: %v", memberName, votingMemberNames)
return false, nil
}
}

framework.Logf("cluster has reached the expected number of %v voting members with none from excluded list, current members are: %v", expectedCount, votingMemberNames)

// Also validate etcd-endpoints ConfigMap
framework.Logf("ensuring that the openshift-etcd/etcd-endpoints cm has the expected number of %v voting members and excludes old members", expectedCount)
etcdEndpointsConfigMap, err := kubeClient.CoreV1().ConfigMaps("openshift-etcd").Get(ctx, "etcd-endpoints", metav1.GetOptions{})
if err != nil {
return false, err
}
currentVotingMemberIPListSet := sets.NewString()
for memberID, votingMemberIP := range etcdEndpointsConfigMap.Data {
// Check if this member ID is in the excluded member IDs list
if excludedMemberIDs.Has(memberID) {
framework.Logf("found excluded member ID %q in etcd-endpoints ConfigMap, will retry", memberID)
return false, nil
}
currentVotingMemberIPListSet.Insert(votingMemberIP)
}
if currentVotingMemberIPListSet.Len() != expectedCount {
framework.Logf("unexpected number of voting members in the openshift-etcd/etcd-endpoints cm, expected exactly %d, got: %v, current members are: %v", expectedCount, currentVotingMemberIPListSet.Len(), currentVotingMemberIPListSet.List())
return false, nil
}

return true, nil
})
}

// EnsureVotingMembersCount counts the number of voting etcd members, it doesn't evaluate health conditions or any other attributes (i.e. name) of individual members
// this method won't fail immediately on errors, this is useful during scaling down operation until the feature can ensure this operation to be graceful
func EnsureVotingMembersCount(ctx context.Context, t TestingT, etcdClientFactory EtcdClientCreator, kubeClient kubernetes.Interface, expectedMembersCount int) error {
Expand Down
87 changes: 85 additions & 2 deletions test/extended/etcd/vertical_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
o "github.com/onsi/gomega"
"github.com/pkg/errors"

machinev1 "github.com/openshift/api/machine/v1"
machineclient "github.com/openshift/client-go/machine/clientset/versioned"
machinev1 "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1"
machinev1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1"
machinev1beta1client "github.com/openshift/client-go/machine/clientset/versioned/typed/machine/v1beta1"
testlibraryapi "github.com/openshift/library-go/test/library/apiserver"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -19,6 +20,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
)

var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd/scaling][Serial] etcd", func() {
Expand All @@ -30,7 +32,7 @@ var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd
machineClientSet *machineclient.Clientset
machineClient machinev1beta1client.MachineInterface
nodeClient v1.NodeInterface
cpmsClient machinev1.ControlPlaneMachineSetInterface
cpmsClient machinev1client.ControlPlaneMachineSetInterface
kubeClient kubernetes.Interface
cpmsActive bool
ctx context.Context
Expand Down Expand Up @@ -293,4 +295,85 @@ var _ = g.Describe("[sig-etcd][Feature:EtcdVerticalScaling][Suite:openshift/etcd
err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient)
o.Expect(err).ToNot(o.HaveOccurred())
})

// The following test validates CPMS OnDelete strategy behavior during full master replacement.
// OnDelete strategy differs from RollingUpdate in that CPMS does not automatically update
// machines when their spec changes. However, when machines are deleted, CPMS still creates
// replacements to maintain the desired replica count.
//
// This test verifies that CPMS correctly handles the deletion of all three master machines
// simultaneously while in OnDelete mode:
// 1) Switches CPMS to OnDelete strategy
// 2) Deletes all master machines at once
// 3) Validates CPMS creates three new replacement machines
// 4) Verifies all old etcd members are removed from both the cluster and etcd-endpoints ConfigMap
// 5) Waits for API server rollout to stabilize and verifies the cluster returns to 3 running machines
g.It("is able to delete all masters with OnDelete strategy and wait for CPMSO to replace them [Timeout:120m][apigroup:machine.openshift.io]", func() {
if !cpmsActive {
e2eskipper.Skipf("CPMS is not active on this platform, this test requires an active CPMS to validate OnDelete strategy")
}

// step 1: Update CPMS to OnDelete strategy
framework.Logf("Updating CPMS strategy to OnDelete")
err = scalingtestinglibrary.UpdateCPMSStrategy(ctx, g.GinkgoT(), cpmsClient, machinev1.OnDelete)
err = errors.Wrap(err, "failed to update CPMS strategy to OnDelete")
o.Expect(err).ToNot(o.HaveOccurred())

// step 2: Restore RollingUpdate strategy in cleanup
defer func() {
framework.Logf("Restoring CPMS strategy to RollingUpdate")
err := scalingtestinglibrary.UpdateCPMSStrategy(ctx, g.GinkgoT(), cpmsClient, machinev1.RollingUpdate)
err = errors.Wrap(err, "cleanup: failed to restore CPMS strategy to RollingUpdate")
o.Expect(err).ToNot(o.HaveOccurred())
}()

// step 3: Capture current etcd member names before deletion
framework.Logf("Capturing current voting etcd member names")
oldMemberNames, err := scalingtestinglibrary.GetVotingMemberNames(ctx, g.GinkgoT(), etcdClientFactory)
err = errors.Wrap(err, "failed to get current voting member names")
o.Expect(err).ToNot(o.HaveOccurred())

// step 4: Delete all master machines
framework.Logf("Deleting all master machines")
deletedMachineNames, err := scalingtestinglibrary.DeleteAllMasterMachines(ctx, g.GinkgoT(), machineClient)
err = errors.Wrap(err, "failed to delete all master machines")
o.Expect(err).ToNot(o.HaveOccurred())
framework.Logf("Deleted machines: %v", deletedMachineNames)

// step 5: Wait for CPMS to show 3 updated replicas
framework.Logf("Waiting for CPMS to show 3 updated replicas")
err = scalingtestinglibrary.EnsureUpdatedReplicasOnCPMS(ctx, g.GinkgoT(), 3, cpmsClient)
err = errors.Wrap(err, "timed out waiting for CPMS to show 3 updated replicas")
o.Expect(err).ToNot(o.HaveOccurred())

// step 6: Wait for etcd membership to have 3 members with none from old member list
framework.Logf("Waiting for etcd membership to stabilize with new members")
err = scalingtestinglibrary.EnsureVotingMembersExcluding(ctx, g.GinkgoT(), etcdClientFactory, kubeClient, oldMemberNames, 3)
err = errors.Wrap(err, "timed out waiting for etcd to have 3 voting members excluding old members")
o.Expect(err).ToNot(o.HaveOccurred())

// step 7: Verify CPMS shows 3 ready replicas
framework.Logf("Waiting for 3 ready replicas on CPMS")
err = scalingtestinglibrary.EnsureReadyReplicasOnCPMS(ctx, g.GinkgoT(), 3, cpmsClient, nodeClient)
err = errors.Wrap(err, "timed out waiting for CPMS to show 3 ready replicas")
o.Expect(err).ToNot(o.HaveOccurred())

// step 8: Verify only 3 running master machines
framework.Logf("Waiting for 3 Running master machines")
err = scalingtestinglibrary.EnsureMasterMachinesAndCount(ctx, g.GinkgoT(), machineClient)
err = errors.Wrap(err, "timed out waiting for only 3 Running master machines")
o.Expect(err).ToNot(o.HaveOccurred())

// step 9: Verify CPMS replicas converged
framework.Logf("Waiting for CPMS replicas to converge")
err = scalingtestinglibrary.EnsureCPMSReplicasConverged(ctx, cpmsClient)
err = errors.Wrap(err, "CPMS replicas failed to converge")
o.Expect(err).ToNot(o.HaveOccurred())

// step 10: Wait for API server to stabilize
framework.Logf("Waiting for API servers to stabilize on the same revision")
err = testlibraryapi.WaitForAPIServerToStabilizeOnTheSameRevision(g.GinkgoT(), oc.KubeClient().CoreV1().Pods("openshift-kube-apiserver"))
err = errors.Wrap(err, "timed out waiting for APIServer pods to stabilize on the same revision")
o.Expect(err).ToNot(o.HaveOccurred())
})
})