diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 4fb453b04..425eb3e38 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -7,151 +7,69 @@ metadata: name: manager-role rules: - apiGroups: - - "" + - batch resources: - - pods + - jobs verbs: - - list - - watch + - create - delete -- apiGroups: - - "" - resources: - - secrets - - endpoints - verbs: - get - list - patch - update - watch - apiGroups: - - "" + - coordination.k8s.io resources: - - events + - leases verbs: - create + - delete + - deletecollection - get - list - - watch - patch - update -- apiGroups: - - "" - resources: - - serviceaccounts - verbs: - - get - - list - watch - - create - - update - - patch - - delete - apiGroups: - - rbac.authorization.k8s.io + - druid.gardener.cloud resources: - - roles - - rolebindings + - etcds verbs: - - get - - list - - watch - create - - update - - patch - delete -- apiGroups: - - "" - - apps - resources: - - services - - configmaps - - statefulsets - verbs: - get - list - patch - update - watch - - create - - delete -- apiGroups: - - batch - resources: - - jobs - verbs: - - get - - list - - watch - - create - - update - - patch - - delete -- apiGroups: - - batch - resources: - - cronjobs - verbs: - - get - - list - - watch - - delete -- apiGroups: - - druid.gardener.cloud - resources: - - etcds - - etcdcopybackupstasks - verbs: - - get - - list - - watch - - create - - update - - patch - - delete - apiGroups: - druid.gardener.cloud resources: - etcds/status - - etcds/finalizers - - etcdcopybackupstasks/status - - etcdcopybackupstasks/finalizers verbs: - get - - update - patch - - create -- apiGroups: - - coordination.k8s.io - resources: - - leases - verbs: - - get - - list - - watch - - create - update - - patch - - delete - - deletecollection - apiGroups: - - "" + - druid.gardener.cloud resources: - - persistentvolumeclaims + - secrets verbs: - get - list + - patch + - update - watch - apiGroups: - policy resources: - poddisruptionbudgets verbs: + - create + - delete - get - list - - watch - - create - - update - patch - - delete + - update + - watch diff --git a/controllers/controllers_suite_test.go b/controllers/controllers_suite_test.go index 147c871a1..5ad5f8bec 100644 --- a/controllers/controllers_suite_test.go +++ b/controllers/controllers_suite_test.go @@ -51,6 +51,7 @@ var ( mgrStopped *sync.WaitGroup activeDeadlineDuration time.Duration + waitDuration time.Duration backupCompactionSchedule = "15 */24 * * *" revertFns []func() @@ -102,7 +103,9 @@ var _ = BeforeSuite(func(done Done) { }) Expect(err).NotTo(HaveOccurred()) - er, err := NewEtcdReconcilerWithImageVector(mgr, false) + waitDuration, err = time.ParseDuration("10s") + Expect(err).NotTo(HaveOccurred()) + er, err := NewEtcdReconcilerWithImageVector(mgr, false, waitDuration) Expect(err).NotTo(HaveOccurred()) err = er.SetupWithManager(mgr, 5, true) @@ -114,8 +117,9 @@ var _ = BeforeSuite(func(done Done) { Expect(err).NotTo(HaveOccurred()) custodian := NewEtcdCustodian(mgr, controllersconfig.EtcdCustodianController{ + SyncPeriod: 10 * time.Second, EtcdMember: controllersconfig.EtcdMemberConfig{ - EtcdMemberNotReadyThreshold: 1 * time.Minute, + EtcdMemberNotReadyThreshold: 20 * time.Second, }, }) @@ -138,7 +142,7 @@ var _ = BeforeSuite(func(done Done) { }) Expect(err).NotTo(HaveOccurred()) - err = lc.SetupWithManager(mgr, 1) + err = lc.SetupWithManager(mgr, 5) Expect(err).NotTo(HaveOccurred()) mgrStopped = startTestManager(mgrCtx, mgr) diff --git a/controllers/etcd_controller.go b/controllers/etcd_controller.go index 61380df34..351234125 100644 --- a/controllers/etcd_controller.go +++ b/controllers/etcd_controller.go @@ -106,31 +106,25 @@ type EtcdReconciler struct { ImageVector imagevector.ImageVector logger logr.Logger disableEtcdServiceAccountAutomount bool -} - -// NewReconcilerWithImageVector creates a new EtcdReconciler object with an image vector -func NewReconcilerWithImageVector(mgr manager.Manager, disableEtcdServiceAccountAutomount bool) (*EtcdReconciler, error) { - etcdReconciler, err := NewEtcdReconciler(mgr, disableEtcdServiceAccountAutomount) - if err != nil { - return nil, err - } - return etcdReconciler.InitializeControllerWithImageVector() + //waitTimeForTests allow some waiting time between certain operations. This variable help some unit test cases + waitTimeForTests time.Duration } // NewEtcdReconciler creates a new EtcdReconciler object -func NewEtcdReconciler(mgr manager.Manager, disableEtcdServiceAccountAutomount bool) (*EtcdReconciler, error) { +func NewEtcdReconciler(mgr manager.Manager, disableEtcdServiceAccountAutomount bool, waitTimeForTests time.Duration) (*EtcdReconciler, error) { return (&EtcdReconciler{ Client: mgr.GetClient(), Config: mgr.GetConfig(), Scheme: mgr.GetScheme(), logger: log.Log.WithName("etcd-controller"), disableEtcdServiceAccountAutomount: disableEtcdServiceAccountAutomount, + waitTimeForTests: waitTimeForTests, }).InitializeControllerWithChartApplier() } // NewEtcdReconcilerWithImageVector creates a new EtcdReconciler object -func NewEtcdReconcilerWithImageVector(mgr manager.Manager, disableEtcdServiceAccountAutomount bool) (*EtcdReconciler, error) { - ec, err := NewEtcdReconciler(mgr, disableEtcdServiceAccountAutomount) +func NewEtcdReconcilerWithImageVector(mgr manager.Manager, disableEtcdServiceAccountAutomount bool, waitTimeForTests time.Duration) (*EtcdReconciler, error) { + ec, err := NewEtcdReconciler(mgr, disableEtcdServiceAccountAutomount, waitTimeForTests) if err != nil { return nil, err } @@ -212,6 +206,7 @@ func buildPredicate(ignoreOperationAnnotation bool) predicate.Predicate { return predicate.Or( druidpredicates.HasOperationAnnotation(), + druidpredicates.HasQuorumLossAnnotation(), druidpredicates.LastOperationNotSuccessful(), extensionspredicate.IsDeleting(), ) @@ -239,6 +234,67 @@ func (r *EtcdReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. if !etcd.DeletionTimestamp.IsZero() { return r.delete(ctx, etcd) } + + // Check if annotation for quorum loss is present in the ETCD annotaions + // if yes, take necessarry actions + if val, ok := etcd.Spec.Annotations[druidpredicates.QuorumLossAnnotation]; ok { + if val == "true" { + // scale down the statefulset to 0 + sts := &appsv1.StatefulSet{} + err := r.Get(ctx, req.NamespacedName, sts) + if err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not fetch statefulset though the annotaion action/quorum-loss is set in ETCD CR: %v", err) + } + + r.logger.Info("Scaling down the statefulset to 0 while tackling quorum loss scenario in ETCD multi node cluster") + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, r.Client, sts, func() error { + sts.Spec.Replicas = pointer.Int32(0) + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale down statefulset to 0 while tackling quorum loss scenario in ETCD multi node cluster: %v", err) + } + time.Sleep(r.waitTimeForTests) + + r.logger.Info("Deleting PVCs while tackling quorum loss scenario in ETCD multi node cluster") + // delete the pvcs + if err := r.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, + client.InNamespace(sts.GetNamespace()), + client.MatchingLabels(getMatchingLabels(sts))); client.IgnoreNotFound(err) != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not delete pvcs while tackling quorum loss scenario in ETCD multi node cluster : %v", err) + } + + r.logger.Info("Scaling up the statefulset to 1 while tackling quorum loss scenario in ETCD multi node cluster") + // scale up the statefulset to 1 + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, r.Client, sts, func() error { + sts.Spec.Replicas = pointer.Int32(1) + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale up statefulset to 1 while tackling quorum loss scenario in ETCD multi node cluster : %v", err) + } + time.Sleep(r.waitTimeForTests) + + // scale up the statefulset to ETCD replicas + r.logger.Info("Scaling up the statefulset to the number of replicas mentioned in ETCD spec") + if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, r.Client, sts, func() error { + sts.Spec.Replicas = &etcd.Spec.Replicas + return nil + }); err != nil { + return ctrl.Result{ + RequeueAfter: 10 * time.Second, + }, fmt.Errorf("cound not scale up statefulset to replica number while tackling quorum loss scenario in ETCD multi node cluster : %v", err) + } + etcd.Spec.Annotations["action/quorum-loss"] = "false" + } + } + return r.reconcile(ctx, etcd) } @@ -1022,6 +1078,15 @@ func clusterInBootstrap(etcd *druidv1alpha1.Etcd) bool { (etcd.Spec.Replicas > 1 && etcd.Status.Replicas == 1) } +func getMatchingLabels(sts *appsv1.StatefulSet) map[string]string { + labels := make(map[string]string) + + labels["name"] = sts.Labels["name"] + labels["instance"] = sts.Labels["instance"] + + return labels +} + func (r *EtcdReconciler) updateEtcdErrorStatus(ctx context.Context, etcd *druidv1alpha1.Etcd, sts *appsv1.StatefulSet, lastError error) error { return controllerutils.TryUpdateStatus(ctx, retry.DefaultBackoff, r.Client, etcd, func() error { lastErrStr := fmt.Sprintf("%v", lastError) diff --git a/controllers/etcd_controller_test.go b/controllers/etcd_controller_test.go index df793705e..78790643e 100644 --- a/controllers/etcd_controller_test.go +++ b/controllers/etcd_controller_test.go @@ -460,9 +460,18 @@ var _ = Describe("Druid", func() { rb = &rbac.RoleBinding{} Eventually(func() error { return roleBindingIsCorrectlyReconciled(c, instance, rb) }, timeout, pollingInterval).Should(BeNil()) + Eventually(func() error { return setReniewTimeForMemberLeases(c, instance) }, timeout, pollingInterval).Should(BeNil()) + validate(instance, s, cm, clSvc, prSvc) validateRole(instance, role) + req := types.NamespacedName{ + Name: instance.Name, + Namespace: instance.Namespace, + } + + err = c.Get(context.TODO(), req, s) + Expect(err).NotTo(HaveOccurred()) setStatefulSetReady(s) err = c.Status().Update(context.TODO(), s) Expect(err).NotTo(HaveOccurred()) @@ -708,7 +717,10 @@ var _ = Describe("Multinode ETCD", func() { svc = &corev1.Service{} Eventually(func() error { return clientServiceIsCorrectlyReconciled(c, instance, svc) }, timeout, pollingInterval).Should(BeNil()) + Eventually(func() error { return setReniewTimeForMemberLeases(c, instance) }, timeout, pollingInterval).Should(BeNil()) + // Validate statefulset + Expect(sts.Spec.Replicas).ShouldNot(BeNil()) Expect(*sts.Spec.Replicas).To(Equal(int32(instance.Spec.Replicas))) if instance.Spec.Replicas == 1 { @@ -726,6 +738,163 @@ var _ = Describe("Multinode ETCD", func() { ) }) +var ( + unknownThreshold = 300 * time.Second + notReadyThreshold = 60 * time.Second + expire = time.Minute * 3 +) + +var _ = Describe("Quorum Loss Scenario", func() { + Context("when quorum is lost for multinode ETCD cluster", func() { + var ( + err error + instance *druidv1alpha1.Etcd + c client.Client + s *appsv1.StatefulSet + cm *corev1.ConfigMap + svc *corev1.Service + now time.Time + longExpirationTime = metav1.NewMicroTime(now.Add(-1 * unknownThreshold).Add(-1 * time.Second).Add(-1 * notReadyThreshold)) + ) + BeforeEach(func() { + instance = getMultinodeEtcdDefault("foo85", "default") + c = mgr.GetClient() + ns := corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: instance.Namespace, + }, + } + + _, err = controllerutil.CreateOrUpdate(context.TODO(), c, &ns, func() error { return nil }) + Expect(err).To(Not(HaveOccurred())) + + err = c.Create(context.TODO(), instance) + Expect(err).NotTo(HaveOccurred()) + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsCorrectlyReconciled(c, instance, s) }, timeout, pollingInterval).Should(BeNil()) + setStatefulSetReady(s) + err = c.Status().Update(context.TODO(), s) + Expect(err).NotTo(HaveOccurred()) + + cm = &corev1.ConfigMap{} + Eventually(func() error { return configMapIsCorrectlyReconciled(c, instance, cm) }, timeout, pollingInterval).Should(BeNil()) + svc = &corev1.Service{} + Eventually(func() error { return clientServiceIsCorrectlyReconciled(c, instance, svc) }, timeout, pollingInterval).Should(BeNil()) + }) + It("when renew time of some of the member leases expired", func() { + // Deliberately update the first member lease + memberLease := &coordinationv1.Lease{} + Eventually(func() error { return fetchMemberLease(c, instance, memberLease, 1) }, timeout, pollingInterval).Should(BeNil()) + err = controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, memberLease, func() error { + memberLease.Spec.RenewTime = &longExpirationTime + return nil + }) + Expect(err).To(Not(HaveOccurred())) + + // Deliberately update the second member lease + memberLease = &coordinationv1.Lease{} + Eventually(func() error { return fetchMemberLease(c, instance, memberLease, 2) }, timeout, pollingInterval).Should(BeNil()) + err = controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, memberLease, func() error { + memberLease.Spec.RenewTime = &longExpirationTime + return nil + }) + Expect(err).To(Not(HaveOccurred())) + + // Check if statefulset replicas is scaled down to 0 + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, 0) }, timeout, pollingInterval).Should(BeNil()) + + // Check if statefulset replicas is scaled up to 1 + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, 1) }, timeout, pollingInterval).Should(BeNil()) + + // Check if statefulset replicas is scaled up to etcd replicas + s = &appsv1.StatefulSet{} + Eventually(func() error { return statefulsetIsScaled(c, instance, s, instance.Spec.Replicas) }, timeout, pollingInterval).Should(BeNil()) + + }) + + AfterEach(func() { + Expect(c.Delete(context.TODO(), instance)).To(Succeed()) + Eventually(func() error { return statefulSetRemoved(c, s) }, timeout, pollingInterval).Should(BeNil()) + Eventually(func() error { return etcdRemoved(c, instance) }, timeout, pollingInterval).Should(BeNil()) + }) + }) +}) + +func getMultinodeEtcdDefault(name, namespace string) *druidv1alpha1.Etcd { + instance := &druidv1alpha1.Etcd{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: druidv1alpha1.EtcdSpec{ + Annotations: map[string]string{ + "app": "etcd-statefulset", + "role": "test", + "instance": name, + }, + Labels: map[string]string{ + "name": "etcd", + "instance": name, + }, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "name": "etcd", + "instance": name, + }, + }, + Replicas: 3, + Backup: druidv1alpha1.BackupSpec{}, + Etcd: druidv1alpha1.EtcdConfig{}, + Common: druidv1alpha1.SharedConfig{}, + }, + } + return instance +} + +func fetchMemberLease(c client.Client, instance *druidv1alpha1.Etcd, lease *coordinationv1.Lease, replica int) error { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + req := types.NamespacedName{ + Name: memberLeaseName(instance.Name, replica), + Namespace: instance.Namespace, + } + + if err := c.Get(ctx, req, lease); err != nil { + return err + } + + if !checkEtcdOwnerReference(lease.GetOwnerReferences(), instance) { + return fmt.Errorf("ownerReference does not exists for lease") + } + return nil +} + +func memberLeaseName(etcdName string, replica int) string { + return fmt.Sprintf("%s-%d-member", etcdName, replica) +} + +func statefulsetIsScaled(c client.Client, instance *druidv1alpha1.Etcd, ss *appsv1.StatefulSet, replicas int32) error { + ctx, cancel := context.WithTimeout(context.TODO(), expire) + defer cancel() + req := types.NamespacedName{ + Name: instance.Name, + Namespace: instance.Namespace, + } + + if err := c.Get(ctx, req, ss); err != nil { + return err + } + + stsReplicas := *ss.Spec.Replicas + if stsReplicas != replicas { + return fmt.Errorf("statefulset replicas are yet %d instead of %d", stsReplicas, replicas) + } + + return nil +} + func validateRole(instance *druidv1alpha1.Etcd, role *rbac.Role) { Expect(*role).To(MatchFields(IgnoreExtras, Fields{ "ObjectMeta": MatchFields(IgnoreExtras, Fields{ @@ -1462,7 +1631,6 @@ func validateEtcd(instance *druidv1alpha1.Etcd, s *appsv1.StatefulSet, cm *corev fmt.Sprintf("--delta-snapshot-memory-limit=%d", instance.Spec.Backup.DeltaSnapshotMemoryLimit.Value()): Equal(fmt.Sprintf("--delta-snapshot-memory-limit=%d", instance.Spec.Backup.DeltaSnapshotMemoryLimit.Value())), fmt.Sprintf("--garbage-collection-policy=%s", *instance.Spec.Backup.GarbageCollectionPolicy): Equal(fmt.Sprintf("--garbage-collection-policy=%s", *instance.Spec.Backup.GarbageCollectionPolicy)), fmt.Sprintf("--endpoints=https://%s-local:%d", instance.Name, clientPort): Equal(fmt.Sprintf("--endpoints=https://%s-local:%d", instance.Name, clientPort)), - fmt.Sprintf("--service-endpoints=https://%s:%d", utils.GetClientServiceName(instance), clientPort): Equal(fmt.Sprintf("--service-endpoints=https://%s:%d", utils.GetClientServiceName(instance), clientPort)), fmt.Sprintf("--embedded-etcd-quota-bytes=%d", int64(instance.Spec.Etcd.Quota.Value())): Equal(fmt.Sprintf("--embedded-etcd-quota-bytes=%d", int64(instance.Spec.Etcd.Quota.Value()))), fmt.Sprintf("%s=%s", "--delta-snapshot-period", instance.Spec.Backup.DeltaSnapshotPeriod.Duration.String()): Equal(fmt.Sprintf("%s=%s", "--delta-snapshot-period", instance.Spec.Backup.DeltaSnapshotPeriod.Duration.String())), fmt.Sprintf("%s=%s", "--garbage-collection-period", instance.Spec.Backup.GarbageCollectionPeriod.Duration.String()): Equal(fmt.Sprintf("%s=%s", "--garbage-collection-period", instance.Spec.Backup.GarbageCollectionPeriod.Duration.String())), @@ -1973,6 +2141,36 @@ func statefulsetIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.E return nil } +func setReniewTimeForMemberLeases(c client.Client, instance *druidv1alpha1.Etcd) error { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + for i := 0; i < int(instance.Spec.Replicas); i++ { + leaseName := memberLeaseName(instance.Name, i) + + req := types.NamespacedName{ + Name: leaseName, + Namespace: instance.Namespace, + } + + ls := &coordinationv1.Lease{} + if err := c.Get(ctx, req, ls); err != nil { + return err + } + + setTime := metav1.NewMicroTime(time.Now()) + if err := controllerutils.TryUpdate(context.TODO(), retry.DefaultBackoff, c, ls, func() error { + ls.Spec.RenewTime = &setTime + return nil + }); err != nil { + return err + } + + } + + return nil +} + func configMapIsCorrectlyReconciled(c client.Client, instance *druidv1alpha1.Etcd, cm *corev1.ConfigMap) error { ctx, cancel := context.WithTimeout(context.TODO(), timeout) defer cancel() diff --git a/controllers/etcd_custodian_controller.go b/controllers/etcd_custodian_controller.go index 035d7b9ee..459fbe62a 100644 --- a/controllers/etcd_custodian_controller.go +++ b/controllers/etcd_custodian_controller.go @@ -48,6 +48,7 @@ import ( controllersconfig "github.com/gardener/etcd-druid/controllers/config" "github.com/gardener/etcd-druid/pkg/health/status" druidmapper "github.com/gardener/etcd-druid/pkg/mapper" + "github.com/gardener/etcd-druid/pkg/predicate" druidpredicates "github.com/gardener/etcd-druid/pkg/predicate" "github.com/gardener/etcd-druid/pkg/utils" ) @@ -89,6 +90,15 @@ func (ec *EtcdCustodian) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. logger := ec.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String()) + if val, ok := etcd.Spec.Annotations[predicate.QuorumLossAnnotation]; ok { + if val == "true" { + logger.Info("Requeue item after 2 minute because the annotaion action/quorum-loss is set in ETCD CR which means a corrective measure for quorum loss in multi node scenario is being taken") + return ctrl.Result{ + RequeueAfter: 2 * time.Minute, + }, nil + } + } + if etcd.Status.LastError != nil && *etcd.Status.LastError != "" { logger.Info(fmt.Sprintf("Requeue item because of last error: %v", *etcd.Status.LastError)) return ctrl.Result{ @@ -108,6 +118,21 @@ func (ec *EtcdCustodian) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. return ctrl.Result{}, err } + conLength := len(etcd.Status.Conditions) + if conLength > 0 && etcd.Status.Conditions[conLength-1].Reason == "QuorumLost" { + logger.Info("Quorum loss detected. Taking measures to fix it.") + withQlAnnotation := etcd.DeepCopy() + // Set annotaion in ETCD to take corrective measure + etcd.Spec.Annotations[predicate.QuorumLossAnnotation] = "true" + if err := ec.Patch(ctx, etcd, client.MergeFrom(withQlAnnotation)); err != nil { + return ctrl.Result{}, err + } + // Allow some time to fix the quorum loss by ETCD controller + return ctrl.Result{ + RequeueAfter: 2 * time.Minute, + }, nil + } + refMgr := NewEtcdDruidRefManager(ec.Client, ec.Scheme, etcd, selector, etcdGVK, nil) stsList, err := refMgr.FetchStatefulSet(ctx, etcd) diff --git a/main.go b/main.go index 4936bfc90..7b2e50e15 100644 --- a/main.go +++ b/main.go @@ -46,6 +46,7 @@ func main() { custodianWorkers int secretWorkers int etcdCopyBackupsTaskWorkers int + clusterMgmtWorkers int custodianSyncPeriod time.Duration disableLeaseCache bool compactionWorkers int @@ -83,6 +84,7 @@ func main() { flag.DurationVar(&etcdMemberNotReadyThreshold, "etcd-member-notready-threshold", 5*time.Minute, "Threshold after which an etcd member is considered not ready if the status was unknown before.") flag.BoolVar(&disableEtcdServiceAccountAutomount, "disable-etcd-serviceaccount-automount", false, "If true then .automountServiceAccountToken will be set to false for the ServiceAccount created for etcd statefulsets.") flag.DurationVar(&etcdMemberUnknownThreshold, "etcd-member-unknown-threshold", 1*time.Minute, "Threshold after which an etcd member is considered unknown.") + flag.IntVar(&clusterMgmtWorkers, "cluster-mgmt-workers", 3, "Number of worker threads of the Cluster Management controller.") flag.Parse() @@ -109,7 +111,8 @@ func main() { os.Exit(1) } - etcd, err := controllers.NewEtcdReconcilerWithImageVector(mgr, disableEtcdServiceAccountAutomount) + waitDuration := 0 * time.Second + etcd, err := controllers.NewEtcdReconcilerWithImageVector(mgr, disableEtcdServiceAccountAutomount, waitDuration) if err != nil { setupLog.Error(err, "Unable to initialize etcd controller with image vector") os.Exit(1) @@ -166,6 +169,10 @@ func main() { os.Exit(1) } + if err := etcdCopyBackupsTask.SetupWithManager(mgr, etcdCopyBackupsTaskWorkers); err != nil { + setupLog.Error(err, "Unable to create controller", "Controller", "EtcdCopyBackupsTask") + } + // +kubebuilder:scaffold:builder setupLog.Info("Starting manager") diff --git a/pkg/component/etcd/lease/lease_member.go b/pkg/component/etcd/lease/lease_member.go index ce7689846..b334ee840 100644 --- a/pkg/component/etcd/lease/lease_member.go +++ b/pkg/component/etcd/lease/lease_member.go @@ -98,5 +98,5 @@ func getMemberLeaseLabels(val Values) map[string]string { } func memberLeaseName(etcdName string, replica int) string { - return fmt.Sprintf("%s-%d", etcdName, replica) + return fmt.Sprintf("%s-%d-member", etcdName, replica) } diff --git a/pkg/component/etcd/lease/lease_test.go b/pkg/component/etcd/lease/lease_test.go index c81238891..caf233872 100644 --- a/pkg/component/etcd/lease/lease_test.go +++ b/pkg/component/etcd/lease/lease_test.go @@ -248,7 +248,7 @@ func checkMemberLeases(ctx context.Context, c client.Client, etcd *druidv1alpha1 func memberLeases(name string, etcdUID types.UID, replicas int32) []interface{} { var elements []interface{} for i := 0; i < int(replicas); i++ { - elements = append(elements, matchLeaseElement(fmt.Sprintf("%s-%d", name, i), name, etcdUID)) + elements = append(elements, matchLeaseElement(fmt.Sprintf("%s-%d-member", name, i), name, etcdUID)) } return elements @@ -273,7 +273,7 @@ func matchLeaseElement(leaseName, etcdName string, etcdUID types.UID) gomegatype func memberLease(etcd *druidv1alpha1.Etcd, replica int, withOwnerRef bool) coordinationv1.Lease { lease := coordinationv1.Lease{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%d", etcd.Name, replica), + Name: fmt.Sprintf("%s-%d-member", etcd.Name, replica), Namespace: etcd.Namespace, Labels: map[string]string{ common.GardenerOwnedBy: etcd.Name, diff --git a/pkg/predicate/predicate.go b/pkg/predicate/predicate.go index 145d22723..e79c940ea 100644 --- a/pkg/predicate/predicate.go +++ b/pkg/predicate/predicate.go @@ -31,6 +31,10 @@ import ( druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1" ) +const ( + QuorumLossAnnotation = "gardener.cloud/quorum-loss" +) + func hasOperationAnnotation(obj client.Object) bool { return obj.GetAnnotations()[v1beta1constants.GardenerOperation] == v1beta1constants.GardenerOperationReconcile } @@ -53,6 +57,28 @@ func HasOperationAnnotation() predicate.Predicate { } } +func hasQuorumLossAnnotation(obj client.Object) bool { + return obj.GetAnnotations()[QuorumLossAnnotation] == "true" +} + +// HasQuorumLossAnnotation is a predicate for the operation annotation. +func HasQuorumLossAnnotation() predicate.Predicate { + return predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + return hasQuorumLossAnnotation(event.Object) + }, + UpdateFunc: func(event event.UpdateEvent) bool { + return hasQuorumLossAnnotation(event.ObjectNew) + }, + GenericFunc: func(event event.GenericEvent) bool { + return hasQuorumLossAnnotation(event.Object) + }, + DeleteFunc: func(event event.DeleteEvent) bool { + return true + }, + } +} + // LastOperationNotSuccessful is a predicate for unsuccessful last operations for creation events. func LastOperationNotSuccessful() predicate.Predicate { operationNotSucceeded := func(obj runtime.Object) bool { @@ -216,3 +242,30 @@ func IsSnapshotLease() predicate.Predicate { }, } } + +// IsMemberLease is a predicate that is `true` if the passed lease object is a member lease. +func IsMemberLease() predicate.Predicate { + isMemberLease := func(obj client.Object) bool { + lease, ok := obj.(*coordinationv1.Lease) + if !ok { + return false + } + + return strings.HasSuffix(lease.Name, "member") + } + + return predicate.Funcs{ + CreateFunc: func(event event.CreateEvent) bool { + return isMemberLease(event.Object) + }, + UpdateFunc: func(event event.UpdateEvent) bool { + return isMemberLease(event.ObjectNew) + }, + GenericFunc: func(event event.GenericEvent) bool { + return isMemberLease(event.Object) + }, + DeleteFunc: func(event event.DeleteEvent) bool { + return isMemberLease(event.Object) + }, + } +}