Skip to content

Commit

Permalink
Added logics for quorum loss scenario.
Browse files Browse the repository at this point in the history
  • Loading branch information
abdasgupta committed Aug 3, 2022
1 parent de66db1 commit fdb2a03
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 119 deletions.
116 changes: 17 additions & 99 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,151 +7,69 @@ metadata:
name: manager-role
rules:
- apiGroups:
- ""
- batch
resources:
- pods
- jobs
verbs:
- list
- watch
- create
- delete
- apiGroups:
- ""
resources:
- secrets
- endpoints
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
- coordination.k8s.io
resources:
- events
- leases
verbs:
- create
- delete
- deletecollection
- get
- list
- watch
- patch
- update
- apiGroups:
- ""
resources:
- serviceaccounts
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- rbac.authorization.k8s.io
- druid.gardener.cloud
resources:
- roles
- rolebindings
- etcds
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
- apps
resources:
- services
- configmaps
- statefulsets
verbs:
- get
- list
- patch
- update
- watch
- create
- delete
- apiGroups:
- batch
resources:
- jobs
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- batch
resources:
- cronjobs
verbs:
- get
- list
- watch
- delete
- apiGroups:
- druid.gardener.cloud
resources:
- etcds
- etcdcopybackupstasks
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- druid.gardener.cloud
resources:
- etcds/status
- etcds/finalizers
- etcdcopybackupstasks/status
- etcdcopybackupstasks/finalizers
verbs:
- get
- update
- patch
- create
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- deletecollection
- apiGroups:
- ""
- druid.gardener.cloud
resources:
- persistentvolumeclaims
- secrets
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- create
- delete
- get
- list
- watch
- create
- update
- patch
- delete
- update
- watch
10 changes: 7 additions & 3 deletions controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var (
mgrStopped *sync.WaitGroup

activeDeadlineDuration time.Duration
waitDuration time.Duration
backupCompactionSchedule = "15 */24 * * *"

revertFns []func()
Expand Down Expand Up @@ -102,7 +103,9 @@ var _ = BeforeSuite(func(done Done) {
})
Expect(err).NotTo(HaveOccurred())

er, err := NewEtcdReconcilerWithImageVector(mgr, false)
waitDuration, err = time.ParseDuration("10s")
Expect(err).NotTo(HaveOccurred())
er, err := NewEtcdReconcilerWithImageVector(mgr, false, waitDuration)
Expect(err).NotTo(HaveOccurred())

err = er.SetupWithManager(mgr, 5, true)
Expand All @@ -114,8 +117,9 @@ var _ = BeforeSuite(func(done Done) {
Expect(err).NotTo(HaveOccurred())

custodian := NewEtcdCustodian(mgr, controllersconfig.EtcdCustodianController{
SyncPeriod: 10 * time.Second,
EtcdMember: controllersconfig.EtcdMemberConfig{
EtcdMemberNotReadyThreshold: 1 * time.Minute,
EtcdMemberNotReadyThreshold: 20 * time.Second,
},
})

Expand All @@ -138,7 +142,7 @@ var _ = BeforeSuite(func(done Done) {
})
Expect(err).NotTo(HaveOccurred())

err = lc.SetupWithManager(mgr, 1)
err = lc.SetupWithManager(mgr, 5)
Expect(err).NotTo(HaveOccurred())

mgrStopped = startTestManager(mgrCtx, mgr)
Expand Down
89 changes: 77 additions & 12 deletions controllers/etcd_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,31 +106,25 @@ type EtcdReconciler struct {
ImageVector imagevector.ImageVector
logger logr.Logger
disableEtcdServiceAccountAutomount bool
}

// NewReconcilerWithImageVector creates a new EtcdReconciler object with an image vector
func NewReconcilerWithImageVector(mgr manager.Manager, disableEtcdServiceAccountAutomount bool) (*EtcdReconciler, error) {
etcdReconciler, err := NewEtcdReconciler(mgr, disableEtcdServiceAccountAutomount)
if err != nil {
return nil, err
}
return etcdReconciler.InitializeControllerWithImageVector()
//waitTimeForTests allow some waiting time between certain operations. This variable help some unit test cases
waitTimeForTests time.Duration
}

// NewEtcdReconciler creates a new EtcdReconciler object
func NewEtcdReconciler(mgr manager.Manager, disableEtcdServiceAccountAutomount bool) (*EtcdReconciler, error) {
func NewEtcdReconciler(mgr manager.Manager, disableEtcdServiceAccountAutomount bool, waitTimeForTests time.Duration) (*EtcdReconciler, error) {
return (&EtcdReconciler{
Client: mgr.GetClient(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
logger: log.Log.WithName("etcd-controller"),
disableEtcdServiceAccountAutomount: disableEtcdServiceAccountAutomount,
waitTimeForTests: waitTimeForTests,
}).InitializeControllerWithChartApplier()
}

// NewEtcdReconcilerWithImageVector creates a new EtcdReconciler object
func NewEtcdReconcilerWithImageVector(mgr manager.Manager, disableEtcdServiceAccountAutomount bool) (*EtcdReconciler, error) {
ec, err := NewEtcdReconciler(mgr, disableEtcdServiceAccountAutomount)
func NewEtcdReconcilerWithImageVector(mgr manager.Manager, disableEtcdServiceAccountAutomount bool, waitTimeForTests time.Duration) (*EtcdReconciler, error) {
ec, err := NewEtcdReconciler(mgr, disableEtcdServiceAccountAutomount, waitTimeForTests)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -212,6 +206,7 @@ func buildPredicate(ignoreOperationAnnotation bool) predicate.Predicate {

return predicate.Or(
druidpredicates.HasOperationAnnotation(),
druidpredicates.HasQuorumLossAnnotation(),
druidpredicates.LastOperationNotSuccessful(),
extensionspredicate.IsDeleting(),
)
Expand Down Expand Up @@ -239,6 +234,67 @@ func (r *EtcdReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
if !etcd.DeletionTimestamp.IsZero() {
return r.delete(ctx, etcd)
}

// Check if annotation for quorum loss is present in the ETCD annotaions
// if yes, take necessarry actions
if val, ok := etcd.Spec.Annotations[druidpredicates.QuorumLossAnnotation]; ok {
if val == "true" {
// scale down the statefulset to 0
sts := &appsv1.StatefulSet{}
err := r.Get(ctx, req.NamespacedName, sts)
if err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not fetch statefulset though the annotaion action/quorum-loss is set in ETCD CR: %v", err)
}

r.logger.Info("Scaling down the statefulset to 0 while tackling quorum loss scenario in ETCD multi node cluster")
if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, r.Client, sts, func() error {
sts.Spec.Replicas = pointer.Int32(0)
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale down statefulset to 0 while tackling quorum loss scenario in ETCD multi node cluster: %v", err)
}
time.Sleep(r.waitTimeForTests)

r.logger.Info("Deleting PVCs while tackling quorum loss scenario in ETCD multi node cluster")
// delete the pvcs
if err := r.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{},
client.InNamespace(sts.GetNamespace()),
client.MatchingLabels(getMatchingLabels(sts))); client.IgnoreNotFound(err) != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not delete pvcs while tackling quorum loss scenario in ETCD multi node cluster : %v", err)
}

r.logger.Info("Scaling up the statefulset to 1 while tackling quorum loss scenario in ETCD multi node cluster")
// scale up the statefulset to 1
if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, r.Client, sts, func() error {
sts.Spec.Replicas = pointer.Int32(1)
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale up statefulset to 1 while tackling quorum loss scenario in ETCD multi node cluster : %v", err)
}
time.Sleep(r.waitTimeForTests)

// scale up the statefulset to ETCD replicas
r.logger.Info("Scaling up the statefulset to the number of replicas mentioned in ETCD spec")
if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, r.Client, sts, func() error {
sts.Spec.Replicas = &etcd.Spec.Replicas
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale up statefulset to replica number while tackling quorum loss scenario in ETCD multi node cluster : %v", err)
}
etcd.Spec.Annotations["action/quorum-loss"] = "false"
}
}

return r.reconcile(ctx, etcd)
}

Expand Down Expand Up @@ -1022,6 +1078,15 @@ func clusterInBootstrap(etcd *druidv1alpha1.Etcd) bool {
(etcd.Spec.Replicas > 1 && etcd.Status.Replicas == 1)
}

func getMatchingLabels(sts *appsv1.StatefulSet) map[string]string {
labels := make(map[string]string)

labels["name"] = sts.Labels["name"]
labels["instance"] = sts.Labels["instance"]

return labels
}

func (r *EtcdReconciler) updateEtcdErrorStatus(ctx context.Context, etcd *druidv1alpha1.Etcd, sts *appsv1.StatefulSet, lastError error) error {
return controllerutils.TryUpdateStatus(ctx, retry.DefaultBackoff, r.Client, etcd, func() error {
lastErrStr := fmt.Sprintf("%v", lastError)
Expand Down
Loading

0 comments on commit fdb2a03

Please sign in to comment.