-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-41190: [C++] support for single threaded joins #41125
Conversation
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format?
or
In the case of PARQUET issues on JIRA the title also supports:
See also: |
This is not a MINOR change. Could you open a new issue for this? |
|
Fair enough, #41190 is the corresponding issue. |
Could you use our PR template instead of removing it entirely next time? Can we avoid using diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index 48cc83dd3d..649ea32bc7 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -948,7 +948,7 @@ class AsofJoinNode : public ExecNode {
return true;
}
- Result<std::shared_ptr<RecordBatch>> ProcessInner() {
+ Status ProcessInner() {
DCHECK(!state_.empty());
auto& lhs = *state_.at(0);
@@ -992,10 +992,18 @@ class AsofJoinNode : public ExecNode {
// Emit the batch
if (dst.empty()) {
- return NULLPTR;
+ return Status::OK();
} else {
ARROW_ASSIGN_OR_RAISE(auto out, dst.Materialize());
- return out.has_value() ? out.value() : NULLPTR;
+ if (!out.has_value()) {
+ return Status::OK();
+ }
+ auto out_rb = out.value();
+ ExecBatch out_b(*out_rb);
+ out_b.index = batches_produced_++;
+ DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
+ out_rb->ToString(), DEBUG_MANIP(std::endl));
+ return output_->InputReceived(this, std::move(out_b));
}
}
@@ -1006,6 +1014,7 @@ class AsofJoinNode : public ExecNode {
~Defer() noexcept { callable(); }
};
+#ifdef ARROW_ENABLE_THREADING
void EndFromProcessThread(Status st = Status::OK()) {
// We must spawn a new task to transfer off the process thread when
// marking this finished. Otherwise there is a chance that doing so could
@@ -1039,21 +1048,10 @@ class AsofJoinNode : public ExecNode {
// Process batches while we have data
for (;;) {
- Result<std::shared_ptr<RecordBatch>> result = ProcessInner();
-
- if (result.ok()) {
- auto out_rb = *result;
- if (!out_rb) break;
- ExecBatch out_b(*out_rb);
- out_b.index = batches_produced_++;
- DEBUG_SYNC(this, "produce batch ", out_b.index, ":", DEBUG_MANIP(std::endl),
- out_rb->ToString(), DEBUG_MANIP(std::endl));
- Status st = output_->InputReceived(this, std::move(out_b));
- if (!st.ok()) {
- EndFromProcessThread(std::move(st));
- }
- } else {
- EndFromProcessThread(result.status());
+ Status st = ProcessInner();
+ const auto ok = st.ok();
+ EndFromProcessThread(std::move(st));
+ if (!ok) {
return false;
}
}
@@ -1085,6 +1083,7 @@ class AsofJoinNode : public ExecNode {
}
static void ProcessThreadWrapper(AsofJoinNode* node) { node->ProcessThread(); }
+#endif
public:
AsofJoinNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels,
@@ -1116,8 +1115,10 @@ class AsofJoinNode : public ExecNode {
}
virtual ~AsofJoinNode() {
+#ifdef ARROW_ENABLE_THREADING
process_.Push(false); // poison pill
process_thread_.join();
+#endif
}
const std::vector<col_index_t>& indices_of_on_key() { return indices_of_on_key_; }
@@ -1375,7 +1376,18 @@ class AsofJoinNode : public ExecNode {
const char* kind_name() const override { return "AsofJoinNode"; }
const Ordering& ordering() const override { return ordering_; }
+ Status PushProcess() {
+#ifdef ARROW_ENABLE_THREADING
+ process_.Push(true);
+#else
+ ARROW_RETURN_NOT_OK(ProcessInner());
+ ARROW_RETURN_NOT_OK(output_->InputFinished(this, batches_produced_));
+#endif
+ return Status::OK();
+ }
+
Status InputReceived(ExecNode* input, ExecBatch batch) override {
+#ifdef ARROW_ENABLE_THREADING
// InputReceived may be called after execution was finished. Pushing it to the
// InputState is unnecessary since we're done (and anyway may cause the
// BackPressureController to pause the input, causing a deadlock), so drop it.
@@ -1384,6 +1396,7 @@ class AsofJoinNode : public ExecNode {
DEBUG_MANIP(std::endl));
return Status::OK();
}
+#endif
// Get the input
ARROW_DCHECK(std_has(inputs_, input));
@@ -1395,7 +1408,7 @@ class AsofJoinNode : public ExecNode {
rb->ToString(), DEBUG_MANIP(std::endl));
ARROW_RETURN_NOT_OK(state_.at(k)->Push(rb));
- process_.Push(true);
+ ARROW_RETURN_NOT_OK(PushProcess());
return Status::OK();
}
@@ -1410,15 +1423,12 @@ class AsofJoinNode : public ExecNode {
// The reason for this is that there are cases at the end of a table where we don't
// know whether the RHS of the join is up-to-date until we know that the table is
// finished.
- process_.Push(true);
+ ARROW_RETURN_NOT_OK(PushProcess());
return Status::OK();
}
Status StartProducing() override {
-#ifndef ARROW_ENABLE_THREADING
- return Status::NotImplemented("ASOF join requires threading enabled");
-#endif
-
+#ifdef ARROW_ENABLE_THREADING
ARROW_ASSIGN_OR_RAISE(process_task_, plan_->query_context()->BeginExternalTask(
"AsofJoinNode::ProcessThread"));
if (!process_task_.is_valid()) {
@@ -1426,6 +1436,7 @@ class AsofJoinNode : public ExecNode {
return Status::OK();
}
process_thread_ = std::thread(&AsofJoinNode::ProcessThreadWrapper, this);
+#endif
return Status::OK();
}
@@ -1433,8 +1444,10 @@ class AsofJoinNode : public ExecNode {
void ResumeProducing(ExecNode* output, int32_t counter) override {}
Status StopProducingImpl() override {
+#ifdef ARROW_ENABLE_THREADING
process_.Clear();
process_.Push(false);
+#endif
return Status::OK();
}
@@ -1464,12 +1477,14 @@ class AsofJoinNode : public ExecNode {
// Backpressure counter common to all inputs
std::atomic<int32_t> backpressure_counter_;
+#ifdef ARROW_ENABLE_THREADING
// Queue for triggering processing of a given input
// (a false value is a poison pill)
ConcurrentQueue<bool> process_;
// Worker thread
std::thread process_thread_;
Future<> process_task_;
+#endif
// In-progress batches produced
int batches_produced_ = 0;
@@ -1496,9 +1511,13 @@ AsofJoinNode::AsofJoinNode(ExecPlan* plan, NodeVector inputs,
debug_os_(join_options.debug_opts ? join_options.debug_opts->os : nullptr),
debug_mutex_(join_options.debug_opts ? join_options.debug_opts->mutex : nullptr),
#endif
- backpressure_counter_(1),
+ backpressure_counter_(1)
+#ifdef ARROW_ENABLE_THREADING
+ ,
process_(),
- process_thread_() {
+ process_thread_()
+#endif
+ {
for (auto& key_hasher : key_hashers_) {
key_hasher->node_ = this;
} |
@joemarshall, will you be able to pick these changes up any time soon? Do you need more funding to pull this back up the priority queue? |
@kou - I couldn't quite get rid of |
And did the same in sorted merge. There are some weird errors in python asof join on some platforms (e.g. mac ), which seem to happen in the main build also. I don't think I broke them. |
Author of sorted merge here. Just to say it, thank you for taking this on! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't use Hungarian names for parameters.
Hey @kou / @westonpace, I think this should be ready for another pass when you have time. It looks like the errors happening before were resolved as @jorisvandenbossche mentioned, but weren't a result of any changes @joemarshall made. |
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems straightforward enough. I haven't debugged either of these nodes enough to really know if this will avoid all deadlock scenarios but I think it's probably better to just tackle those as they arrive.
Just one question.
@westonpace done that fix above now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
After merging your PR, Conbench analyzed the 7 benchmarking runs that have been run so far on merge-commit a2453bd. There was 1 benchmark result indicating a performance regression:
The full Conbench report has more details. It also includes information about 37 possible false positives for unstable benchmarks that are known to sometimes produce them. |
When I initially added single threading support, I didn't do asof joins and sorted merge joins, because the code for these operations uses threads internally. This is a small check-in to add support for them. Tests run okay in single-threaded, I'm pushing it here to run full tests and check I didn't break the threaded case.
I'm pushing this now because making this work saves adding a load of threading checks in python (this currently breaks single-threaded python i.e. emscripten).