Skip to content

Commit

Permalink
clustercatalog_controller: hacky, but more correct status updating in…
Browse files Browse the repository at this point in the history
… reconciler

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
  • Loading branch information
joelanford committed Oct 3, 2024
1 parent 29d21c7 commit 797ff05
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 188 deletions.
13 changes: 3 additions & 10 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,10 @@ func main() {
os.Exit(1)
}

clusterCatalogFinalizers, err := corecontrollers.NewFinalizers(localStorage, unpacker)
if err != nil {
setupLog.Error(err, "unable to configure finalizers")
os.Exit(1)
}

if err = (&corecontrollers.ClusterCatalogReconciler{
Client: mgr.GetClient(),
Unpacker: unpacker,
Storage: localStorage,
Finalizers: clusterCatalogFinalizers,
Client: mgr.GetClient(),
Unpacker: unpacker,
Storage: localStorage,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
os.Exit(1)
Expand Down
198 changes: 140 additions & 58 deletions internal/controllers/core/clustercatalog_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"context" // #nosec
"errors"
"fmt"
"slices"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -47,9 +50,22 @@ const (
// ClusterCatalogReconciler reconciles a Catalog object
type ClusterCatalogReconciler struct {
client.Client
Unpacker source.Unpacker
Storage storage.Instance
Finalizers crfinalizer.Finalizers
Unpacker source.Unpacker
Storage storage.Instance

finalizers crfinalizer.Finalizers

// TODO: The below storedCatalogs fields are used for a quick a hack that helps
// us correctly populate a ClusterCatalog's status. The fact that we need
// these is indicative of a larger problem with the design of one or both
// of the Unpacker and Storage interfaces. We should fix this.
storedCatalogsMu sync.RWMutex
storedCatalogs map[string]storedCatalogData
}

type storedCatalogData struct {
observedGeneration int64
unpackResult source.Result
}

//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clustercatalogs,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -109,6 +125,14 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterCatalogReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.storedCatalogsMu.Lock()
defer r.storedCatalogsMu.Unlock()
r.storedCatalogs = make(map[string]storedCatalogData)

if err := r.setupFinalizers(); err != nil {
return fmt.Errorf("failed to setup finalizers: %v", err)
}

return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.ClusterCatalog{}).
Complete(r)
Expand All @@ -123,7 +147,9 @@ func (r *ClusterCatalogReconciler) SetupWithManager(mgr ctrl.Manager) error {
// linting from the linter that was fussing about this.
// nolint:unparam
func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *v1alpha1.ClusterCatalog) (ctrl.Result, error) {
finalizeResult, err := r.Finalizers.Finalize(ctx, catalog)
l := log.FromContext(ctx)

finalizeResult, err := r.finalizers.Finalize(ctx, catalog)
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -133,55 +159,125 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *v1alp
return ctrl.Result{}, nil
}

if !r.needsUnpacking(catalog) {
return ctrl.Result{}, nil
// TODO: The below algorithm to get the current state based on an in-memory
// storedCatalogs map is a hack that helps us keep the ClusterCatalog's
// status up-to-date. The fact that we need this setup is indicative of
// a larger problem with the design of one or both of the Unpacker and
// Storage interfaces and/or their interactions. We should fix this.
expectedStatus, storedCatalog, hasStoredCatalog := r.getCurrentState(catalog)

// If any of the following are true, we need to unpack the catalog:
// - we don't have a stored catalog in the map
// - we have a stored catalog, but the content doesn't exist on disk
// - we have a stored catalog, the content exists, but the expected status differs from the actual status
// - we have a stored catalog, the content exists, the status looks correct, but the catalog generation is different from the observed generation in the stored catalog
// - we have a stored catalog, the content exists, the status looks correct and reflects the catalog generation, but it is time to poll again
needsUnpack := false
switch {
case !hasStoredCatalog:
l.Info("unpack required: no cached catalog metadata found for this catalog")
needsUnpack = true
case !r.Storage.ContentExists(catalog.Name):
l.Info("unpack required: no stored content found for this catalog")
needsUnpack = true
case !equality.Semantic.DeepEqual(catalog.Status, *expectedStatus):
l.Info("unpack required: current ClusterCatalog status differs from expected status")
needsUnpack = true
case catalog.Generation != storedCatalog.observedGeneration:
l.Info("unpack required: catalog generation differs from observed generation")
needsUnpack = true
case r.needsPoll(storedCatalog.unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog):
l.Info("unpack required: poll duration has elapsed")
needsUnpack = true
}

if !needsUnpack {
// No need to update the status because we've already checked
// that it is set correctly. Otherwise, we'd be unpacking again.
return nextPollResult(storedCatalog.unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog), nil
}

unpackResult, err := r.Unpacker.Unpack(ctx, catalog)
if err != nil {
unpackErr := fmt.Errorf("source catalog content: %w", err)
updateStatusProgressing(catalog, unpackErr)
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), unpackErr)
return ctrl.Result{}, unpackErr
}

switch unpackResult.State {
case source.StateUnpacked:
contentURL := ""
// TODO: We should check to see if the unpacked result has the same content
// as the already unpacked content. If it does, we should skip this rest
// of the unpacking steps.
err := r.Storage.Store(ctx, catalog.Name, unpackResult.FS)
if err != nil {
storageErr := fmt.Errorf("error storing fbc: %v", err)
updateStatusProgressing(catalog, storageErr)
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
return ctrl.Result{}, storageErr
}
contentURL = r.Storage.ContentURL(catalog.Name)

updateStatusProgressing(catalog, nil)
updateStatusServing(&catalog.Status, unpackResult, contentURL, catalog.GetGeneration())
contentURL := r.Storage.ContentURL(catalog.Name)

var requeueAfter time.Duration
switch catalog.Spec.Source.Type {
case v1alpha1.SourceTypeImage:
if catalog.Spec.Source.Image != nil && catalog.Spec.Source.Image.PollInterval != nil {
requeueAfter = wait.Jitter(catalog.Spec.Source.Image.PollInterval.Duration, requeueJitterMaxFactor)
}
}

return ctrl.Result{RequeueAfter: requeueAfter}, nil
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
updateStatusServing(&catalog.Status, *unpackResult, contentURL, catalog.GetGeneration())
default:
panic(fmt.Sprintf("unknown unpack state %q", unpackResult.State))
}

r.storedCatalogsMu.Lock()
r.storedCatalogs[catalog.Name] = storedCatalogData{
unpackResult: *unpackResult,
observedGeneration: catalog.GetGeneration(),
}
r.storedCatalogsMu.Unlock()
return nextPollResult(unpackResult.ResolvedSource.Image.LastSuccessfulPollAttempt.Time, catalog), nil
}

