Skip to content

Commit

Permalink
informer/reflector: delete syncWithStream
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <caiwei95@hotmail.com>
  • Loading branch information
Iceber committed Nov 15, 2023
1 parent 85856b6 commit 07f4d15
Showing 1 changed file with 10 additions and 17 deletions.
27 changes: 10 additions & 17 deletions pkg/synchromanager/clustersynchro/informer/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,16 @@ func (r *Reflector) listWithResultStream(ctx context.Context, pager *clspager.Li
go func() {
list, paginatedResult, err = pager.List(clspager.WithResultStream(ctx, ch), options)
}()
if itemKeys, err = r.syncWithStream(ch); err != nil {
return

var key string
for obj := range ch {
if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err != nil {
return
}
if err = r.store.Add(obj); err != nil {
return
}
itemKeys = append(itemKeys, cache.ExplicitKey(key))
}
return
}
Expand All @@ -494,21 +502,6 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err
return r.store.Replace(found, resourceVersion)
}

func (r *Reflector) syncWithStream(ch chan runtime.Object) ([]interface{}, error) {
var keys []interface{}
for obj := range ch {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
return keys, err
}
keys = append(keys, cache.ExplicitKey(key))
if err := r.store.Add(obj); err != nil {
return keys, err
}
}
return keys, nil
}

func (r *Reflector) syncWithKeys(keys []interface{}, resourceVersion string) error {
return r.store.Replace(keys, resourceVersion)
}
Expand Down

0 comments on commit 07f4d15

Please sign in to comment.