From 2765b4737153e8a5a91a60ab9cc717f7d45ec813 Mon Sep 17 00:00:00 2001 From: meiyi Date: Thu, 4 Jul 2024 10:44:08 +0800 Subject: [PATCH] fix group commit --- .../exec/group_commit_block_sink_operator.cpp | 11 +- be/src/runtime/group_commit_mgr.cpp | 158 +++++++++++------- be/src/runtime/group_commit_mgr.h | 14 +- 3 files changed, 117 insertions(+), 66 deletions(-) diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp index 17088b37c3eb49..424ede07be5cf1 100644 --- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp +++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp @@ -27,7 +27,12 @@ namespace doris::pipeline { GroupCommitBlockSinkLocalState::~GroupCommitBlockSinkLocalState() { if (_load_block_queue) { _remove_estimated_wal_bytes(); - _load_block_queue->remove_load_id(_parent->cast()._load_id); + [[maybe_unused]] auto st = _load_block_queue->remove_load_id( + _parent->cast()._load_id); + } else { + _state->exec_env()->group_commit_mgr()->remove_load_id( + _parent->cast()._table_id, + _parent->cast()._load_id); } } @@ -221,7 +226,7 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state, if (dp->param("table_id", -1) == _table_id) { if (_load_block_queue) { _remove_estimated_wal_bytes(); - _load_block_queue->remove_load_id(p._load_id); + [[maybe_unused]] auto st = _load_block_queue->remove_load_id(p._load_id); } if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for( std ::chrono ::seconds(60)) == std ::future_status ::ready) { @@ -304,7 +309,7 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc RETURN_IF_ERROR(local_state._add_blocks(state, true)); } local_state._remove_estimated_wal_bytes(); - local_state._load_block_queue->remove_load_id(_load_id); + [[maybe_unused]] auto st = local_state._load_block_queue->remove_load_id(_load_id); } return Status::OK(); }; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 464f9f5122106f..54f25a708a42c6 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -112,58 +112,48 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* *find_block = false; *eos = false; std::unique_lock l(mutex); - if (!_need_commit) { - if (std::chrono::duration_cast(std::chrono::steady_clock::now() - - _start_time) - .count() >= _group_commit_interval_ms) { - _need_commit = true; - } + if (runtime_state->is_cancelled() || !status.ok()) { + auto st = runtime_state->cancel_reason(); + _cancel_without_lock(st); + return status; } auto duration = std::chrono::duration_cast( std::chrono::steady_clock::now() - _start_time) .count(); - if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && !_need_commit) { - if (_group_commit_interval_ms - duration <= 0) { - _need_commit = true; - } else { - get_block_dep->block(); - return Status::OK(); + if (!_need_commit && duration >= _group_commit_interval_ms) { + _need_commit = true; + } + auto get_load_ids = [&]() { + std::stringstream ss; + ss << "["; + for (auto& id : _load_ids_to_write_dep) { + ss << id.first.to_string() << ", "; } - } else if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && - _need_commit && !_load_ids_to_write_dep.empty()) { - if (duration >= 10 * _group_commit_interval_ms) { - std::stringstream ss; - ss << "["; - for (auto& id : _load_ids_to_write_dep) { - ss << id.first.to_string() << ", "; + ss << "]"; + return ss.str(); + }; + if (_block_queue.empty()) { + if (_need_commit && duration >= 10 * _group_commit_interval_ms) { + auto last_print_duration = std::chrono::duration_cast( + std::chrono::steady_clock::now() - _last_print_time) + .count(); + if (last_print_duration >= 5000) { + _last_print_time = std::chrono::steady_clock::now(); + LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id + << ", label=" << label << ", instance_id=" << load_instance_id + << ", duration=" << duration << ", load_ids=" << get_load_ids(); } - ss << "]"; - LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id - << ", label=" << label << ", instance_id=" << load_instance_id - << ", duration=" << duration << ", load_ids=" << ss.str() - << ", runtime_state=" << runtime_state; } - get_block_dep->block(); - return Status::OK(); - } - if (runtime_state->is_cancelled()) { - auto st = runtime_state->cancel_reason(); - _cancel_without_lock(st); - return status; - } - if (!_block_queue.empty()) { + if (!_load_ids_to_write_dep.empty()) { + get_block_dep->block(); + } + } else { const BlockData block_data = _block_queue.front(); block->swap(*block_data.block); *find_block = true; _block_queue.pop_front(); int before_block_queues_bytes = _all_block_queues_bytes->load(); _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); - std::stringstream ss; - ss << "["; - for (const auto& id : _load_ids_to_write_dep) { - ss << id.first.to_string() << ", "; - } - ss << "]"; VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). " << "block queue size is " << _block_queue.size() << ", block rows is " << block->rows() << ", block bytes is " << block->bytes() @@ -172,9 +162,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* << ", after remove block, all block queues bytes is " << _all_block_queues_bytes->load() << ", txn_id=" << txn_id << ", label=" << label << ", instance_id=" << load_instance_id - << ", load_ids=" << ss.str() << ", runtime_state=" << runtime_state - << ", the block is " << block->dump_data() << ", the block column size is " - << block->columns_bytes(); + << ", load_ids=" << get_load_ids() << ", the block is " << block->dump_data() + << ", the block column size is " << block->columns_bytes(); } if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) { *eos = true; @@ -190,7 +179,7 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* return Status::OK(); } -void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { +Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) { std::unique_lock l(mutex); if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) { _load_ids_to_write_dep[load_id]->set_always_ready(); @@ -198,7 +187,15 @@ void LoadBlockQueue::remove_load_id(const UniqueId& load_id) { for (auto read_dep : _read_deps) { read_dep->set_ready(); } + return Status::OK(); } + return Status::NotFound("load_id=" + load_id.to_string() + + " not in block queue, label=" + label); +} + +bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) { + std::unique_lock l(mutex); + return _load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end(); } Status LoadBlockQueue::add_load_id(const UniqueId& load_id, @@ -250,6 +247,9 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { for (auto& id : _load_ids_to_write_dep) { id.second->set_always_ready(); } + for (auto read_dep : _read_deps) { + read_dep->set_ready(); + } } Status GroupCommitTable::get_first_block_load_queue( @@ -261,6 +261,14 @@ Status GroupCommitTable::get_first_block_load_queue( DCHECK(table_id == _table_id); std::unique_lock l(_lock); auto try_to_get_matched_queue = [&]() -> Status { + for (const auto& [_, inner_block_queue] : _load_block_queues) { + if (inner_block_queue->contain_load_id(load_id)) { + load_block_queue = inner_block_queue; + label = inner_block_queue->label; + txn_id = inner_block_queue->txn_id; + return Status::OK(); + } + } for (const auto& [_, inner_block_queue] : _load_block_queues) { if (!inner_block_queue->need_commit()) { if (base_schema_version == inner_block_queue->schema_version) { @@ -285,28 +293,38 @@ Status GroupCommitTable::get_first_block_load_queue( return Status::OK(); } create_plan_dep->block(); - _create_plan_deps.push_back(create_plan_dep); + _create_plan_deps.emplace(load_id, + std::make_tuple(create_plan_dep, put_block_dep, base_schema_version)); if (!_is_creating_plan_fragment) { _is_creating_plan_fragment = true; - RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, mem_tracker, - dep = create_plan_dep] { - Defer defer {[&, dep = dep]() { - std::unique_lock l(_lock); - for (auto it : _create_plan_deps) { - it->set_ready(); - } - std::vector> {}.swap(_create_plan_deps); - _is_creating_plan_fragment = false; - }}; - auto st = _create_group_commit_load(be_exe_version, mem_tracker); - if (!st.ok()) { - LOG(WARNING) << "create group commit load error, st=" << st.to_string(); - } - })); + RETURN_IF_ERROR( + _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] { + Defer defer {[&, dep = dep]() { + std::unique_lock l(_lock); + for (auto it : _create_plan_deps) { + std::get<0>(it.second)->set_ready(); + } + _create_plan_deps.clear(); + _is_creating_plan_fragment = false; + }}; + auto st = _create_group_commit_load(be_exe_version, mem_tracker); + if (!st.ok()) { + LOG(WARNING) << "create group commit load error, st=" << st.to_string(); + } + })); } return try_to_get_matched_queue(); } +void GroupCommitTable::remove_load_id(const UniqueId& load_id) { + std::unique_lock l(_lock); + for (const auto& [_, inner_block_queue] : _load_block_queues) { + if (inner_block_queue->remove_load_id(load_id).ok()) { + return; + } + } +} + Status GroupCommitTable::_create_group_commit_load(int be_exe_version, std::shared_ptr mem_tracker) { Status st = Status::OK(); @@ -378,6 +396,21 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version, pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs, be_exe_version)); _load_block_queues.emplace(instance_id, load_block_queue); + + std::vector success_load_ids; + for (const auto& [id, load_info] : _create_plan_deps) { + auto create_dep = std::get<0>(load_info); + auto put_dep = std::get<1>(load_info); + if (load_block_queue->schema_version == std::get<2>(load_info)) { + if (load_block_queue->add_load_id(id, put_dep).ok()) { + create_dep->set_ready(); + success_load_ids.emplace_back(id); + } + } + } + for (const auto& id2 : success_load_ids) { + _create_plan_deps.erase(id2); + } } } st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, result.pipeline_params); @@ -596,6 +629,13 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i return group_commit_table->get_load_block_queue(instance_id, load_block_queue, get_block_dep); } +void GroupCommitMgr::remove_load_id(int64_t table_id, const UniqueId& load_id) { + std::lock_guard wlock(_lock); + if (_table_map.find(table_id) != _table_map.end()) { + _table_map.find(table_id)->second->remove_load_id(load_id); + } +} + Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const std::string& import_label, WalManager* wal_manager, std::vector& slot_desc, int be_exe_version) { diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index e9ea152ea5c6d6..16c7e0c24d37aa 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -66,6 +66,7 @@ class LoadBlockQueue { wait_internal_group_commit_finish(wait_internal_group_commit_finish), _group_commit_interval_ms(group_commit_interval_ms), _start_time(std::chrono::steady_clock::now()), + _last_print_time(_start_time), _group_commit_data_bytes(group_commit_data_bytes), _all_block_queues_bytes(all_block_queues_bytes) {}; @@ -73,9 +74,10 @@ class LoadBlockQueue { bool write_wal, UniqueId& load_id); Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, bool* eos, std::shared_ptr get_block_dep); + bool contain_load_id(const UniqueId& load_id); Status add_load_id(const UniqueId& load_id, const std::shared_ptr put_block_dep); - void remove_load_id(const UniqueId& load_id); + Status remove_load_id(const UniqueId& load_id); void cancel(const Status& st); bool need_commit() { return _need_commit; } @@ -133,6 +135,7 @@ class LoadBlockQueue { // commit by time interval, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");' int64_t _group_commit_interval_ms; std::chrono::steady_clock::time_point _start_time; + std::chrono::steady_clock::time_point _last_print_time; // commit by data size int64_t _group_commit_data_bytes; int64_t _data_bytes = 0; @@ -140,8 +143,6 @@ class LoadBlockQueue { // memory back pressure, memory consumption of all tables' load block queues std::shared_ptr _all_block_queues_bytes; std::condition_variable _get_cond; - static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000; // 1s - static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s }; class GroupCommitTable { @@ -164,6 +165,7 @@ class GroupCommitTable { Status get_load_block_queue(const TUniqueId& instance_id, std::shared_ptr& load_block_queue, std::shared_ptr get_block_dep); + void remove_load_id(const UniqueId& load_id); private: Status _create_group_commit_load(int be_exe_version, @@ -186,7 +188,10 @@ class GroupCommitTable { // fragment_instance_id to load_block_queue std::unordered_map> _load_block_queues; bool _is_creating_plan_fragment = false; - std::vector> _create_plan_deps; + // user_load_id -> + std::unordered_map, + std::shared_ptr, int64_t>> + _create_plan_deps; }; class GroupCommitMgr { @@ -208,6 +213,7 @@ class GroupCommitMgr { std::shared_ptr create_plan_dep, std::shared_ptr put_block_dep, std::string& label, int64_t& txn_id); + void remove_load_id(int64_t table_id, const UniqueId& load_id); std::promise debug_promise; std::future debug_future = debug_promise.get_future();