-
Notifications
You must be signed in to change notification settings - Fork 4k
Open
Description
Describe the enhancement requested
I have been experimenting with the concept of pipe in Acero. It show great potential in expanding Acero flexibility and features. Initial draft and proof of concept implementation is available here: PR-45435
Consider named_pipe that:
- has one sink (
pipe_sinkorpipe_tee) - has multiple sources (
pipe_source) - each
ExecBatchthat enters sink (pipe_sink/pipe_tee) is duplicated to all of its outputs (pipe_source) - sinks are matched with sources by
pipe_nameinInitphase of exec plan
pipe_sink - is a sink node that consumes RecordBatches and duplicates them onto all pipe_sources that match by name.
pipe_tee - works exactly as pipe_sink and additionally forwards the batch to its output
pipe_source - is pipe consumer node that can be considered as source node in terms of declaration
Example of new flexibility given new nodes:
Declaration main_query= Declaration::Sequence({
{"source", ...},
{"pipe_tee", PipeSinkNodeOptions{"named_pipe_1"}},
{"filter", FilterNodeOptions{expr1},
{"sink", ...}
});
Declaration extra_query= Declaration::Sequence({
{"pipe_source", PipeSourceNodeOptions{"named_pipe_1", schema}}
{"filter", FilterNodeOptions{expr2},
{"sink", ...}
});
...
main_query.AddToPlan(plan.get());
extra_query.AddToPlan(plan.get());
Things left to do:
refactor the code and addpipe_sinkhandle instances ofpipe_sourcesconsumers that do not have corresponding producers - produce error- probably more tests
- update doc
Let me know what you think about this feature and whether there are any pitfalls that I missed. What additional test cases should I implement?
Component(s)
C++