func updateStatusProgressing(catalog *v1alpha1.ClusterCatalog, err error) {
func (r *ClusterCatalogReconciler) getCurrentState(catalog *v1alpha1.ClusterCatalog) (*v1alpha1.ClusterCatalogStatus, storedCatalogData, bool) {
r.storedCatalogsMu.RLock()
storedCatalog, hasStoredCatalog := r.storedCatalogs[catalog.Name]
r.storedCatalogsMu.RUnlock()

expectedStatus := catalog.Status.DeepCopy()

// Set expected status based on what we see in the stored catalog
clearUnknownConditions(expectedStatus)
if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) {
updateStatusServing(expectedStatus, storedCatalog.unpackResult, r.Storage.ContentURL(catalog.Name), storedCatalog.observedGeneration)
updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil)
}

return expectedStatus, storedCatalog, hasStoredCatalog
}

func nextPollResult(lastSuccessfulPoll time.Time, catalog *v1alpha1.ClusterCatalog) ctrl.Result {
var requeueAfter time.Duration
switch catalog.Spec.Source.Type {
case v1alpha1.SourceTypeImage:
if catalog.Spec.Source.Image != nil && catalog.Spec.Source.Image.PollInterval != nil {
jitteredDuration := wait.Jitter(catalog.Spec.Source.Image.PollInterval.Duration, requeueJitterMaxFactor)
requeueAfter = time.Until(lastSuccessfulPoll.Add(jitteredDuration))
}
}
return ctrl.Result{RequeueAfter: requeueAfter}
}

func clearUnknownConditions(status *v1alpha1.ClusterCatalogStatus) {
knownTypes := sets.New[string](
v1alpha1.TypeServing,
v1alpha1.TypeProgressing,
)
status.Conditions = slices.DeleteFunc(status.Conditions, func(cond metav1.Condition) bool {
return !knownTypes.Has(cond.Type)
})
}

