Skip to content

Commit

Permalink
Fix review comment about async controller start
Browse files Browse the repository at this point in the history
Signed-off-by: David Festal <dfestal@redhat.com>
  • Loading branch information
davidfestal committed Apr 6, 2023
1 parent f9a5104 commit 108d93e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 138 deletions.
186 changes: 58 additions & 128 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package syncer
import (
"context"
"fmt"
"sync"
"time"

kcpdynamic "github.com/kcp-dev/client-go/dynamic"
Expand Down Expand Up @@ -212,21 +211,21 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i

namespaceCleaner := &delegatingCleaner{}
shardManager := synctarget.NewShardManager(
func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (*synctarget.ShardAccess, error) {
func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (*synctarget.ShardAccess, func() error, error) {
upstreamConfig := rest.CopyConfig(cfg.UpstreamConfig)
upstreamConfig.Host = shardURLs.SyncerURL
rest.AddUserAgent(upstreamConfig, "kcp#syncing/"+kcpVersion)
upstreamSyncerClusterClient, err := kcpdynamic.NewForConfig(upstreamConfig)
if err != nil {
return nil, err
return nil, nil, err
}

upstreamUpsyncConfig := rest.CopyConfig(cfg.UpstreamConfig)
upstreamUpsyncConfig.Host = shardURLs.UpsyncerURL
rest.AddUserAgent(upstreamUpsyncConfig, "kcp#upsyncing/"+kcpVersion)
upstreamUpsyncerClusterClient, err := kcpdynamic.NewForConfig(upstreamUpsyncConfig)
if err != nil {
return nil, err
return nil, nil, err
}

ddsifForUpstreamSyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamSyncerClusterClient, nil, nil,
Expand All @@ -241,7 +240,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
},
}, cache.Indexers{})
if err != nil {
return nil, err
return nil, nil, err
}

ddsifForUpstreamUpsyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamUpsyncerClusterClient, nil, nil,
Expand All @@ -255,7 +254,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
},
cache.Indexers{})
if err != nil {
return nil, err
return nil, nil, err
}

logicalClusterIndex := synctarget.NewLogicalClusterIndex(ddsifForUpstreamSyncer, ddsifForUpstreamUpsyncer)
Expand All @@ -272,90 +271,78 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
namespaceCleaner, syncTarget.GetUID(),
syncerNamespace, dnsProcessor, cfg.DNSImage, secretMutator, podspecableMutator)
if err != nil {
return nil, err
return nil, nil, err
}

upsyncerCleaner, err := upsync.NewUpSyncerCleanupController(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, types.UID(cfg.SyncTargetUID), syncTargetKey,
upstreamUpsyncerClusterClient, ddsifForUpstreamUpsyncer,
ddsifForDownstream)
if err != nil {
return nil, err
return nil, nil, err
}

// Start and sync informer factories

waitForCacheSyncContext, cancelWaitForCacheSync := context.WithCancel(ctx)
defer cancelWaitForCacheSync()
// track the watch/list errors during the upcoming waitForCacheSync.
errorTracker := &watchCacheErrorTracker{
cancelWait: cancelWaitForCacheSync,

// 10 is quite arbitrary here.
// We don't want to block the embedding reconciliation loop too long,
// but still want to ensure the watch/list errors are repeated and signal
// a real problem.
maxErrorCount: 10,
}
var cacheSyncsForAlwaysRequiredGVRs []cache.InformerSynced
for _, alwaysRequiredGVR := range alwaysRequiredGVRs {
if informer, err := ddsifForUpstreamSyncer.ForResource(alwaysRequiredGVR); err != nil {
return nil, err
return nil, nil, err
} else {
errorTracker.TrackErrors(alwaysRequiredGVR, informer.Informer())
cacheSyncsForAlwaysRequiredGVRs = append(cacheSyncsForAlwaysRequiredGVRs, informer.Informer().HasSynced)
}
}

