Skip to content

Commit

Permalink
Merge pull request #974 from fluxcd/progressive-status
Browse files Browse the repository at this point in the history
Introduce Progressive status
  • Loading branch information
darkowlzz authored Jan 10, 2023
2 parents c8feb3a + 197a03b commit 802193c
Show file tree
Hide file tree
Showing 23 changed files with 1,298 additions and 347 deletions.
71 changes: 52 additions & 19 deletions controllers/bucket_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
helper "github.com/fluxcd/pkg/runtime/controller"
"github.com/fluxcd/pkg/runtime/patch"
"github.com/fluxcd/pkg/runtime/predicates"
rreconcile "github.com/fluxcd/pkg/runtime/reconcile"

eventv1 "github.com/fluxcd/pkg/apis/event/v1beta1"
"github.com/fluxcd/pkg/sourceignore"
Expand Down Expand Up @@ -119,6 +120,8 @@ type BucketReconciler struct {

Storage *Storage
ControllerName string

patchOptions []patch.Option
}

type BucketReconcilerOptions struct {
Expand Down Expand Up @@ -151,7 +154,7 @@ type BucketProvider interface {
// bucketReconcileFunc is the function type for all the v1beta2.Bucket
// (sub)reconcile functions. The type implementations are grouped and
// executed serially to perform the complete reconcile of the object.
type bucketReconcileFunc func(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error)
type bucketReconcileFunc func(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error)

// etagIndex is an index of storage object keys and their Etag values.
type etagIndex struct {
Expand Down Expand Up @@ -234,6 +237,8 @@ func (r *BucketReconciler) SetupWithManager(mgr ctrl.Manager) error {
}

func (r *BucketReconciler) SetupWithManagerAndOptions(mgr ctrl.Manager, opts BucketReconcilerOptions) error {
r.patchOptions = getPatchOptions(bucketReadyCondition.Owned, r.ControllerName)

return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.Bucket{}).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicates.ReconcileRequestedPredicate{})).
Expand All @@ -259,18 +264,15 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
r.RecordSuspend(ctx, obj, obj.Spec.Suspend)

// Initialize the patch helper with the current version of the object.
patchHelper, err := patch.NewHelper(obj, r.Client)
if err != nil {
return ctrl.Result{}, err
}
serialPatcher := patch.NewSerialPatcher(obj, r.Client)

// recResult stores the abstracted reconcile result.
var recResult sreconcile.Result

// Always attempt to patch the object and status after each reconciliation
// NOTE: The final runtime result and error are set in this block.
defer func() {
summarizeHelper := summarize.NewHelper(r.EventRecorder, patchHelper)
summarizeHelper := summarize.NewHelper(r.EventRecorder, serialPatcher)
summarizeOpts := []summarize.Option{
summarize.WithConditions(bucketReadyCondition),
summarize.WithReconcileResult(recResult),
Expand Down Expand Up @@ -316,19 +318,35 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
r.reconcileSource,
r.reconcileArtifact,
}
recResult, retErr = r.reconcile(ctx, obj, reconcilers)
recResult, retErr = r.reconcile(ctx, serialPatcher, obj, reconcilers)
return
}

// reconcile iterates through the bucketReconcileFunc tasks for the
// object. It returns early on the first call that returns
// reconcile.ResultRequeue, or produces an error.
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcile(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) {
oldObj := obj.DeepCopy()

// Mark as reconciling if generation differs.
if obj.Generation != obj.Status.ObservedGeneration {
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason, "reconciliation in progress")

var recAtVal string
if v, ok := meta.ReconcileAnnotationValue(obj.GetAnnotations()); ok {
recAtVal = v
}

// Persist reconciling if generation differs or reconciliation is requested.
switch {
case obj.Generation != obj.Status.ObservedGeneration:
rreconcile.ProgressiveStatus(false, obj, meta.ProgressingReason,
"processing object: new generation %d -> %d", obj.Status.ObservedGeneration, obj.Generation)
if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil {
return sreconcile.ResultEmpty, err
}
case recAtVal != obj.Status.GetLastHandledReconcileRequest():
if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil {
return sreconcile.ResultEmpty, err
}
}

// Create temp working dir
Expand Down Expand Up @@ -356,7 +374,7 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
)

for _, rec := range reconcilers {
recResult, err := rec(ctx, obj, index, tmpDir)
recResult, err := rec(ctx, sp, obj, index, tmpDir)
// Exit immediately on ResultRequeue.
if recResult == sreconcile.ResultRequeue {
return sreconcile.ResultRequeue, nil
Expand Down Expand Up @@ -421,22 +439,31 @@ func (r *BucketReconciler) notify(ctx context.Context, oldObj, newObj *sourcev1.
// condition is added.
// The hostname of any URL in the Status of the object are updated, to ensure
// they match the Storage server hostname of current runtime.
func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcileStorage(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, _ *etagIndex, _ string) (sreconcile.Result, error) {
// Garbage collect previous advertised artifact(s) from storage
_ = r.garbageCollect(ctx, obj)

// Determine if the advertised artifact is still in storage
var artifactMissing bool
if artifact := obj.GetArtifact(); artifact != nil && !r.Storage.ArtifactExist(*artifact) {
obj.Status.Artifact = nil
obj.Status.URL = ""
artifactMissing = true
// Remove the condition as the artifact doesn't exist.
conditions.Delete(obj, sourcev1.ArtifactInStorageCondition)
}

// Record that we do not have an artifact
if obj.GetArtifact() == nil {
conditions.MarkReconciling(obj, "NoArtifact", "no artifact for resource in storage")
msg := "building artifact"
if artifactMissing {
msg += ": disappeared from storage"
}
rreconcile.ProgressiveStatus(true, obj, meta.ProgressingReason, msg)
conditions.Delete(obj, sourcev1.ArtifactInStorageCondition)
if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil {
return sreconcile.ResultEmpty, err
}
return sreconcile.ResultSuccess, nil
}

Expand All @@ -453,7 +480,7 @@ func (r *BucketReconciler) reconcileStorage(ctx context.Context, obj *sourcev1.B
// When a SecretRef is defined, it attempts to fetch the Secret before calling
// the provider. If this fails, it records v1beta2.FetchFailedCondition=True on
// the object and returns early.
func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
secret, err := r.getBucketSecret(ctx, obj)
if err != nil {
e := &serror.Event{Err: err, Reason: sourcev1.AuthenticationFailedReason}
Expand Down Expand Up @@ -528,8 +555,14 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu

if !obj.GetArtifact().HasRevision(revision) {
message := fmt.Sprintf("new upstream revision '%s'", revision)
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
conditions.MarkReconciling(obj, "NewRevision", message)
if obj.GetArtifact() != nil {
conditions.MarkTrue(obj, sourcev1.ArtifactOutdatedCondition, "NewRevision", message)
}
rreconcile.ProgressiveStatus(true, obj, meta.ProgressingReason, "building artifact: %s", message)
if err := sp.Patch(ctx, obj, r.patchOptions...); err != nil {
ctrl.LoggerFrom(ctx).Error(err, "failed to patch")
return
}
}
}()

Expand All @@ -554,7 +587,7 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, obj *sourcev1.Bu
// early.
// On a successful archive, the Artifact in the Status of the object is set,
// and the symlink in the Storage is updated to its path.
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
func (r *BucketReconciler) reconcileArtifact(ctx context.Context, sp *patch.SerialPatcher, obj *sourcev1.Bucket, index *etagIndex, dir string) (sreconcile.Result, error) {
// Calculate revision
revision, err := index.Revision()
if err != nil {
Expand All @@ -572,7 +605,7 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
if obj.GetArtifact().HasRevision(artifact.Revision) {
conditions.Delete(obj, sourcev1.ArtifactOutdatedCondition)
conditions.MarkTrue(obj, sourcev1.ArtifactInStorageCondition, meta.SucceededReason,
"stored artifact for revision '%s'", artifact.Revision)
"stored artifact: revision '%s'", artifact.Revision)
}
}()

Expand Down
Loading

0 comments on commit 802193c

Please sign in to comment.