Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Make garbage-collection labelling resource-specific
Browse files Browse the repository at this point in the history
This prevents unintended delitions from happening if/when cluster object labels
are copied around.

I also, changed some names and tweaked comments a bit.
  • Loading branch information
Alfonso Acosta committed Mar 6, 2019
1 parent c2ff9e7 commit a2ca569
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 27 deletions.
4 changes: 2 additions & 2 deletions cluster/kubernetes/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (o *baseObject) debyte() {
o.bytes = nil
}

func PolicyFromAnnotations(annotations map[string]string) policy.Set {
func PoliciesFromAnnotations(annotations map[string]string) policy.Set {
set := policy.Set{}
for k, v := range annotations {
if strings.HasPrefix(k, PolicyPrefix) {
Expand All @@ -95,7 +95,7 @@ func PolicyFromAnnotations(annotations map[string]string) policy.Set {
}

func (o baseObject) Policies() policy.Set {
return PolicyFromAnnotations(o.Meta.Annotations)
return PoliciesFromAnnotations(o.Meta.Annotations)
}

func (o baseObject) Source() string {
Expand Down
64 changes: 45 additions & 19 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package kubernetes
import (
"bytes"
"crypto/sha1"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
Expand All @@ -28,7 +30,12 @@ import (
)

const (
syncSetLabel = kresource.PolicyPrefix + "sync-set"
// We use mark-and-sweep garbage collection to delete cluster objects.
// Marking is done by adding a label when creating and updating the objects.
// Sweeping is done by comparing Marked cluster objects with the manifests in Git.
gcMarkLabel = kresource.PolicyPrefix + "sync-gc-mark"
// We want to prevent garbage-collecting cluster objects which haven't been updated.
// We annotate objects with the checksum of their Git manifest to verify this.
checksumAnnotation = kresource.PolicyPrefix + "sync-checksum"
)

Expand All @@ -37,7 +44,7 @@ const (
// necessarily indicate complete failure; some resources may succeed
// in being synced, and some may fail (for example, they may be
// malformed).
func (c *Cluster) Sync(spec cluster.SyncSet) error {
func (c *Cluster) Sync(syncSet cluster.SyncSet) error {
logger := log.With(c.logger, "method", "Sync")

// Keep track of the checksum of each resource, so we can compare
Expand All @@ -53,7 +60,7 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {

cs := makeChangeSet()
var errs cluster.SyncError
for _, res := range spec.Resources {
for _, res := range syncSet.Resources {
id := res.ResourceID().String()
// make a record of the checksum, whether we stage it to
// be applied or not, so that we don't delete it later.
Expand All @@ -67,11 +74,11 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {
// It's possible to give a cluster resource the "ignore"
// annotation directly -- e.g., with `kubectl annotate` -- so
// we need to examine the cluster resource here too.
if cres, ok := clusterResources[id]; ok && cres.Policy().Has(policy.Ignore) {
if cres, ok := clusterResources[id]; ok && cres.Policies().Has(policy.Ignore) {
logger.Log("info", "not applying resource; ignore annotation in cluster resource", "resource", cres.ResourceID())
continue
}
resBytes, err := applyMetadata(res, spec.Name, checkHex)
resBytes, err := applyMetadata(res, syncSet.Name, checkHex)
if err == nil {
cs.stage("apply", res.ResourceID(), res.Source(), resBytes)
} else {
Expand All @@ -89,7 +96,7 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {
c.muSyncErrors.RUnlock()

if c.GC {
deleteErrs, gcFailure := c.collectGarbage(spec, checksums, logger)
deleteErrs, gcFailure := c.collectGarbage(syncSet, checksums, logger)
if gcFailure != nil {
return gcFailure
}
Expand All @@ -108,13 +115,13 @@ func (c *Cluster) Sync(spec cluster.SyncSet) error {
}

func (c *Cluster) collectGarbage(
spec cluster.SyncSet,
syncSet cluster.SyncSet,
checksums map[string]string,
logger log.Logger) (cluster.SyncError, error) {

orphanedResources := makeChangeSet()

clusterResources, err := c.getResourcesInSyncSet(spec.Name)
clusterResources, err := c.getSyncSetGCMarkedResources(syncSet.Name)
if err != nil {
return nil, errors.Wrap(err, "collating resources in cluster for calculating garbage collection")
}
Expand Down Expand Up @@ -168,16 +175,18 @@ metadata:
`, r.obj.GetAPIVersion(), r.obj.GetKind(), r.obj.GetNamespace(), r.obj.GetName()))
}

func (r *kuberesource) Policy() policy.Set {
return kresource.PolicyFromAnnotations(r.obj.GetAnnotations())
func (r *kuberesource) Policies() policy.Set {
return kresource.PoliciesFromAnnotations(r.obj.GetAnnotations())
}

// GetChecksum returns the checksum recorded on the resource from
// Kubernetes, or an empty string if it's not present.
func (r *kuberesource) GetChecksum() string {
return r.obj.GetAnnotations()[checksumAnnotation]
}

func (r *kuberesource) GetGCMark() string {
return r.obj.GetLabels()[gcMarkLabel]
}

func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesource, error) {
listOptions := meta_v1.ListOptions{}
if selector != "" {
Expand Down Expand Up @@ -238,23 +247,31 @@ func (c *Cluster) getResourcesBySelector(selector string) (map[string]*kuberesou
return result, nil
}

// exportResourcesInStack collates all the resources that belong to a
// stack, i.e., were applied by flux.
func (c *Cluster) getResourcesInSyncSet(name string) (map[string]*kuberesource, error) {
return c.getResourcesBySelector(fmt.Sprintf("%s=%s", syncSetLabel, name)) // means "has label <<stackLabel>>"
func (c *Cluster) getSyncSetGCMarkedResources(syncSetName string) (map[string]*kuberesource, error) {
allGCMarkedResources, err := c.getResourcesBySelector(gcMarkLabel) // means "gcMarkLabel exists"
if err != nil {
return nil, err
}
syncSetGCMarkedResources := map[string]*kuberesource{}
for resID, kres := range allGCMarkedResources {
if kres.GetGCMark() == makeGCMark(syncSetName, resID) {
syncSetGCMarkedResources[resID] = kres
}
}
return syncSetGCMarkedResources, nil
}

func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error) {
func applyMetadata(res resource.Resource, syncSetName, checksum string) ([]byte, error) {
definition := map[interface{}]interface{}{}
if err := yaml.Unmarshal(res.Bytes(), &definition); err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("failed to parse yaml from %s", res.Source()))
}

mixin := map[string]interface{}{}

if set != "" {
if syncSetName != "" {
mixinLabels := map[string]string{}
mixinLabels[syncSetLabel] = set
mixinLabels[gcMarkLabel] = makeGCMark(syncSetName, res.ResourceID().String())
mixin["labels"] = mixinLabels
}

Expand All @@ -275,6 +292,15 @@ func applyMetadata(res resource.Resource, set, checksum string) ([]byte, error)
return bytes, nil
}

func makeGCMark(syncSetName, resourceID string) string {
hasher := sha256.New()
hasher.Write([]byte(syncSetName))
// To prevent deleting objects with copied labels
// an object-specific mark is created (by including its identifier).
hasher.Write([]byte(resourceID))
return "sha256:" + base64.RawURLEncoding.EncodeToString(hasher.Sum(nil))
}

// --- internal types for keeping track of syncing

type applyObject struct {
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ metadata:
}

// Now check that the resources were created
actual, err := kube.getResourcesInSyncSet("testset")
actual, err := kube.getSyncSetGCMarkedResources("testset")
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 3 additions & 5 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb
).Observe(time.Since(started).Seconds())
}()

syncSetName := makeSyncLabel(d.Repo.Origin(), d.GitConfig)
syncSetName := makeGitConfigHash(d.Repo.Origin(), d.GitConfig)

// We don't care how long this takes overall, only about not
// getting bogged down in certain operations, so use an
Expand Down Expand Up @@ -455,15 +455,13 @@ func isUnknownRevision(err error) bool {
strings.Contains(err.Error(), "bad revision"))
}

func makeSyncLabel(remote git.Remote, conf git.Config) string {
func makeGitConfigHash(remote git.Remote, conf git.Config) string {
urlbit := remote.SafeURL()
pathshash := sha256.New()
pathshash.Write([]byte(urlbit))
pathshash.Write([]byte(conf.Branch))
for _, path := range conf.Paths {
pathshash.Write([]byte(path))
}
// the prefix is in part to make sure it's a valid (Kubernetes)
// label value -- a modest abstraction leak
return "git-" + base64.RawURLEncoding.EncodeToString(pathshash.Sum(nil))
return base64.RawURLEncoding.EncodeToString(pathshash.Sum(nil))
}

0 comments on commit a2ca569

Please sign in to comment.