Skip to content

Commit a9ad104

Browse files
author
Rafał Hibner
committed
Remove Pop from queues
1 parent 16c2031 commit a9ad104

File tree

2 files changed

+4
-17
lines changed

2 files changed

+4
-17
lines changed

cpp/src/arrow/acero/concurrent_queue_internal.h

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,6 @@ namespace arrow::acero {
3131
template <class T>
3232
class ConcurrentQueue {
3333
public:
34-
// Pops the last item from the queue. Must be called on a non-empty queue
35-
T Pop() {
36-
std::unique_lock<std::mutex> lock(mutex_);
37-
return PopUnlocked();
38-
}
39-
4034
// Pops the last item from the queue but waits if the queue is empty until new items are
4135
// pushed.
4236
T WaitAndPop() {
@@ -140,13 +134,6 @@ class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
140134
explicit BackpressureConcurrentQueue(BackpressureHandler handler)
141135
: handler_(std::move(handler)) {}
142136

143-
// Pops the last item from the queue. Must be called on a non-empty queue
144-
T Pop() {
145-
std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
146-
DoHandle do_handle(*this);
147-
return ConcurrentQueue<T>::PopUnlocked();
148-
}
149-
150137
// Pops the last item from the queue but waits if the queue is empty until new items are
151138
// pushed.
152139
T WaitAndPop() {

cpp/src/arrow/acero/util_test.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ void ConcurrentQueueBasicTest(Queue& queue) {
193193
ASSERT_TRUE(queue.Empty());
194194
queue.Push(1);
195195
ASSERT_FALSE(queue.Empty());
196-
ASSERT_EQ(queue.Pop(), 1);
196+
ASSERT_EQ(queue.TryPop(), std::make_optional(1));
197197
ASSERT_TRUE(queue.Empty());
198198

199199
auto fut_pop = std::async(std::launch::async, [&]() { return queue.WaitAndPop(); });
@@ -225,7 +225,7 @@ TEST(ConcurrentQueue, BasicTest) {
225225
class BackpressureTestExecNode : public ExecNode {
226226
public:
227227
BackpressureTestExecNode() : ExecNode(nullptr, {}, {}, nullptr) {}
228-
const char* kind_name() const { return "BackpressureTestNode"; }
228+
const char* kind_name() const override { return "BackpressureTestNode"; }
229229
Status InputReceived(ExecNode* input, ExecBatch batch) override {
230230
return Status::NotImplemented("Test only node");
231231
}
@@ -283,10 +283,10 @@ TEST(BackpressureConcurrentQueue, BackpressureTest) {
283283
queue.Push(9);
284284
ASSERT_TRUE(dummy_node.paused);
285285
ASSERT_FALSE(dummy_node.stopped);
286-
ASSERT_EQ(queue.Pop(), 6);
286+
ASSERT_EQ(queue.TryPop(), std::make_optional(6));
287287
ASSERT_TRUE(dummy_node.paused);
288288
ASSERT_FALSE(dummy_node.stopped);
289-
ASSERT_EQ(queue.Pop(), 7);
289+
ASSERT_EQ(queue.TryPop(), std::make_optional(7));
290290
ASSERT_FALSE(dummy_node.paused);
291291
ASSERT_FALSE(dummy_node.stopped);
292292
queue.Push(10);

0 commit comments

Comments
 (0)