Skip to content

Commit

Permalink
Fix intermittent UT failure re: recreating namespace
Browse files Browse the repository at this point in the history
...due to a timing issue with updates with the fake client when the
cache isn't fully synced initially.

Signed-off-by: Tom Pantelis <tompantelis@gmail.com>
  • Loading branch information
tpantelis committed Sep 3, 2024
1 parent 4ea82d2 commit 2b9b790
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
16 changes: 8 additions & 8 deletions pkg/syncer/resource_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type ResourceSyncerConfig struct {

type resourceSyncer struct {
workQueue workqueue.Interface
hasSynced func() bool
cachesSynced []cache.InformerSynced
unregHandler func()
informer cache.Controller
store cache.Store
Expand Down Expand Up @@ -243,7 +243,7 @@ func NewResourceSyncer(config *ResourceSyncerConfig) (Interface, error) {
Transform: resourceUtil.TrimManagedFields,
})

syncer.hasSynced = syncer.informer.HasSynced
syncer.cachesSynced = append(syncer.cachesSynced, syncer.informer.HasSynced)

return syncer, nil
}
Expand All @@ -265,7 +265,7 @@ func NewResourceSyncerWithSharedInformer(config *ResourceSyncerConfig, informer
return nil, errors.Wrapf(err, "error registering event handler")
}

syncer.hasSynced = reg.HasSynced
syncer.cachesSynced = append(syncer.cachesSynced, reg.HasSynced)

syncer.unregHandler = func() {
_ = informer.RemoveEventHandler(reg)
Expand Down Expand Up @@ -315,7 +315,7 @@ func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) {
syncer.workQueue = workqueue.New(config.Name)

if config.NamespaceInformer != nil {
_, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{
reg, err := config.NamespaceInformer.AddEventHandler(cache.ResourceEventHandlerDetailedFuncs{
AddFunc: func(obj interface{}, _ bool) {
syncer.workQueue.Enqueue(cache.ExplicitKey(cache.NewObjectName(namespaceAddedKey, resourceUtil.MustToMeta(obj).GetName()).String()))
},
Expand All @@ -329,6 +329,8 @@ func newResourceSyncer(config *ResourceSyncerConfig) (*resourceSyncer, error) {
if err != nil {
return nil, errors.Wrapf(err, "error registering namespace handler")
}

syncer.cachesSynced = append(syncer.cachesSynced, reg.HasSynced)
}

return syncer, nil
Expand Down Expand Up @@ -387,7 +389,7 @@ func (r *resourceSyncer) Start(stopCh <-chan struct{}) error {
if *r.config.WaitForCacheSync {
r.log.V(log.LIBDEBUG).Infof("Syncer %q waiting for informer cache to sync", r.config.Name)

_ = cache.WaitForCacheSync(stopCh, r.hasSynced)
_ = cache.WaitForCacheSync(stopCh, r.cachesSynced...)
}

r.workQueue.Run(stopCh, r.processNextWorkItem)
Expand Down Expand Up @@ -516,7 +518,7 @@ func (r *resourceSyncer) doReconcile(resourceLister func() []runtime.Object) {
}

func (r *resourceSyncer) runIfCacheSynced(defaultReturn any, run func() any) any {
if ok := cache.WaitForCacheSync(r.stopCh, r.hasSynced); !ok {
if ok := cache.WaitForCacheSync(r.stopCh, r.cachesSynced...); !ok {
// This means the cache was stopped.
r.log.Warningf("Syncer %q failed to wait for informer cache to sync", r.config.Name)

Expand Down Expand Up @@ -837,8 +839,6 @@ func (r *resourceSyncer) handleNamespaceAdded(namespace string) {
ns, name, _ := cache.SplitMetaNamespaceKey(k)
r.RequeueResource(name, ns)
}

delete(r.missingNamespaces, namespace)
}
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/syncer/resource_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,10 +1283,6 @@ func testWithMissingNamespace() {
createNamespace(test.LocalNamespace)

fakereactor.AddVerifyNamespaceReactor(&d.config.SourceClient.(*fakeClient.FakeDynamicClient).Fake, "pods")

if d.config.NamespaceInformer != nil {
go d.config.NamespaceInformer.Run(d.stopCh)
}
})

Specify("distribute should eventually succeed when the namespace is created", func() {
Expand Down Expand Up @@ -1500,6 +1496,12 @@ func newTestDriver(sourceNamespace, localClusterID string, syncDirection syncer.

Expect(err).To(Succeed())

if d.config.NamespaceInformer != nil {
go func() {
d.config.NamespaceInformer.Run(d.stopCh)
}()
}

utilruntime.ErrorHandlers = append(utilruntime.ErrorHandlers,
func(_ context.Context, err error, _ string, _ ...interface{}) {
d.handledError <- err
Expand Down

0 comments on commit 2b9b790

Please sign in to comment.