Skip to content

Commit

Permalink
Add cleanup phase for kube-proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
g-gaston committed Mar 23, 2023
1 parent 90e780f commit 1e05315
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 27 deletions.
119 changes: 105 additions & 14 deletions pkg/clustermanager/kube_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -24,11 +25,12 @@ import (
)

const (
iptablesLegacyLabel = "anywhere.eks.amazonaws.com/iptableslegacy"
k8sAppLabel = "k8s-app"
kubeProxyLabel = "kube-proxy"
kubeProxyDSName = "kube-proxy"
kubeProxyDSNamespace = "kube-system"
iptablesLegacyLabel = "anywhere.eks.amazonaws.com/iptableslegacy"
iptablesLegacyKubeProxyDSName = "kube-proxy-iptables-legacy"
k8sAppLabel = "k8s-app"
kubeProxyLabel = "kube-proxy"
kubeProxyDSName = "kube-proxy"
kubeProxyDSNamespace = "kube-system"
)

var firstEKSDWithNewKubeProxy = map[anywherev1.KubernetesVersion]int{
Expand Down Expand Up @@ -60,6 +62,9 @@ func NewKubeProxyCLIUpgrader(log logr.Logger, factory ClientFactory, opts ...Kub
}

// KubeProxyCLIUpgrader prepares a cluster for a kube-proxy upgrade.
// It's mostly a wrapper around [KubeProxyUpgrader] to be used from the CLI.
// It builds clients from kubeconfig files and facilitates mocking. It also uses a retrier
// around [KubeProxyCLIUpgrader] to deal with transient errors.
type KubeProxyCLIUpgrader struct {
clientFactory ClientFactory
log logr.Logger
Expand All @@ -78,32 +83,61 @@ func KubeProxyCLIUpgraderRetrier(retrier retrier.Retrier) KubeProxyCLIUpgraderOp
}

// PrepareUpgrade perfoms the necessary steps prior to a kube-proxy upgrade.
// It's mostly a wrapper around [PrepareKubeProxyForUpgrade] to be used from the CLI.
// It builds clients from kubeconfig files and facilitates mocking. It also uses a retrier
// around [PrepareKubeProxyForUpgrade] to deal with transient errors.
func (u KubeProxyCLIUpgrader) PrepareUpgrade(ctx context.Context,
spec *cluster.Spec,
managementClusterKubeconfigPath, workloadClusterKubeconfigPath string,
) error {
u.log.V(4).Info("Building client for management cluster", "kubeconfig", managementClusterKubeconfigPath)
managementClusterClient, err := u.clientFactory.BuildClientFromKubeconfig(managementClusterKubeconfigPath)
managementClusterClient, workloadClusterClient, err := u.buildClients(
managementClusterKubeconfigPath, workloadClusterKubeconfigPath,
)
if err != nil {
return err
}

u.log.V(4).Info("Building client for workload cluster", "kubeconfig", workloadClusterKubeconfigPath)
workloadClusterClient, err := u.clientFactory.BuildClientFromKubeconfig(workloadClusterKubeconfigPath)
up := NewKubeProxyUpgrader()

return u.retrier.Retry(func() error {
return up.PrepareForUpgrade(ctx, u.log, managementClusterClient, workloadClusterClient, spec)
})
}

// CleanupAfterUpgrade perfoms the necessary steps after an upgrade.
func (u KubeProxyCLIUpgrader) CleanupAfterUpgrade(ctx context.Context,
spec *cluster.Spec,
managementClusterKubeconfigPath, workloadClusterKubeconfigPath string,
) error {
managementClusterClient, workloadClusterClient, err := u.buildClients(
managementClusterKubeconfigPath, workloadClusterKubeconfigPath,
)
if err != nil {
return err
}

up := NewKubeProxyUpgrader()

return u.retrier.Retry(func() error {
return up.PrepareForUpgrade(ctx, u.log, managementClusterClient, workloadClusterClient, spec)
return up.CleanupAfterUpgrade(ctx, u.log, managementClusterClient, workloadClusterClient, spec)
})
}

func (u KubeProxyCLIUpgrader) buildClients(
managementClusterKubeconfigPath, workloadClusterKubeconfigPath string,
) (managementClusterClient, workloadClusterClient client.Client, err error) {
u.log.V(4).Info("Building client for management cluster", "kubeconfig", managementClusterKubeconfigPath)
managementClusterClient, err = u.clientFactory.BuildClientFromKubeconfig(managementClusterKubeconfigPath)
if err != nil {
return nil, nil, err
}

u.log.V(4).Info("Building client for workload cluster", "kubeconfig", workloadClusterKubeconfigPath)
workloadClusterClient, err = u.clientFactory.BuildClientFromKubeconfig(workloadClusterKubeconfigPath)
if err != nil {
return nil, nil, err
}

return managementClusterClient, workloadClusterClient, nil
}

// NewKubeProxyUpgrader builds a new KubeProxyUpgrader.
func NewKubeProxyUpgrader(opts ...KubeProxyUpgraderOpt) KubeProxyUpgrader {
u := &KubeProxyUpgrader{
Expand Down Expand Up @@ -194,6 +228,48 @@ func (u KubeProxyUpgrader) PrepareForUpgrade(ctx context.Context, log logr.Logge
return nil
}

// CleanupAfterUpgrade cleanups all the leftover changes made by PrepareForUpgrade.
// It's idempotent so it can be call multiple timesm even if PrepareForUpgrade wasn't
// called before.
func (u KubeProxyUpgrader) CleanupAfterUpgrade(ctx context.Context, log logr.Logger, managementClusterClient, workloadClusterClient client.Client, spec *cluster.Spec) error {
log.V(4).Info("Deleting iptables legacy kube-proxy", "name", iptablesLegacyKubeProxyDSName)
if err := deleteIPTablesLegacyKubeProxy(ctx, workloadClusterClient); err != nil {
return err
}

// Remove nodeAffinity from original kube-proxy. It's not strcitly necessary since there
// won't be more nodes with that label, but it prevents future errors.
kubeProxy, err := getKubeProxy(ctx, workloadClusterClient)
if err != nil {
return err
}
if kubeProxy.Spec.Template.Spec.Affinity != nil {
kubeProxy.Spec.Template.Spec.Affinity = nil
log.V(4).Info("Removing node-affinity from kube-proxy")
if err := workloadClusterClient.Update(ctx, kubeProxy); err != nil {
return errors.Wrap(err, "updating main kube-proxy version to remove nodeAffinity")
}
}

// Remove the skip annotation from the kubeadm control plane so it start reconciling the kube-proxy again
kcp, err := controller.KubeadmControlPlane(ctx, managementClusterClient, spec.Cluster)
if err != nil {
return errors.Wrap(err, "reading the kubeadm control plane to cleanup the skip annotations")
}

if _, ok := kcp.Annotations[controlplanev1.SkipKubeProxyAnnotation]; !ok {
return nil
}

delete(kcp.Annotations, controlplanev1.SkipKubeProxyAnnotation)
log.V(4).Info("Removing skip kube-proxy annotation from KubeadmControlPlane")
if err := managementClusterClient.Update(ctx, kcp); err != nil {
return errors.Wrap(err, "preparing kcp for kube-proxy upgrade")
}

return nil
}

func specIncludesNewKubeProxy(spec *cluster.Spec) bool {
return eksdIncludesNewKubeProxy(spec.Cluster.Spec.KubernetesVersion, spec.VersionsBundle.KubeDistro.EKSD.Number)
}
Expand Down Expand Up @@ -334,7 +410,7 @@ func iptablesLegacyKubeProxyFromCurrentDaemonSet(kcp *controlplanev1.KubeadmCont

// Generate a new DS with the old kube-proxy version with nodeAffinity so it only
// gets scheduled in the old (current) nodes.
iptablesLegacyKubeProxy.Name = "kube-proxy-iptables-legacy"
iptablesLegacyKubeProxy.Name = iptablesLegacyKubeProxyDSName
iptablesLegacyKubeProxy.ObjectMeta.ResourceVersion = ""
iptablesLegacyKubeProxy.ObjectMeta.UID = ""
image := kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.ImageRepository + "/kube-proxy" +
Expand Down Expand Up @@ -369,6 +445,21 @@ func createIPTablesLegacyKubeProxy(ctx context.Context, client client.Client, kc
return nil
}

func deleteIPTablesLegacyKubeProxy(ctx context.Context, client client.Client) error {
iptablesLegacyKubeProxy := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: iptablesLegacyKubeProxyDSName,
Namespace: kubeProxyDSNamespace,
},
}

if err := client.Delete(ctx, iptablesLegacyKubeProxy); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrap(err, "deleting secondary kube-proxy DS with iptables-legacy")
}

return nil
}

func updateKubeProxyVersion(ctx context.Context, client client.Client, kubeProxy *appsv1.DaemonSet, image string) error {
kubeProxy.Spec.Template.Spec.Containers[0].Image = image
if err := client.Update(ctx, kubeProxy); err != nil {
Expand Down
90 changes: 77 additions & 13 deletions pkg/clustermanager/kube_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@ import (
"github.com/aws/eks-anywhere/pkg/clustermanager"
"github.com/aws/eks-anywhere/pkg/clustermanager/mocks"
"github.com/aws/eks-anywhere/pkg/constants"
"github.com/aws/eks-anywhere/pkg/controller/clientutil"
"github.com/aws/eks-anywhere/pkg/retrier"
)

type prepareKubeProxyTest struct {
ctx context.Context
log logr.Logger
spec *cluster.Spec
kcp *controlplanev1.KubeadmControlPlane
kubeProxy *appsv1.DaemonSet
nodeCP *corev1.Node
nodeWorker *corev1.Node
kubeProxyCP *corev1.Pod
kubeProxyWorker *corev1.Pod
managementClient client.Client
workloadClient client.Client
ctx context.Context
log logr.Logger
spec *cluster.Spec
kcp *controlplanev1.KubeadmControlPlane
kubeProxy *appsv1.DaemonSet
nodeCP *corev1.Node
nodeWorker *corev1.Node
kubeProxyCP *corev1.Pod
kubeProxyWorker *corev1.Pod
managementClient client.Client
workloadClient client.Client
workloadClusterExtraObjects []client.Object
}

func newPrepareKubeProxyTest() *prepareKubeProxyTest {
Expand Down Expand Up @@ -117,13 +119,16 @@ func newPrepareKubeProxyTest() *prepareKubeProxyTest {
func (tt *prepareKubeProxyTest) initClients(tb testing.TB) {
tt.managementClient = fake.NewClientBuilder().WithObjects(tt.kcp).Build()

tt.workloadClient = fake.NewClientBuilder().WithObjects(
objs := []client.Object{
tt.kubeProxy,
tt.kubeProxyCP,
tt.kubeProxyWorker,
tt.nodeCP,
tt.nodeWorker,
).Build()
}
objs = append(objs, tt.workloadClusterExtraObjects...)

tt.workloadClient = fake.NewClientBuilder().WithObjects(objs...).Build()
}

// startKCPControllerEmulator stars a routine that reverts the kube-proxy
Expand Down Expand Up @@ -313,6 +318,47 @@ func TestKubeProxyUpgraderPrepareForUpgradeNewSpecHasOldKubeProxy(t *testing.T)
workloadAPI.ShouldEventuallyNotExist(tt.ctx, legacyKubeProxy)
}

func TestKubeProxyUpgraderCleanupAfterUpgradeSuccessWithReentry(t *testing.T) {
g := NewWithT(t)
tt := newPrepareKubeProxyTest()
tt.workloadClusterExtraObjects = append(tt.workloadClusterExtraObjects, &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-proxy-iptables-legacy",
Namespace: "kube-system",
},
})
tt.kubeProxy.Spec.Template.Spec.Affinity = &corev1.Affinity{}
clientutil.AddAnnotation(tt.kcp, controlplanev1.SkipKubeProxyAnnotation, "true")
tt.initClients(t)
u := clustermanager.NewKubeProxyUpgrader()

g.Expect(
u.CleanupAfterUpgrade(tt.ctx, tt.log, tt.managementClient, tt.workloadClient, tt.spec),
).To(Succeed())

managementAPI := envtest.NewAPIExpecter(t, tt.managementClient)
managementAPI.ShouldEventuallyMatch(tt.ctx, tt.kcp, func(g Gomega) {
g.Expect(tt.kcp.Annotations).NotTo(HaveKeyWithValue(controlplanev1.SkipKubeProxyAnnotation, "true"))
})

workloadAPI := envtest.NewAPIExpecter(t, tt.workloadClient)
workloadAPI.ShouldEventuallyMatch(tt.ctx, tt.kubeProxy, func(g Gomega) {
g.Expect(tt.kubeProxy.Spec.Template.Spec.Affinity).To(BeNil())
})

legacyKubeProxy := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-proxy-iptables-legacy",
Namespace: "kube-system",
},
}
workloadAPI.ShouldEventuallyNotExist(tt.ctx, legacyKubeProxy)

g.Expect(
u.CleanupAfterUpgrade(tt.ctx, tt.log, tt.managementClient, tt.workloadClient, tt.spec),
).To(Succeed())
}