ddsifForUpstreamSyncer.Start(ctx.Done())
ddsifForUpstreamUpsyncer.Start(ctx.Done())
start := func() error {
// Start and sync informer factories

// Invalid shard URL, which cannot be watched by informers, will be
// detected, skipped, correctly logged and reported as an error
// to the embedding reconciler that manages shard URLs.
if ok := cache.WaitForCacheSync(waitForCacheSyncContext.Done(), cacheSyncsForAlwaysRequiredGVRs...); !ok {
return nil, fmt.Errorf("unable to sync watch caches for virtual workspace %q: %s", shardURLs.SyncerURL, errorTracker.ErrorSummary())
}
ddsifForUpstreamSyncer.Start(ctx.Done())
ddsifForUpstreamUpsyncer.Start(ctx.Done())

go ddsifForUpstreamSyncer.StartWorker(ctx)
go ddsifForUpstreamUpsyncer.StartWorker(ctx)

go specSyncer.Start(ctx, numSyncerThreads)
go upsyncerCleaner.Start(ctx, numSyncerThreads)

// Create and start GVR-specific controllers through controller managers
upstreamSyncerControllerManager := controllermanager.NewControllerManager(ctx,
"upstream-syncer",
controllermanager.InformerSource{
Subscribe: ddsifForUpstreamSyncer.Subscribe,
Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) {
genericInformers, notSynced := ddsifForUpstreamSyncer.Informers()
informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers))
for gvr, inf := range genericInformers {
informers[gvr] = inf.Informer()
}
return informers, notSynced
if ok := cache.WaitForCacheSync(ctx.Done(), cacheSyncsForAlwaysRequiredGVRs...); !ok {
return fmt.Errorf("unable to sync watch caches for virtual workspace %q", shardURLs.SyncerURL)
}

go ddsifForUpstreamSyncer.StartWorker(ctx)
go ddsifForUpstreamUpsyncer.StartWorker(ctx)

go specSyncer.Start(ctx, numSyncerThreads)
go upsyncerCleaner.Start(ctx, numSyncerThreads)

// Create and start GVR-specific controllers through controller managers
upstreamSyncerControllerManager := controllermanager.NewControllerManager(ctx,
"upstream-syncer",
controllermanager.InformerSource{
Subscribe: ddsifForUpstreamSyncer.Subscribe,
Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) {
genericInformers, notSynced := ddsifForUpstreamSyncer.Informers()
informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers))
for gvr, inf := range genericInformers {
informers[gvr] = inf.Informer()
}
return informers, notSynced
},
},
},
map[string]controllermanager.ManagedController{},
)
go upstreamSyncerControllerManager.Start(ctx)

upstreamUpsyncerControllerManager := controllermanager.NewControllerManager(ctx,
"upstream-upsyncer",
controllermanager.InformerSource{
Subscribe: ddsifForUpstreamUpsyncer.Subscribe,
Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) {
genericInformers, notSynced := ddsifForUpstreamUpsyncer.Informers()
informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers))
for gvr, inf := range genericInformers {
informers[gvr] = inf.Informer()
}
return informers, notSynced
map[string]controllermanager.ManagedController{},
)
go upstreamSyncerControllerManager.Start(ctx)

upstreamUpsyncerControllerManager := controllermanager.NewControllerManager(ctx,
"upstream-upsyncer",
controllermanager.InformerSource{
Subscribe: ddsifForUpstreamUpsyncer.Subscribe,
Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) {
genericInformers, notSynced := ddsifForUpstreamUpsyncer.Informers()
informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers))
for gvr, inf := range genericInformers {
informers[gvr] = inf.Informer()
}
return informers, notSynced
},
},
},
map[string]controllermanager.ManagedController{},
)
go upstreamUpsyncerControllerManager.Start(ctx)
map[string]controllermanager.ManagedController{},
)
go upstreamUpsyncerControllerManager.Start(ctx)

return nil
}

return &synctarget.ShardAccess{
SyncerClient: upstreamSyncerClusterClient,
Expand All @@ -364,7 +351,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i
UpsyncerDDSIF: ddsifForUpstreamUpsyncer,

LogicalClusterIndex: logicalClusterIndex,
}, nil
}, start, nil
},
)

Expand Down Expand Up @@ -563,60 +550,3 @@ func (s *delegatingCleaner) CancelCleaning(key string) {
}
s.delegate.CancelCleaning(key)
}

// watchCacheErrorTracker tracks the watch/list errors that occur during
// informer watch cache syncs, and stores them per GVR and per error message.
// It allows cancelling the wait for cache sync if the total number of tracked
// errors is higher than a maximum number of errors.
type watchCacheErrorTracker struct {
// maxErrorCount is the maximum total number of list/watch errors after which the
// waitForCacheSync call should be canceled.
maxErrorCount int

// cancelWait is the method that will cancel the waitForCacheSync call.
cancelWait func()

errors map[schema.GroupVersionResource]map[string]int
totalErrorCount int

mutex sync.Mutex
}

