Skip to content

Commit

Permalink
Give Sources control of invalidation for Snapshots they produce.
Browse files Browse the repository at this point in the history
This moves the hard-coded 5-minute TTL out of the cache and into the
registry-based implementation of cache.Source. The hack that makes
snapshots produced by "virtual" sources is no longer necessary.

Signed-off-by: Ben Luddy <bluddy@redhat.com>
  • Loading branch information
benluddy committed Feb 17, 2022
1 parent 857c9da commit 103ed08
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 132 deletions.
6 changes: 4 additions & 2 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type Operator struct {
bundleUnpackTimeout time.Duration
clientFactory clients.Factory
muInstallPlan sync.Mutex
resolverSourceProvider *resolver.RegistrySourceProvider
}

type CatalogSourceSyncFunc func(logger *logrus.Entry, in *v1alpha1.CatalogSource) (out *v1alpha1.CatalogSource, continueSync bool, syncError error)
Expand Down Expand Up @@ -188,8 +189,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
clientFactory: clients.NewFactory(config),
}
op.sources = grpc.NewSourceStore(logger, 10*time.Second, 10*time.Minute, op.syncSourceState)
op.resolverSourceProvider = resolver.SourceProviderFromRegistryClientProvider(op.sources, logger)
op.reconciler = reconciler.NewRegistryReconcilerFactory(lister, opClient, configmapRegistryImage, op.now, ssaClient)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.sources, logger)
res := resolver.NewOperatorStepResolver(lister, crClient, operatorNamespace, op.resolverSourceProvider, logger)
op.resolver = resolver.NewInstrumentedResolver(res, metrics.RegisterDependencyResolutionSuccess, metrics.RegisterDependencyResolutionFailure)

// Wire OLM CR sharedIndexInformers
Expand Down Expand Up @@ -465,7 +467,7 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

