Skip to content

Commit

Permalink
Wait adative duration per unready replicas between retries
Browse files Browse the repository at this point in the history
  • Loading branch information
wongni committed Jun 13, 2022
1 parent 1484135 commit 013035c
Show file tree
Hide file tree
Showing 8 changed files with 1,390 additions and 21 deletions.
25 changes: 17 additions & 8 deletions pkg/clustermanager/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type ClusterClient interface {
GetNamespace(ctx context.Context, kubeconfig string, namespace string) error
ValidateControlPlaneNodes(ctx context.Context, cluster *types.Cluster, clusterName string) error
ValidateWorkerNodes(ctx context.Context, clusterName string, kubeconfigFile string) error
CountMachineDeploymentReplicasReady(ctx context.Context, clusterName string, kubeconfigFile string) (int, int, error)
GetBundles(ctx context.Context, kubeconfigFile, name, namespace string) (*releasev1alpha1.Bundles, error)
GetApiServerUrl(ctx context.Context, cluster *types.Cluster) (string, error)
GetClusterCATlsCert(ctx context.Context, clusterName string, cluster *types.Cluster, namespace string) ([]byte, error)
Expand Down Expand Up @@ -707,17 +708,25 @@ func (c *ClusterManager) waitForControlPlaneReplicasReady(ctx context.Context, m
}

func (c *ClusterManager) waitForMachineDeploymentReplicasReady(ctx context.Context, managementCluster *types.Cluster, clusterSpec *cluster.Spec) error {
ready, total := 0, 0
policy := func(_ int, _ error) (bool, time.Duration) {
return true, c.machineBackoff * time.Duration(total-ready)
}

var machineDeploymentReplicasCount int
for _, workerNodeGroupConfiguration := range clusterSpec.Cluster.Spec.WorkerNodeGroupConfigurations {
machineDeploymentReplicasCount += workerNodeGroupConfiguration.Count
}

isMdReady := func() error {
return c.clusterClient.ValidateWorkerNodes(ctx, clusterSpec.Cluster.Name, managementCluster.KubeconfigFile)
}

err := isMdReady()
if err == nil {
areMdReplicasReady := func() error {
var err error
ready, total, err = c.clusterClient.CountMachineDeploymentReplicasReady(ctx, clusterSpec.Cluster.Name, managementCluster.KubeconfigFile)
if err != nil {
return err
}
if ready != total {
return fmt.Errorf("%d machine deployment replicas are not ready", total-ready)
}
return nil
}

Expand All @@ -726,8 +735,8 @@ func (c *ClusterManager) waitForMachineDeploymentReplicasReady(ctx context.Conte
timeout = c.machinesMinWait
}

r := retrier.New(timeout)
if err := r.Retry(isMdReady); err != nil {
r := retrier.New(timeout, retrier.WithRetryPolicy(policy))
if err := r.Retry(areMdReplicasReady); err != nil {
return fmt.Errorf("retries exhausted waiting for machinedeployment replicas to be ready: %v", err)
}
return nil
Expand Down
90 changes: 84 additions & 6 deletions pkg/clustermanager/cluster_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func TestClusterManagerUpgradeSelfManagedClusterSuccess(t *testing.T) {
tt.mocks.provider.EXPECT().MachineDeploymentsToDelete(wCluster, tt.clusterSpec, tt.clusterSpec.DeepCopy()).Return([]string{})
tt.mocks.client.EXPECT().WaitForDeployment(tt.ctx, wCluster, "30m", "Available", gomock.Any(), gomock.Any()).MaxTimes(10)
tt.mocks.client.EXPECT().ValidateControlPlaneNodes(tt.ctx, mCluster, wCluster.Name).Return(nil)
tt.mocks.client.EXPECT().ValidateWorkerNodes(tt.ctx, wCluster.Name, mCluster.KubeconfigFile).Return(nil)
tt.mocks.client.EXPECT().CountMachineDeploymentReplicasReady(tt.ctx, wCluster.Name, mCluster.KubeconfigFile).Return(0, 0, nil)
tt.mocks.provider.EXPECT().GetDeployments()
tt.mocks.writer.EXPECT().Write(clusterName+"-eks-a-cluster.yaml", gomock.Any(), gomock.Not(gomock.Nil()))
tt.mocks.client.EXPECT().GetEksaOIDCConfig(tt.ctx, tt.clusterSpec.Cluster.Spec.IdentityProviderRefs[0].Name, tt.cluster.KubeconfigFile, tt.clusterSpec.Cluster.Namespace).Return(nil, nil)
Expand Down Expand Up @@ -515,7 +515,7 @@ func TestClusterManagerUpgradeWorkloadClusterSuccess(t *testing.T) {
tt.mocks.provider.EXPECT().MachineDeploymentsToDelete(mCluster, tt.clusterSpec, tt.clusterSpec.DeepCopy()).Return([]string{})
tt.mocks.client.EXPECT().WaitForDeployment(tt.ctx, mCluster, "30m", "Available", gomock.Any(), gomock.Any()).MaxTimes(10)
tt.mocks.client.EXPECT().ValidateControlPlaneNodes(tt.ctx, mCluster, mCluster.Name).Return(nil)
tt.mocks.client.EXPECT().ValidateWorkerNodes(tt.ctx, mCluster.Name, mCluster.KubeconfigFile).Return(nil)
tt.mocks.client.EXPECT().CountMachineDeploymentReplicasReady(tt.ctx, mCluster.Name, mCluster.KubeconfigFile).Return(0, 0, nil)
tt.mocks.provider.EXPECT().GetDeployments()
tt.mocks.writer.EXPECT().Write(mgmtClusterName+"-eks-a-cluster.yaml", gomock.Any(), gomock.Not(gomock.Nil()))
tt.mocks.client.EXPECT().GetEksaOIDCConfig(tt.ctx, tt.clusterSpec.Cluster.Spec.IdentityProviderRefs[0].Name, mCluster.KubeconfigFile, tt.clusterSpec.Cluster.Namespace).Return(nil, nil)
Expand Down Expand Up @@ -551,7 +551,85 @@ func TestClusterManagerUpgradeCloudStackWorkloadClusterSuccess(t *testing.T) {
tt.mocks.provider.EXPECT().MachineDeploymentsToDelete(mCluster, tt.clusterSpec, tt.clusterSpec.DeepCopy()).Return([]string{})
tt.mocks.client.EXPECT().WaitForDeployment(tt.ctx, mCluster, "30m", "Available", gomock.Any(), gomock.Any()).MaxTimes(10)
tt.mocks.client.EXPECT().ValidateControlPlaneNodes(tt.ctx, mCluster, mCluster.Name).Return(nil)
tt.mocks.client.EXPECT().ValidateWorkerNodes(tt.ctx, mCluster.Name, mCluster.KubeconfigFile).Return(nil)
tt.mocks.client.EXPECT().CountMachineDeploymentReplicasReady(tt.ctx, mCluster.Name, mCluster.KubeconfigFile).Return(0, 0, nil)
tt.mocks.provider.EXPECT().GetDeployments()
tt.mocks.writer.EXPECT().Write(mgmtClusterName+"-eks-a-cluster.yaml", gomock.Any(), gomock.Not(gomock.Nil()))
tt.mocks.client.EXPECT().GetEksaOIDCConfig(tt.ctx, tt.clusterSpec.Cluster.Spec.IdentityProviderRefs[0].Name, mCluster.KubeconfigFile, tt.clusterSpec.Cluster.Namespace).Return(nil, nil)
tt.mocks.networking.EXPECT().RunPostControlPlaneUpgradeSetup(tt.ctx, wCluster).Return(nil)

if err := tt.clusterManager.UpgradeCluster(tt.ctx, mCluster, wCluster, tt.clusterSpec, tt.mocks.provider); err != nil {
t.Errorf("ClusterManager.UpgradeCluster() error = %v, wantErr nil", err)
}
}

func TestClusterManagerUpgradeWorkloadClusterWaitForMDReadyErrorOnce(t *testing.T) {
mgmtClusterName := "cluster-name"
workClusterName := "cluster-name-w"

mCluster := &types.Cluster{
Name: mgmtClusterName,
ExistingManagement: true,
}
wCluster := &types.Cluster{
Name: workClusterName,
}

tt := newSpecChangedTest(t)
tt.mocks.client.EXPECT().GetEksaCluster(tt.ctx, mCluster, mgmtClusterName).Return(tt.oldClusterConfig, nil)
tt.mocks.client.EXPECT().GetBundles(tt.ctx, mCluster.KubeconfigFile, mCluster.Name, "").Return(test.Bundles(t), nil)
tt.mocks.client.EXPECT().GetEksdRelease(tt.ctx, gomock.Any(), constants.EksaSystemNamespace, gomock.Any())
tt.mocks.provider.EXPECT().GenerateCAPISpecForUpgrade(tt.ctx, mCluster, mCluster, tt.clusterSpec, tt.clusterSpec.DeepCopy())
tt.mocks.client.EXPECT().ApplyKubeSpecFromBytesWithNamespace(tt.ctx, mCluster, test.OfType("[]uint8"), constants.EksaSystemNamespace).Times(2)
tt.mocks.provider.EXPECT().RunPostControlPlaneUpgrade(tt.ctx, tt.clusterSpec, tt.clusterSpec, wCluster, mCluster)
tt.mocks.client.EXPECT().WaitForControlPlaneReady(tt.ctx, mCluster, "60m", mgmtClusterName).MaxTimes(2)
tt.mocks.client.EXPECT().WaitForControlPlaneNotReady(tt.ctx, mCluster, "1m", mgmtClusterName)
tt.mocks.client.EXPECT().GetMachines(tt.ctx, mCluster, mCluster.Name).Return([]types.Machine{}, nil).Times(2)
tt.mocks.provider.EXPECT().MachineDeploymentsToDelete(mCluster, tt.clusterSpec, tt.clusterSpec.DeepCopy()).Return([]string{})
tt.mocks.client.EXPECT().WaitForDeployment(tt.ctx, mCluster, "30m", "Available", gomock.Any(), gomock.Any()).MaxTimes(10)
tt.mocks.client.EXPECT().ValidateControlPlaneNodes(tt.ctx, mCluster, mCluster.Name).Return(nil)
// Fail once
tt.mocks.client.EXPECT().CountMachineDeploymentReplicasReady(tt.ctx, mCluster.Name, mCluster.KubeconfigFile).Times(1).Return(0, 0, errors.New("error counting MD replicas"))
// Return 1 and 1 for ready and total replicas
tt.mocks.client.EXPECT().CountMachineDeploymentReplicasReady(tt.ctx, mCluster.Name, mCluster.KubeconfigFile).Times(1).Return(1, 1, nil)
tt.mocks.provider.EXPECT().GetDeployments()
tt.mocks.writer.EXPECT().Write(mgmtClusterName+"-eks-a-cluster.yaml", gomock.Any(), gomock.Not(gomock.Nil()))
tt.mocks.client.EXPECT().GetEksaOIDCConfig(tt.ctx, tt.clusterSpec.Cluster.Spec.IdentityProviderRefs[0].Name, mCluster.KubeconfigFile, tt.clusterSpec.Cluster.Namespace).Return(nil, nil)
tt.mocks.networking.EXPECT().RunPostControlPlaneUpgradeSetup(tt.ctx, wCluster).Return(nil)

if err := tt.clusterManager.UpgradeCluster(tt.ctx, mCluster, wCluster, tt.clusterSpec, tt.mocks.provider); err != nil {
t.Errorf("ClusterManager.UpgradeCluster() error = %v, wantErr nil", err)
}
}

func TestClusterManagerUpgradeWorkloadClusterWaitForMDReadyUnreadyOnce(t *testing.T) {
mgmtClusterName := "cluster-name"
workClusterName := "cluster-name-w"

mCluster := &types.Cluster{
Name: mgmtClusterName,
ExistingManagement: true,
}
wCluster := &types.Cluster{
Name: workClusterName,
}

tt := newSpecChangedTest(t)
tt.mocks.client.EXPECT().GetEksaCluster(tt.ctx, mCluster, mgmtClusterName).Return(tt.oldClusterConfig, nil)
tt.mocks.client.EXPECT().GetBundles(tt.ctx, mCluster.KubeconfigFile, mCluster.Name, "").Return(test.Bundles(t), nil)
tt.mocks.client.EXPECT().GetEksdRelease(tt.ctx, gomock.Any(), constants.EksaSystemNamespace, gomock.Any())
tt.mocks.provider.EXPECT().GenerateCAPISpecForUpgrade(tt.ctx, mCluster, mCluster, tt.clusterSpec, tt.clusterSpec.DeepCopy())
tt.mocks.client.EXPECT().ApplyKubeSpecFromBytesWithNamespace(tt.ctx, mCluster, test.OfType("[]uint8"), constants.EksaSystemNamespace).Times(2)
tt.mocks.provider.EXPECT().RunPostControlPlaneUpgrade(tt.ctx, tt.clusterSpec, tt.clusterSpec, wCluster, mCluster)
tt.mocks.client.EXPECT().WaitForControlPlaneReady(tt.ctx, mCluster, "60m", mgmtClusterName).MaxTimes(2)
tt.mocks.client.EXPECT().WaitForControlPlaneNotReady(tt.ctx, mCluster, "1m", mgmtClusterName)
tt.mocks.client.EXPECT().GetMachines(tt.ctx, mCluster, mCluster.Name).Return([]types.Machine{}, nil).Times(2)
tt.mocks.provider.EXPECT().MachineDeploymentsToDelete(mCluster, tt.clusterSpec, tt.clusterSpec.DeepCopy()).Return([]string{})
tt.mocks.client.EXPECT().WaitForDeployment(tt.ctx, mCluster, "30m", "Available", gomock.Any(), gomock.Any()).MaxTimes(10)
tt.mocks.client.EXPECT().ValidateControlPlaneNodes(tt.ctx, mCluster, mCluster.Name).Return(nil)
// Return 0 and 1 for ready and total replicas once
tt.mocks.client.EXPECT().CountMachineDeploymentReplicasReady(tt.ctx, mCluster.Name, mCluster.KubeconfigFile).Times(1).Return(0, 1, nil)
// Return 1 and 1 for ready and total replicas
tt.mocks.client.EXPECT().CountMachineDeploymentReplicasReady(tt.ctx, mCluster.Name, mCluster.KubeconfigFile).Times(1).Return(1, 1, nil)
tt.mocks.provider.EXPECT().GetDeployments()
tt.mocks.writer.EXPECT().Write(mgmtClusterName+"-eks-a-cluster.yaml", gomock.Any(), gomock.Not(gomock.Nil()))
tt.mocks.client.EXPECT().GetEksaOIDCConfig(tt.ctx, tt.clusterSpec.Cluster.Spec.IdentityProviderRefs[0].Name, mCluster.KubeconfigFile, tt.clusterSpec.Cluster.Namespace).Return(nil, nil)
Expand Down Expand Up @@ -660,7 +738,7 @@ func TestClusterManagerUpgradeWorkloadClusterWaitForCAPITimeout(t *testing.T) {
tt.mocks.provider.EXPECT().MachineDeploymentsToDelete(wCluster, tt.clusterSpec, tt.clusterSpec.DeepCopy()).Return([]string{})
tt.mocks.client.EXPECT().WaitForDeployment(tt.ctx, wCluster, "30m", "Available", gomock.Any(), gomock.Any()).Return(errors.New("time out"))
tt.mocks.client.EXPECT().ValidateControlPlaneNodes(tt.ctx, mCluster, wCluster.Name).Return(nil)
tt.mocks.client.EXPECT().ValidateWorkerNodes(tt.ctx, wCluster.Name, mCluster.KubeconfigFile).Return(nil)
tt.mocks.client.EXPECT().CountMachineDeploymentReplicasReady(tt.ctx, wCluster.Name, mCluster.KubeconfigFile).Return(0, 0, nil)
tt.mocks.writer.EXPECT().Write(clusterName+"-eks-a-cluster.yaml", gomock.Any(), gomock.Not(gomock.Nil()))
tt.mocks.client.EXPECT().GetEksaOIDCConfig(tt.ctx, tt.clusterSpec.Cluster.Spec.IdentityProviderRefs[0].Name, tt.cluster.KubeconfigFile, tt.clusterSpec.Cluster.Namespace).Return(nil, nil)
tt.mocks.networking.EXPECT().RunPostControlPlaneUpgradeSetup(tt.ctx, wCluster).Return(nil)
Expand Down Expand Up @@ -692,7 +770,7 @@ func TestClusterManagerMoveCAPISuccess(t *testing.T) {
m.client.EXPECT().GetClusters(ctx, to).Return(clusters, nil)
m.client.EXPECT().WaitForControlPlaneReady(ctx, to, "15m0s", capiClusterName)
m.client.EXPECT().ValidateControlPlaneNodes(ctx, to, to.Name)
m.client.EXPECT().ValidateWorkerNodes(ctx, to.Name, to.KubeconfigFile)
m.client.EXPECT().CountMachineDeploymentReplicasReady(ctx, to.Name, to.KubeconfigFile)
m.client.EXPECT().GetMachines(ctx, to, to.Name)

if err := c.MoveCAPI(ctx, from, to, to.Name, clusterSpec); err != nil {
Expand Down Expand Up @@ -793,7 +871,7 @@ func TestClusterManagerMoveCAPIErrorGetMachines(t *testing.T) {
m.client.EXPECT().MoveManagement(ctx, from, to)
m.client.EXPECT().GetClusters(ctx, to)
m.client.EXPECT().ValidateControlPlaneNodes(ctx, to, to.Name)
m.client.EXPECT().ValidateWorkerNodes(ctx, to.Name, to.KubeconfigFile)
m.client.EXPECT().CountMachineDeploymentReplicasReady(ctx, to.Name, to.KubeconfigFile)
m.client.EXPECT().GetMachines(ctx, to, from.Name).Return(nil, errors.New("error getting machines")).AnyTimes()

if err := c.MoveCAPI(ctx, from, to, from.Name, clusterSpec); err == nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/clustermanager/mocks/client_and_networking.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions pkg/executables/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,24 +480,35 @@ func (k *Kubectl) ValidateControlPlaneNodes(ctx context.Context, cluster *types.

func (k *Kubectl) ValidateWorkerNodes(ctx context.Context, clusterName string, kubeconfig string) error {
logger.V(6).Info("waiting for nodes", "cluster", clusterName)
deployments, err := k.GetMachineDeployments(ctx, WithKubeconfig(kubeconfig), WithNamespace(constants.EksaSystemNamespace))
ready, total, err := k.CountMachineDeploymentReplicasReady(ctx, clusterName, kubeconfig)
if err != nil {
return err
}
if ready != total {
return fmt.Errorf("%d machine deployment replicas are not ready", total-ready)
}
return nil
}

func (k *Kubectl) CountMachineDeploymentReplicasReady(ctx context.Context, clusterName string, kubeconfig string) (ready, total int, err error) {
logger.V(6).Info("counting ready machine deployment replicas", "cluster", clusterName)
deployments, err := k.GetMachineDeployments(ctx, WithKubeconfig(kubeconfig), WithNamespace(constants.EksaSystemNamespace))
if err != nil {
return 0, 0, err
}
for _, machineDeployment := range deployments {
if machineDeployment.Status.Phase != "Running" {
return fmt.Errorf("machine deployment is in %s phase", machineDeployment.Status.Phase)
return 0, 0, fmt.Errorf("machine deployment is in %s phase", machineDeployment.Status.Phase)
}

if machineDeployment.Status.UnavailableReplicas != 0 {
return fmt.Errorf("%v machine deployment replicas are unavailable", machineDeployment.Status.UnavailableReplicas)
return 0, 0, fmt.Errorf("%d machine deployment replicas are unavailable", machineDeployment.Status.UnavailableReplicas)
}

if machineDeployment.Status.ReadyReplicas != machineDeployment.Status.Replicas {
return fmt.Errorf("%v machine deployment replicas are not ready", machineDeployment.Status.Replicas-machineDeployment.Status.ReadyReplicas)
}
ready += int(machineDeployment.Status.ReadyReplicas)
total += int(machineDeployment.Status.Replicas)
}
return nil
return ready, total, nil
}

func (k *Kubectl) VsphereWorkerNodesMachineTemplate(ctx context.Context, clusterName string, kubeconfig string, namespace string) (*vspherev1.VSphereMachineTemplate, error) {
Expand Down
Loading

0 comments on commit 013035c

Please sign in to comment.