Skip to content

Commit

Permalink
Merge pull request #1706 from Bowenislandsong/catsrc_weighting
Browse files Browse the repository at this point in the history
Add Catalog Source priority for dependency resolution
  • Loading branch information
openshift-merge-robot authored Aug 6, 2020
2 parents 163608d + a7565b4 commit 8a25369
Show file tree
Hide file tree
Showing 9 changed files with 648 additions and 68 deletions.
28 changes: 0 additions & 28 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
// resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors
steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, querier)
if err != nil {
go o.recorder.Event(ns, corev1.EventTypeWarning,"ResolutionFailed", err.Error())
go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error())
return err
}

Expand Down
65 changes: 43 additions & 22 deletions pkg/controller/registry/resolver/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"time"

"github.com/blang/semver"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
"github.com/sirupsen/logrus"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
)

type RegistryClientProvider interface {
Expand Down Expand Up @@ -43,27 +44,33 @@ type OperatorCacheProvider interface {
}

type OperatorCache struct {
logger logrus.FieldLogger
rcp RegistryClientProvider
snapshots map[registry.CatalogKey]*CatalogSnapshot
ttl time.Duration
sem chan struct{}
m sync.RWMutex
logger logrus.FieldLogger
rcp RegistryClientProvider
catsrcLister v1alpha1.CatalogSourceLister
snapshots map[registry.CatalogKey]*CatalogSnapshot
ttl time.Duration
sem chan struct{}
m sync.RWMutex
}

const defaultCatalogSourcePriority int = 0

type catalogSourcePriority int

var _ OperatorCacheProvider = &OperatorCache{}

func NewOperatorCache(rcp RegistryClientProvider, log logrus.FieldLogger) *OperatorCache {
func NewOperatorCache(rcp RegistryClientProvider, log logrus.FieldLogger, catsrcLister v1alpha1.CatalogSourceLister) *OperatorCache {
const (
MaxConcurrentSnapshotUpdates = 4
)

return &OperatorCache{
logger: log,
rcp: rcp,
snapshots: make(map[registry.CatalogKey]*CatalogSnapshot),
ttl: 5 * time.Minute,
sem: make(chan struct{}, MaxConcurrentSnapshotUpdates),
logger: log,
rcp: rcp,
catsrcLister: catsrcLister,
snapshots: make(map[registry.CatalogKey]*CatalogSnapshot),
ttl: 5 * time.Minute,
sem: make(chan struct{}, MaxConcurrentSnapshotUpdates),
}
}

Expand Down Expand Up @@ -151,11 +158,20 @@ func (c *OperatorCache) Namespaced(namespaces ...string) MultiCatalogOperatorFin

for _, miss := range misses {
ctx, cancel := context.WithTimeout(context.Background(), CachePopulateTimeout)

catsrcPriority := defaultCatalogSourcePriority
// Ignoring error and treat catsrc priority as 0 if not found.
catsrc, err := c.catsrcLister.CatalogSources(miss.Namespace).Get(miss.Name)
if err == nil {
catsrcPriority = catsrc.Spec.Priority
}

s := CatalogSnapshot{
logger: c.logger.WithField("catalog", miss),
key: miss,
expiry: now.Add(c.ttl),
pop: cancel,
logger: c.logger.WithField("catalog", miss),
key: miss,
expiry: now.Add(c.ttl),
pop: cancel,
priority: catalogSourcePriority(catsrcPriority),
}
s.m.Lock()
c.snapshots[miss] = &s
Expand Down Expand Up @@ -222,13 +238,13 @@ func ensurePackageProperty(o *Operator, name, version string) {
PackageName: name,
Version: version,
}
byte, err := json.Marshal(prop)
bytes, err := json.Marshal(prop)
if err != nil {
return
}
o.properties = append(o.properties, &api.Property{
Type: opregistry.PackageType,
Value: string(byte),
Value: string(bytes),
})
}

Expand Down Expand Up @@ -277,6 +293,7 @@ type CatalogSnapshot struct {
operators []*Operator
m sync.RWMutex
pop context.CancelFunc
priority catalogSourcePriority
}

func (s *CatalogSnapshot) Cancel() {
Expand Down Expand Up @@ -354,10 +371,14 @@ func (s SortableSnapshots) Less(i, j int) bool {
return false
}

// the rest are sorted first in namespace preference order, then by name
// the rest are sorted first on priority, namespace and then by name
if s.snapshots[i].priority != s.snapshots[j].priority {
return s.snapshots[i].priority > s.snapshots[j].priority
}
if s.snapshots[i].key.Namespace != s.snapshots[j].key.Namespace {
return s.namespaces[s.snapshots[i].key.Namespace] < s.namespaces[s.snapshots[j].key.Namespace]
}

return s.snapshots[i].key.Name < s.snapshots[j].key.Name
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/controller/registry/resolver/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorlister"
"github.com/operator-framework/operator-registry/pkg/api"
"github.com/operator-framework/operator-registry/pkg/client"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
)

type BundleStreamStub struct {
Expand Down Expand Up @@ -83,8 +83,8 @@ func TestOperatorCacheConcurrency(t *testing.T) {
const (
NWorkers = 64
)

rcp := RegistryClientProviderStub{}
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
var keys []registry.CatalogKey
for i := 0; i < 128; i++ {
for j := 0; j < 8; j++ {
Expand All @@ -106,7 +106,7 @@ func TestOperatorCacheConcurrency(t *testing.T) {
}
}

c := NewOperatorCache(rcp, logrus.New())
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)

errs := make(chan error)
for w := 0; w < NWorkers; w++ {
Expand Down Expand Up @@ -140,6 +140,7 @@ func TestOperatorCacheConcurrency(t *testing.T) {

func TestOperatorCacheExpiration(t *testing.T) {
rcp := RegistryClientProviderStub{}
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
key := registry.CatalogKey{Namespace: "dummynamespace", Name: "dummyname"}
rcp[key] = &RegistryClientStub{
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
Expand All @@ -155,14 +156,15 @@ func TestOperatorCacheExpiration(t *testing.T) {
}),
}

c := NewOperatorCache(rcp, logrus.New())
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)
c.ttl = 0 // instantly stale

require.Len(t, c.Namespaced("dummynamespace").Catalog(key).Find(WithCSVName("csvname")), 1)
}

func TestOperatorCacheReuse(t *testing.T) {
rcp := RegistryClientProviderStub{}
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
key := registry.CatalogKey{Namespace: "dummynamespace", Name: "dummyname"}
rcp[key] = &RegistryClientStub{
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
Expand All @@ -178,7 +180,7 @@ func TestOperatorCacheReuse(t *testing.T) {
}),
}

c := NewOperatorCache(rcp, logrus.New())
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)

require.Len(t, c.Namespaced("dummynamespace").Catalog(key).Find(WithCSVName("csvname")), 1)
}
Expand Down Expand Up @@ -290,6 +292,7 @@ func TestCatalogSnapshotFind(t *testing.T) {

func TestStripPluralRequiredAndProvidedAPIKeys(t *testing.T) {
rcp := RegistryClientProviderStub{}
catsrcLister := operatorlister.NewLister().OperatorsV1alpha1().CatalogSourceLister()
key := registry.CatalogKey{Namespace: "testnamespace", Name: "testname"}
rcp[key] = &RegistryClientStub{
BundleIterator: client.NewBundleIterator(&BundleStreamStub{
Expand Down Expand Up @@ -327,7 +330,7 @@ func TestStripPluralRequiredAndProvidedAPIKeys(t *testing.T) {
}),
}

c := NewOperatorCache(rcp, logrus.New())
c := NewOperatorCache(rcp, logrus.New(), catsrcLister)

nc := c.Namespaced("testnamespace")
result, err := AtLeast(1, nc.Find(ProvidingAPI(opregistry.APIKey{Group: "g", Version: "v1", Kind: "K"})))
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/registry/resolver/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"fmt"
"sort"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
"github.com/sirupsen/logrus"
utilerrors "k8s.io/apimachinery/pkg/util/errors"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/solver"
opregistry "github.com/operator-framework/operator-registry/pkg/registry"
)

type OperatorResolver interface {
Expand All @@ -24,9 +25,9 @@ type SatResolver struct {
log logrus.FieldLogger
}

func NewDefaultSatResolver(rcp RegistryClientProvider, log logrus.FieldLogger) *SatResolver {
func NewDefaultSatResolver(rcp RegistryClientProvider, catsrcLister v1alpha1listers.CatalogSourceLister, log logrus.FieldLogger) *SatResolver {
return &SatResolver{
cache: NewOperatorCache(rcp, log),
cache: NewOperatorCache(rcp, log, catsrcLister),
log: log,
}
}
Expand Down
Loading

0 comments on commit 8a25369

Please sign in to comment.