-
Notifications
You must be signed in to change notification settings - Fork 31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛clustercatalog_controller: hacky, but more correct status updating in reconciler #424
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,11 +20,14 @@ import ( | |
"context" // #nosec | ||
"errors" | ||
"fmt" | ||
"slices" | ||
"sync" | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/api/equality" | ||
"k8s.io/apimachinery/pkg/api/meta" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/util/sets" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
@@ -47,9 +50,22 @@ const ( | |
// ClusterCatalogReconciler reconciles a Catalog object | ||
type ClusterCatalogReconciler struct { | ||
client.Client | ||
Unpacker source.Unpacker | ||
Storage storage.Instance | ||
Finalizers crfinalizer.Finalizers | ||
Unpacker source.Unpacker | ||
Storage storage.Instance | ||
|
||
finalizers crfinalizer.Finalizers | ||
|
||
// TODO: The below storedCatalogs fields are used for a quick a hack that helps | ||
// us correctly populate a ClusterCatalog's status. The fact that we need | ||
// these is indicative of a larger problem with the design of one or both | ||
// of the Unpacker and Storage interfaces. We should fix this. | ||
storedCatalogsMu sync.RWMutex | ||
storedCatalogs map[string]storedCatalogData | ||
} | ||
|
||
type storedCatalogData struct { | ||
observedGeneration int64 | ||
unpackResult source.Result | ||
} | ||
|
||
//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clustercatalogs,verbs=get;list;watch;create;update;patch;delete | ||
|
@@ -75,6 +91,14 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque | |
reconciledCatsrc := existingCatsrc.DeepCopy() | ||
res, reconcileErr := r.reconcile(ctx, reconciledCatsrc) | ||
|
||
// If we encounter an error, we should delete the stored catalog metadata | ||
// which represents the state of a successfully unpacked catalog. Deleting | ||
// this state ensures that we will continue retrying the unpacking process | ||
// until it succeeds. | ||
if reconcileErr != nil { | ||
r.deleteStoredCatalog(reconciledCatsrc.Name) | ||
} | ||
|
||
// Do checks before any Update()s, as Update() may modify the resource structure! | ||
updateStatus := !equality.Semantic.DeepEqual(existingCatsrc.Status, reconciledCatsrc.Status) | ||
updateFinalizers := !equality.Semantic.DeepEqual(existingCatsrc.Finalizers, reconciledCatsrc.Finalizers) | ||
|
@@ -109,6 +133,14 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque | |
|
||
// SetupWithManager sets up the controller with the Manager. | ||
func (r *ClusterCatalogReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
r.storedCatalogsMu.Lock() | ||
defer r.storedCatalogsMu.Unlock() | ||
r.storedCatalogs = make(map[string]storedCatalogData) | ||
|
||
if err := r.setupFinalizers(); err != nil { | ||
return fmt.Errorf("failed to setup finalizers: %v", err) | ||
} | ||
|
||
return ctrl.NewControllerManagedBy(mgr). | ||
For(&v1alpha1.ClusterCatalog{}). | ||
Complete(r) | ||
|
@@ -123,7 +155,9 @@ func (r *ClusterCatalogReconciler) SetupWithManager(mgr ctrl.Manager) error { | |
// linting from the linter that was fussing about this. | ||
// nolint:unparam | ||
func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *v1alpha1.ClusterCatalog) (ctrl.Result, error) { | ||
finalizeResult, err := r.Finalizers.Finalize(ctx, catalog) | ||
l := log.FromContext(ctx) | ||
|
||
finalizeResult, err := r.finalizers.Finalize(ctx, catalog) | ||
if err != nil { | ||
return ctrl.Result{}, err | ||
} | ||
|
@@ -133,55 +167,125 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *v1alp | |
return ctrl.Result{}, nil | ||
} | ||
|
||
if !r.needsUnpacking(catalog) { | ||
return ctrl.Result{}, nil | ||
// TODO: The below algorithm to get the current state based on an in-memory | ||
// storedCatalogs map is a hack that helps us keep the ClusterCatalog's | ||
// status up-to-date. The fact that we need this setup is indicative of | ||
// a larger problem with the design of one or both of the Unpacker and | ||
// Storage interfaces and/or their interactions. We should fix this. | ||
expectedStatus, storedCatalog, hasStoredCatalog := r.getCurrentState(catalog) | ||
|
||
// If any of the following are true, we need to unpack the catalog: | ||
// - we don't have a stored catalog in the map | ||
// - we have a stored catalog, but the content doesn't exist on disk | ||
// - we have a stored catalog, the content exists, but the expected status differs from the actual status | ||
// - we have a stored catalog, the content exists, the status looks correct, but the catalog generation is different from the observed generation in the stored catalog | ||
// - we have a stored catalog, the content exists, the status looks correct and reflects the catalog generation, but it is time to poll again | ||
needsUnpack := false | ||
switch { | ||
case !hasStoredCatalog: | ||
l.Info("unpack required: no cached catalog metadata found for this catalog") | ||
needsUnpack = true | ||
case !r.Storage.ContentExists(catalog.Name): | ||
l.Info("unpack required: no stored content found for this catalog") | ||
needsUnpack = true | ||
case !equality.Semantic.DeepEqual(catalog.Status, *expectedStatus): | ||
l.Info("unpack required: current ClusterCatalog status differs from expected status") | ||
needsUnpack = true | ||
case catalog.Generation != storedCatalog.observedGeneration: | ||
l.Info("unpack required: catalog generation differs from observed generation") | ||
needsUnpack = true | ||
case r.needsPoll(storedCatalog.unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog): | ||
l.Info("unpack required: poll duration has elapsed") | ||
needsUnpack = true | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I intentionally added lots of logging here to help us deduce why we might be unpacking more frequently than expected. |
||
|
||
if !needsUnpack { | ||
// No need to update the status because we've already checked | ||
// that it is set correctly. Otherwise, we'd be unpacking again. | ||
return nextPollResult(storedCatalog.unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog), nil | ||
} | ||
|
||
unpackResult, err := r.Unpacker.Unpack(ctx, catalog) | ||
if err != nil { | ||
unpackErr := fmt.Errorf("source catalog content: %w", err) | ||
updateStatusProgressing(catalog, unpackErr) | ||
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), unpackErr) | ||
return ctrl.Result{}, unpackErr | ||
} | ||
|
||
switch unpackResult.State { | ||
case source.StateUnpacked: | ||
contentURL := "" | ||
// TODO: We should check to see if the unpacked result has the same content | ||
// as the already unpacked content. If it does, we should skip this rest | ||
// of the unpacking steps. | ||
err := r.Storage.Store(ctx, catalog.Name, unpackResult.FS) | ||
if err != nil { | ||
storageErr := fmt.Errorf("error storing fbc: %v", err) | ||
updateStatusProgressing(catalog, storageErr) | ||
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr) | ||
return ctrl.Result{}, storageErr | ||
Comment on lines
+223
to
224
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. considering this isn't a terminal error, should we requeue here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By returning the |
||
} | ||
contentURL = r.Storage.ContentURL(catalog.Name) | ||
|
||
updateStatusProgressing(catalog, nil) | ||
updateStatusServing(&catalog.Status, unpackResult, contentURL, catalog.GetGeneration()) | ||
|
||
var requeueAfter time.Duration | ||
switch catalog.Spec.Source.Type { | ||
case v1alpha1.SourceTypeImage: | ||
if catalog.Spec.Source.Image != nil && catalog.Spec.Source.Image.PollInterval != nil { | ||
requeueAfter = wait.Jitter(catalog.Spec.Source.Image.PollInterval.Duration, requeueJitterMaxFactor) | ||
} | ||
} | ||
contentURL := r.Storage.ContentURL(catalog.Name) | ||
|
||
return ctrl.Result{RequeueAfter: requeueAfter}, nil | ||
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil) | ||
updateStatusServing(&catalog.Status, *unpackResult, contentURL, catalog.GetGeneration()) | ||
default: | ||
panic(fmt.Sprintf("unknown unpack state %q", unpackResult.State)) | ||
} | ||
|
||
r.storedCatalogsMu.Lock() | ||
r.storedCatalogs[catalog.Name] = storedCatalogData{ | ||
unpackResult: *unpackResult, | ||
observedGeneration: catalog.GetGeneration(), | ||
} | ||
r.storedCatalogsMu.Unlock() | ||
return nextPollResult(unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog), nil | ||
} | ||
|
||
func updateStatusProgressing(catalog *v1alpha1.ClusterCatalog, err error) { | ||
func (r *ClusterCatalogReconciler) getCurrentState(catalog *v1alpha1.ClusterCatalog) (*v1alpha1.ClusterCatalogStatus, storedCatalogData, bool) { | ||
r.storedCatalogsMu.RLock() | ||
storedCatalog, hasStoredCatalog := r.storedCatalogs[catalog.Name] | ||
r.storedCatalogsMu.RUnlock() | ||
|
||
expectedStatus := catalog.Status.DeepCopy() | ||
|
||
// Set expected status based on what we see in the stored catalog | ||
clearUnknownConditions(expectedStatus) | ||
if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) { | ||
updateStatusServing(expectedStatus, storedCatalog.unpackResult, r.Storage.ContentURL(catalog.Name), storedCatalog.observedGeneration) | ||
updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I needed to change the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this to avoid waiting till the next poll before reconciling in case of unpack errors? Looks like the equivalence check for the Progressing condition will always fail if we had an error on the last unpack; do we backoff between reconciling errored unpacks? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh this is a good question. Here's a scenario:
Step 3 is a little odd though. Maybe if we return an error from |
||
} | ||
|
||
return expectedStatus, storedCatalog, hasStoredCatalog | ||
} | ||
|
||
func nextPollResult(lastSuccessfulPoll time.Time, catalog *v1alpha1.ClusterCatalog) ctrl.Result { | ||
var requeueAfter time.Duration | ||
switch catalog.Spec.Source.Type { | ||
case v1alpha1.SourceTypeImage: | ||
if catalog.Spec.Source.Image != nil && catalog.Spec.Source.Image.PollInterval != nil { | ||
jitteredDuration := wait.Jitter(catalog.Spec.Source.Image.PollInterval.Duration, requeueJitterMaxFactor) | ||
requeueAfter = time.Until(lastSuccessfulPoll.Add(jitteredDuration)) | ||
} | ||
} | ||
return ctrl.Result{RequeueAfter: requeueAfter} | ||
} | ||
|
||
func clearUnknownConditions(status *v1alpha1.ClusterCatalogStatus) { | ||
knownTypes := sets.New[string]( | ||
v1alpha1.TypeServing, | ||
v1alpha1.TypeProgressing, | ||
) | ||
status.Conditions = slices.DeleteFunc(status.Conditions, func(cond metav1.Condition) bool { | ||
return !knownTypes.Has(cond.Type) | ||
}) | ||
} | ||
|
||
func updateStatusProgressing(status *v1alpha1.ClusterCatalogStatus, generation int64, err error) { | ||
progressingCond := metav1.Condition{ | ||
Type: v1alpha1.TypeProgressing, | ||
Status: metav1.ConditionFalse, | ||
Reason: v1alpha1.ReasonSucceeded, | ||
Message: "Successfully unpacked and stored content from resolved source", | ||
ObservedGeneration: catalog.GetGeneration(), | ||
ObservedGeneration: generation, | ||
} | ||
|
||
if err != nil { | ||
|
@@ -195,10 +299,10 @@ func updateStatusProgressing(catalog *v1alpha1.ClusterCatalog, err error) { | |
progressingCond.Reason = v1alpha1.ReasonBlocked | ||
} | ||
|
||
meta.SetStatusCondition(&catalog.Status.Conditions, progressingCond) | ||
meta.SetStatusCondition(&status.Conditions, progressingCond) | ||
} | ||
|
||
func updateStatusServing(status *v1alpha1.ClusterCatalogStatus, result *source.Result, contentURL string, generation int64) { | ||
func updateStatusServing(status *v1alpha1.ClusterCatalogStatus, result source.Result, contentURL string, generation int64) { | ||
status.ResolvedSource = result.ResolvedSource | ||
status.ContentURL = contentURL | ||
status.LastUnpacked = metav1.NewTime(result.UnpackTime) | ||
|
@@ -223,34 +327,15 @@ func updateStatusNotServing(status *v1alpha1.ClusterCatalogStatus, generation in | |
}) | ||
} | ||
|
||
func (r *ClusterCatalogReconciler) needsUnpacking(catalog *v1alpha1.ClusterCatalog) bool { | ||
// if ResolvedSource is nil, it indicates that this is the first time we're | ||
// unpacking this catalog. | ||
if catalog.Status.ResolvedSource == nil { | ||
return true | ||
} | ||
if !r.Storage.ContentExists(catalog.Name) { | ||
return true | ||
} | ||
// if there is no spec.Source.Image, don't unpack again | ||
if catalog.Spec.Source.Image == nil { | ||
return false | ||
} | ||
if len(catalog.Status.Conditions) == 0 { | ||
return true | ||
} | ||
for _, c := range catalog.Status.Conditions { | ||
if c.ObservedGeneration != catalog.Generation { | ||
return true | ||
} | ||
} | ||
// if pollInterval is nil, don't unpack again | ||
func (r *ClusterCatalogReconciler) needsPoll(lastSuccessfulPoll time.Time, catalog *v1alpha1.ClusterCatalog) bool { | ||
// If polling is disabled, we don't need to poll. | ||
if catalog.Spec.Source.Image.PollInterval == nil { | ||
return false | ||
} | ||
// if it's not time to poll yet, and the CR wasn't changed don't unpack again | ||
nextPoll := catalog.Status.ResolvedSource.Image.LastSuccessfulPollAttempt.Add(catalog.Spec.Source.Image.PollInterval.Duration) | ||
return !nextPoll.After(time.Now()) | ||
|
||
// Only poll if the next poll time is in the past. | ||
nextPoll := lastSuccessfulPoll.Add(catalog.Spec.Source.Image.PollInterval.Duration) | ||
return nextPoll.Before(time.Now()) | ||
} | ||
|
||
// Compare resources - ignoring status & metadata.finalizers | ||
|
@@ -266,26 +351,35 @@ func (f finalizerFunc) Finalize(ctx context.Context, obj client.Object) (crfinal | |
return f(ctx, obj) | ||
} | ||
|
||
func NewFinalizers(localStorage storage.Instance, unpacker source.Unpacker) (crfinalizer.Finalizers, error) { | ||
func (r *ClusterCatalogReconciler) setupFinalizers() error { | ||
f := crfinalizer.NewFinalizers() | ||
err := f.Register(fbcDeletionFinalizer, finalizerFunc(func(ctx context.Context, obj client.Object) (crfinalizer.Result, error) { | ||
catalog, ok := obj.(*v1alpha1.ClusterCatalog) | ||
if !ok { | ||
panic("could not convert object to clusterCatalog") | ||
} | ||
if err := localStorage.Delete(catalog.Name); err != nil { | ||
updateStatusProgressing(catalog, err) | ||
if err := r.Storage.Delete(catalog.Name); err != nil { | ||
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err) | ||
return crfinalizer.Result{StatusUpdated: true}, err | ||
} | ||
updateStatusNotServing(&catalog.Status, catalog.GetGeneration()) | ||
if err := unpacker.Cleanup(ctx, catalog); err != nil { | ||
updateStatusProgressing(catalog, err) | ||
if err := r.Unpacker.Cleanup(ctx, catalog); err != nil { | ||
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err) | ||
return crfinalizer.Result{StatusUpdated: true}, err | ||
} | ||
|
||
r.deleteStoredCatalog(catalog.Name) | ||
return crfinalizer.Result{StatusUpdated: true}, nil | ||
})) | ||
if err != nil { | ||
return f, err | ||
return err | ||
} | ||
return f, nil | ||
r.finalizers = f | ||
return nil | ||
} | ||
|
||
func (r *ClusterCatalogReconciler) deleteStoredCatalog(catalogName string) { | ||
r.storedCatalogsMu.Lock() | ||
defer r.storedCatalogsMu.Unlock() | ||
delete(r.storedCatalogs, catalogName) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to refactor finalizers so that it had access to the reconciler struct. That way it would be possible to
delete(r.storedCatalogs, catalog.Name)
in the finalizer logic.