From 988bb5b24bafcc7cc0fdd208a4f7200ceaf08d03 Mon Sep 17 00:00:00 2001 From: Abhishek Dasgupta Date: Mon, 25 Jul 2022 13:15:26 +0530 Subject: [PATCH] Added logics for quorum loss scenario. --- controllers/cluster_mgmt_controller.go | 192 ++++++++++++++++++++ controllers/cluster_mgmt_controller_test.go | 183 +++++++++++++++++++ pkg/component/etcd/lease/lease_member.go | 2 +- pkg/component/etcd/lease/lease_test.go | 2 +- pkg/predicate/predicate.go | 27 +++ 5 files changed, 404 insertions(+), 2 deletions(-) create mode 100644 controllers/cluster_mgmt_controller.go create mode 100644 controllers/cluster_mgmt_controller_test.go diff --git a/controllers/cluster_mgmt_controller.go b/controllers/cluster_mgmt_controller.go new file mode 100644 index 000000000..e243d8d2c --- /dev/null +++ b/controllers/cluster_mgmt_controller.go @@ -0,0 +1,192 @@ +// Copyright (c) 2022 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "context" + "fmt" + "time" + + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/go-logr/logr" + appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" + + druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1" + controllersconfig "github.com/gardener/etcd-druid/controllers/config" + "github.com/gardener/etcd-druid/pkg/health/etcdmember" + druidpredicates "github.com/gardener/etcd-druid/pkg/predicate" + "github.com/gardener/gardener/pkg/controllerutils" + "github.com/gardener/gardener/pkg/utils/imagevector" + kutil "github.com/gardener/gardener/pkg/utils/kubernetes" +) + +const clusterMgmtControllerName = "cluster-mgmt-controller" + +// ClusterMgmtController reconciles ETCD multinode cluster +type ClusterMgmtController struct { + client.Client + logger logr.Logger + ImageVector imagevector.ImageVector +} + +// NewClusterMgmtController creates a new ClusterMgmtController object +func NewClusterMgmtController(mgr manager.Manager) *ClusterMgmtController { + return &ClusterMgmtController{ + Client: mgr.GetClient(), + logger: log.Log.WithName("cluster-mgmt-controller"), + } +} + +// SetupWithManager sets up manager with a new controller and cmc as the reconcile.Reconciler +func (cmc *ClusterMgmtController) SetupWithManager(mgr ctrl.Manager, workers int) error { + + ctrl, err := controller.New(clusterMgmtControllerName, mgr, controller.Options{ + Reconciler: cmc, + MaxConcurrentReconciles: workers, + }) + if err != nil { + return err + } + + return ctrl.Watch( + &source.Kind{Type: &coordinationv1.Lease{}}, + &handler.EnqueueRequestForOwner{OwnerType: &druidv1alpha1.Etcd{}, IsController: true}, + // druidpredicates.LeaseHolderIdentityChange(), + druidpredicates.IsMemberLease(), + ) +} + +// +kubebuilder:rbac:groups=druid.gardener.cloud,resources=etcds,verbs=get;list;watch +// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;patch;delete + +// Reconcile reconciles the multinode ETCD cluster. +func (cmc *ClusterMgmtController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + cmc.logger.Info("Cluster management controller reconciliation started") + etcd := &druidv1alpha1.Etcd{} + if err := cmc.Get(ctx, req.NamespacedName, etcd); err != nil { + if errors.IsNotFound(err) { + // Object not found, return. Created objects are automatically garbage collected. + // For additional cleanup logic use finalizers. + return ctrl.Result{}, nil + } + // Error reading the object - requeue the request. + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, err + } + + logger := cmc.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String()) + + // run a loop every 5 minutes that will monitor the cluster health and take action if members in the etcd cluster are down + for { + if !etcd.DeletionTimestamp.IsZero() { + return ctrl.Result{Requeue: false}, nil + } + + unknownThreshold := 300 * time.Second + notReadyThreshold := 60 * time.Second + + checker := etcdmember.ReadyCheck(cmc.Client, logger, controllersconfig.EtcdCustodianController{ + EtcdMember: controllersconfig.EtcdMemberConfig{ + EtcdMemberNotReadyThreshold: notReadyThreshold, + EtcdMemberUnknownThreshold: unknownThreshold, + }, + }) + + results := checker.Check(context.Background(), *etcd) + + totalReadyMembers := 0 + + for _, result := range results { + if result.Status() == "LeaseSucceeded" { + totalReadyMembers = totalReadyMembers + 1 + } + } + + quorum := int(etcd.Spec.Replicas)/2 + 1 + + if totalReadyMembers < quorum { + // scale down the statefulset to 0 + sts := &appsv1.StatefulSet{} + err := cmc.Get(ctx, types.NamespacedName{Name: etcd.Name, Namespace: etcd.Namespace}, sts) + if err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not fetch statefulset: %v", err) + } + + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error { + sts.Spec.Replicas = pointer.Int32(0) + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale down statefulset to 0 : %v", err) + } + + // delete the pvcs + if err := cmc.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, + client.InNamespace(sts.GetNamespace()), + client.MatchingLabels(getMatchingLabels(sts))); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not delete pvcs : %v", err) + } + + // scale up the statefulset to 1 + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error { + sts.Spec.Replicas = pointer.Int32(1) + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale up statefulset to 1 : %v", err) + } + + // scale up the statefulset to ETCD replicas + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error { + sts.Spec.Replicas = &etcd.Spec.Replicas + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale up statefulset to replica number : %v", err) + } + + continue + } + } +} + +func getMatchingLabels(sts *appsv1.StatefulSet) map[string]string { + labels := make(map[string]string) + + labels["name"] = sts.Labels["name"] + labels["instance"] = sts.Labels["instance"] + + return labels +} diff --git a/controllers/cluster_mgmt_controller_test.go b/controllers/cluster_mgmt_controller_test.go new file mode 100644 index 000000000..63a9d8e48 --- /dev/null +++ b/controllers/cluster_mgmt_controller_test.go @@ -0,0 +1,183 @@ +// Copyright (c) 2022 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package controllers + +import ( + "context" + "fmt" + "time" + + druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1" + "github.com/gardener/gardener/pkg/controllerutils" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" +) + +var ( + unknownThreshold = 300 * time.Second + notReadyThreshold = 60 * time.Second +) + +var _ = FDescribe("Cluster Management Controller", func() { + FContext("when quorum is lost for multinode ETCD cluster", func() { + var ( + err error + instance *druidv1alpha1.Etcd + c client.Client + s *appsv1.StatefulSet + cm *corev1.ConfigMap + svc *corev1.Service + now time.Time + longExpirationTime = metav1.NewMicroTime(now.Add(-1 * unknownThreshold).Add(-1 * time.Second).Add(-1 * notReadyThreshold)) + ) + BeforeEach(func() { + instance = getMultinodeEtcdDefault("foo444", "default") + c = mgr.GetClient() + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: instance.Namespace, + }, + } + + _, err = controllerutil.CreateOrUpdate(context.TODO(), c, &ns, func() error { return nil }) + Expect(err).To(Not(HaveOccurred())) + + err = c.Create(context.TODO(), instance) + Expect(err).NotTo(HaveOccurred()) + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsCorrectlyReconciled(c, instance, s) }, timeout, pollingInterval).Should(BeNil()) + cm = &corev1.ConfigMap{} + Eventually(func() error { return configMapIsCorrectlyReconciled(c, instance, cm) }, timeout, pollingInterval).Should(BeNil()) + svc = &corev1.Service{} + Eventually(func() error { return clientServiceIsCorrectlyReconciled(c, instance, svc) }, timeout, pollingInterval).Should(BeNil()) + }) + FIt("when renew time of member leases expired", func() { + // Deliberately update the first member lease + memberLease := &coordinationv1.Lease{} + Eventually(func() error { return fetchMemberLease(c, instance, memberLease, 1) }, timeout, pollingInterval).Should(BeNil()) + err = controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, memberLease, func() error { + memberLease.Spec.RenewTime = &longExpirationTime + return nil + }) + + // Deliberately update the second member lease + memberLease = &coordinationv1.Lease{} + Eventually(func() error { return fetchMemberLease(c, instance, memberLease, 2) }, timeout, pollingInterval).Should(BeNil()) + err = controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, memberLease, func() error { + memberLease.Spec.RenewTime = &longExpirationTime + return nil + }) + + // Check if statefulset replicas is scaled down to 0 + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, 0) }, timeout, pollingInterval).Should(BeNil()) + + // Check if statefulset replicas is scaled up to 1 + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, 1) }, timeout, pollingInterval).Should(BeNil()) + + // Check if statefulset replicas is scaled up to etcd replicas + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, instance.Spec.Replicas) }, timeout, pollingInterval).Should(BeNil()) + + }) + + AfterEach(func() { + Expect(c.Delete(context.TODO(), instance)).To(Succeed()) + Eventually(func() error { return statefulSetRemoved(c, s) }, timeout, pollingInterval).Should(BeNil()) + Eventually(func() error { return etcdRemoved(c, instance) }, timeout, pollingInterval).Should(BeNil()) + }) + }) +}) + +func getMultinodeEtcdDefault(name, namespace string) *druidv1alpha1.Etcd { + instance := &druidv1alpha1.Etcd{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: druidv1alpha1.EtcdSpec{ + Annotations: map[string]string{ + "app": "etcd-statefulset", + "role": "test", + "instance": name, + }, + Labels: map[string]string{ + "name": "etcd", + "instance": name, + }, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "etcd", + "instance": name, + }, + }, + Replicas: 3, + Backup: druidv1alpha1.BackupSpec{}, + Etcd: druidv1alpha1.EtcdConfig{}, + Common: druidv1alpha1.SharedConfig{}, + }, + } + return instance +} + +func fetchMemberLease(c client.Client, instance *druidv1alpha1.Etcd, lease *coordinationv1.Lease, replica int) error { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + req := types.NamespacedName{ + Name: memberLeaseName(instance.Name, replica), + Namespace: instance.Namespace, + } + + if err := c.Get(ctx, req, lease); err != nil { + return err + } + + if !checkEtcdOwnerReference(lease.GetOwnerReferences(), instance) { + return fmt.Errorf("ownerReference does not exists for lease") + } + return nil +} + +func memberLeaseName(etcdName string, replica int) string { + return fmt.Sprintf("%s-%d-member", etcdName, replica) +} + +func statefulsetIsScaled(c client.Client, instance *druidv1alpha1.Etcd, ss *appsv1.StatefulSet, replicas int32) error { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + req := types.NamespacedName{ + Name: instance.Name, + Namespace: instance.Namespace, + } + + if err := c.Get(ctx, req, ss); err != nil { + return err + } + + stsReplicas := *ss.Spec.Replicas + if stsReplicas != replicas { + return fmt.Errorf("statefulset replicas are yet %d instead of %d", stsReplicas, replicas) + } + + return nil +} diff --git a/pkg/component/etcd/lease/lease_member.go b/pkg/component/etcd/lease/lease_member.go index ce7689846..b334ee840 100644 --- a/pkg/component/etcd/lease/lease_member.go +++ b/pkg/component/etcd/lease/lease_member.go @@ -98,5 +98,5 @@ func getMemberLeaseLabels(val Values) map[string]string { } func memberLeaseName(etcdName string, replica int) string { - return fmt.Sprintf("%s-%d", etcdName, replica) + return fmt.Sprintf("%s-%d-member", etcdName, replica) } diff --git a/pkg/component/etcd/lease/lease_test.go b/pkg/component/etcd/lease/lease_test.go index c81238891..789f2e124 100644 --- a/pkg/component/etcd/lease/lease_test.go +++ b/pkg/component/etcd/lease/lease_test.go @@ -248,7 +248,7 @@ func checkMemberLeases(ctx context.Context, c client.Client, etcd *druidv1alpha1 func memberLeases(name string, etcdUID types.UID, replicas int32) []interface{} { var elements []interface{} for i := 0; i < int(replicas); i++ { - elements = append(elements, matchLeaseElement(fmt.Sprintf("%s-%d", name, i), name, etcdUID)) + elements = append(elements, matchLeaseElement(fmt.Sprintf("%s-%d-member", name, i), name, etcdUID)) } return elements diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index 145d22723..a3d28c93d 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -216,3 +216,30 @@ func IsSnapshotLease() predicate.Predicate { }, } } + +// IsMemberLease is a predicate that is `true` if the passed lease object is a member lease. +func IsMemberLease() predicate.Predicate { + isMemberLease := func(obj client.Object) bool { + lease, ok := obj.(*coordinationv1.Lease) + if !ok { + return false + } + + return strings.HasSuffix(lease.Name, "member") + } + + return predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + return isMemberLease(event.Object) + }, + UpdateFunc: func(event event.UpdateEvent) bool { + return isMemberLease(event.ObjectNew) + }, + GenericFunc: func(event event.GenericEvent) bool { + return isMemberLease(event.Object) + }, + DeleteFunc: func(event event.DeleteEvent) bool { + return isMemberLease(event.Object) + }, + } +}