Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ vendor/

# editor and IDE paraphernalia
.idea/
.run/
*.swp
*.swo
*~
Expand Down
26 changes: 15 additions & 11 deletions catalogd/cmd/catalogd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@ limitations under the License.
package main

import (
"context"
"crypto/tls"
"errors"
"flag"
"fmt"
"log"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/containers/image/v5/types"
"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
Expand All @@ -50,6 +49,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand All @@ -61,10 +61,10 @@ import (
"github.com/operator-framework/operator-controller/internal/catalogd/garbagecollection"
catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
"github.com/operator-framework/operator-controller/internal/catalogd/serverutil"
"github.com/operator-framework/operator-controller/internal/catalogd/source"
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
"github.com/operator-framework/operator-controller/internal/catalogd/webhook"
fsutil "github.com/operator-framework/operator-controller/internal/shared/util/fs"
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
"github.com/operator-framework/operator-controller/internal/shared/version"
)

Expand Down Expand Up @@ -177,7 +177,8 @@ func main() {

cw, err := certwatcher.New(certFile, keyFile)
if err != nil {
log.Fatalf("Failed to initialize certificate watcher: %v", err)
setupLog.Error(err, "failed to initialize certificate watcher")
os.Exit(1)
}

tlsOpts := func(config *tls.Config) {
Expand Down Expand Up @@ -273,14 +274,16 @@ func main() {
os.Exit(1)
}

unpackCacheBasePath := filepath.Join(cacheDir, source.UnpackCacheDir)
unpackCacheBasePath := filepath.Join(cacheDir, "unpack")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks odd. we are moving from using a variable to a hard coded string.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place that constant was used. Nothing in the source package used it, nothing in the new imageutil package uses it.

if err := os.MkdirAll(unpackCacheBasePath, 0770); err != nil {
setupLog.Error(err, "unable to create cache directory for unpacking")
os.Exit(1)
}
unpacker := &source.ContainersImageRegistry{
BaseCachePath: unpackCacheBasePath,
SourceContextFunc: func(logger logr.Logger) (*types.SystemContext, error) {

imageCache := imageutil.CatalogCache(unpackCacheBasePath)
imagePuller := &imageutil.ContainersImagePuller{
SourceCtxFunc: func(ctx context.Context) (*types.SystemContext, error) {
logger := log.FromContext(ctx)
srcContext := &types.SystemContext{
DockerCertPath: pullCasDir,
OCICertPath: pullCasDir,
Expand Down Expand Up @@ -334,9 +337,10 @@ func main() {
}

if err = (&corecontrollers.ClusterCatalogReconciler{
Client: mgr.GetClient(),
Unpacker: unpacker,
Storage: localStorage,
Client: mgr.GetClient(),
ImageCache: imageCache,
ImagePuller: imagePuller,
Storage: localStorage,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
os.Exit(1)
Expand Down
18 changes: 10 additions & 8 deletions cmd/operator-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"time"

"github.com/containers/image/v5/types"
"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
Expand All @@ -49,6 +48,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
crfinalizer "sigs.k8s.io/controller-runtime/pkg/finalizer"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"

Expand All @@ -65,12 +65,12 @@ import (
"github.com/operator-framework/operator-controller/internal/operator-controller/controllers"
"github.com/operator-framework/operator-controller/internal/operator-controller/features"
"github.com/operator-framework/operator-controller/internal/operator-controller/finalizers"
"github.com/operator-framework/operator-controller/internal/operator-controller/httputil"
"github.com/operator-framework/operator-controller/internal/operator-controller/resolve"
"github.com/operator-framework/operator-controller/internal/operator-controller/rukpak/preflights/crdupgradesafety"
"github.com/operator-framework/operator-controller/internal/operator-controller/rukpak/source"
"github.com/operator-framework/operator-controller/internal/operator-controller/scheme"
fsutil "github.com/operator-framework/operator-controller/internal/shared/util/fs"
httputil "github.com/operator-framework/operator-controller/internal/shared/util/http"
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
"github.com/operator-framework/operator-controller/internal/shared/version"
)

Expand Down Expand Up @@ -315,13 +315,14 @@ func main() {
os.Exit(1)
}

unpacker := &source.ContainersImageRegistry{
BaseCachePath: filepath.Join(cachePath, "unpack"),
SourceContextFunc: func(logger logr.Logger) (*types.SystemContext, error) {
imageCache := imageutil.BundleCache(filepath.Join(cachePath, "unpack"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using unpackCacheBasePath := filepath.Join(cacheDir, "unpack") also in catalogd main.go . we should use a constant or variable for unpack.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These unpack paths are decided/defined/used purely in the context of each main.go. It's just a coincidence/convention that they are the same. But they don't have to be.

imagePuller := &imageutil.ContainersImagePuller{
SourceCtxFunc: func(ctx context.Context) (*types.SystemContext, error) {
srcContext := &types.SystemContext{
DockerCertPath: pullCasDir,
OCICertPath: pullCasDir,
}
logger := log.FromContext(ctx)
if _, err := os.Stat(authFilePath); err == nil && globalPullSecretKey != nil {
logger.Info("using available authentication information for pulling image")
srcContext.AuthFilePath = authFilePath
Expand All @@ -336,7 +337,7 @@ func main() {

clusterExtensionFinalizers := crfinalizer.NewFinalizers()
if err := clusterExtensionFinalizers.Register(controllers.ClusterExtensionCleanupUnpackCacheFinalizer, finalizers.FinalizerFunc(func(ctx context.Context, obj client.Object) (crfinalizer.Result, error) {
return crfinalizer.Result{}, unpacker.Cleanup(ctx, &source.BundleSource{Name: obj.GetName()})
return crfinalizer.Result{}, imageCache.Delete(ctx, obj.GetName())
})); err != nil {
setupLog.Error(err, "unable to register finalizer", "finalizerKey", controllers.ClusterExtensionCleanupUnpackCacheFinalizer)
os.Exit(1)
Expand Down Expand Up @@ -399,7 +400,8 @@ func main() {
if err = (&controllers.ClusterExtensionReconciler{
Client: cl,
Resolver: resolver,
Unpacker: unpacker,
ImageCache: imageCache,
ImagePuller: imagePuller,
Applier: helmApplier,
InstalledBundleGetter: &controllers.DefaultInstalledBundleGetter{ActionClientGetter: acg},
Finalizers: clusterExtensionFinalizers,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/onsi/ginkgo/v2 v2.22.2
github.com/onsi/gomega v1.36.2
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0
github.com/operator-framework/api v0.29.0
github.com/operator-framework/helm-operator-plugins v0.8.0
github.com/operator-framework/operator-registry v1.50.0
Expand Down Expand Up @@ -177,7 +178,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/openshift/crd-schema-checker v0.0.0-20240404194209-35a9033b1d11 // indirect
github.com/operator-framework/operator-lib v0.17.0 // indirect
Expand Down
86 changes: 52 additions & 34 deletions internal/catalogd/controllers/core/clustercatalog_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/containers/image/v5/docker/reference"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -38,8 +39,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

catalogdv1 "github.com/operator-framework/operator-controller/catalogd/api/v1"
"github.com/operator-framework/operator-controller/internal/catalogd/source"
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
)

const (
Expand All @@ -52,8 +53,11 @@ const (
// ClusterCatalogReconciler reconciles a Catalog object
type ClusterCatalogReconciler struct {
client.Client
Unpacker source.Unpacker
Storage storage.Instance

ImageCache imageutil.Cache
ImagePuller imageutil.Puller

Storage storage.Instance

finalizers crfinalizer.Finalizers

Expand All @@ -66,8 +70,10 @@ type ClusterCatalogReconciler struct {
}

type storedCatalogData struct {
ref reference.Canonical
lastUnpack time.Time
lastSuccessfulPoll time.Time
observedGeneration int64
unpackResult source.Result
}

//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clustercatalogs,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -216,50 +222,58 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *catal
case catalog.Generation != storedCatalog.observedGeneration:
l.Info("unpack required: catalog generation differs from observed generation")
needsUnpack = true
case r.needsPoll(storedCatalog.unpackResult.LastSuccessfulPollAttempt.Time, catalog):
case r.needsPoll(storedCatalog.lastSuccessfulPoll, catalog):
l.Info("unpack required: poll duration has elapsed")
needsUnpack = true
}

if !needsUnpack {
// No need to update the status because we've already checked
// that it is set correctly. Otherwise, we'd be unpacking again.
return nextPollResult(storedCatalog.unpackResult.LastSuccessfulPollAttempt.Time, catalog), nil
return nextPollResult(storedCatalog.lastSuccessfulPoll, catalog), nil
}

if catalog.Spec.Source.Type != catalogdv1.SourceTypeImage {
err := reconcile.TerminalError(fmt.Errorf("unknown source type %q", catalog.Spec.Source.Type))
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
return ctrl.Result{}, err
}
if catalog.Spec.Source.Image == nil {
err := reconcile.TerminalError(fmt.Errorf("error parsing ClusterCatalog %q, image source is nil", catalog.Name))
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
return ctrl.Result{}, err
}

unpackResult, err := r.Unpacker.Unpack(ctx, catalog)
fsys, canonicalRef, unpackTime, err := r.ImagePuller.Pull(ctx, catalog.Name, catalog.Spec.Source.Image.Ref, r.ImageCache)
if err != nil {
unpackErr := fmt.Errorf("source catalog content: %w", err)
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), unpackErr)
return ctrl.Result{}, unpackErr
}

switch unpackResult.State {
case source.StateUnpacked:
// TODO: We should check to see if the unpacked result has the same content
// as the already unpacked content. If it does, we should skip this rest
// of the unpacking steps.
err := r.Storage.Store(ctx, catalog.Name, unpackResult.FS)
if err != nil {
storageErr := fmt.Errorf("error storing fbc: %v", err)
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
return ctrl.Result{}, storageErr
}
baseURL := r.Storage.BaseURL(catalog.Name)

updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
updateStatusServing(&catalog.Status, *unpackResult, baseURL, catalog.GetGeneration())
default:
panic(fmt.Sprintf("unknown unpack state %q", unpackResult.State))
// TODO: We should check to see if the unpacked result has the same content
// as the already unpacked content. If it does, we should skip this rest
// of the unpacking steps.
if err := r.Storage.Store(ctx, catalog.Name, fsys); err != nil {
storageErr := fmt.Errorf("error storing fbc: %v", err)
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
return ctrl.Result{}, storageErr
}
baseURL := r.Storage.BaseURL(catalog.Name)

updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
updateStatusServing(&catalog.Status, canonicalRef, unpackTime, baseURL, catalog.GetGeneration())

lastSuccessfulPoll := time.Now()
r.storedCatalogsMu.Lock()
r.storedCatalogs[catalog.Name] = storedCatalogData{
unpackResult: *unpackResult,
ref: canonicalRef,
lastUnpack: unpackTime,
lastSuccessfulPoll: lastSuccessfulPoll,
observedGeneration: catalog.GetGeneration(),
}
r.storedCatalogsMu.Unlock()
return nextPollResult(unpackResult.LastSuccessfulPollAttempt.Time, catalog), nil
return nextPollResult(lastSuccessfulPoll, catalog), nil
}

func (r *ClusterCatalogReconciler) getCurrentState(catalog *catalogdv1.ClusterCatalog) (*catalogdv1.ClusterCatalogStatus, storedCatalogData, bool) {
Expand All @@ -272,7 +286,7 @@ func (r *ClusterCatalogReconciler) getCurrentState(catalog *catalogdv1.ClusterCa
// Set expected status based on what we see in the stored catalog
clearUnknownConditions(expectedStatus)
if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) {
updateStatusServing(expectedStatus, storedCatalog.unpackResult, r.Storage.BaseURL(catalog.Name), storedCatalog.observedGeneration)
updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.Storage.BaseURL(catalog.Name), storedCatalog.observedGeneration)
updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil)
}

Expand Down Expand Up @@ -325,13 +339,17 @@ func updateStatusProgressing(status *catalogdv1.ClusterCatalogStatus, generation
meta.SetStatusCondition(&status.Conditions, progressingCond)
}

func updateStatusServing(status *catalogdv1.ClusterCatalogStatus, result source.Result, baseURL string, generation int64) {
status.ResolvedSource = result.ResolvedSource
if status.URLs == nil {
status.URLs = &catalogdv1.ClusterCatalogURLs{}
func updateStatusServing(status *catalogdv1.ClusterCatalogStatus, ref reference.Canonical, modTime time.Time, baseURL string, generation int64) {
status.ResolvedSource = &catalogdv1.ResolvedCatalogSource{
Type: catalogdv1.SourceTypeImage,
Image: &catalogdv1.ResolvedImageSource{
Ref: ref.String(),
},
}
status.URLs = &catalogdv1.ClusterCatalogURLs{
Base: baseURL,
}
status.URLs.Base = baseURL
status.LastUnpacked = ptr.To(metav1.NewTime(result.UnpackTime))
status.LastUnpacked = ptr.To(metav1.NewTime(modTime.Truncate(time.Second)))
meta.SetStatusCondition(&status.Conditions, metav1.Condition{
Type: catalogdv1.TypeServing,
Status: metav1.ConditionTrue,
Expand Down Expand Up @@ -434,7 +452,7 @@ func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catal
return err
}
updateStatusNotServing(&catalog.Status, catalog.GetGeneration())
if err := r.Unpacker.Cleanup(ctx, catalog); err != nil {
if err := r.ImageCache.Delete(ctx, catalog.Name); err != nil {
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
return err
}
Expand Down
Loading
Loading