func updateStatusProgressing(status *v1alpha1.ClusterCatalogStatus, generation int64, err error) {
progressingCond := metav1.Condition{
Type: v1alpha1.TypeProgressing,
Status: metav1.ConditionFalse,
Reason: v1alpha1.ReasonSucceeded,
Message: "Successfully unpacked and stored content from resolved source",
ObservedGeneration: catalog.GetGeneration(),
ObservedGeneration: generation,
}

if err != nil {
Expand All @@ -195,10 +291,10 @@ func updateStatusProgressing(catalog *v1alpha1.ClusterCatalog, err error) {
progressingCond.Reason = v1alpha1.ReasonBlocked
}

meta.SetStatusCondition(&catalog.Status.Conditions, progressingCond)
meta.SetStatusCondition(&status.Conditions, progressingCond)
}

func updateStatusServing(status *v1alpha1.ClusterCatalogStatus, result *source.Result, contentURL string, generation int64) {
func updateStatusServing(status *v1alpha1.ClusterCatalogStatus, result source.Result, contentURL string, generation int64) {
status.ResolvedSource = result.ResolvedSource
status.ContentURL = contentURL
status.LastUnpacked = metav1.NewTime(result.UnpackTime)
Expand All @@ -223,34 +319,15 @@ func updateStatusNotServing(status *v1alpha1.ClusterCatalogStatus, generation in
})
}

func (r *ClusterCatalogReconciler) needsUnpacking(catalog *v1alpha1.ClusterCatalog) bool {
// if ResolvedSource is nil, it indicates that this is the first time we're
// unpacking this catalog.
if catalog.Status.ResolvedSource == nil {
return true
}
if !r.Storage.ContentExists(catalog.Name) {
return true
}
// if there is no spec.Source.Image, don't unpack again
if catalog.Spec.Source.Image == nil {
return false
}
if len(catalog.Status.Conditions) == 0 {
return true
}
for _, c := range catalog.Status.Conditions {
if c.ObservedGeneration != catalog.Generation {
return true
}
}
// if pollInterval is nil, don't unpack again
func (r *ClusterCatalogReconciler) needsPoll(lastSuccessfulPoll time.Time, catalog *v1alpha1.ClusterCatalog) bool {
// If polling is disabled, we don't need to poll.
if catalog.Spec.Source.Image.PollInterval == nil {
return false
}
// if it's not time to poll yet, and the CR wasn't changed don't unpack again
nextPoll := catalog.Status.ResolvedSource.Image.LastSuccessfulPollAttempt.Add(catalog.Spec.Source.Image.PollInterval.Duration)
return !nextPoll.After(time.Now())

// Only poll if the next poll time is in the past.
nextPoll := lastSuccessfulPoll.Add(catalog.Spec.Source.Image.PollInterval.Duration)
return nextPoll.Before(time.Now())
}

// Compare resources - ignoring status & metadata.finalizers
Expand All @@ -266,26 +343,31 @@ func (f finalizerFunc) Finalize(ctx context.Context, obj client.Object) (crfinal
return f(ctx, obj)
}

func NewFinalizers(localStorage storage.Instance, unpacker source.Unpacker) (crfinalizer.Finalizers, error) {
func (r *ClusterCatalogReconciler) setupFinalizers() error {
f := crfinalizer.NewFinalizers()
err := f.Register(fbcDeletionFinalizer, finalizerFunc(func(ctx context.Context, obj client.Object) (crfinalizer.Result, error) {
r.storedCatalogsMu.Lock()
defer r.storedCatalogsMu.Unlock()
catalog, ok := obj.(*v1alpha1.ClusterCatalog)
if !ok {
panic("could not convert object to clusterCatalog")
}
if err := localStorage.Delete(catalog.Name); err != nil {
updateStatusProgressing(catalog, err)
if err := r.Storage.Delete(catalog.Name); err != nil {
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
return crfinalizer.Result{StatusUpdated: true}, err
}
updateStatusNotServing(&catalog.Status, catalog.GetGeneration())
if err := unpacker.Cleanup(ctx, catalog); err != nil {
updateStatusProgressing(catalog, err)
if err := r.Unpacker.Cleanup(ctx, catalog); err != nil {
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
return crfinalizer.Result{StatusUpdated: true}, err
}

delete(r.storedCatalogs, catalog.Name)
return crfinalizer.Result{StatusUpdated: true}, nil
}))
if err != nil {
return f, err
return err
}
return f, nil
r.finalizers = f
return nil
}
Loading

0 comments on commit 797ff05

Please sign in to comment.