// TrackErrors will track watch/list errors for this informer with a WatchErrorHandler.
func (h *watchCacheErrorTracker) TrackErrors(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer) {
_ = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
h.mutex.Lock()
defer h.mutex.Unlock()
errStr := err.Error()

if h.errors == nil {
h.errors = make(map[schema.GroupVersionResource]map[string]int)
}

gvrErrors := h.errors[gvr]
if gvrErrors == nil {
gvrErrors = make(map[string]int)
h.errors[gvr] = gvrErrors
}

h.errors[gvr][errStr]++
h.totalErrorCount++
if h.totalErrorCount > h.maxErrorCount {
h.cancelWait()
}
})
}

// ErrorSummary returns a summary of the various watch/list errors
// tracked during a wait for cache sync, per GVR and per error message.
func (h *watchCacheErrorTracker) ErrorSummary() string {
h.mutex.Lock()
defer h.mutex.Unlock()
var summary string
for gvr, gvrErrors := range h.errors {
for errorStr, count := range gvrErrors {
summary += fmt.Sprintf("\n%s: %d x %s", gvr.String(), count, errorStr)
}
}
return summary
}
37 changes: 27 additions & 10 deletions pkg/syncer/synctarget/shard_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,26 @@ type GetShardAccessFunc func(clusterName logicalcluster.Name) (ShardAccess, bool
// upstream virtual workspace URLs, based on a SyncTarget resource passed to the updateShards() method.
//
// When a shard is found (identified by the couple of virtual workspace URLs - for both syncer and upsyncer),
// then the startShardControllers() method is called, and the resulting [ShardAccess] is stored.
// then the newShardControllers() method is called, the resulting [ShardAccess] is stored,
// and the resulting start() function is called in a goroutine.
//
// When a shard is removed, the context initially passed to the
// startShardControllers() method is cancelled.
//
// The ShardAccessForCluster() method will be used by some downstream controllers in order to
// be able to get / list upstream resources in the right shard.
func NewShardManager(
startShardControllers func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (*ShardAccess, error)) *shardManager {
newShardControllers func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (acces *ShardAccess, start func() error, err error)) *shardManager {
return &shardManager{
controllers: map[workloadv1alpha1.VirtualWorkspace]shardControllers{},
startShardControllers: startShardControllers,
controllers: map[workloadv1alpha1.VirtualWorkspace]shardControllers{},
newShardControllers: newShardControllers,
}
}

type shardManager struct {
controllersLock sync.RWMutex
controllers map[workloadv1alpha1.VirtualWorkspace]shardControllers
startShardControllers func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (*ShardAccess, error)
controllersLock sync.RWMutex
controllers map[workloadv1alpha1.VirtualWorkspace]shardControllers
newShardControllers func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (acces *ShardAccess, start func() error, err error)
}

func (c *shardManager) ShardAccessForCluster(clusterName logicalcluster.Name) (ShardAccess, bool, error) {
Expand Down Expand Up @@ -211,6 +212,7 @@ func (c *shardManager) reconcile(ctx context.Context, syncTarget *workloadv1alph
var errs []error
// Create and start missing controllers that have Virtual Workspace URLs for a shard
for shardURLs := range requiredShards {
shardURLs := shardURLs
if _, ok := c.controllers[shardURLs]; ok {
// The controllers are already started
continue
Expand All @@ -219,21 +221,36 @@ func (c *shardManager) reconcile(ctx context.Context, syncTarget *workloadv1alph
// Start the controllers
shardControllersContext, cancelFunc := context.WithCancel(ctx)
// Create the controllers
shardAccess, err := c.startShardControllers(shardControllersContext, shardURLs)
shardAccess, start, err := c.newShardControllers(shardControllersContext, shardURLs)
if err != nil {
logger.Error(err, "failed creating controllers for shard", "shard", shardURLs)
errs = append(errs, err)
cancelFunc()
continue
}
c.controllers[shardURLs] = shardControllers{
*shardAccess, cancelFunc,
*shardAccess, cancelFunc, false,
}
go func() {
err := start()
c.controllersLock.Lock()
defer c.controllersLock.Unlock()

if err != nil {
delete(c.controllers, shardURLs)
cancelFunc()
} else {
controllers := c.controllers[shardURLs]
controllers.ready = true
c.controllers[shardURLs] = controllers
}
}()
}
return reconcileStatusContinue, utilserrors.NewAggregate(errs)
}

type shardControllers struct {
ShardAccess
stop func()
stop func()
ready bool
}

0 comments on commit 108d93e

Please sign in to comment.