From 0502b76f7210eb61c08867ad999249a136cdbcb1 Mon Sep 17 00:00:00 2001 From: meiyi Date: Thu, 29 May 2025 17:45:49 +0800 Subject: [PATCH] [fix](group commit) fix group_commit get_block too slow --- be/src/pipeline/dependency.cpp | 2 +- be/src/pipeline/dependency.h | 9 +- .../exec/group_commit_scan_operator.cpp | 8 +- .../exec/group_commit_scan_operator.h | 3 +- be/src/runtime/group_commit_mgr.cpp | 103 ++++++++---------- be/src/runtime/group_commit_mgr.h | 5 +- .../commands/insert/OlapInsertExecutor.java | 5 + .../test_group_commit_error.groovy | 12 ++ 8 files changed, 76 insertions(+), 71 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index 8c3d09c56e3ac6..a6578804e3948a 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -160,7 +160,7 @@ void RuntimeFilterTimerQueue::start() { if (it.use_count() == 1) { // `use_count == 1` means this runtime filter has been released } else if (it->should_be_check_timeout()) { - if (it->_parent->is_blocked_by()) { + if (it->force_wait_timeout() || it->_parent->is_blocked_by()) { // This means runtime filter is not ready, so we call timeout or continue to poll this timer. int64_t ms_since_registration = MonotonicMillis() - it->registration_time(); if (ms_since_registration > it->wait_time_ms()) { diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 1935e03e24eb2d..09bc22c9f5ccbb 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -203,10 +203,11 @@ struct RuntimeFilterTimerQueue; class RuntimeFilterTimer { public: RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms, - std::shared_ptr parent) + std::shared_ptr parent, bool force_wait_timeout = false) : _parent(std::move(parent)), _registration_time(registration_time), - _wait_time_ms(wait_time_ms) {} + _wait_time_ms(wait_time_ms), + _force_wait_timeout(force_wait_timeout) {} // Called by runtime filter producer. void call_ready(); @@ -224,6 +225,8 @@ class RuntimeFilterTimer { bool should_be_check_timeout(); + bool force_wait_timeout() { return _force_wait_timeout; } + private: friend struct RuntimeFilterTimerQueue; std::shared_ptr _parent = nullptr; @@ -231,6 +234,8 @@ class RuntimeFilterTimer { std::mutex _lock; int64_t _registration_time; const int32_t _wait_time_ms; + // true only for group_commit_scan_operator + bool _force_wait_timeout; }; struct RuntimeFilterTimerQueue { diff --git a/be/src/pipeline/exec/group_commit_scan_operator.cpp b/be/src/pipeline/exec/group_commit_scan_operator.cpp index 841f9e80832e61..f258557be18bca 100644 --- a/be/src/pipeline/exec/group_commit_scan_operator.cpp +++ b/be/src/pipeline/exec/group_commit_scan_operator.cpp @@ -34,8 +34,7 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* b SCOPED_TIMER(local_state.exec_time_counter()); bool find_node = false; RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos, - local_state._get_block_dependency, - local_state._timer_dependency)); + local_state._get_block_dependency)); return Status::OK(); } @@ -45,16 +44,13 @@ Status GroupCommitLocalState::init(RuntimeState* state, LocalStateInfo& info) { auto& p = _parent->cast(); _get_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), "GroupCommitGetBlockDependency", true); - _timer_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "GroupCommitTimerDependency", true); auto st = state->exec_env()->group_commit_mgr()->get_load_block_queue( p._table_id, state->fragment_instance_id(), load_block_queue, _get_block_dependency); if (st.ok()) { DCHECK(load_block_queue != nullptr); - _timer_dependency->block(); _runtime_filter_timer = std::make_shared( MonotonicMillis(), load_block_queue->get_group_commit_interval_ms(), - _timer_dependency); + _get_block_dependency, true); std::vector> timers; timers.push_back(_runtime_filter_timer); ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(std::move(timers)); diff --git a/be/src/pipeline/exec/group_commit_scan_operator.h b/be/src/pipeline/exec/group_commit_scan_operator.h index e7424df6e2ba35..592e2e2815042f 100644 --- a/be/src/pipeline/exec/group_commit_scan_operator.h +++ b/be/src/pipeline/exec/group_commit_scan_operator.h @@ -39,7 +39,7 @@ class GroupCommitLocalState final : public ScanLocalState Status init(RuntimeState* state, LocalStateInfo& info) override; std::shared_ptr load_block_queue = nullptr; std::vector dependencies() const override { - return {_scan_dependency.get(), _get_block_dependency.get(), _timer_dependency.get()}; + return {_scan_dependency.get(), _get_block_dependency.get()}; } private: @@ -47,7 +47,6 @@ class GroupCommitLocalState final : public ScanLocalState Status _process_conjuncts(RuntimeState* state) override; std::shared_ptr _get_block_dependency = nullptr; - std::shared_ptr _timer_dependency = nullptr; std::shared_ptr _runtime_filter_timer = nullptr; }; diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 40f940c020732e..9e368d81438ee7 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -35,6 +35,16 @@ namespace doris { +std::string LoadBlockQueue::_get_load_ids() { + std::stringstream ss; + ss << "["; + for (auto& id : _load_ids_to_write_dep) { + ss << id.first.to_string() << ", "; + } + ss << "]"; + return ss.str(); +} + Status LoadBlockQueue::add_block(RuntimeState* runtime_state, std::shared_ptr block, bool write_wal, UniqueId& load_id) { @@ -55,23 +65,12 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, _data_bytes += block->bytes(); int before_block_queues_bytes = _all_block_queues_bytes->load(); _all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed); - std::stringstream ss; - ss << "["; - for (const auto& id : _load_ids_to_write_dep) { - ss << id.first.to_string() << ", "; - } - ss << "]"; VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). " - << "block queue size is " << _block_queue.size() << ", block rows is " - << block->rows() << ", block bytes is " << block->bytes() - << ", before add block, all block queues bytes is " - << before_block_queues_bytes - << ", after add block, all block queues bytes is " - << _all_block_queues_bytes->load() << ", txn_id=" << txn_id - << ", label=" << label << ", instance_id=" << load_instance_id - << ", load_ids=" << ss.str() << ", runtime_state=" << runtime_state - << ", the block is " << block->dump_data() << ", the block column size is " - << block->columns_bytes(); + << "Cur block rows=" << block->rows() << ", bytes=" << block->bytes() + << ". all block queues bytes from " << before_block_queues_bytes << " to " + << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() + << ". txn_id=" << txn_id << ", label=" << label + << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); } if (write_wal || config::group_commit_wait_replay_wal_finish) { auto st = _v_wal_writer->write_wal(block.get()); @@ -85,6 +84,9 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, config::group_commit_queue_mem_limit) { DCHECK(_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()); _load_ids_to_write_dep[load_id]->block(); + VLOG_DEBUG << "block add_block for load_id=" << load_id + << ", memory=" << _all_block_queues_bytes->load(std::memory_order_relaxed) + << ". inner load_id=" << load_instance_id << ", label=" << label; } } if (!_need_commit) { @@ -104,14 +106,14 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, } for (auto read_dep : _read_deps) { read_dep->set_ready(); + VLOG_DEBUG << "set ready for inner load_id=" << load_instance_id; } return Status::OK(); } Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, bool* eos, - std::shared_ptr get_block_dep, - std::shared_ptr timer_dependency) { + std::shared_ptr get_block_dep) { *find_block = false; *eos = false; std::unique_lock l(mutex); @@ -126,15 +128,6 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* if (!_need_commit && duration >= _group_commit_interval_ms) { _need_commit = true; } - auto get_load_ids = [&]() { - std::stringstream ss; - ss << "["; - for (auto& id : _load_ids_to_write_dep) { - ss << id.first.to_string() << ", "; - } - ss << "]"; - return ss.str(); - }; if (_block_queue.empty()) { if (_need_commit && duration >= 10 * _group_commit_interval_ms) { auto last_print_duration = std::chrono::duration_cast( @@ -144,12 +137,13 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* _last_print_time = std::chrono::steady_clock::now(); LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id << ", label=" << label << ", instance_id=" << load_instance_id - << ", duration=" << duration << ", load_ids=" << get_load_ids(); + << ", duration=" << duration << ", load_ids=" << _get_load_ids(); } } + VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", but queue is empty"; if (!_need_commit) { get_block_dep->block(); - VLOG_DEBUG << "block get_block for query_id=" << load_instance_id; + VLOG_DEBUG << "block get_block for inner load_id=" << load_instance_id; } } else { const BlockData block_data = _block_queue.front(); @@ -159,15 +153,11 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* int before_block_queues_bytes = _all_block_queues_bytes->load(); _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). " - << "block queue size is " << _block_queue.size() << ", block rows is " - << block->rows() << ", block bytes is " << block->bytes() - << ", before remove block, all block queues bytes is " - << before_block_queues_bytes - << ", after remove block, all block queues bytes is " - << _all_block_queues_bytes->load() << ", txn_id=" << txn_id - << ", label=" << label << ", instance_id=" << load_instance_id - << ", load_ids=" << get_load_ids() << ", the block is " << block->dump_data() - << ", the block column size is " << block->columns_bytes(); + << "Cur block rows=" << block->rows() << ", bytes=" << block->bytes() + << ". all block queues bytes from " << before_block_queues_bytes << " to " + << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() + << ". txn_id=" << txn_id << ", label=" << label + << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); } if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) { *eos = true; @@ -179,6 +169,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* for (auto& id : _load_ids_to_write_dep) { id.second->set_ready(); } + VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids() + << ". inner load_id=" << load_instance_id << ", label=" << label; } return Status::OK(); } @@ -191,6 +183,7 @@ Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) { for (auto read_dep : _read_deps) { read_dep->set_ready(); } + VLOG_DEBUG << "set ready for load_id=" << load_id << ", inner load_id=" << load_instance_id; return Status::OK(); } return Status::NotFound("load_id=" + load_id.to_string() + @@ -225,35 +218,25 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) { << ", status=" << st.to_string(); status = Status::Cancelled("cancel group_commit, label=" + label + ", status=" + st.to_string()); + int before_block_queues_bytes = _all_block_queues_bytes->load(); while (!_block_queue.empty()) { const BlockData& block_data = _block_queue.front().block; - int before_block_queues_bytes = _all_block_queues_bytes->load(); _all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed); - std::stringstream ss; - ss << "["; - for (const auto& id : _load_ids_to_write_dep) { - ss << id.first.to_string() << ", "; - } - ss << "]"; - VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). " - << "block queue size is " << _block_queue.size() << ", block rows is " - << block_data.block->rows() << ", block bytes is " << block_data.block->bytes() - << ", before remove block, all block queues bytes is " - << before_block_queues_bytes - << ", after remove block, all block queues bytes is " - << _all_block_queues_bytes->load() << ", txn_id=" << txn_id - << ", label=" << label << ", instance_id=" << load_instance_id - << ", load_ids=" << ss.str() << ", the block is " - << block_data.block->dump_data() << ", the block column size is " - << block_data.block->columns_bytes(); _block_queue.pop_front(); } + VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). " + << "all block queues bytes from " << before_block_queues_bytes << " to " + << _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size() + << ", txn_id=" << txn_id << ", label=" << label + << ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids(); for (auto& id : _load_ids_to_write_dep) { id.second->set_always_ready(); } for (auto read_dep : _read_deps) { read_dep->set_ready(); } + VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids() + << ", inner load_id=" << load_instance_id; } Status GroupCommitTable::get_first_block_load_queue( @@ -287,7 +270,7 @@ Status GroupCommitTable::get_first_block_load_queue( } } return Status::InternalError("can not get a block queue for table_id: " + - std::to_string(_table_id)); + std::to_string(_table_id) + _create_plan_failed_reason); }; if (try_to_get_matched_queue().ok()) { @@ -310,7 +293,11 @@ Status GroupCommitTable::get_first_block_load_queue( }}; auto st = _create_group_commit_load(be_exe_version, mem_tracker); if (!st.ok()) { - LOG(WARNING) << "create group commit load error, st=" << st.to_string(); + LOG(WARNING) << "create group commit load error: " << st.to_string(); + _create_plan_failed_reason = ". create group commit load error: " + + st.to_string().substr(0, 300); + } else { + _create_plan_failed_reason = ""; } })); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index 2be17400026f94..dbaade783f4eac 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -74,8 +74,7 @@ class LoadBlockQueue { Status add_block(RuntimeState* runtime_state, std::shared_ptr block, bool write_wal, UniqueId& load_id); Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block, - bool* eos, std::shared_ptr get_block_dep, - std::shared_ptr timer_dependency); + bool* eos, std::shared_ptr get_block_dep); bool contain_load_id(const UniqueId& load_id); Status add_load_id(const UniqueId& load_id, const std::shared_ptr put_block_dep); @@ -124,6 +123,7 @@ class LoadBlockQueue { private: void _cancel_without_lock(const Status& st); + std::string _get_load_ids(); // the set of load ids of all blocks in this queue std::map> _load_ids_to_write_dep; @@ -196,6 +196,7 @@ class GroupCommitTable { std::tuple, std::shared_ptr, int64_t, int64_t>> _create_plan_deps; + std::string _create_plan_failed_reason; }; class GroupCommitMgr { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index fed0576b34d595..47ef9cc3c2ebca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -29,6 +29,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.load.EtlJobType; import org.apache.doris.nereids.NereidsPlanner; @@ -50,6 +51,7 @@ import org.apache.doris.system.Backend; import org.apache.doris.thrift.TOlapTableLocationParam; import org.apache.doris.thrift.TPartitionType; +import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; @@ -92,6 +94,9 @@ public void beginTransaction() { return; } try { + if (DebugPointUtil.isEnable("OlapInsertExecutor.beginTransaction.failed")) { + throw new BeginTransactionException("current running txns on db is larger than limit"); + } this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction( database.getId(), ImmutableList.of(table.getId()), labelName, new TxnCoordinator(TxnSourceType.FE, 0, diff --git a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy index 6e9a89aa0f7eaa..cef9bbdbf27df4 100644 --- a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy +++ b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy @@ -31,6 +31,18 @@ suite("test_group_commit_error", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllBEs() GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllFEs("OlapInsertExecutor.beginTransaction.failed") + sql """ set group_commit = async_mode """ + sql """ insert into ${tableName} values (1, 1) """ + assertTrue(false) + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(e.getMessage().contains("begin transaction failed")) + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + } + try { GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr.exec_plan_fragment.failed") sql """ set group_commit = async_mode """