Skip to content

Commit

Permalink
Added logics for quorum loss scenario.
Browse files Browse the repository at this point in the history
  • Loading branch information
abdasgupta committed Aug 3, 2022
1 parent 51b3cfe commit 215fbc2
Show file tree
Hide file tree
Showing 11 changed files with 643 additions and 109 deletions.
116 changes: 17 additions & 99 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,151 +7,69 @@ metadata:
name: manager-role
rules:
- apiGroups:
- ""
- batch
resources:
- pods
- jobs
verbs:
- list
- watch
- create
- delete
- apiGroups:
- ""
resources:
- secrets
- endpoints
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- ""
- coordination.k8s.io
resources:
- events
- leases
verbs:
- create
- delete
- deletecollection
- get
- list
- watch
- patch
- update
- apiGroups:
- ""
resources:
- serviceaccounts
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- rbac.authorization.k8s.io
- druid.gardener.cloud
resources:
- roles
- rolebindings
- etcds
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- ""
- apps
resources:
- services
- configmaps
- statefulsets
verbs:
- get
- list
- patch
- update
- watch
- create
- delete
- apiGroups:
- batch
resources:
- jobs
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- batch
resources:
- cronjobs
verbs:
- get
- list
- watch
- delete
- apiGroups:
- druid.gardener.cloud
resources:
- etcds
- etcdcopybackupstasks
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- druid.gardener.cloud
resources:
- etcds/status
- etcds/finalizers
- etcdcopybackupstasks/status
- etcdcopybackupstasks/finalizers
verbs:
- get
- update
- patch
- create
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- deletecollection
- apiGroups:
- ""
- druid.gardener.cloud
resources:
- persistentvolumeclaims
- secrets
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- create
- delete
- get
- list
- watch
- create
- update
- patch
- delete
- update
- watch
215 changes: 215 additions & 0 deletions controllers/cluster_mgmt_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright (c) 2022 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controllers

import (
"context"
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"

druidv1alpha1 "github.com/gardener/etcd-druid/api/v1alpha1"
controllersconfig "github.com/gardener/etcd-druid/controllers/config"
"github.com/gardener/etcd-druid/pkg/health/etcdmember"
druidpredicates "github.com/gardener/etcd-druid/pkg/predicate"
"github.com/gardener/gardener/pkg/controllerutils"
kutil "github.com/gardener/gardener/pkg/utils/kubernetes"
)

const clusterMgmtControllerName = "cluster-mgmt-controller"

// ClusterMgmtController reconciles ETCD multinode cluster
type ClusterMgmtController struct {
client.Client
logger logr.Logger
config controllersconfig.ClusterMgmtConfig
}

// NewClusterMgmtController creates a new ClusterMgmtController object
func NewClusterMgmtController(mgr manager.Manager, config controllersconfig.ClusterMgmtConfig) *ClusterMgmtController {
return &ClusterMgmtController{
Client: mgr.GetClient(),
logger: log.Log.WithName("cluster-mgmt-controller"),
config: config,
}
}

// SetupWithManager sets up manager with a new controller and cmc as the reconcile.Reconciler
func (cmc *ClusterMgmtController) SetupWithManager(mgr ctrl.Manager, workers int) error {

ctrl, err := controller.New(clusterMgmtControllerName, mgr, controller.Options{
Reconciler: cmc,
MaxConcurrentReconciles: workers,
})
if err != nil {
return err
}

return ctrl.Watch(
&source.Kind{Type: &coordinationv1.Lease{}},
&handler.EnqueueRequestForOwner{OwnerType: &druidv1alpha1.Etcd{}, IsController: true},
// druidpredicates.LeaseHolderIdentityChange(),
druidpredicates.IsMemberLease(),
)
}

// +kubebuilder:rbac:groups=druid.gardener.cloud,resources=etcds,verbs=get;list;watch
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=get;list;watch;create;update;patch;delete

// Reconcile reconciles the multinode ETCD cluster.
func (cmc *ClusterMgmtController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
cmc.logger.Info("Cluster management controller reconciliation started")
etcd := &druidv1alpha1.Etcd{}
if err := cmc.Get(ctx, req.NamespacedName, etcd); err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, err
}

logger := cmc.logger.WithValues("etcd", kutil.Key(etcd.Namespace, etcd.Name).String())

if etcd == nil {
return ctrl.Result{Requeue: false}, fmt.Errorf("ETCD object is not found")
}

if !etcd.DeletionTimestamp.IsZero() {
return ctrl.Result{Requeue: false}, nil
}

if etcd.Spec.Replicas <= 1 {
return ctrl.Result{Requeue: false}, nil
}

// Allow some time before the quorum loss check actually happens
startTime := time.Now()
if !startTime.After(etcd.CreationTimestamp.Add(cmc.config.WaitDuration)) {
return ctrl.Result{RequeueAfter: 2 * time.Minute}, nil
}

time.Sleep(2 * time.Minute)
unknownThreshold := 300 * time.Second
notReadyThreshold := 60 * time.Second

checker := etcdmember.ReadyCheck(cmc.Client, logger, controllersconfig.EtcdCustodianController{
EtcdMember: controllersconfig.EtcdMemberConfig{
EtcdMemberNotReadyThreshold: notReadyThreshold,
EtcdMemberUnknownThreshold: unknownThreshold,
},
})

results := checker.Check(context.Background(), *etcd)
totalReadyMembers := 0

for _, result := range results {
if result.Status() == "LeaseSucceeded" {
totalReadyMembers = totalReadyMembers + 1
}
}

quorum := int(etcd.Spec.Replicas)/2 + 1

if totalReadyMembers < quorum {
// scale down the statefulset to 0
sts := &appsv1.StatefulSet{}
err := cmc.Get(ctx, types.NamespacedName{Name: etcd.Name, Namespace: etcd.Namespace}, sts)
if err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not fetch statefulset: %v", err)
}

logger.Info("Scaling down the statefulset to 0")
if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error {
sts.Spec.Replicas = pointer.Int32(0)
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale down statefulset to 0 : %v", err)
}

sts = &appsv1.StatefulSet{}
err = cmc.Get(ctx, types.NamespacedName{Name: etcd.Name, Namespace: etcd.Namespace}, sts)
if err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not fetch statefulset: %v", err)
}
fmt.Printf("The statefulset %v has replicas %v", sts.Name, *sts.Spec.Replicas)

logger.Info("Deleting PVCs")
// delete the pvcs
if err := cmc.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{},
client.InNamespace(sts.GetNamespace()),
client.MatchingLabels(getMatchingLabels(sts))); client.IgnoreNotFound(err) != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not delete pvcs : %v", err)
}

logger.Info("Scaling up the statefulset to 1")
// scale up the statefulset to 1
if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error {
sts.Spec.Replicas = pointer.Int32(1)
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale up statefulset to 1 : %v", err)
}

// scale up the statefulset to ETCD replicas
logger.Info("Scaling up the statefulset to the number of replicas mentioned in ETCD spec")
if _, err := controllerutils.GetAndCreateOrStrategicMergePatch(ctx, cmc.Client, sts, func() error {
sts.Spec.Replicas = &etcd.Spec.Replicas
return nil
}); err != nil {
return ctrl.Result{
RequeueAfter: 10 * time.Second,
}, fmt.Errorf("cound not scale up statefulset to replica number : %v", err)
}
}
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

func getMatchingLabels(sts *appsv1.StatefulSet) map[string]string {
labels := make(map[string]string)

labels["name"] = sts.Labels["name"]
labels["instance"] = sts.Labels["instance"]

return labels
}
Loading

0 comments on commit 215fbc2

Please sign in to comment.