Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](group commit) fix group commit can not get block queue and may stuck #37260

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ namespace doris::pipeline {
GroupCommitBlockSinkLocalState::~GroupCommitBlockSinkLocalState() {
if (_load_block_queue) {
_remove_estimated_wal_bytes();
_load_block_queue->remove_load_id(_parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
[[maybe_unused]] auto st = _load_block_queue->remove_load_id(
_parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
} else {
_state->exec_env()->group_commit_mgr()->remove_load_id(
_parent->cast<GroupCommitBlockSinkOperatorX>()._table_id,
_parent->cast<GroupCommitBlockSinkOperatorX>()._load_id);
}
}

Expand Down Expand Up @@ -221,7 +226,7 @@ Status GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
if (dp->param<int64_t>("table_id", -1) == _table_id) {
if (_load_block_queue) {
_remove_estimated_wal_bytes();
_load_block_queue->remove_load_id(p._load_id);
[[maybe_unused]] auto st = _load_block_queue->remove_load_id(p._load_id);
}
if (ExecEnv::GetInstance()->group_commit_mgr()->debug_future.wait_for(
std ::chrono ::seconds(60)) == std ::future_status ::ready) {
Expand Down Expand Up @@ -304,7 +309,7 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, vectorized::Bloc
RETURN_IF_ERROR(local_state._add_blocks(state, true));
}
local_state._remove_estimated_wal_bytes();
local_state._load_block_queue->remove_load_id(_load_id);
[[maybe_unused]] auto st = local_state._load_block_queue->remove_load_id(_load_id);
}
return Status::OK();
};
Expand Down
158 changes: 99 additions & 59 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,58 +112,48 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
if (!_need_commit) {
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() -
_start_time)
.count() >= _group_commit_interval_ms) {
_need_commit = true;
}
if (runtime_state->is_cancelled() || !status.ok()) {
auto st = runtime_state->cancel_reason();
_cancel_without_lock(st);
return status;
}
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() && !_need_commit) {
if (_group_commit_interval_ms - duration <= 0) {
_need_commit = true;
} else {
get_block_dep->block();
return Status::OK();
if (!_need_commit && duration >= _group_commit_interval_ms) {
_need_commit = true;
}
auto get_load_ids = [&]() {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
} else if (!runtime_state->is_cancelled() && status.ok() && _block_queue.empty() &&
_need_commit && !_load_ids_to_write_dep.empty()) {
if (duration >= 10 * _group_commit_interval_ms) {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
ss << "]";
return ss.str();
};
if (_block_queue.empty()) {
if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
auto last_print_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _last_print_time)
.count();
if (last_print_duration >= 5000) {
_last_print_time = std::chrono::steady_clock::now();
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", duration=" << duration << ", load_ids=" << get_load_ids();
}
ss << "]";
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", duration=" << duration << ", load_ids=" << ss.str()
<< ", runtime_state=" << runtime_state;
}
get_block_dep->block();
return Status::OK();
}
if (runtime_state->is_cancelled()) {
auto st = runtime_state->cancel_reason();
_cancel_without_lock(st);
return status;
}
if (!_block_queue.empty()) {
if (!_load_ids_to_write_dep.empty()) {
get_block_dep->block();
}
} else {
const BlockData block_data = _block_queue.front();
block->swap(*block_data.block);
*find_block = true;
_block_queue.pop_front();
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
<< "block queue size is " << _block_queue.size() << ", block rows is "
<< block->rows() << ", block bytes is " << block->bytes()
Expand All @@ -172,9 +162,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
<< ", after remove block, all block queues bytes is "
<< _all_block_queues_bytes->load() << ", txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", load_ids=" << ss.str() << ", runtime_state=" << runtime_state
<< ", the block is " << block->dump_data() << ", the block column size is "
<< block->columns_bytes();
<< ", load_ids=" << get_load_ids() << ", the block is " << block->dump_data()
<< ", the block column size is " << block->columns_bytes();
}
if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) {
*eos = true;
Expand All @@ -190,15 +179,23 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
return Status::OK();
}

void LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(mutex);
if (_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end()) {
_load_ids_to_write_dep[load_id]->set_always_ready();
_load_ids_to_write_dep.erase(load_id);
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
return Status::OK();
}
return Status::NotFound<false>("load_id=" + load_id.to_string() +
" not in block queue, label=" + label);
}

bool LoadBlockQueue::contain_load_id(const UniqueId& load_id) {
std::unique_lock l(mutex);
return _load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end();
}

Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
Expand Down Expand Up @@ -250,6 +247,9 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) {
for (auto& id : _load_ids_to_write_dep) {
id.second->set_always_ready();
}
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
}

