Skip to content

Commit

Permalink
resolver: Add support for excluding global catalogs from resolution (#…
Browse files Browse the repository at this point in the history
…2788)

The resolver is currently configured to always consider resolving from global
catalogs. In some cases, this may not be desirable, and the user would like to
explicitly ignore global catalogs for the purposes of resolution. This change
enables per-namespace exclusion from global catalog sources via an annotation on
existing registry source provider.

A queueinformer for OperatorGroups is added since they are now an input to
resolution via the global catalog exclusion annotation. Namespace resolution
will be triggered on changes to an OperatorGroup, in case the value provided on
that annotation by the user changes.

Updated the error message returned by the cache in case a source has an error
to be more clear.

Signed-off-by: Daniel Sover <dsover@redhat.com>
Co-authored-by: perdasilva <perdasilva@redhat.com>

Co-authored-by: perdasilva <perdasilva@redhat.com>
  • Loading branch information
exdx and perdasilva authored Jun 16, 2022
1 parent 97b64e1 commit dba08eb
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 8 deletions.
82 changes: 82 additions & 0 deletions pkg/controller/operators/catalog/og_source_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package catalog

import (
"context"
"fmt"

v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/labels"
)

type OperatorGroupToggleSourceProvider struct {
sp cache.SourceProvider
logger *logrus.Logger
ogLister v1listers.OperatorGroupLister
}

func NewOperatorGroupToggleSourceProvider(sp cache.SourceProvider, logger *logrus.Logger,
ogLister v1listers.OperatorGroupLister) *OperatorGroupToggleSourceProvider {
return &OperatorGroupToggleSourceProvider{
sp: sp,
logger: logger,
ogLister: ogLister,
}
}

const exclusionAnnotation string = "olm.operatorframework.io/exclude-global-namespace-resolution"

func (e *OperatorGroupToggleSourceProvider) Sources(namespaces ...string) map[cache.SourceKey]cache.Source {
// Check if annotation is set first
resolutionNamespaces, err := e.CheckForExclusion(namespaces...)
if err != nil {
e.logger.Errorf("error checking namespaces %#v for global resolution exlusion: %s", namespaces, err)
// Fail early with a dummy Source that returns an error
// TODO: Update the Sources interface to return an error
m := make(map[cache.SourceKey]cache.Source)
key := cache.SourceKey{Name: "operatorgroup-unavailable", Namespace: namespaces[0]}
source := errorSource{err}
m[key] = source
return m
}
return e.sp.Sources(resolutionNamespaces...)
}

type errorSource struct {
error
}

func (e errorSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
return nil, e.error
}

func (e *OperatorGroupToggleSourceProvider) CheckForExclusion(namespaces ...string) ([]string, error) {
var defaultResult = namespaces
// The first namespace provided is always the current namespace being synced
var ownNamespace = namespaces[0]
var toggledResult = []string{ownNamespace}

// Check the OG on the NS provided for the exclusion annotation
ogs, err := e.ogLister.OperatorGroups(ownNamespace).List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("listing operatorgroups in namespace %s: %s", ownNamespace, err)
}

if len(ogs) != 1 {
// Problem with the operatorgroup configuration in the namespace, or the operatorgroup has not yet been persisted
// Note: a resync will be triggered if/when the operatorgroup becomes available
return nil, fmt.Errorf("found %d operatorgroups in namespace %s: expected 1", len(ogs), ownNamespace)
}

var og = ogs[0]
if val, ok := og.Annotations[exclusionAnnotation]; ok && val == "true" {
// Exclusion specified
// Ignore the globalNamespace for the purposes of resolution in this namespace
e.logger.Printf("excluding global catalogs from resolution in namespace %s", ownNamespace)
return toggledResult, nil
}

return defaultResult, nil
}
38 changes: 33 additions & 5 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/client-go/util/workqueue"

