From 108d93eb08933a3bbd46e827544266282598d4e3 Mon Sep 17 00:00:00 2001 From: David Festal Date: Wed, 5 Apr 2023 15:31:03 +0200 Subject: [PATCH] Fix review comment about async controller start Signed-off-by: David Festal --- pkg/syncer/syncer.go | 186 ++++++++----------------- pkg/syncer/synctarget/shard_manager.go | 37 +++-- 2 files changed, 85 insertions(+), 138 deletions(-) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 98ac4aa81cb..dc1461d5254 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -19,7 +19,6 @@ package syncer import ( "context" "fmt" - "sync" "time" kcpdynamic "github.com/kcp-dev/client-go/dynamic" @@ -212,13 +211,13 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i namespaceCleaner := &delegatingCleaner{} shardManager := synctarget.NewShardManager( - func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (*synctarget.ShardAccess, error) { + func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (*synctarget.ShardAccess, func() error, error) { upstreamConfig := rest.CopyConfig(cfg.UpstreamConfig) upstreamConfig.Host = shardURLs.SyncerURL rest.AddUserAgent(upstreamConfig, "kcp#syncing/"+kcpVersion) upstreamSyncerClusterClient, err := kcpdynamic.NewForConfig(upstreamConfig) if err != nil { - return nil, err + return nil, nil, err } upstreamUpsyncConfig := rest.CopyConfig(cfg.UpstreamConfig) @@ -226,7 +225,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i rest.AddUserAgent(upstreamUpsyncConfig, "kcp#upsyncing/"+kcpVersion) upstreamUpsyncerClusterClient, err := kcpdynamic.NewForConfig(upstreamUpsyncConfig) if err != nil { - return nil, err + return nil, nil, err } ddsifForUpstreamSyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamSyncerClusterClient, nil, nil, @@ -241,7 +240,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i }, }, cache.Indexers{}) if err != nil { - return nil, err + return nil, nil, err } ddsifForUpstreamUpsyncer, err := ddsif.NewDiscoveringDynamicSharedInformerFactory(upstreamUpsyncerClusterClient, nil, nil, @@ -255,7 +254,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i }, cache.Indexers{}) if err != nil { - return nil, err + return nil, nil, err } logicalClusterIndex := synctarget.NewLogicalClusterIndex(ddsifForUpstreamSyncer, ddsifForUpstreamUpsyncer) @@ -272,90 +271,78 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i namespaceCleaner, syncTarget.GetUID(), syncerNamespace, dnsProcessor, cfg.DNSImage, secretMutator, podspecableMutator) if err != nil { - return nil, err + return nil, nil, err } upsyncerCleaner, err := upsync.NewUpSyncerCleanupController(logger, logicalcluster.From(syncTarget), cfg.SyncTargetName, types.UID(cfg.SyncTargetUID), syncTargetKey, upstreamUpsyncerClusterClient, ddsifForUpstreamUpsyncer, ddsifForDownstream) if err != nil { - return nil, err + return nil, nil, err } - // Start and sync informer factories - - waitForCacheSyncContext, cancelWaitForCacheSync := context.WithCancel(ctx) - defer cancelWaitForCacheSync() - // track the watch/list errors during the upcoming waitForCacheSync. - errorTracker := &watchCacheErrorTracker{ - cancelWait: cancelWaitForCacheSync, - - // 10 is quite arbitrary here. - // We don't want to block the embedding reconciliation loop too long, - // but still want to ensure the watch/list errors are repeated and signal - // a real problem. - maxErrorCount: 10, - } var cacheSyncsForAlwaysRequiredGVRs []cache.InformerSynced for _, alwaysRequiredGVR := range alwaysRequiredGVRs { if informer, err := ddsifForUpstreamSyncer.ForResource(alwaysRequiredGVR); err != nil { - return nil, err + return nil, nil, err } else { - errorTracker.TrackErrors(alwaysRequiredGVR, informer.Informer()) cacheSyncsForAlwaysRequiredGVRs = append(cacheSyncsForAlwaysRequiredGVRs, informer.Informer().HasSynced) } } - ddsifForUpstreamSyncer.Start(ctx.Done()) - ddsifForUpstreamUpsyncer.Start(ctx.Done()) + start := func() error { + // Start and sync informer factories - // Invalid shard URL, which cannot be watched by informers, will be - // detected, skipped, correctly logged and reported as an error - // to the embedding reconciler that manages shard URLs. - if ok := cache.WaitForCacheSync(waitForCacheSyncContext.Done(), cacheSyncsForAlwaysRequiredGVRs...); !ok { - return nil, fmt.Errorf("unable to sync watch caches for virtual workspace %q: %s", shardURLs.SyncerURL, errorTracker.ErrorSummary()) - } + ddsifForUpstreamSyncer.Start(ctx.Done()) + ddsifForUpstreamUpsyncer.Start(ctx.Done()) - go ddsifForUpstreamSyncer.StartWorker(ctx) - go ddsifForUpstreamUpsyncer.StartWorker(ctx) - - go specSyncer.Start(ctx, numSyncerThreads) - go upsyncerCleaner.Start(ctx, numSyncerThreads) - - // Create and start GVR-specific controllers through controller managers - upstreamSyncerControllerManager := controllermanager.NewControllerManager(ctx, - "upstream-syncer", - controllermanager.InformerSource{ - Subscribe: ddsifForUpstreamSyncer.Subscribe, - Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) { - genericInformers, notSynced := ddsifForUpstreamSyncer.Informers() - informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers)) - for gvr, inf := range genericInformers { - informers[gvr] = inf.Informer() - } - return informers, notSynced + if ok := cache.WaitForCacheSync(ctx.Done(), cacheSyncsForAlwaysRequiredGVRs...); !ok { + return fmt.Errorf("unable to sync watch caches for virtual workspace %q", shardURLs.SyncerURL) + } + + go ddsifForUpstreamSyncer.StartWorker(ctx) + go ddsifForUpstreamUpsyncer.StartWorker(ctx) + + go specSyncer.Start(ctx, numSyncerThreads) + go upsyncerCleaner.Start(ctx, numSyncerThreads) + + // Create and start GVR-specific controllers through controller managers + upstreamSyncerControllerManager := controllermanager.NewControllerManager(ctx, + "upstream-syncer", + controllermanager.InformerSource{ + Subscribe: ddsifForUpstreamSyncer.Subscribe, + Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) { + genericInformers, notSynced := ddsifForUpstreamSyncer.Informers() + informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers)) + for gvr, inf := range genericInformers { + informers[gvr] = inf.Informer() + } + return informers, notSynced + }, }, - }, - map[string]controllermanager.ManagedController{}, - ) - go upstreamSyncerControllerManager.Start(ctx) - - upstreamUpsyncerControllerManager := controllermanager.NewControllerManager(ctx, - "upstream-upsyncer", - controllermanager.InformerSource{ - Subscribe: ddsifForUpstreamUpsyncer.Subscribe, - Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) { - genericInformers, notSynced := ddsifForUpstreamUpsyncer.Informers() - informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers)) - for gvr, inf := range genericInformers { - informers[gvr] = inf.Informer() - } - return informers, notSynced + map[string]controllermanager.ManagedController{}, + ) + go upstreamSyncerControllerManager.Start(ctx) + + upstreamUpsyncerControllerManager := controllermanager.NewControllerManager(ctx, + "upstream-upsyncer", + controllermanager.InformerSource{ + Subscribe: ddsifForUpstreamUpsyncer.Subscribe, + Informers: func() (informers map[schema.GroupVersionResource]cache.SharedIndexInformer, notSynced []schema.GroupVersionResource) { + genericInformers, notSynced := ddsifForUpstreamUpsyncer.Informers() + informers = make(map[schema.GroupVersionResource]cache.SharedIndexInformer, len(genericInformers)) + for gvr, inf := range genericInformers { + informers[gvr] = inf.Informer() + } + return informers, notSynced + }, }, - }, - map[string]controllermanager.ManagedController{}, - ) - go upstreamUpsyncerControllerManager.Start(ctx) + map[string]controllermanager.ManagedController{}, + ) + go upstreamUpsyncerControllerManager.Start(ctx) + + return nil + } return &synctarget.ShardAccess{ SyncerClient: upstreamSyncerClusterClient, @@ -364,7 +351,7 @@ func StartSyncer(ctx context.Context, cfg *SyncerConfig, numSyncerThreads int, i UpsyncerDDSIF: ddsifForUpstreamUpsyncer, LogicalClusterIndex: logicalClusterIndex, - }, nil + }, start, nil }, ) @@ -563,60 +550,3 @@ func (s *delegatingCleaner) CancelCleaning(key string) { } s.delegate.CancelCleaning(key) } - -// watchCacheErrorTracker tracks the watch/list errors that occur during -// informer watch cache syncs, and stores them per GVR and per error message. -// It allows cancelling the wait for cache sync if the total number of tracked -// errors is higher than a maximum number of errors. -type watchCacheErrorTracker struct { - // maxErrorCount is the maximum total number of list/watch errors after which the - // waitForCacheSync call should be canceled. - maxErrorCount int - - // cancelWait is the method that will cancel the waitForCacheSync call. - cancelWait func() - - errors map[schema.GroupVersionResource]map[string]int - totalErrorCount int - - mutex sync.Mutex -} - -// TrackErrors will track watch/list errors for this informer with a WatchErrorHandler. -func (h *watchCacheErrorTracker) TrackErrors(gvr schema.GroupVersionResource, informer cache.SharedIndexInformer) { - _ = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) { - h.mutex.Lock() - defer h.mutex.Unlock() - errStr := err.Error() - - if h.errors == nil { - h.errors = make(map[schema.GroupVersionResource]map[string]int) - } - - gvrErrors := h.errors[gvr] - if gvrErrors == nil { - gvrErrors = make(map[string]int) - h.errors[gvr] = gvrErrors - } - - h.errors[gvr][errStr]++ - h.totalErrorCount++ - if h.totalErrorCount > h.maxErrorCount { - h.cancelWait() - } - }) -} - -// ErrorSummary returns a summary of the various watch/list errors -// tracked during a wait for cache sync, per GVR and per error message. -func (h *watchCacheErrorTracker) ErrorSummary() string { - h.mutex.Lock() - defer h.mutex.Unlock() - var summary string - for gvr, gvrErrors := range h.errors { - for errorStr, count := range gvrErrors { - summary += fmt.Sprintf("\n%s: %d x %s", gvr.String(), count, errorStr) - } - } - return summary -} diff --git a/pkg/syncer/synctarget/shard_manager.go b/pkg/syncer/synctarget/shard_manager.go index fcf0876d458..6a4a65d0b83 100644 --- a/pkg/syncer/synctarget/shard_manager.go +++ b/pkg/syncer/synctarget/shard_manager.go @@ -150,7 +150,8 @@ type GetShardAccessFunc func(clusterName logicalcluster.Name) (ShardAccess, bool // upstream virtual workspace URLs, based on a SyncTarget resource passed to the updateShards() method. // // When a shard is found (identified by the couple of virtual workspace URLs - for both syncer and upsyncer), -// then the startShardControllers() method is called, and the resulting [ShardAccess] is stored. +// then the newShardControllers() method is called, the resulting [ShardAccess] is stored, +// and the resulting start() function is called in a goroutine. // // When a shard is removed, the context initially passed to the // startShardControllers() method is cancelled. @@ -158,17 +159,17 @@ type GetShardAccessFunc func(clusterName logicalcluster.Name) (ShardAccess, bool // The ShardAccessForCluster() method will be used by some downstream controllers in order to // be able to get / list upstream resources in the right shard. func NewShardManager( - startShardControllers func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (*ShardAccess, error)) *shardManager { + newShardControllers func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (acces *ShardAccess, start func() error, err error)) *shardManager { return &shardManager{ - controllers: map[workloadv1alpha1.VirtualWorkspace]shardControllers{}, - startShardControllers: startShardControllers, + controllers: map[workloadv1alpha1.VirtualWorkspace]shardControllers{}, + newShardControllers: newShardControllers, } } type shardManager struct { - controllersLock sync.RWMutex - controllers map[workloadv1alpha1.VirtualWorkspace]shardControllers - startShardControllers func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (*ShardAccess, error) + controllersLock sync.RWMutex + controllers map[workloadv1alpha1.VirtualWorkspace]shardControllers + newShardControllers func(ctx context.Context, shardURLs workloadv1alpha1.VirtualWorkspace) (acces *ShardAccess, start func() error, err error) } func (c *shardManager) ShardAccessForCluster(clusterName logicalcluster.Name) (ShardAccess, bool, error) { @@ -211,6 +212,7 @@ func (c *shardManager) reconcile(ctx context.Context, syncTarget *workloadv1alph var errs []error // Create and start missing controllers that have Virtual Workspace URLs for a shard for shardURLs := range requiredShards { + shardURLs := shardURLs if _, ok := c.controllers[shardURLs]; ok { // The controllers are already started continue @@ -219,7 +221,7 @@ func (c *shardManager) reconcile(ctx context.Context, syncTarget *workloadv1alph // Start the controllers shardControllersContext, cancelFunc := context.WithCancel(ctx) // Create the controllers - shardAccess, err := c.startShardControllers(shardControllersContext, shardURLs) + shardAccess, start, err := c.newShardControllers(shardControllersContext, shardURLs) if err != nil { logger.Error(err, "failed creating controllers for shard", "shard", shardURLs) errs = append(errs, err) @@ -227,13 +229,28 @@ func (c *shardManager) reconcile(ctx context.Context, syncTarget *workloadv1alph continue } c.controllers[shardURLs] = shardControllers{ - *shardAccess, cancelFunc, + *shardAccess, cancelFunc, false, } + go func() { + err := start() + c.controllersLock.Lock() + defer c.controllersLock.Unlock() + + if err != nil { + delete(c.controllers, shardURLs) + cancelFunc() + } else { + controllers := c.controllers[shardURLs] + controllers.ready = true + c.controllers[shardURLs] = controllers + } + }() } return reconcileStatusContinue, utilserrors.NewAggregate(errs) } type shardControllers struct { ShardAccess - stop func() + stop func() + ready bool }