Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Make garbage-collection labelling resource-specific #1798

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cluster/kubernetes/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (o *baseObject) debyte() {
o.bytes = nil
}

func PolicyFromAnnotations(annotations map[string]string) policy.Set {
func PoliciesFromAnnotations(annotations map[string]string) policy.Set {
set := policy.Set{}
for k, v := range annotations {
if strings.HasPrefix(k, PolicyPrefix) {
Expand All @@ -95,7 +95,7 @@ func PolicyFromAnnotations(annotations map[string]string) policy.Set {
}

func (o baseObject) Policies() policy.Set {
return PolicyFromAnnotations(o.Meta.Annotations)
return PoliciesFromAnnotations(o.Meta.Annotations)
}

func (o baseObject) Source() string {
Expand Down
64 changes: 45 additions & 19 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kubernetes
import (
"bytes"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
Expand All @@ -28,7 +30,12 @@ import (
)

const (
syncSetLabel = kresource.PolicyPrefix + "sync-set"
// We use mark-and-sweep garbage collection to delete cluster objects.
// Marking is done by adding a label when creating and updating the objects.
// Sweeping is done by comparing Marked cluster objects with the manifests in Git.
gcMarkLabel = kresource.PolicyPrefix + "sync-gc-mark"
// We want to prevent garbage-collecting cluster objects which haven't been updated.
// We annotate objects with the checksum of their Git manifest to verify this.
checksumAnnotation = kresource.PolicyPrefix + "sync-checksum"
)

Expand All @@ -37,7 +44,7 @@ const (
// necessarily indicate complete failure; some resources may succeed
// in being synced, and some may fail (for example, they may be
// malformed).
func (c *Cluster) Sync(spec cluster.SyncSet) error {
func (c *Cluster) Sync(syncSet cluster.SyncSet) error {
logger := log.With(c.logger, "method", "Sync")

// Keep track of the checksum of each resource, so we can compare
Expand All @@ -53,7 +60,7 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {

cs := makeChangeSet()
var errs cluster.SyncError
for _, res := range spec.Resources {
for _, res := range syncSet.Resources {
id := res.ResourceID().String()
// make a record of the checksum, whether we stage it to
// be applied or not, so that we don't delete it later.
Expand All @@ -67,11 +74,11 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {
// It's possible to give a cluster resource the "ignore"
// annotation directly -- e.g., with `kubectl annotate` -- so
// we need to examine the cluster resource here too.
if cres, ok := clusterResources[id]; ok && cres.Policy().Has(policy.Ignore) {
if cres, ok := clusterResources[id]; ok && cres.Policies().Has(policy.Ignore) {
logger.Log("info", "not applying resource; ignore annotation in cluster resource", "resource", cres.ResourceID())
continue
}
resBytes, err := applyMetadata(res, spec.Name, checkHex)
resBytes, err := applyMetadata(res, syncSet.Name, checkHex)
if err == nil {
cs.stage("apply", res.ResourceID(), res.Source(), resBytes)
} else {
Expand All @@ -89,7 +96,7 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {
c.muSyncErrors.RUnlock()

if c.GC {
deleteErrs, gcFailure := c.collectGarbage(spec, checksums, logger)
deleteErrs, gcFailure := c.collectGarbage(syncSet, checksums, logger)
if gcFailure != nil {
return gcFailure
}
Expand All @@ -108,13 +115,13 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {
}

func (c *Cluster) collectGarbage(
spec cluster.SyncSet,
syncSet cluster.SyncSet,
checksums map[string]string,
logger log.Logger) (cluster.SyncError, error) {

orphanedResources := makeChangeSet()

clusterResources, err := c.getResourcesInSyncSet(spec.Name)
clusterResources, err := c.getGCMarkedResourcesInSyncSet(syncSet.Name)
if err != nil {
return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection")
}
Expand Down Expand Up @@ -168,16 +175,18 @@ metadata:
`, r.obj.GetAPIVersion(), r.obj.GetKind(), r.obj.GetNamespace(), r.obj.GetName()))
}

func (r *kuberesource) Policy() policy.Set {
return kresource.PolicyFromAnnotations(r.obj.GetAnnotations())
func (r *kuberesource) Policies() policy.Set {
return kresource.PoliciesFromAnnotations(r.obj.GetAnnotations())
}

// GetChecksum returns the checksum recorded on the resource from
// Kubernetes, or an empty string if it's not present.
func (r *kuberesource) GetChecksum() string {
return r.obj.GetAnnotations()[checksumAnnotation]
}

func (r *kuberesource) GetGCMark() string {
return r.obj.GetLabels()[gcMarkLabel]
}

func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) {
listOptions := meta_v1.ListOptions{}
if selector != "" {
Expand Down Expand Up @@ -238,23 +247,31 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
return result, nil
}

// exportResourcesInStack collates all the resources that belong to a
// stack, i.e., were applied by flux.
func (c *Cluster) getResourcesInSyncSet(name string) (map[string]*kuberesource, error) {
return c.getResourcesBySelector(fmt.Sprintf("%s=%s", syncSetLabel, name)) // means "has label <<stackLabel>>"
func (c *Cluster) getGCMarkedResourcesInSyncSet(syncSetName string) (map[string]*kuberesource, error) {
allGCMarkedResources, err := c.getResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists"
if err != nil {
return nil, err
}
syncSetGCMarkedResources := map[string]*kuberesource{}
for resID, kres := range allGCMarkedResources {
if kres.GetGCMark() == makeGCMark(syncSetName, resID) {
syncSetGCMarkedResources[resID] = kres
}
}
return syncSetGCMarkedResources, nil
}

func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error) {
func applyMetadata(res resource.Resource, syncSetName, checksum string) ([]byte, error) {
definition := map[interface{}]interface{}{}
if err := yaml.Unmarshal(res.Bytes(), &definition); err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("failed to parse yaml from %s", res.Source()))
}

mixin := map[string]interface{}{}

if set != "" {
if syncSetName != "" {
mixinLabels := map[string]string{}
mixinLabels[syncSetLabel] = set
mixinLabels[gcMarkLabel] = makeGCMark(syncSetName, res.ResourceID().String())
mixin["labels"] = mixinLabels
}

Expand All @@ -275,6 +292,15 @@ func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error)
return bytes, nil
}

func makeGCMark(syncSetName, resourceID string) string {
hasher := sha256.New()
hasher.Write([]byte(syncSetName))
// To prevent deleting objects with copied labels
// an object-specific mark is created (by including its identifier).
hasher.Write([]byte(resourceID))
return "sha256:" + base64.RawURLEncoding.EncodeToString(hasher.Sum(nil))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the colon : is not allowed in the k8s version we have running:

ts=2019-03-07T20:07:59.874086773Z caller=sync.go:482 component=cluster method=Sync cmd="kubectl apply -f -" took=1.260555818s err="running kubectl: The Secret \"my-secret\" is invalid: metadata.labels: Invalid value: \"sha256:XmUQTWgJCpTt_vgSxwB113djqIlbulw_bNcdVL9zNmk\": a valid label must be an empty string or consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyValue',  or 'my_value',  or '12345', regex used for validation is '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?')" output=

will create an issue with more information

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rndstr so were you debuggin this for the last hour too? #1805

}

// --- internal types for keeping track of syncing

type applyObject struct {
Expand Down
51 changes: 49 additions & 2 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ metadata:
panic(err)
}

// Now check that the resources were created
actual, err := kube.getResourcesInSyncSet("testset")
// Now check what resources remain in the sync set
actual, err := kube.getGCMarkedResourcesInSyncSet("testset")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -370,6 +370,53 @@ metadata:
test(t, kube, withoutNS, withNS, false)
})

t.Run("sync won't delete resources whose garbage collection mark was copied to", func(t *testing.T) {
kube, _ := setup(t)
kube.GC = true

depName := "dep"
depNS := "foobar"
dep := fmt.Sprintf(`---
apiVersion: apps/v1
kind: Deployment
metadata:
name: %s
namespace: %s
`, depName, depNS)

// Add dep to the cluster through syncing
test(t, kube, dep, dep, false)

// Add a copy of dep (including the GCmark label) with different name directly to the cluster
gvr := schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
}
client := kube.client.dynamicClient.Resource(gvr).Namespace(depNS)
depActual, err := client.Get(depName, metav1.GetOptions{})
assert.NoError(t, err)
depCopy := depActual.DeepCopy()
depCopyName := depName + "copy"
depCopy.SetName(depCopyName)
depCopyActual, err := client.Create(depCopy)
assert.NoError(t, err)

// Check that both dep and its copy have the same GCmark label
assert.Equal(t, depActual.GetName()+"copy", depCopyActual.GetName())
assert.NotEmpty(t, depActual.GetLabels()[gcMarkLabel])
assert.Equal(t, depActual.GetLabels()[gcMarkLabel], depCopyActual.GetLabels()[gcMarkLabel])

// Remove defs1 from the cluster through syncing
test(t, kube, "", "", false)

// Check that defs1 is removed from the cluster but its copy isn't, due to having a different name
_, err = client.Get(depName, metav1.GetOptions{})
assert.Error(t, err)
_, err = client.Get(depCopyName, metav1.GetOptions{})
assert.NoError(t, err)
})

t.Run("sync won't delete if apply failed", func(t *testing.T) {
kube, _ := setup(t)
kube.GC = true
Expand Down
8 changes: 3 additions & 5 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb
).Observe(time.Since(started).Seconds())
}()

syncSetName := makeSyncLabel(d.Repo.Origin(), d.GitConfig)
syncSetName := makeGitConfigHash(d.Repo.Origin(), d.GitConfig)

// We don't care how long this takes overall, only about not
// getting bogged down in certain operations, so use an
Expand Down Expand Up @@ -455,15 +455,13 @@ func isUnknownRevision(err error) bool {
strings.Contains(err.Error(), "bad revision"))
}

func makeSyncLabel(remote git.Remote, conf git.Config) string {
func makeGitConfigHash(remote git.Remote, conf git.Config) string {
urlbit := remote.SafeURL()
pathshash := sha256.New()
pathshash.Write([]byte(urlbit))
pathshash.Write([]byte(conf.Branch))
for _, path := range conf.Paths {
pathshash.Write([]byte(path))
}
// the prefix is in part to make sure it's a valid (Kubernetes)
// label value -- a modest abstraction leak
return "git-" + base64.RawURLEncoding.EncodeToString(pathshash.Sum(nil))
return base64.RawURLEncoding.EncodeToString(pathshash.Sum(nil))
}