diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp b/be/src/pipeline/exec/group_commit_scan_operator.cpp index 141a5e7bf770c5..841f9e80832e61 100644 --- a/be/src/pipeline/exec/group_commit_scan_operator.cpp +++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp @@ -33,10 +33,9 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* b auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); bool find_node = false; - while (!find_node && !*eos) { - RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos, - local_state._get_block_dependency)); - } + RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos, + local_state._get_block_dependency, + local_state._timer_dependency)); return Status::OK(); } @@ -46,8 +45,21 @@ Status GroupCommitLocalState::init(RuntimeState* state, LocalStateInfo& info) { auto& p = _parent->cast(); _get_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "GroupCommitGetBlockDependency", true); - return state->exec_env()->group_commit_mgr()->get_load_block_queue( + _timer_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "GroupCommitTimerDependency", true); + auto st = state->exec_env()->group_commit_mgr()->get_load_block_queue( p._table_id, state->fragment_instance_id(), load_block_queue, _get_block_dependency); + if (st.ok()) { + DCHECK(load_block_queue != nullptr); + _timer_dependency->block(); + _runtime_filter_timer = std::make_shared( + MonotonicMillis(), load_block_queue->get_group_commit_interval_ms(), + _timer_dependency); + std::vector> timers; + timers.push_back(_runtime_filter_timer); + ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(std::move(timers)); + } + return st; } Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) { diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h b/be/src/pipeline/exec/group_commit_scan_operator.h index d1428899ede6b9..e7424df6e2ba35 100644 --- a/be/src/pipeline/exec/group_commit_scan_operator.h +++ b/be/src/pipeline/exec/group_commit_scan_operator.h @@ -37,9 +37,9 @@ class GroupCommitLocalState final : public ScanLocalState GroupCommitLocalState(RuntimeState* state, OperatorXBase* parent) : ScanLocalState(state, parent) {} Status init(RuntimeState* state, LocalStateInfo& info) override; - std::shared_ptr load_block_queue; + std::shared_ptr load_block_queue = nullptr; std::vector dependencies() const override { - return {_scan_dependency.get(), _get_block_dependency.get()}; + return {_scan_dependency.get(), _get_block_dependency.get(), _timer_dependency.get()}; } private: @@ -47,6 +47,8 @@ class GroupCommitLocalState final : public ScanLocalState Status _process_conjuncts(RuntimeState* state) override; std::shared_ptr _get_block_dependency = nullptr; + std::shared_ptr _timer_dependency = nullptr; + std::shared_ptr _runtime_filter_timer = nullptr; }; class GroupCommitOperatorX final : public ScanOperatorX { diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 70d250deba19f2..b1fb11792167ba 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -109,7 +109,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, bool* eos, - std::shared_ptr get_block_dep) { + std::shared_ptr get_block_dep, + std::shared_ptr timer_dependency) { *find_block = false; *eos = false; std::unique_lock l(mutex); @@ -145,8 +146,9 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* << ", duration=" << duration << ", load_ids=" << get_load_ids(); } } - if (!_load_ids_to_write_dep.empty()) { + if (!_need_commit && !timer_dependency->ready()) { get_block_dep->block(); + VLOG_DEBUG << "block get_block for query_id=" << load_instance_id; } } else { const BlockData block_data = _block_queue.front(); diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index c6cb34a022a516..32579547893d83 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -73,7 +73,8 @@ class LoadBlockQueue { Status add_block(RuntimeState* runtime_state, std::shared_ptr block, bool write_wal, UniqueId& load_id); Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, - bool* eos, std::shared_ptr get_block_dep); + bool* eos, std::shared_ptr get_block_dep, + std::shared_ptr timer_dependency); bool contain_load_id(const UniqueId& load_id); Status add_load_id(const UniqueId& load_id, const std::shared_ptr put_block_dep); @@ -88,6 +89,7 @@ class LoadBlockQueue { bool has_enough_wal_disk_space(size_t estimated_wal_bytes); void append_dependency(std::shared_ptr finish_dep); void append_read_dependency(std::shared_ptr read_dep); + int64_t get_group_commit_interval_ms() { return _group_commit_interval_ms; }; std::string debug_string() const { fmt::memory_buffer debug_string_buffer;