Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions be/src/pipeline/exec/group_commit_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* b
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
bool find_node = false;
while (!find_node && !*eos) {
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos,
local_state._get_block_dependency));
}
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos,
local_state._get_block_dependency,
local_state._timer_dependency));
return Status::OK();
}

Expand All @@ -46,8 +45,21 @@ Status GroupCommitLocalState::init(RuntimeState* state, LocalStateInfo& info) {
auto& p = _parent->cast<GroupCommitOperatorX>();
_get_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitGetBlockDependency", true);
return state->exec_env()->group_commit_mgr()->get_load_block_queue(
_timer_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitTimerDependency", true);
auto st = state->exec_env()->group_commit_mgr()->get_load_block_queue(
p._table_id, state->fragment_instance_id(), load_block_queue, _get_block_dependency);
if (st.ok()) {
DCHECK(load_block_queue != nullptr);
_timer_dependency->block();
_runtime_filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
MonotonicMillis(), load_block_queue->get_group_commit_interval_ms(),
_timer_dependency);
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> timers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥会用到runtime filter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

group commit需要按照提交时间被唤醒,现在只有runtime filter支持这种

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种地方都得用注释写清楚,否则,别人还以为我们的group commit 跟runtime filter 有啥关联

timers.push_back(_runtime_filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(std::move(timers));
}
return st;
}

Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) {
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/group_commit_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ class GroupCommitLocalState final : public ScanLocalState<GroupCommitLocalState>
GroupCommitLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalState(state, parent) {}
Status init(RuntimeState* state, LocalStateInfo& info) override;
std::shared_ptr<LoadBlockQueue> load_block_queue;
std::shared_ptr<LoadBlockQueue> load_block_queue = nullptr;
std::vector<Dependency*> dependencies() const override {
return {_scan_dependency.get(), _get_block_dependency.get()};
return {_scan_dependency.get(), _get_block_dependency.get(), _timer_dependency.get()};
}

private:
friend class GroupCommitOperatorX;
Status _process_conjuncts(RuntimeState* state) override;

std::shared_ptr<Dependency> _get_block_dependency = nullptr;
std::shared_ptr<Dependency> _timer_dependency = nullptr;
std::shared_ptr<pipeline::RuntimeFilterTimer> _runtime_filter_timer = nullptr;
};

class GroupCommitOperatorX final : public ScanOperatorX<GroupCommitLocalState> {
Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,

Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block, bool* eos,
std::shared_ptr<pipeline::Dependency> get_block_dep) {
std::shared_ptr<pipeline::Dependency> get_block_dep,
std::shared_ptr<pipeline::Dependency> timer_dependency) {
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
Expand Down Expand Up @@ -145,8 +146,9 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
<< ", duration=" << duration << ", load_ids=" << get_load_ids();
}
}
if (!_load_ids_to_write_dep.empty()) {
if (!_need_commit && !timer_dependency->ready()) {
get_block_dep->block();
VLOG_DEBUG << "block get_block for query_id=" << load_instance_id;
}
} else {
const BlockData block_data = _block_queue.front();
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class LoadBlockQueue {
Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block,
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block,
bool* eos, std::shared_ptr<pipeline::Dependency> get_block_dep);
bool* eos, std::shared_ptr<pipeline::Dependency> get_block_dep,
std::shared_ptr<pipeline::Dependency> timer_dependency);
bool contain_load_id(const UniqueId& load_id);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency> put_block_dep);
Expand All @@ -88,6 +89,7 @@ class LoadBlockQueue {
bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep);
void append_read_dependency(std::shared_ptr<pipeline::Dependency> read_dep);
int64_t get_group_commit_interval_ms() { return _group_commit_interval_ms; };

std::string debug_string() const {
fmt::memory_buffer debug_string_buffer;
Expand Down
Loading