-
Notifications
You must be signed in to change notification settings - Fork 298
feat: allow list items to be processed in parallel #738
base: master
Are you sure you want to change the base?
feat: allow list items to be processed in parallel #738
Conversation
849c153 to
0f1cd01
Compare
Signed-off-by: Shady Rafehi <shady@canva.com>
0f1cd01 to
4e4d254
Compare
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #738 +/- ##
==========================================
- Coverage 54.26% 47.67% -6.59%
==========================================
Files 64 64
Lines 6164 6611 +447
==========================================
- Hits 3345 3152 -193
- Misses 2549 3199 +650
+ Partials 270 260 -10 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| listPageSize: defaultListPageSize, | ||
| listPageBufferSize: defaultListPageBufferSize, | ||
| listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight), | ||
| listItemSemaphoreWeight: defaultListItemSemaphoreWeight, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great that it maintains existing behavior by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very elegant solution!
| resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { | ||
| return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { | ||
| if un, ok := obj.(*unstructured.Unstructured); !ok { | ||
| return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName()) | ||
| } else { | ||
| items = append(items, c.newResource(un)) | ||
| if err := limiter.Run(ctx, func() { | ||
| newRes := c.newResource(un) | ||
| listLock.Lock() | ||
| items = append(items, newRes) | ||
| listLock.Unlock() | ||
| }); err != nil { | ||
| return fmt.Errorf("failed to process list item: %w", err) | ||
| } | ||
| } | ||
| return nil | ||
| }) | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to call this out as a trade-off, I think this will cause a change in ordering when using concurrency. But this is only when its enabled. Not a bad trade-off, just something to be aware of.



This PR introduces a new setting which allows items returned by
ListPagerto be processed in parallel.Currently, items are processed sequentially per resource. This is a problem if
populateResourceInfoHandleris expensive. We internally have a custom resource which takes 1.5 milliseconds in thepopulateResourceInfoHandlerfunction due to a custom lua healthcheck script and a few ignored labels which are removed during diff normalisation. With over 50,000 of these custom resources, that's a 75 second overhead.With a parallelism of 8, we've seen a page of 500 items go from ~750ms to ~100ms. We've also seen the processing of a full list complete at roughly the same time as
ListPagercompletes. Previously, processing of pages constantly lagged behind the retrieval.gobenchresults:Before and after: