diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index d1d6b7387a0be8..8b9c917862b664 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -66,6 +66,13 @@ bool Exchanger::_dequeue_data(LocalExchangeSourceLocalState* local_st local_state->_shared_state->sub_mem_usage(channel_id, block->_allocated_bytes); data_block->swap(block->_data_block); } + + std::unique_lock l(*_m[channel_id]); + // data_queue locked so that the size_approx is consistent with the actual queue size + if (_data_queue[channel_id].data_queue.size_approx() == 0) { + local_state->_dependency->block(); + } + return true; } else if (all_finished) { *eos = true; diff --git a/be/test/pipeline/local_exchanger_test.cpp b/be/test/pipeline/local_exchanger_test.cpp index d3a9b0e2d5d350..bd684a4ee06716 100644 --- a/be/test/pipeline/local_exchanger_test.cpp +++ b/be/test/pipeline/local_exchanger_test.cpp @@ -416,7 +416,7 @@ TEST_F(LocalExchangerTest, PassthroughExchanger) { Status::OK()); EXPECT_EQ(block.rows(), j == num_blocks ? 0 : 10); EXPECT_EQ(eos, false); - EXPECT_EQ(_local_states[i]->_dependency->ready(), j != num_blocks); + EXPECT_EQ(_local_states[i]->_dependency->ready(), j < num_blocks - 1); } } EXPECT_EQ(shared_state->mem_usage, 0); @@ -453,7 +453,7 @@ TEST_F(LocalExchangerTest, PassthroughExchanger) { Status::OK()); EXPECT_EQ(block.rows(), j == 1 ? 0 : 10); EXPECT_FALSE(eos); - EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1); + EXPECT_EQ(_local_states[i]->_dependency->ready(), false); } } } @@ -617,7 +617,8 @@ TEST_F(LocalExchangerTest, PassToOneExchanger) { EXPECT_EQ(block.rows(), i == 0 && j < num_blocks * num_sink ? 10 : 0); EXPECT_EQ(eos, i != 0); if (i == 0) { - EXPECT_EQ(_local_states[i]->_dependency->ready(), j != num_blocks * num_sink); + EXPECT_EQ(_local_states[i]->_dependency->ready(), + j < num_blocks * num_sink - 1); } } } @@ -655,7 +656,7 @@ TEST_F(LocalExchangerTest, PassToOneExchanger) { Status::OK()); EXPECT_EQ(block.rows(), j == 1 ? 0 : 10); EXPECT_FALSE(eos); - EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1); + EXPECT_EQ(_local_states[i]->_dependency->ready(), false); } } } @@ -801,7 +802,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) { // Dequeue from data queue and accumulate rows if rows is smaller than batch_size. for (size_t i = 0; i < num_sources; i++) { - for (size_t j = 0; j <= num_blocks * num_sources; j++) { + for (size_t j = 0; j < num_blocks * num_sources; j++) { bool eos = false; vectorized::Block block; EXPECT_EQ( @@ -812,7 +813,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) { Status::OK()); EXPECT_EQ(block.rows(), j == num_blocks * num_sources ? 0 : 10); EXPECT_FALSE(eos); - EXPECT_EQ(_local_states[i]->_dependency->ready(), j != num_blocks * num_sources); + EXPECT_EQ(_local_states[i]->_dependency->ready(), j < num_blocks * num_sources - 1); } } EXPECT_EQ(shared_state->mem_usage, 0); @@ -849,7 +850,7 @@ TEST_F(LocalExchangerTest, BroadcastExchanger) { Status::OK()); EXPECT_EQ(block.rows(), j == num_sources ? 0 : 10); EXPECT_FALSE(eos); - EXPECT_EQ(_local_states[i]->_dependency->ready(), j != num_sources); + EXPECT_EQ(_local_states[i]->_dependency->ready(), j < num_sources - 1); } } } @@ -1017,7 +1018,7 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) { : (j == 2 * num_blocks - 1 ? 0 : num_rows_per_block)) << j; EXPECT_EQ(eos, false); - EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 2 * num_blocks - 1) << j; + EXPECT_EQ(_local_states[i]->_dependency->ready(), j < 2 * num_blocks - 2) << j; } } EXPECT_EQ(shared_state->mem_usage, 0); @@ -1054,7 +1055,7 @@ TEST_F(LocalExchangerTest, AdaptivePassthroughExchanger) { Status::OK()); EXPECT_EQ(block.rows(), j == 1 ? 0 : num_rows_per_block); EXPECT_FALSE(eos); - EXPECT_EQ(_local_states[i]->_dependency->ready(), j != 1); + EXPECT_EQ(_local_states[i]->_dependency->ready(), false); } } }