Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](memory) When Load ends, check memory tracker value returns is equal to 0 #40016

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,10 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
DCHECK(thrift_sink.__isset.olap_table_sink);
#ifndef NDEBUG
DCHECK(state->get_query_ctx() != nullptr);
state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
#endif
_sink.reset(
new GroupCommitBlockSinkOperatorX(next_sink_operator_id(), row_desc, output_exprs));
break;
Expand Down Expand Up @@ -1177,6 +1181,10 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
break;
}
case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
#ifndef NDEBUG
DCHECK(_query_ctx != nullptr);
_query_ctx->query_mem_tracker->is_group_commit_load = true;
#endif
op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), descs, _num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
if (request.__isset.parallel_instances) {
Expand Down
71 changes: 42 additions & 29 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() {
"mem tracker not equal to 0 when mem tracker destruct, this usually means that "
"memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
"If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see "
"more information."
"1. For query and load, memory leaks may have occurred, it is expected that the query "
"mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. "
Expand All @@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_consumption->current_value() != 0) {
// TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end.
#ifndef NDEBUG
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::string err_msg =
fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.",
label(), _consumption->current_value(), _consumption->peak_value(),
Expand All @@ -140,29 +142,29 @@ MemTrackerLimiter::~MemTrackerLimiter() {
}
_consumption->set(0);
#ifndef NDEBUG
} else if (!_address_sanitizers.empty()) {
LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
} else if (!_address_sanitizers.empty() && !is_group_commit_load) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
#endif
}
memory_memtrackerlimiter_cnt << -1;
}

#ifndef NDEBUG
void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace(1, "DISABLED")
<< ", old stack_trace: " << it->second.stack_trace;
_error_address_sanitizers.emplace_back(
fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, "
"consumption: {}, peak consumption: {}, buf: {}, size: {}, old "
"buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(),
buf, size, it->first, it->second.size,
get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace));
}

// if alignment not equal to 0, maybe usable_size > size.
Expand All @@ -174,36 +176,47 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
}

void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
if (it->second.size != size) {
LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace(1, "DISABLED")
<< ", old stack_trace: " << it->second.stack_trace;
_error_address_sanitizers.emplace_back(fmt::format(
"[Address Sanitizer] free memory buf size inaccurate, mem tracker label: "
"{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: "
"{}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(), buf,
size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"),
it->second.stack_trace));
}
_address_sanitizers.erase(buf);
} else {
LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace(1, "DISABLED");
_error_address_sanitizers.emplace_back(fmt::format(
"[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: "
"{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(), buf, size,
get_stack_trace(1, "FULL_WITH_INLINE")));
}
}
}

std::string MemTrackerLimiter::print_address_sanitizers() {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
std::string detail = "[Address Sanitizer]:";
detail += "\n memory not be freed:";
for (const auto& it : _address_sanitizers) {
detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size,
it.second.stack_trace);
auto msg = fmt::format(
"\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: "
"{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
_label, _consumption->current_value(), _consumption->peak_value(), it.first,
it.second.size, it.second.stack_trace);
LOG(INFO) << msg;
detail += msg;
}
detail += "\n incorrect memory alloc and free:";
for (const auto& err_msg : _error_address_sanitizers) {
LOG(INFO) << err_msg;
detail += fmt::format("\n {}", err_msg);
}
return detail;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class MemTrackerLimiter final : public MemTracker {
void add_address_sanitizers(void* buf, size_t size);
void remove_address_sanitizers(void* buf, size_t size);
std::string print_address_sanitizers();
bool is_group_commit_load {false};
#endif

std::string debug_string() override {
Expand Down Expand Up @@ -260,6 +261,7 @@ class MemTrackerLimiter final : public MemTracker {

std::mutex _address_sanitizers_mtx;
std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
std::vector<std::string> _error_address_sanitizers;
#endif
};

Expand Down
Loading