"github.com/operator-framework/api/pkg/operators/reference"
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions"
Expand Down Expand Up @@ -118,7 +119,7 @@ type Operator struct {
bundleUnpackTimeout time.Duration
clientFactory clients.Factory
muInstallPlan sync.Mutex
resolverSourceProvider *resolver.RegistrySourceProvider
sourceInvalidator *resolver.RegistrySourceProvider
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -191,9 +192,10 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(config),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
op.sourceInvalidator = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
resolverSourceProvider := NewOperatorGroupToggleSourceProvider(op.sourceInvalidator, logger, op.lister.OperatorsV1().OperatorGroupLister())
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, resolverSourceProvider, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down Expand Up @@ -259,7 +261,19 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
if err := op.RegisterInformer(operatorGroupInformer.Informer()); err != nil {
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ogs")
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
operatorGroupQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(ogQueue),
queueinformer.WithInformer(operatorGroupInformer.Informer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroups).ToSyncer()),
)
if err != nil {
return nil, err
}
if err := op.RegisterQueueInformer(operatorGroupQueueInformer); err != nil {
return nil, err
}

Expand Down Expand Up @@ -475,7 +489,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

switch state.State {
case connectivity.Ready:
o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key))
o.sourceInvalidator.Invalidate(resolvercache.SourceKey(state.Key))
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)
Expand Down Expand Up @@ -1085,6 +1099,20 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
return nil
}

// syncOperatorGroups requeues the namespace resolution queue on changes to an operatorgroup
// This is because the operatorgroup is now an input to resolution via the global catalog exclusion annotation
func (o *Operator) syncOperatorGroups(obj interface{}) error {
og, ok := obj.(*operatorsv1.OperatorGroup)
if !ok {
o.logger.Debugf("wrong type: %#v", obj)
return fmt.Errorf("casting OperatorGroup failed")
}

o.nsResolveQueue.Add(og.GetNamespace())

return nil
}

func (o *Operator) nothingToUpdate(logger *logrus.Entry, sub *v1alpha1.Subscription) bool {
if sub.Status.InstallPlanRef != nil && sub.Status.State == v1alpha1.SubscriptionStateUpgradePending {
logger.Debugf("skipping update: installplan already created")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *NamespacedOperatorCache) Error() error {
err := snapshot.err
snapshot.m.RUnlock()
if err != nil {
errs = append(errs, fmt.Errorf("error using catalog %s (in namespace %s): %w", key.Name, key.Namespace, err))
errs = append(errs, fmt.Errorf("failed to populate resolver cache from source %v: %w", key.String(), err))
}
}
return errors.NewAggregate(errs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,5 +238,5 @@ func TestNamespaceOperatorCacheError(t *testing.T) {
key: ErrorSource{Error: errors.New("testing")},
})

require.EqualError(t, c.Namespaced("dummynamespace").Error(), "error using catalog dummyname (in namespace dummynamespace): testing")
require.EqualError(t, c.Namespaced("dummynamespace").Error(), "failed to populate resolver cache from source dummyname/dummynamespace: testing")
}
2 changes: 1 addition & 1 deletion pkg/controller/registry/resolver/step_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1182,7 +1182,7 @@ func TestResolver(t *testing.T) {
steps: [][]*v1alpha1.Step{},
subs: []*v1alpha1.Subscription{},
errAssert: func(t *testing.T, err error) {
assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv")
assert.Contains(t, err.Error(), "failed to populate resolver cache from source @existing/catsrc-namespace: csv catsrc-namespace/a.v1")
assert.Contains(t, err.Error(), "in phase Failed instead of Replacing")
},
},
Expand Down
133 changes: 133 additions & 0 deletions test/e2e/catalog_exclusion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package e2e

