Skip to content

Commit

Permalink
Add Catalog Source priority for dependency resolution
Browse files Browse the repository at this point in the history
This commit adds catsrc priority value to sorting consideration when
resolving operator dependency. Catsrcs are ranked by their priority
from high to low to be considered for supplying dependent operators.
The default priorities for custom catsrcs are 0 and default catsrcs
have negative priorities.
  • Loading branch information
harishsurf authored and bowenislandsong committed Aug 4, 2020
1 parent 5e5b11f commit b029c95
Show file tree
Hide file tree
Showing 10 changed files with 655 additions and 77 deletions.
28 changes: 0 additions & 28 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, querier)
if err != nil {
go o.recorder.Event(ns, corev1.EventTypeWarning,"ResolutionFailed", err.Error())
go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error())
return err
}

Expand Down
65 changes: 43 additions & 22 deletions pkg/controller/registry/resolver/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"time"

"github.com/blang/semver"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
"github.com/sirupsen/logrus"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
)

type RegistryClientProvider interface {
Expand Down Expand Up @@ -43,27 +44,33 @@ type OperatorCacheProvider interface {
}

type OperatorCache struct {
logger logrus.FieldLogger
rcp RegistryClientProvider
snapshots map[registry.CatalogKey]*CatalogSnapshot
ttl time.Duration
sem chan struct{}
m sync.RWMutex
logger logrus.FieldLogger
rcp RegistryClientProvider
catsrcLister v1alpha1.CatalogSourceLister
snapshots map[registry.CatalogKey]*CatalogSnapshot
ttl time.Duration
sem chan struct{}
m sync.RWMutex
}

const defaultCatalogSourcePriority int = 0

type catalogSourcePriority int

var _ OperatorCacheProvider = &OperatorCache{}

func NewOperatorCache(rcp RegistryClientProvider, log logrus.FieldLogger) *OperatorCache {
func NewOperatorCache(rcp RegistryClientProvider, log logrus.FieldLogger, catsrcLister v1alpha1.CatalogSourceLister) *OperatorCache {
const (
MaxConcurrentSnapshotUpdates = 4
)

return &OperatorCache{
logger: log,
rcp: rcp,
snapshots: make(map[registry.CatalogKey]*CatalogSnapshot),
ttl: 5 * time.Minute,
sem: make(chan struct{}, MaxConcurrentSnapshotUpdates),
logger: log,
rcp: rcp,
catsrcLister: catsrcLister,
snapshots: make(map[registry.CatalogKey]*CatalogSnapshot),
ttl: 5 * time.Minute,
sem: make(chan struct{}, MaxConcurrentSnapshotUpdates),
}
}

Expand Down Expand Up @@ -151,11 +158,20 @@ func (c *OperatorCache) Namespaced(namespaces ...string) MultiCatalogOperatorFin

for _, miss := range misses {
ctx, cancel := context.WithTimeout(context.Background(), CachePopulateTimeout)

catsrcPriority := defaultCatalogSourcePriority
// Ignoring error and treat catsrc priority as 0 if not found.
catsrc, err := c.catsrcLister.CatalogSources(miss.Namespace).Get(miss.Name)
if err == nil {
catsrcPriority = catsrc.Spec.Priority
}

s := CatalogSnapshot{
logger: c.logger.WithField("catalog", miss),
key: miss,
expiry: now.Add(c.ttl),
pop: cancel,
logger: c.logger.WithField("catalog", miss),
key: miss,
expiry: now.Add(c.ttl),
pop: cancel,
priority: catalogSourcePriority(catsrcPriority),
}
s.m.Lock()
c.snapshots[miss] = &s
Expand Down Expand Up @@ -222,13 +238,13 @@ func ensurePackageProperty(o *Operator, name, version string) {
PackageName: name,
Version: version,
}
byte, err := json.Marshal(prop)
bytes, err := json.Marshal(prop)
if err != nil {
return
}
o.properties = append(o.properties, &api.Property{
Type: opregistry.PackageType,
Value: string(byte),
Value: string(bytes),
})
}

Expand Down Expand Up @@ -277,6 +293,7 @@ type CatalogSnapshot struct {
operators []*Operator
m sync.RWMutex
pop context.CancelFunc
priority catalogSourcePriority
}

func (s *CatalogSnapshot) Cancel() {
Expand Down Expand Up @@ -354,10 +371,14 @@ func (s SortableSnapshots) Less(i, j int) bool {
return false
}

// the rest are sorted first in namespace preference order, then by name
// the rest are sorted first on priority, namespace and then by name
if s.snapshots[i].priority != s.snapshots[j].priority {
return s.snapshots[i].priority > s.snapshots[j].priority
}
if s.snapshots[i].key.Namespace != s.snapshots[j].key.Namespace {
return s.namespaces[s.snapshots[i].key.Namespace] < s.namespaces[s.snapshots[j].key.Namespace]
}

return s.snapshots[i].key.Name < s.snapshots[j].key.Name
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/controller/registry/resolver/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
)

type BundleStreamStub struct {
Expand Down Expand Up @@ -83,8 +83,8 @@ func TestOperatorCacheConcurrency(t *testing.T) {
const (
NWorkers = 64
)

rcp := RegistryClientProviderStub{}
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
var keys []registry.CatalogKey
for i := 0; i < 128; i++ {
for j := 0; j < 8; j++ {
Expand All @@ -106,7 +106,7 @@ func TestOperatorCacheConcurrency(t *testing.T) {
}
}

c := NewOperatorCache(rcp, logrus.New())
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)

errs := make(chan error)
for w := 0; w < NWorkers; w++ {
Expand Down Expand Up @@ -140,6 +140,7 @@ func TestOperatorCacheConcurrency(t *testing.T) {

func TestOperatorCacheExpiration(t *testing.T) {
rcp := RegistryClientProviderStub{}
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
key := registry.CatalogKey{Namespace: "dummynamespace", Name: "dummyname"}
rcp[key] = &RegistryClientStub{
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
Expand All @@ -155,14 +156,15 @@ func TestOperatorCacheExpiration(t *testing.T) {
}),
}

c := NewOperatorCache(rcp, logrus.New())
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)
c.ttl = 0 // instantly stale

require.Len(t, c.Namespaced("dummynamespace").Catalog(key).Find(WithCSVName("csvname")), 1)
}

func TestOperatorCacheReuse(t *testing.T) {
rcp := RegistryClientProviderStub{}
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
key := registry.CatalogKey{Namespace: "dummynamespace", Name: "dummyname"}
rcp[key] = &RegistryClientStub{
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
Expand All @@ -178,7 +180,7 @@ func TestOperatorCacheReuse(t *testing.T) {
}),
}

c := NewOperatorCache(rcp, logrus.New())
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)

require.Len(t, c.Namespaced("dummynamespace").Catalog(key).Find(WithCSVName("csvname")), 1)
}
Expand Down Expand Up @@ -290,6 +292,7 @@ func TestCatalogSnapshotFind(t *testing.T) {

func TestStripPluralRequiredAndProvidedAPIKeys(t *testing.T) {
rcp := RegistryClientProviderStub{}
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
key := registry.CatalogKey{Namespace: "testnamespace", Name: "testname"}
rcp[key] = &RegistryClientStub{
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
Expand Down Expand Up @@ -327,7 +330,7 @@ func TestStripPluralRequiredAndProvidedAPIKeys(t *testing.T) {
}),
}

c := NewOperatorCache(rcp, logrus.New())
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)

nc := c.Namespaced("testnamespace")
result, err := AtLeast(1, nc.Find(ProvidingAPI(opregistry.APIKey{Group: "g", Version: "v1", Kind: "K"})))
Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/registry/resolver/operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func NewOperatorFromBundle(bundle *api.Bundle, startingCSV string, sourceKey reg

// legacy support - if the api doesn't contain properties/dependencies, build them from required/provided apis
properties := bundle.Properties
if properties == nil || len(properties) == 0{
if properties == nil || len(properties) == 0 {
properties, err = apisToProperties(provided)
if err != nil {
return nil, err
Expand Down Expand Up @@ -297,8 +297,6 @@ func NewOperatorFromBundle(bundle *api.Bundle, startingCSV string, sourceKey reg
return op, nil
}



return &Operator{
name: bundle.CsvName,
replaces: bundle.Replaces,
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/registry/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"fmt"
"sort"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
"github.com/sirupsen/logrus"
utilerrors "k8s.io/apimachinery/pkg/util/errors"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
)

type OperatorResolver interface {
Expand All @@ -24,9 +25,9 @@ type SatResolver struct {
log logrus.FieldLogger
}

func NewDefaultSatResolver(rcp RegistryClientProvider, log logrus.FieldLogger) *SatResolver {
func NewDefaultSatResolver(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, log logrus.FieldLogger) *SatResolver {
return &SatResolver{
cache: NewOperatorCache(rcp, log),
cache: NewOperatorCache(rcp, log, catsrcLister),
log: log,
}
}
Expand Down
Loading

0 comments on commit b029c95

Please sign in to comment.