Skip to content

Commit

Permalink
Merge pull request #180 from timuthy/enhancement.etcd-status-updates
Browse files Browse the repository at this point in the history
Custodian controller improvements
  • Loading branch information
timuthy authored Jun 2, 2021
2 parents 6030a7e + 839dd39 commit a9dbf5c
Show file tree
Hide file tree
Showing 87 changed files with 9,767 additions and 319 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ check-generate:
# Generate code
.PHONY: generate
generate:
@go generate "$(REPO_ROOT)/pkg/..."
@"$(REPO_ROOT)/hack/update-codegen.sh"

# Build the docker image
Expand All @@ -109,7 +110,8 @@ docker-push:
.PHONY: install-requirements
install-requirements:
@go install -mod=vendor sigs.k8s.io/controller-tools/cmd/controller-gen
@"$(REPO_ROOT)/vendor/github.com/gardener/gardener/hack/install-requirements.sh" > /dev/null
@go install -mod=vendor github.com/golang/mock/mockgen
@"$(REPO_ROOT)/vendor/github.com/gardener/gardener/hack/install-requirements.sh"

.PHONY: update-dependencies
update-dependencies:
Expand Down
259 changes: 181 additions & 78 deletions config/crd/bases/druid.gardener.cloud_etcds.yaml

Large diffs are not rendered by default.

65 changes: 32 additions & 33 deletions controllers/controller_ref_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"strings"
"sync"