switch state.State {
case connectivity.Ready:
o.resolver.Expire(resolvercache.SourceKey(state.Key))
o.resolverSourceProvider.Invalidate(resolvercache.SourceKey(state.Key))
if o.namespace == state.Key.Namespace {
namespaces, err := index.CatalogSubscriberNamespaces(o.catalogSubscriberIndexer,
state.Key.Name, state.Key.Namespace)
Expand Down
55 changes: 29 additions & 26 deletions pkg/controller/registry/resolver/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (p StaticSourceProvider) Sources(namespaces ...string) map[SourceKey]Source

type OperatorCacheProvider interface {
Namespaced(namespaces ...string) MultiCatalogOperatorFinder
Expire(catalog SourceKey)
}

type SourcePriorityProvider interface {
Expand Down Expand Up @@ -150,22 +149,11 @@ func (c *NamespacedOperatorCache) Error() error {
return errors.NewAggregate(errs)
}

func (c *Cache) Expire(catalog SourceKey) {
c.m.Lock()
defer c.m.Unlock()
s, ok := c.snapshots[catalog]
if !ok {
return
}
s.expiry = time.Unix(0, 0)
}

func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
const (
CachePopulateTimeout = time.Minute
)

now := time.Now()
sources := c.sp.Sources(namespaces...)

result := NamespacedOperatorCache{
Expand All @@ -182,7 +170,7 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
func() {
snapshot.m.RLock()
defer snapshot.m.RUnlock()
if snapshot.Valid(now) {
if snapshot.Valid() {
result.snapshots[key] = snapshot
} else {
misses = append(misses, key)
Expand All @@ -205,7 +193,7 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
// Take the opportunity to clear expired snapshots while holding the lock.
var expired []SourceKey
for key, snapshot := range c.snapshots {
if !snapshot.Valid(now) {
if !snapshot.Valid() {
snapshot.Cancel()
expired = append(expired, key)
}
Expand All @@ -217,7 +205,7 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
// Check for any snapshots that were populated while waiting to acquire the lock.
var found int
for i := range misses {
if hdr, ok := c.snapshots[misses[i]]; ok && hdr.Valid(now) {
if hdr, ok := c.snapshots[misses[i]]; ok && hdr.Valid() {
result.snapshots[misses[i]] = hdr
misses[found], misses[i] = misses[i], misses[found]
found++
Expand All @@ -230,17 +218,10 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {

hdr := snapshotHeader{
key: miss,
expiry: now.Add(c.ttl),
pop: cancel,
priority: c.sourcePriorityProvider.Priority(miss),
}

if miss.Virtual() {
// hack! always refresh virtual catalogs.
// todo: Sources should be responsible for determining when the Snapshots they produce become invalid
hdr.expiry = time.Time{}
}

hdr.m.Lock()
c.snapshots[miss] = &hdr
result.snapshots[miss] = &hdr
Expand All @@ -249,7 +230,13 @@ func (c *Cache) Namespaced(namespaces ...string) MultiCatalogOperatorFinder {
defer hdr.m.Unlock()
c.sem <- struct{}{}
defer func() { <-c.sem }()
hdr.snapshot, hdr.err = source.Snapshot(ctx)
if snapshot, err := source.Snapshot(ctx); err != nil {
hdr.err = err
} else if snapshot != nil {
hdr.snapshot = snapshot
} else {
hdr.err = fmt.Errorf("source %q produced no snapshot and no error", hdr.key)
}
}(ctx, &hdr, sources[miss])
}

Expand Down Expand Up @@ -286,6 +273,15 @@ func (c *NamespacedOperatorCache) Find(p ...Predicate) []*Entry {

type Snapshot struct {
Entries []*Entry

// Unless closed, the Snapshot is valid.
Valid <-chan struct{}
}

func ValidOnce() <-chan struct{} {
c := make(chan struct{})
close(c)
return c
}

var _ Source = &Snapshot{}
Expand All @@ -298,7 +294,6 @@ type snapshotHeader struct {
snapshot *Snapshot

key SourceKey
expiry time.Time
m sync.RWMutex
pop context.CancelFunc
err error
Expand All @@ -309,10 +304,18 @@ func (hdr *snapshotHeader) Cancel() {
hdr.pop()
}

func (hdr *snapshotHeader) Valid(at time.Time) bool {
func (hdr *snapshotHeader) Valid() bool {
hdr.m.RLock()
defer hdr.m.RUnlock()
return hdr.snapshot != nil && hdr.err == nil && at.Before(hdr.expiry)
if hdr.snapshot == nil || hdr.err != nil {
return false
}
select {
case <-hdr.snapshot.Valid:
return false
default:
}
return true
}

type sortableSnapshots struct {
Expand Down
36 changes: 12 additions & 24 deletions pkg/controller/registry/resolver/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/rand"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -68,12 +67,12 @@ func TestOperatorCacheExpiration(t *testing.T) {
key := SourceKey{Namespace: "dummynamespace", Name: "dummyname"}
ssp := make(StaticSourceProvider)
c := New(ssp)
c.ttl = 0 // instantly stale

ssp[key] = &Snapshot{
Entries: []*Entry{
{Name: "v1"},
},
Valid: ValidOnce(),
}
require.Len(t, c.Namespaced("dummynamespace").Catalog(key).Find(CSVNamePredicate("v1")), 1)

Expand Down Expand Up @@ -108,62 +107,51 @@ func TestOperatorCacheReuse(t *testing.T) {
func TestCatalogSnapshotValid(t *testing.T) {
type tc struct {
Name string
Expiry time.Time
Snapshot *Snapshot
Error error
At time.Time
Expected bool
}

for _, tt := range []tc{
{
Name: "after expiry",
Expiry: time.Unix(0, 1),
Snapshot: &Snapshot{},
Name: "invalidated",
Snapshot: &Snapshot{
Valid: ValidOnce(),
},
Error: nil,
At: time.Unix(0, 2),
Expected: false,
},
{
Name: "before expiry",
Expiry: time.Unix(0, 2),
Snapshot: &Snapshot{},
Name: "valid",
Snapshot: &Snapshot{}, // valid forever
Error: nil,
At: time.Unix(0, 1),
Expected: true,
},
{
Name: "nil snapshot",
Expiry: time.Unix(0, 2),
Name: "nil snapshot and non-nil error",
Snapshot: nil,
Error: errors.New(""),
At: time.Unix(0, 1),
Expected: false,
},
{
Name: "non-nil error",
Expiry: time.Unix(0, 2),
Name: "non-nil snapshot and non-nil error",
Snapshot: &Snapshot{},
Error: errors.New(""),
At: time.Unix(0, 1),
Expected: false,
},
{
Name: "at expiry",
Expiry: time.Unix(0, 1),
Snapshot: &Snapshot{},
Name: "nil snapshot and nil error",
Snapshot: nil,
Error: nil,
At: time.Unix(0, 1),
Expected: false,
},
} {
t.Run(tt.Name, func(t *testing.T) {
s := snapshotHeader{
expiry: tt.Expiry,
snapshot: tt.Snapshot,
err: tt.Error,
}
assert.Equal(t, tt.Expected, s.Valid(tt.At))
assert.Equal(t, tt.Expected, s.Valid())
})
}
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/controller/registry/resolver/instrumented_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"time"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
)

type InstrumentedResolver struct {
Expand Down Expand Up @@ -33,7 +32,3 @@ func (ir *InstrumentedResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step
}
return steps, lookups, subs, err
}

func (ir *InstrumentedResolver) Expire(key cache.SourceKey) {
ir.resolver.Expire(key)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache"
"github.com/stretchr/testify/require"
)

Expand All @@ -22,16 +21,10 @@ func (r *fakeResolverWithError) ResolveSteps(namespace string) ([]*v1alpha1.Step
return nil, nil, nil, errors.New("Fake error")
}

func (r *fakeResolverWithError) Expire(key cache.SourceKey) {
}

func (r *fakeResolverWithoutError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) {
return nil, nil, nil, nil
}

func (r *fakeResolverWithoutError) Expire(key cache.SourceKey) {
}

func newFakeResolverWithError() *fakeResolverWithError {
return &fakeResolverWithError{}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/registry/resolver/source_csvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) {
s.logger.Printf("considered csvs without properties annotation during resolution: %v", names)
}

return &cache.Snapshot{Entries: entries}, nil
return &cache.Snapshot{
Entries: entries,
Valid: cache.ValidOnce(),
}, nil
}

func (s *csvSource) inferProperties(csv *v1alpha1.ClusterServiceVersion, subs []*v1alpha1.Subscription) ([]*api.Property, error) {
Expand Down
Loading

0 comments on commit 103ed08

Please sign in to comment.