import (
"context"
"path/filepath"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
operatorsv1 "github.com/operator-framework/api/pkg/operators/v1"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/test/e2e/ctx"
"github.com/operator-framework/operator-lifecycle-manager/test/e2e/util"
. "github.com/operator-framework/operator-lifecycle-manager/test/e2e/util/gomega"
"google.golang.org/grpc/connectivity"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8scontrollerclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const magicCatalogDir = "magiccatalog"

var _ = Describe("Global Catalog Exclusion", func() {
var (
testNamespace corev1.Namespace
determinedE2eClient *util.DeterminedE2EClient
operatorGroup operatorsv1.OperatorGroup
localCatalog *MagicCatalog
)

BeforeEach(func() {
determinedE2eClient = util.NewDeterminedClient(ctx.Ctx().E2EClient())

By("creating a namespace with an own namespace operator group without annotations")
e2eTestNamespace := genName("global-catalog-exclusion-e2e-")
operatorGroup = operatorsv1.OperatorGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: e2eTestNamespace,
Name: genName("og-"),
Annotations: nil,
},
Spec: operatorsv1.OperatorGroupSpec{
TargetNamespaces: []string{e2eTestNamespace},
},
}
testNamespace = SetupGeneratedTestNamespaceWithOperatorGroup(e2eTestNamespace, operatorGroup)

By("creating a broken catalog in the global namespace")
globalCatalog := &v1alpha1.CatalogSource{
ObjectMeta: metav1.ObjectMeta{
Name: genName("bad-global-catalog-"),
Namespace: operatorNamespace,
},
Spec: v1alpha1.CatalogSourceSpec{
DisplayName: "Broken Global Catalog Source",
SourceType: v1alpha1.SourceTypeGrpc,
Address: "1.1.1.1:1337", // points to non-existing service
},
}
_ = determinedE2eClient.Create(context.Background(), globalCatalog)

By("creating a healthy catalog in the test namespace")
localCatalogName := genName("good-catsrc-")
var err error = nil

fbcPath := filepath.Join(testdataDir, magicCatalogDir, "fbc_initial.yaml")
localCatalog, err = NewMagicCatalogFromFile(determinedE2eClient, testNamespace.GetName(), localCatalogName, fbcPath)
Expect(err).To(Succeed())

// deploy catalog blocks until the catalog has reached a ready state or fails
Expect(localCatalog.DeployCatalog(context.Background())).To(Succeed())

By("checking that the global catalog is broken")
// Adding this check here to speed up the test
// the global catalog can fail while we wait for the local catalog to get to a ready state
EventuallyResource(globalCatalog).Should(HaveGrpcConnectionWithLastConnectionState(connectivity.TransientFailure))
})

AfterEach(func() {
TeardownNamespace(testNamespace.GetName())
})

When("a subscription referring to the local catalog is created", func() {
var subscription *v1alpha1.Subscription

BeforeEach(func() {
subscription = &v1alpha1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace.GetName(),
Name: genName("local-subscription-"),
},
Spec: &v1alpha1.SubscriptionSpec{
CatalogSource: localCatalog.GetName(),
CatalogSourceNamespace: localCatalog.GetNamespace(),
Package: "packageA",
Channel: "stable",
InstallPlanApproval: v1alpha1.ApprovalAutomatic,
},
}

By("creating a subscription")
_ = determinedE2eClient.Create(context.Background(), subscription)
})

When("the operator group is annotated with olm.operatorframework.io/exclude-global-namespace-resolution=true", func() {

It("the broken subscription should resolve and have state AtLatest", func() {
By("checking that the subscription is not resolving and has a condition with type ResolutionFailed")
EventuallyResource(subscription).Should(ContainSubscriptionConditionOfType(v1alpha1.SubscriptionResolutionFailed))

By("annotating the operator group with olm.operatorframework.io/exclude-global-namespace-resolution=true")
Eventually(func() error {
annotatedOperatorGroup := operatorGroup.DeepCopy()
if err := determinedE2eClient.Get(context.Background(), k8scontrollerclient.ObjectKeyFromObject(annotatedOperatorGroup), annotatedOperatorGroup); err != nil {
return err
}

if annotatedOperatorGroup.Annotations == nil {
annotatedOperatorGroup.Annotations = map[string]string{}
}

annotatedOperatorGroup.Annotations["olm.operatorframework.io/exclude-global-namespace-resolution"] = "true"
if err := determinedE2eClient.Update(context.Background(), annotatedOperatorGroup); err != nil {
return err
}
return nil
}).Should(Succeed())

By("checking that the subscription resolves and has state AtLatest")
EventuallyResource(subscription).Should(HaveSubscriptionState(v1alpha1.SubscriptionStateAtLatest))
})
})
})
})

0 comments on commit dba08eb

Please sign in to comment.