func TestEKSDVersionAndNumberFromTag(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -512,3 +558,21 @@ func TestKubeProxyCLIUpgraderPrepareUpgradeErrorInPrepare(t *testing.T) {
u.PrepareUpgrade(tt.ctx, tt.spec, managementKubeConfig, workloadKubeConfig),
).To(MatchError(ContainSubstring("reading the kubeadm control plane for an upgrade")))
}

func TestKubeProxyCLIUpgraderCleanupAfterUpgradeSuccess(t *testing.T) {
g := NewWithT(t)
tt := newPrepareKubeProxyTest()
tt.initClients(t)
managementKubeConfig := "mngmt.yaml"
workloadKubeConfig := "workload.yaml"
ctrl := gomock.NewController(t)
factory := mocks.NewMockClientFactory(ctrl)
factory.EXPECT().BuildClientFromKubeconfig(managementKubeConfig).Return(tt.managementClient, nil)
factory.EXPECT().BuildClientFromKubeconfig(workloadKubeConfig).Return(tt.workloadClient, nil)

u := clustermanager.NewKubeProxyCLIUpgrader(test.NewNullLogger(), factory)

g.Expect(
u.CleanupAfterUpgrade(tt.ctx, tt.spec, managementKubeConfig, workloadKubeConfig),
).To(Succeed())
}
1 change: 1 addition & 0 deletions pkg/workflows/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@ type PackageInstaller interface {
// ClusterUpgrader prepares the cluster for an upgrade.
type ClusterUpgrader interface {
PrepareUpgrade(ctx context.Context, spec *cluster.Spec, managementClusterKubeconfigPath, workloadClusterKubeconfigPath string) error
CleanupAfterUpgrade(ctx context.Context, spec *cluster.Spec, managementClusterKubeconfigPath, workloadClusterKubeconfigPath string) error
}
14 changes: 14 additions & 0 deletions pkg/workflows/interfaces/mocks/clients.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/workflows/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,16 @@ func (s *upgradeWorkloadClusterTask) Run(ctx context.Context, commandContext *ta
return &CollectDiagnosticsTask{}
}

if err := commandContext.ClusterUpgrader.CleanupAfterUpgrade(
ctx,
commandContext.ClusterSpec,
commandContext.ManagementCluster.KubeconfigFile,
commandContext.WorkloadCluster.KubeconfigFile,
); err != nil {
commandContext.SetError(err)
return &CollectDiagnosticsTask{}
}

if commandContext.UpgradeChangeDiff.Changed() {
if err = commandContext.ClusterManager.ApplyBundles(ctx, commandContext.ClusterSpec, eksaManagementCluster); err != nil {
commandContext.SetError(err)
Expand Down
5 changes: 5 additions & 0 deletions pkg/workflows/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (c *upgradeTestSetup) expectUpgradeWorkload(managementCluster *types.Cluste
calls := []*gomock.Call{
c.expectPrepareUpgradeWorkload(managementCluster, workloadCluster),
c.expectUpgradeWorkloadToReturn(managementCluster, workloadCluster, nil),
c.clusterUpgrader.EXPECT().CleanupAfterUpgrade(c.ctx,
c.newClusterSpec,
managementCluster.KubeconfigFile, //nolint
workloadCluster.KubeconfigFile,
),
}

if managementCluster != nil && managementCluster.ExistingManagement {
Expand Down

0 comments on commit 1e05315

Please sign in to comment.