From a085d5bcc8ae7c06f628827fa7d98cdcf2998027 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 6 Mar 2019 18:34:02 +0100 Subject: [PATCH 1/2] Make garbage-collection labelling resource-specific This prevents unintended delitions from happening if/when cluster object labels are copied around. I also, changed some names and tweaked comments a bit. --- cluster/kubernetes/resource/resource.go | 4 +- cluster/kubernetes/sync.go | 64 +++++++++++++++++-------- cluster/kubernetes/sync_test.go | 2 +- daemon/loop.go | 8 ++-- 4 files changed, 51 insertions(+), 27 deletions(-) diff --git a/cluster/kubernetes/resource/resource.go b/cluster/kubernetes/resource/resource.go index dc5f94ad6..78f298f41 100644 --- a/cluster/kubernetes/resource/resource.go +++ b/cluster/kubernetes/resource/resource.go @@ -79,7 +79,7 @@ func (o *baseObject) debyte() { o.bytes = nil } -func PolicyFromAnnotations(annotations map[string]string) policy.Set { +func PoliciesFromAnnotations(annotations map[string]string) policy.Set { set := policy.Set{} for k, v := range annotations { if strings.HasPrefix(k, PolicyPrefix) { @@ -95,7 +95,7 @@ func PolicyFromAnnotations(annotations map[string]string) policy.Set { } func (o baseObject) Policies() policy.Set { - return PolicyFromAnnotations(o.Meta.Annotations) + return PoliciesFromAnnotations(o.Meta.Annotations) } func (o baseObject) Source() string { diff --git a/cluster/kubernetes/sync.go b/cluster/kubernetes/sync.go index 73f55feb5..e372c6a35 100644 --- a/cluster/kubernetes/sync.go +++ b/cluster/kubernetes/sync.go @@ -3,6 +3,8 @@ package kubernetes import ( "bytes" "crypto/sha1" + "crypto/sha256" + "encoding/base64" "encoding/hex" "fmt" "io" @@ -28,7 +30,12 @@ import ( ) const ( - syncSetLabel = kresource.PolicyPrefix + "sync-set" + // We use mark-and-sweep garbage collection to delete cluster objects. + // Marking is done by adding a label when creating and updating the objects. + // Sweeping is done by comparing Marked cluster objects with the manifests in Git. + gcMarkLabel = kresource.PolicyPrefix + "sync-gc-mark" + // We want to prevent garbage-collecting cluster objects which haven't been updated. + // We annotate objects with the checksum of their Git manifest to verify this. checksumAnnotation = kresource.PolicyPrefix + "sync-checksum" ) @@ -37,7 +44,7 @@ const ( // necessarily indicate complete failure; some resources may succeed // in being synced, and some may fail (for example, they may be // malformed). -func (c *Cluster) Sync(spec cluster.SyncSet) error { +func (c *Cluster) Sync(syncSet cluster.SyncSet) error { logger := log.With(c.logger, "method", "Sync") // Keep track of the checksum of each resource, so we can compare @@ -53,7 +60,7 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error { cs := makeChangeSet() var errs cluster.SyncError - for _, res := range spec.Resources { + for _, res := range syncSet.Resources { id := res.ResourceID().String() // make a record of the checksum, whether we stage it to // be applied or not, so that we don't delete it later. @@ -67,11 +74,11 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error { // It's possible to give a cluster resource the "ignore" // annotation directly -- e.g., with `kubectl annotate` -- so // we need to examine the cluster resource here too. - if cres, ok := clusterResources[id]; ok && cres.Policy().Has(policy.Ignore) { + if cres, ok := clusterResources[id]; ok && cres.Policies().Has(policy.Ignore) { logger.Log("info", "not applying resource; ignore annotation in cluster resource", "resource", cres.ResourceID()) continue } - resBytes, err := applyMetadata(res, spec.Name, checkHex) + resBytes, err := applyMetadata(res, syncSet.Name, checkHex) if err == nil { cs.stage("apply", res.ResourceID(), res.Source(), resBytes) } else { @@ -89,7 +96,7 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error { c.muSyncErrors.RUnlock() if c.GC { - deleteErrs, gcFailure := c.collectGarbage(spec, checksums, logger) + deleteErrs, gcFailure := c.collectGarbage(syncSet, checksums, logger) if gcFailure != nil { return gcFailure } @@ -108,13 +115,13 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error { } func (c *Cluster) collectGarbage( - spec cluster.SyncSet, + syncSet cluster.SyncSet, checksums map[string]string, logger log.Logger) (cluster.SyncError, error) { orphanedResources := makeChangeSet() - clusterResources, err := c.getResourcesInSyncSet(spec.Name) + clusterResources, err := c.getGCMarkedResourcesInSyncSet(syncSet.Name) if err != nil { return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection") } @@ -168,16 +175,18 @@ metadata: `, r.obj.GetAPIVersion(), r.obj.GetKind(), r.obj.GetNamespace(), r.obj.GetName())) } -func (r *kuberesource) Policy() policy.Set { - return kresource.PolicyFromAnnotations(r.obj.GetAnnotations()) +func (r *kuberesource) Policies() policy.Set { + return kresource.PoliciesFromAnnotations(r.obj.GetAnnotations()) } -// GetChecksum returns the checksum recorded on the resource from -// Kubernetes, or an empty string if it's not present. func (r *kuberesource) GetChecksum() string { return r.obj.GetAnnotations()[checksumAnnotation] } +func (r *kuberesource) GetGCMark() string { + return r.obj.GetLabels()[gcMarkLabel] +} + func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) { listOptions := meta_v1.ListOptions{} if selector != "" { @@ -238,13 +247,21 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou return result, nil } -// exportResourcesInStack collates all the resources that belong to a -// stack, i.e., were applied by flux. -func (c *Cluster) getResourcesInSyncSet(name string) (map[string]*kuberesource, error) { - return c.getResourcesBySelector(fmt.Sprintf("%s=%s", syncSetLabel, name)) // means "has label <>" +func (c *Cluster) getGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) { + allGCMarkedResources, err := c.getResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists" + if err != nil { + return nil, err + } + syncSetGCMarkedResources := map[string]*kuberesource{} + for resID, kres := range allGCMarkedResources { + if kres.GetGCMark() == makeGCMark(syncSetName, resID) { + syncSetGCMarkedResources[resID] = kres + } + } + return syncSetGCMarkedResources, nil } -func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error) { +func applyMetadata(res resource.Resource, syncSetName, checksum string) ([]byte, error) { definition := map[interface{}]interface{}{} if err := yaml.Unmarshal(res.Bytes(), &definition); err != nil { return nil, errors.Wrap(err, fmt.Sprintf("failed to parse yaml from %s", res.Source())) @@ -252,9 +269,9 @@ func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error) mixin := map[string]interface{}{} - if set != "" { + if syncSetName != "" { mixinLabels := map[string]string{} - mixinLabels[syncSetLabel] = set + mixinLabels[gcMarkLabel] = makeGCMark(syncSetName, res.ResourceID().String()) mixin["labels"] = mixinLabels } @@ -275,6 +292,15 @@ func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error) return bytes, nil } +func makeGCMark(syncSetName, resourceID string) string { + hasher := sha256.New() + hasher.Write([]byte(syncSetName)) + // To prevent deleting objects with copied labels + // an object-specific mark is created (by including its identifier). + hasher.Write([]byte(resourceID)) + return "sha256:" + base64.RawURLEncoding.EncodeToString(hasher.Sum(nil)) +} + // --- internal types for keeping track of syncing type applyObject struct { diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index d1de2d7a2..b595b2c2d 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -291,7 +291,7 @@ metadata: } // Now check that the resources were created - actual, err := kube.getResourcesInSyncSet("testset") + actual, err := kube.getGCMarkedResourcesInSyncSet("testset") if err != nil { t.Fatal(err) } diff --git a/daemon/loop.go b/daemon/loop.go index 10ea0aff4..a60e44e3e 100644 --- a/daemon/loop.go +++ b/daemon/loop.go @@ -160,7 +160,7 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb ).Observe(time.Since(started).Seconds()) }() - syncSetName := makeSyncLabel(d.Repo.Origin(), d.GitConfig) + syncSetName := makeGitConfigHash(d.Repo.Origin(), d.GitConfig) // We don't care how long this takes overall, only about not // getting bogged down in certain operations, so use an @@ -455,7 +455,7 @@ func isUnknownRevision(err error) bool { strings.Contains(err.Error(), "bad revision")) } -func makeSyncLabel(remote git.Remote, conf git.Config) string { +func makeGitConfigHash(remote git.Remote, conf git.Config) string { urlbit := remote.SafeURL() pathshash := sha256.New() pathshash.Write([]byte(urlbit)) @@ -463,7 +463,5 @@ func makeSyncLabel(remote git.Remote, conf git.Config) string { for _, path := range conf.Paths { pathshash.Write([]byte(path)) } - // the prefix is in part to make sure it's a valid (Kubernetes) - // label value -- a modest abstraction leak - return "git-" + base64.RawURLEncoding.EncodeToString(pathshash.Sum(nil)) + return base64.RawURLEncoding.EncodeToString(pathshash.Sum(nil)) } From 03201f7c9f2706925f7b236b83f4bdc0c3339d97 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 7 Mar 2019 14:38:34 +0000 Subject: [PATCH 2/2] Test that copying a GCmark label doesn't result in a deletion --- cluster/kubernetes/sync_test.go | 49 ++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/cluster/kubernetes/sync_test.go b/cluster/kubernetes/sync_test.go index b595b2c2d..501310260 100644 --- a/cluster/kubernetes/sync_test.go +++ b/cluster/kubernetes/sync_test.go @@ -290,7 +290,7 @@ metadata: panic(err) } - // Now check that the resources were created + // Now check what resources remain in the sync set actual, err := kube.getGCMarkedResourcesInSyncSet("testset") if err != nil { t.Fatal(err) @@ -370,6 +370,53 @@ metadata: test(t, kube, withoutNS, withNS, false) }) + t.Run("sync won't delete resources whose garbage collection mark was copied to", func(t *testing.T) { + kube, _ := setup(t) + kube.GC = true + + depName := "dep" + depNS := "foobar" + dep := fmt.Sprintf(`--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: %s + namespace: %s +`, depName, depNS) + + // Add dep to the cluster through syncing + test(t, kube, dep, dep, false) + + // Add a copy of dep (including the GCmark label) with different name directly to the cluster + gvr := schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "deployments", + } + client := kube.client.dynamicClient.Resource(gvr).Namespace(depNS) + depActual, err := client.Get(depName, metav1.GetOptions{}) + assert.NoError(t, err) + depCopy := depActual.DeepCopy() + depCopyName := depName + "copy" + depCopy.SetName(depCopyName) + depCopyActual, err := client.Create(depCopy) + assert.NoError(t, err) + + // Check that both dep and its copy have the same GCmark label + assert.Equal(t, depActual.GetName()+"copy", depCopyActual.GetName()) + assert.NotEmpty(t, depActual.GetLabels()[gcMarkLabel]) + assert.Equal(t, depActual.GetLabels()[gcMarkLabel], depCopyActual.GetLabels()[gcMarkLabel]) + + // Remove defs1 from the cluster through syncing + test(t, kube, "", "", false) + + // Check that defs1 is removed from the cluster but its copy isn't, due to having a different name + _, err = client.Get(depName, metav1.GetOptions{}) + assert.Error(t, err) + _, err = client.Get(depCopyName, metav1.GetOptions{}) + assert.NoError(t, err) + }) + t.Run("sync won't delete if apply failed", func(t *testing.T) { kube, _ := setup(t) kube.GC = true