Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custodian controller improvements #180

Merged
merged 5 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ check-generate:
# Generate code
.PHONY: generate
generate:
@go generate "$(REPO_ROOT)/pkg/..."
@"$(REPO_ROOT)/hack/update-codegen.sh"

# Build the docker image
Expand All @@ -109,7 +110,8 @@ docker-push:
.PHONY: install-requirements
install-requirements:
@go install -mod=vendor sigs.k8s.io/controller-tools/cmd/controller-gen
@"$(REPO_ROOT)/vendor/github.com/gardener/gardener/hack/install-requirements.sh" > /dev/null
@go install -mod=vendor github.com/golang/mock/mockgen
@"$(REPO_ROOT)/vendor/github.com/gardener/gardener/hack/install-requirements.sh"

.PHONY: update-dependencies
update-dependencies:
Expand Down
259 changes: 181 additions & 78 deletions config/crd/bases/druid.gardener.cloud_etcds.yaml

Large diffs are not rendered by default.

65 changes: 32 additions & 33 deletions controllers/controller_ref_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"strings"
"sync"

"k8s.io/client-go/tools/cache"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -79,7 +81,7 @@ func (m *BaseControllerRefManager) CanAdopt() error {
// own the object.
//
// No reconciliation will be attempted if the controller is being deleted.
func (m *BaseControllerRefManager) claimObject(obj client.Object, match func(metav1.Object) bool, adopt, release func(client.Object) error) (bool, error) {
func (m *BaseControllerRefManager) claimObject(ctx context.Context, obj client.Object, match func(metav1.Object) bool, adopt, release func(context.Context, client.Object) error) (bool, error) {
controllerRef := metav1.GetControllerOf(obj)
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
Expand All @@ -91,7 +93,7 @@ func (m *BaseControllerRefManager) claimObject(obj client.Object, match func(met
// However, if the ownerReference is set for adopting the statefulset, druid
// needs to inject the annotations and remove the ownerReference.
// Return true (successfully claimed) before checking deletion timestamp.
if err := adopt(obj); err != nil {
if err := adopt(ctx, obj); err != nil {
// If the object no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
Expand All @@ -108,7 +110,7 @@ func (m *BaseControllerRefManager) claimObject(obj client.Object, match func(met
if m.Controller.GetDeletionTimestamp() != nil {
return false, nil
}
if err := release(obj); err != nil {
if err := release(ctx, obj); err != nil {
// If the object no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
Expand Down Expand Up @@ -137,7 +139,7 @@ func (m *BaseControllerRefManager) claimObject(obj client.Object, match func(met
// than sts will have ownerReference set as well.
if !checkEtcdAnnotations(obj.GetAnnotations(), m.Controller) {
// Selector matches. Try to adopt.
if err := adopt(obj); err != nil {
if err := adopt(ctx, obj); err != nil {
// If the object no longer exists, ignore the error.
if errors.IsNotFound(err) {
return false, nil
Expand Down Expand Up @@ -188,31 +190,21 @@ func NewEtcdDruidRefManager(
}

// FetchStatefulSet fetches statefulset based on ETCD resource
func (m *EtcdDruidRefManager) FetchStatefulSet(etcd *druidv1alpha1.Etcd) ([]*appsv1.StatefulSet, error) {
func (m *EtcdDruidRefManager) FetchStatefulSet(ctx context.Context, etcd *druidv1alpha1.Etcd) (*appsv1.StatefulSetList, error) {
selector, err := metav1.LabelSelectorAsSelector(etcd.Spec.Selector)
if err != nil {
logger.Error(err, "Error converting etcd selector to selector")
return nil, err
}

// list all statefulsets to include the statefulsets that don't match the etcd`s selector
// anymore but has the stale controller ref.
statefulSets := &appsv1.StatefulSetList{}
err = m.cl.List(context.TODO(), statefulSets, client.InNamespace(etcd.Namespace), client.MatchingLabelsSelector{Selector: selector})
if err != nil {
logger.Error(err, "Error listing statefulsets")
return nil, err
}

// NOTE: filteredStatefulSets are pointing to deep copies of the cache, but this could change in the future.
// Ref: https://github.com/kubernetes-sigs/controller-runtime/blob/release-0.2/pkg/cache/internal/cache_reader.go#L74
// if you need to modify them, you need to copy it first.
filteredStatefulSets, err := m.ClaimStatefulsets(statefulSets)
err = m.cl.List(ctx, statefulSets, client.InNamespace(etcd.Namespace), client.MatchingLabelsSelector{Selector: selector})
Comment on lines -210 to +202
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this correction ❤️

I have created the #186 to separately address simplification of the claim logic.

if err != nil {
return nil, err
}

return filteredStatefulSets, err
return statefulSets, err
}

// ClaimStatefulsets tries to take ownership of a list of Statefulsets.
Expand All @@ -231,9 +223,11 @@ func (m *EtcdDruidRefManager) FetchStatefulSet(etcd *druidv1alpha1.Etcd) ([]*app
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of statefulsets that you now own is returned.
func (m *EtcdDruidRefManager) ClaimStatefulsets(sts *appsv1.StatefulSetList, filters ...func(*appsv1.StatefulSet) bool) ([]*appsv1.StatefulSet, error) {
var claimed []*appsv1.StatefulSet
var errlist []error
func (m *EtcdDruidRefManager) ClaimStatefulsets(ctx context.Context, statefulSetList *appsv1.StatefulSetList, filters ...func(*appsv1.StatefulSet) bool) ([]*appsv1.StatefulSet, error) {
var (
claimed []*appsv1.StatefulSet
errlist []error
)

match := func(obj metav1.Object) bool {
ss := obj.(*appsv1.StatefulSet)
Expand All @@ -249,15 +243,15 @@ func (m *EtcdDruidRefManager) ClaimStatefulsets(sts *appsv1.StatefulSetList, fil
return true
}

for k := range sts.Items {
ss := &sts.Items[k]
ok, err := m.claimObject(ss, match, m.AdoptResource, m.ReleaseResource)
for k := range statefulSetList.Items {
sts := &statefulSetList.Items[k]
ok, err := m.claimObject(ctx, sts, match, m.AdoptResource, m.ReleaseResource)
if err != nil {
errlist = append(errlist, err)
continue
}
if ok {
claimed = append(claimed, ss)
claimed = append(claimed, sts)
}
}
return claimed, utilerrors.NewAggregate(errlist)
Expand All @@ -278,7 +272,7 @@ func (m *EtcdDruidRefManager) ClaimStatefulsets(sts *appsv1.StatefulSetList, fil
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of Services that you now own is returned.
func (m *EtcdDruidRefManager) ClaimServices(svcs *corev1.ServiceList, filters ...func(*corev1.Service) bool) ([]*corev1.Service, error) {
func (m *EtcdDruidRefManager) ClaimServices(ctx context.Context, svcs *corev1.ServiceList, filters ...func(*corev1.Service) bool) ([]*corev1.Service, error) {
var claimed []*corev1.Service
var errlist []error

Expand All @@ -298,7 +292,7 @@ func (m *EtcdDruidRefManager) ClaimServices(svcs *corev1.ServiceList, filters ..

for k := range svcs.Items {
svc := &svcs.Items[k]
ok, err := m.claimObject(svc, match, m.AdoptResource, m.ReleaseResource)
ok, err := m.claimObject(ctx, svc, match, m.AdoptResource, m.ReleaseResource)

if err != nil {
errlist = append(errlist, err)
Expand Down Expand Up @@ -326,7 +320,7 @@ func (m *EtcdDruidRefManager) ClaimServices(svcs *corev1.ServiceList, filters ..
//
// If the error is nil, either the reconciliation succeeded, or no
// reconciliation was necessary. The list of Services that you now own is returned.
func (m *EtcdDruidRefManager) ClaimConfigMaps(cms *corev1.ConfigMapList, filters ...func(*corev1.ConfigMap) bool) ([]*corev1.ConfigMap, error) {
func (m *EtcdDruidRefManager) ClaimConfigMaps(ctx context.Context, cms *corev1.ConfigMapList, filters ...func(*corev1.ConfigMap) bool) ([]*corev1.ConfigMap, error) {
var claimed []*corev1.ConfigMap
var errlist []error

Expand All @@ -346,7 +340,7 @@ func (m *EtcdDruidRefManager) ClaimConfigMaps(cms *corev1.ConfigMapList, filters

for k := range cms.Items {
cm := &cms.Items[k]
ok, err := m.claimObject(cm, match, m.AdoptResource, m.ReleaseResource)
ok, err := m.claimObject(ctx, cm, match, m.AdoptResource, m.ReleaseResource)

if err != nil {
errlist = append(errlist, err)
Expand All @@ -361,7 +355,7 @@ func (m *EtcdDruidRefManager) ClaimConfigMaps(cms *corev1.ConfigMapList, filters

// AdoptResource sends a patch to take control of the Etcd. It returns the error if
// the patching fails.
func (m *EtcdDruidRefManager) AdoptResource(obj client.Object) error {
func (m *EtcdDruidRefManager) AdoptResource(ctx context.Context, obj client.Object) error {
if err := m.CanAdopt(); err != nil {
return fmt.Errorf("can't adopt resource %v/%v (%v): %v", obj.GetNamespace(), obj.GetName(), obj.GetUID(), err)
}
Expand All @@ -375,7 +369,12 @@ func (m *EtcdDruidRefManager) AdoptResource(obj client.Object) error {
if annotations == nil {
annotations = map[string]string{}
}
annotations[common.GardenerOwnedBy] = fmt.Sprintf("%s/%s", m.Controller.GetNamespace(), m.Controller.GetName())
objectKey, err := cache.MetaNamespaceKeyFunc(m.Controller)
if err != nil {
return err
}

annotations[common.GardenerOwnedBy] = objectKey
annotations[common.GardenerOwnerType] = strings.ToLower(etcdGVK.Kind)
clone.SetAnnotations(annotations)
case *corev1.ConfigMap:
Expand All @@ -396,12 +395,12 @@ func (m *EtcdDruidRefManager) AdoptResource(obj client.Object) error {
return fmt.Errorf("cannot adopt resource: %s", objType)
}

return m.cl.Patch(context.TODO(), clone, client.MergeFrom(obj))
return m.cl.Patch(ctx, clone, client.MergeFrom(obj))
}

// ReleaseResource sends a patch to free the resource from the control of the controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *EtcdDruidRefManager) ReleaseResource(obj client.Object) error {
func (m *EtcdDruidRefManager) ReleaseResource(ctx context.Context, obj client.Object) error {
var clone client.Object
switch objType := obj.(type) {
case *appsv1.StatefulSet:
Expand All @@ -421,7 +420,7 @@ func (m *EtcdDruidRefManager) ReleaseResource(obj client.Object) error {

m.disown(clone)

err := client.IgnoreNotFound(m.cl.Patch(context.TODO(), clone, client.MergeFrom(obj)))
err := client.IgnoreNotFound(m.cl.Patch(ctx, clone, client.MergeFrom(obj)))
if errors.IsInvalid(err) {
// Invalid error will be returned in two cases: 1. the etcd
// has no owner reference, 2. the uid of the etcd doesn't
Expand Down
Loading