"k8s.io/client-go/tools/cache"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -79,7 +81,7 @@ func (m *BaseControllerRefManager) CanAdopt() error {
// own the object.
//
// No reconciliation will be attempted if the controller is being deleted.
func (m *BaseControllerRefManager) claimObject(obj client.Object, match func(metav1.Object) bool, adopt, release func(client.Object) error) (bool, error) {
func (m *BaseControllerRefManager) claimObject(ctx context.Context, obj client.Object, match func(metav1.Object) bool, adopt, release func(context.Context, client.Object) error) (bool, error) {
controllerRef := metav1.GetControllerOf(obj)
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
Expand All @@ -91,7 +93,7 @@ func (m *BaseControllerRefManager) claimObject(obj client.Object, match func(met
// However, if the ownerReference is set for adopting the statefulset, druid
// needs to inject the annotations and remove the ownerReference.
// Return true (successfully claimed) before checking deletion timestamp.
if err := adopt(obj); err != nil {
if err := adopt(ctx, obj); err != nil {
// If the object no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
Expand All @@ -108,7 +110,7 @@ func (m *BaseControllerRefManager) claimObject(obj client.Object, match func(met
if m.Controller.GetDeletionTimestamp() != nil {
return false, nil
}
if err := release(obj); err != nil {
if err := release(ctx, obj); err != nil {
// If the object no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
Expand Down Expand Up @@ -137,7 +139,7 @@ func (m *BaseControllerRefManager) claimObject(obj client.Object, match func(met
// than sts will have ownerReference set as well.
if !checkEtcdAnnotations(obj.GetAnnotations(), m.Controller) {
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
if err := adopt(ctx, obj); err != nil {
// If the object no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
Expand Down Expand Up @@ -188,31 +190,21 @@ func NewEtcdDruidRefManager(
}

// FetchStatefulSet fetches statefulset based on ETCD resource
func (m *EtcdDruidRefManager) FetchStatefulSet(etcd *druidv1alpha1.Etcd) ([]*appsv1.StatefulSet, error) {
func (m *EtcdDruidRefManager) FetchStatefulSet(ctx context.Context, etcd *druidv1alpha1.Etcd) (*appsv1.StatefulSetList, error) {
selector, err := metav1.LabelSelectorAsSelector(etcd.Spec.Selector)
if err != nil {
logger.Error(err, "Error converting etcd selector to selector")
return nil, err
}

// list all statefulsets to include the statefulsets that don't match the etcd`s selector
// anymore but has the stale controller ref.
statefulSets := &appsv1.StatefulSetList{}
err = m.cl.List(context.TODO(), statefulSets, client.InNamespace(etcd.Namespace), client.MatchingLabelsSelector{Selector: selector})
if err != nil {
logger.Error(err, "Error listing statefulsets")
return nil, err
}

// NOTE: filteredStatefulSets are pointing to deep copies of the cache, but this could change in the future.
// Ref: https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.2/pkg/cache/internal/cache_reader.go#L74
// if you need to modify them, you need to copy it first.
filteredStatefulSets, err := m.ClaimStatefulsets(statefulSets)
err = m.cl.List(ctx, statefulSets, client.InNamespace(etcd.Namespace), client.MatchingLabelsSelector{Selector: selector})
if err != nil {
return nil, err
}

return filteredStatefulSets, err
return statefulSets, err
}

// ClaimStatefulsets tries to take ownership of a list of Statefulsets.
Expand All @@ -231,9 +223,11 @@ func (m *EtcdDruidRefManager) FetchStatefulSet(etcd *druidv1alpha1.Etcd) ([]*app
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of statefulsets that you now own is returned.
func (m *EtcdDruidRefManager) ClaimStatefulsets(sts *appsv1.StatefulSetList, filters ...func(*appsv1.StatefulSet) bool) ([]*appsv1.StatefulSet, error) {
var claimed []*appsv1.StatefulSet
var errlist []error
func (m *EtcdDruidRefManager) ClaimStatefulsets(ctx context.Context, statefulSetList *appsv1.StatefulSetList, filters ...func(*appsv1.StatefulSet) bool) ([]*appsv1.StatefulSet, error) {
var (
claimed []*appsv1.StatefulSet
errlist []error
)

match := func(obj metav1.Object) bool {
ss := obj.(*appsv1.StatefulSet)
Expand All @@ -249,15 +243,15 @@ func (m *EtcdDruidRefManager) ClaimStatefulsets(sts *appsv1.StatefulSetList, fil
return true
}

for k := range sts.Items {
ss := &sts.Items[k]
ok, err := m.claimObject(ss, match, m.AdoptResource, m.ReleaseResource)
for k := range statefulSetList.Items {
sts := &statefulSetList.Items[k]
ok, err := m.claimObject(ctx, sts, match, m.AdoptResource, m.ReleaseResource)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, ss)
claimed = append(claimed, sts)
}
}
return claimed, utilerrors.NewAggregate(errlist)
Expand All @@ -278,7 +272,7 @@ func (m *EtcdDruidRefManager) ClaimStatefulsets(sts *appsv1.StatefulSetList, fil
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of Services that you now own is returned.
func (m *EtcdDruidRefManager) ClaimServices(svcs *corev1.ServiceList, filters ...func(*corev1.Service) bool) ([]*corev1.Service, error) {
func (m *EtcdDruidRefManager) ClaimServices(ctx context.Context, svcs *corev1.ServiceList, filters ...func(*corev1.Service) bool) ([]*corev1.Service, error) {
var claimed []*corev1.Service
var errlist []error

Expand All @@ -298,7 +292,7 @@ func (m *EtcdDruidRefManager) ClaimServices(svcs *corev1.ServiceList, filters ..

for k := range svcs.Items {
svc := &svcs.Items[k]
ok, err := m.claimObject(svc, match, m.AdoptResource, m.ReleaseResource)
ok, err := m.claimObject(ctx, svc, match, m.AdoptResource, m.ReleaseResource)

if err != nil {
errlist = append(errlist, err)
Expand Down Expand Up @@ -326,7 +320,7 @@ func (m *EtcdDruidRefManager) ClaimServices(svcs *corev1.ServiceList, filters ..
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of Services that you now own is returned.
func (m *EtcdDruidRefManager) ClaimConfigMaps(cms *corev1.ConfigMapList, filters ...func(*corev1.ConfigMap) bool) ([]*corev1.ConfigMap, error) {
func (m *EtcdDruidRefManager) ClaimConfigMaps(ctx context.Context, cms *corev1.ConfigMapList, filters ...func(*corev1.ConfigMap) bool) ([]*corev1.ConfigMap, error) {
var claimed []*corev1.ConfigMap
var errlist []error

Expand All @@ -346,7 +340,7 @@ func (m *EtcdDruidRefManager) ClaimConfigMaps(cms *corev1.ConfigMapList, filters

for k := range cms.Items {
cm := &cms.Items[k]
ok, err := m.claimObject(cm, match, m.AdoptResource, m.ReleaseResource)
ok, err := m.claimObject(ctx, cm, match, m.AdoptResource, m.ReleaseResource)

if err != nil {
errlist = append(errlist, err)
Expand All @@ -361,7 +355,7 @@ func (m *EtcdDruidRefManager) ClaimConfigMaps(cms *corev1.ConfigMapList, filters

// AdoptResource sends a patch to take control of the Etcd. It returns the error if
// the patching fails.
func (m *EtcdDruidRefManager) AdoptResource(obj client.Object) error {
func (m *EtcdDruidRefManager) AdoptResource(ctx context.Context, obj client.Object) error {
if err := m.CanAdopt(); err != nil {
return fmt.Errorf("can't adopt resource %v/%v (%v): %v", obj.GetNamespace(), obj.GetName(), obj.GetUID(), err)
}
Expand All @@ -375,7 +369,12 @@ func (m *EtcdDruidRefManager) AdoptResource(obj client.Object) error {
if annotations == nil {
annotations = map[string]string{}
}
annotations[common.GardenerOwnedBy] = fmt.Sprintf("%s/%s", m.Controller.GetNamespace(), m.Controller.GetName())
objectKey, err := cache.MetaNamespaceKeyFunc(m.Controller)
if err != nil {
return err
}

annotations[common.GardenerOwnedBy] = objectKey
annotations[common.GardenerOwnerType] = strings.ToLower(etcdGVK.Kind)
clone.SetAnnotations(annotations)
case *corev1.ConfigMap:
Expand All @@ -396,12 +395,12 @@ func (m *EtcdDruidRefManager) AdoptResource(obj client.Object) error {
return fmt.Errorf("cannot adopt resource: %s", objType)
}

return m.cl.Patch(context.TODO(), clone, client.MergeFrom(obj))
return m.cl.Patch(ctx, clone, client.MergeFrom(obj))
}

// ReleaseResource sends a patch to free the resource from the control of the controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *EtcdDruidRefManager) ReleaseResource(obj client.Object) error {
func (m *EtcdDruidRefManager) ReleaseResource(ctx context.Context, obj client.Object) error {
var clone client.Object
switch objType := obj.(type) {
case *appsv1.StatefulSet:
Expand All @@ -421,7 +420,7 @@ func (m *EtcdDruidRefManager) ReleaseResource(obj client.Object) error {

m.disown(clone)

err := client.IgnoreNotFound(m.cl.Patch(context.TODO(), clone, client.MergeFrom(obj)))
err := client.IgnoreNotFound(m.cl.Patch(ctx, clone, client.MergeFrom(obj)))
if errors.IsInvalid(err) {
// Invalid error will be returned in two cases: 1. the etcd
// has no owner reference, 2. the uid of the etcd doesn't
Expand Down
Loading

0 comments on commit a9dbf5c

Please sign in to comment.