Status GroupCommitTable::get_first_block_load_queue(
Expand All @@ -261,6 +261,14 @@ Status GroupCommitTable::get_first_block_load_queue(
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (inner_block_queue->contain_load_id(load_id)) {
load_block_queue = inner_block_queue;
label = inner_block_queue->label;
txn_id = inner_block_queue->txn_id;
return Status::OK();
}
}
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (!inner_block_queue->need_commit()) {
if (base_schema_version == inner_block_queue->schema_version) {
Expand All @@ -285,28 +293,38 @@ Status GroupCommitTable::get_first_block_load_queue(
return Status::OK();
}
create_plan_dep->block();
_create_plan_deps.push_back(create_plan_dep);
_create_plan_deps.emplace(load_id,
std::make_tuple(create_plan_dep, put_block_dep, base_schema_version));
if (!_is_creating_plan_fragment) {
_is_creating_plan_fragment = true;
RETURN_IF_ERROR(_thread_pool->submit_func([&, be_exe_version, mem_tracker,
dep = create_plan_dep] {
Defer defer {[&, dep = dep]() {
std::unique_lock l(_lock);
for (auto it : _create_plan_deps) {
it->set_ready();
}
std::vector<std::shared_ptr<pipeline::Dependency>> {}.swap(_create_plan_deps);
_is_creating_plan_fragment = false;
}};
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
}
}));
RETURN_IF_ERROR(
_thread_pool->submit_func([&, be_exe_version, mem_tracker, dep = create_plan_dep] {
Defer defer {[&, dep = dep]() {
std::unique_lock l(_lock);
for (auto it : _create_plan_deps) {
std::get<0>(it.second)->set_ready();
}
_create_plan_deps.clear();
_is_creating_plan_fragment = false;
}};
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
}
}));
}
return try_to_get_matched_queue();
}

void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
std::unique_lock l(_lock);
for (const auto& [_, inner_block_queue] : _load_block_queues) {
if (inner_block_queue->remove_load_id(load_id).ok()) {
return;
}
}
}

Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
Status st = Status::OK();
Expand Down Expand Up @@ -378,6 +396,21 @@ Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
be_exe_version));
_load_block_queues.emplace(instance_id, load_block_queue);

std::vector<UniqueId> success_load_ids;
for (const auto& [id, load_info] : _create_plan_deps) {
auto create_dep = std::get<0>(load_info);
auto put_dep = std::get<1>(load_info);
if (load_block_queue->schema_version == std::get<2>(load_info)) {
if (load_block_queue->add_load_id(id, put_dep).ok()) {
create_dep->set_ready();
success_load_ids.emplace_back(id);
}
}
}
for (const auto& id2 : success_load_ids) {
_create_plan_deps.erase(id2);
}
}
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, result.pipeline_params);
Expand Down Expand Up @@ -596,6 +629,13 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i
return group_commit_table->get_load_block_queue(instance_id, load_block_queue, get_block_dep);
}

void GroupCommitMgr::remove_load_id(int64_t table_id, const UniqueId& load_id) {
std::lock_guard wlock(_lock);
if (_table_map.find(table_id) != _table_map.end()) {
_table_map.find(table_id)->second->remove_load_id(load_id);
}
}

Status LoadBlockQueue::create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id,
const std::string& import_label, WalManager* wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) {
Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,18 @@ class LoadBlockQueue {
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
_group_commit_interval_ms(group_commit_interval_ms),
_start_time(std::chrono::steady_clock::now()),
_last_print_time(_start_time),
_group_commit_data_bytes(group_commit_data_bytes),
_all_block_queues_bytes(all_block_queues_bytes) {};

Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block,
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block,
bool* eos, std::shared_ptr<pipeline::Dependency> get_block_dep);
bool contain_load_id(const UniqueId& load_id);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency> put_block_dep);
void remove_load_id(const UniqueId& load_id);
Status remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
bool need_commit() { return _need_commit; }

Expand Down Expand Up @@ -133,15 +135,14 @@ class LoadBlockQueue {
// commit by time interval, can be changed by 'ALTER TABLE my_table SET ("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::chrono::steady_clock::time_point _start_time;
std::chrono::steady_clock::time_point _last_print_time;
// commit by data size
int64_t _group_commit_data_bytes;
int64_t _data_bytes = 0;

// memory back pressure, memory consumption of all tables' load block queues
std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
std::condition_variable _get_cond;
static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000; // 1s
static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s
};

class GroupCommitTable {
Expand All @@ -164,6 +165,7 @@ class GroupCommitTable {
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
std::shared_ptr<pipeline::Dependency> get_block_dep);
void remove_load_id(const UniqueId& load_id);

private:
Status _create_group_commit_load(int be_exe_version,
Expand All @@ -186,7 +188,10 @@ class GroupCommitTable {
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>> _load_block_queues;
bool _is_creating_plan_fragment = false;
std::vector<std::shared_ptr<pipeline::Dependency>> _create_plan_deps;
// user_load_id -> <create_plan_dep, put_block_dep, base_schema_version>
std::unordered_map<UniqueId, std::tuple<std::shared_ptr<pipeline::Dependency>,
std::shared_ptr<pipeline::Dependency>, int64_t>>
_create_plan_deps;
};

class GroupCommitMgr {
Expand All @@ -208,6 +213,7 @@ class GroupCommitMgr {
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep,
std::string& label, int64_t& txn_id);
void remove_load_id(int64_t table_id, const UniqueId& load_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();

Expand Down
Loading