Skip to content

Commit

Permalink
2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Oct 22, 2024
1 parent 2819538 commit 71fe098
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(),
&result, _timezone_obj));
local_state._queue->blocking_put(result);
if (local_state._queue->size() > 10) {
if (local_state._queue->size() > config::max_memory_sink_batch_count) {
local_state._queue_dependency->block();
}
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/record_batch_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
namespace doris {

bool RecordBatchQueue::blocking_get(std::shared_ptr<arrow::RecordBatch>* result) {
if (_dep && size() <= 10) {
if (_dep && size() <= config::max_memory_sink_batch_count) {
_dep->set_ready();
}
// Before each get queue, will set sink task dependency ready.
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/result_queue_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id,
if (iter != _fragment_queue_map.end()) {
*queue = iter->second;
} else {
// the blocking queue size = 20 (default), in this way, one queue have 20 * 1024 rows at most
BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count));
// max_elements will not take effect, because when queue size reaches max_memory_sink_batch_count,
// MemoryScratchSink will block queue dependency, in this way, one queue have 20 * 1024 rows at most.
// use MemoryScratchSink queue dependency instead of BlockingQueue to achieve blocking.
BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count * 2));
_fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp));
*queue = tmp;
}
Expand Down

0 comments on commit 71fe098

Please sign in to comment.