Skip to content

Commit

Permalink
Create sorted merge node
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremy Aguilon authored and JerAguilon committed Oct 25, 2023
1 parent 27697de commit 64211f3
Show file tree
Hide file tree
Showing 6 changed files with 507 additions and 0 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ class InputState {
return queue_.UnsyncFront();
}

// TODO(jeraguilon): consolidate
#define LATEST_VAL_CASE(id, val) \
case Type::id: { \
using T = typename TypeIdTraits<Type::id>::Type; \
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/acero/backpressure_handler.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
<<<<<<< HEAD
=======

>>>>>>> b34c999b6 (Create sorted merge node)
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand All @@ -16,6 +20,10 @@
// under the License.

#pragma once
<<<<<<< HEAD
=======
#include "arrow/acero/exec_plan.h"
>>>>>>> b34c999b6 (Create sorted merge node)
#include "arrow/acero/options.h"

#include <memory>
Expand All @@ -24,15 +32,26 @@ namespace arrow::acero {

class BackpressureHandler {
private:
<<<<<<< HEAD
BackpressureHandler(size_t low_threshold, size_t high_threshold,
std::unique_ptr<BackpressureControl> backpressure_control)
: low_threshold_(low_threshold),
=======
BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
std::unique_ptr<BackpressureControl> backpressure_control)
: input_(input),
low_threshold_(low_threshold),
>>>>>>> b34c999b6 (Create sorted merge node)
high_threshold_(high_threshold),
backpressure_control_(std::move(backpressure_control)) {}

public:
static Result<BackpressureHandler> Make(
<<<<<<< HEAD
size_t low_threshold, size_t high_threshold,
=======
ExecNode* input, size_t low_threshold, size_t high_threshold,
>>>>>>> b34c999b6 (Create sorted merge node)
std::unique_ptr<BackpressureControl> backpressure_control) {
if (low_threshold >= high_threshold) {
return Status::Invalid("low threshold (", low_threshold,
Expand All @@ -41,7 +60,11 @@ class BackpressureHandler {
if (backpressure_control == NULLPTR) {
return Status::Invalid("null backpressure control parameter");
}
<<<<<<< HEAD
BackpressureHandler backpressure_handler(low_threshold, high_threshold,
=======
BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
>>>>>>> b34c999b6 (Create sorted merge node)
std::move(backpressure_control));
return std::move(backpressure_handler);
}
Expand All @@ -54,7 +77,20 @@ class BackpressureHandler {
}
}

<<<<<<< HEAD
private:
=======
Status ForceShutdown() {
// It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
// Since acero's executor won't terminate if any one node is paused, we need to
// force resume the node before stopping production.
backpressure_control_->Resume();
return input_->StopProducing();
}

private:
ExecNode* input_;
>>>>>>> b34c999b6 (Create sorted merge node)
size_t low_threshold_;
size_t high_threshold_;
std::unique_ptr<BackpressureControl> backpressure_control_;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/acero/concurrent_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
return ConcurrentQueue<T>::TryPopUnlocked();
}

<<<<<<< HEAD
=======
Status ForceShutdown() { return handler_.ForceShutdown(); }

>>>>>>> b34c999b6 (Create sorted merge node)
private:
BackpressureHandler handler_;
};
Expand Down
Loading

0 comments on commit 64211f3

Please sign in to comment.