-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-47385: Create BackpressureSource and BackpressureCombiner #47392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
|
|
|
|
I will simplify pause logic. It will be easier to reason with simple but composable pause propagation logic. |
|
Some minor suggestions regarding to formats. Now looking into more details. |
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
| // 1. Default pause_on_any=true - pause on any source is propagated - OR logic | ||
| // 2. pause_on_any=false - pause is propagated only when all sources are paused - AND | ||
| // logic | ||
| class ARROW_ACERO_EXPORT BackpressureCombiner { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I get the idea of bridging multiple backpressure sources and propagating the backpressure between, in an implicit and automatic manner. But before we introduce this as a public API, I'd like to see a more concrete use case, e.g. a plan tree (possibly a hypothetical one with some new node types you are about to add in the future), to showcase some unclear things such as:
- Who owns the life span of the combiner and the combiner sources?
- When and where to establish the bridge between a combiner source and a particular combiner?
- When and where to trigger a combiner source's
Pause()orResume()?
Current BackpressureCombiner exposes too much implementation detail as a public interface, e.g. we even know it uses std::mutex to manage synchronization. But this is another topic that we may talk about later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is reasonable but I am not sure how to approach this. I was trying to keep the PRs relatively small. To showcase use of this component I would need Pipes that enable muli-output nodes. I can either add Pipe to this PR or update rebase Pipe draft to use that uses BackpressureCombiner internally. Also to think it would be relatively easy to showcase filter node with optional output providing filtered-out data. LMK what you think is best approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separation is good, you don't have to combine any PRs. I think you can just sketch the idea in text, if there isn't much trouble. (And I briefly viewed the Pipe PR and I get the idea.)
Thanks.
Rationale for this change
Provide infrastructure for unified backpressure control.
What changes are included in this PR?
BackpressureController is moved from asof_join and sorted_merge to common backpressure.
BackpressureCombiner with BackpressureCombiner::Source is added to handle multiple pause sources.
Are these changes tested?
Test is added.
Are there any user-facing changes?
New api is available