Skip to content

Commit

Permalink
[fix](memory) When Load ends, check memory tracker value returns is e…
Browse files Browse the repository at this point in the history
…qual to 0 (apache#40016)

Check all memory is freed when Load is finished.
  • Loading branch information
xinyiZzz committed Sep 14, 2024
1 parent fbb7c85 commit 2dabc8b
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 29 deletions.
4 changes: 4 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,10 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
break;
}
case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: {
#ifndef NDEBUG
DCHECK(state->get_query_ctx() != nullptr);
state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
#endif
sink_ = std::make_shared<GroupCommitBlockSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
break;
Expand Down
71 changes: 42 additions & 29 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ MemTrackerLimiter::~MemTrackerLimiter() {
"mem tracker not equal to 0 when mem tracker destruct, this usually means that "
"memory tracking is inaccurate and SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER are not used correctly. "
"If the log is truncated, search for `Address Sanitizer` in the be.INFO log to see "
"more information."
"1. For query and load, memory leaks may have occurred, it is expected that the query "
"mem tracker will be bound to the thread context using SCOPED_ATTACH_TASK and "
"SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER before all memory alloc and free. "
Expand All @@ -127,7 +129,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_consumption->current_value() != 0) {
// TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end.
#ifndef NDEBUG
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::string err_msg =
fmt::format("mem tracker label: {}, consumption: {}, peak consumption: {}, {}.",
label(), _consumption->current_value(), _consumption->peak_value(),
Expand All @@ -140,29 +142,29 @@ MemTrackerLimiter::~MemTrackerLimiter() {
}
_consumption->set(0);
#ifndef NDEBUG
} else if (!_address_sanitizers.empty()) {
LOG(INFO) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
} else if (!_address_sanitizers.empty() && !is_group_commit_load) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << _consumption->peak_value()
<< print_address_sanitizers();
#endif
}
memory_memtrackerlimiter_cnt << -1;
}

#ifndef NDEBUG
void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
LOG(INFO) << "[Address Sanitizer] memory buf repeat add, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace(1, "DISABLED")
<< ", old stack_trace: " << it->second.stack_trace;
_error_address_sanitizers.emplace_back(
fmt::format("[Address Sanitizer] memory buf repeat add, mem tracker label: {}, "
"consumption: {}, peak consumption: {}, buf: {}, size: {}, old "
"buf: {}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(),
buf, size, it->first, it->second.size,
get_stack_trace(1, "FULL_WITH_INLINE"), it->second.stack_trace));
}

// if alignment not equal to 0, maybe usable_size > size.
Expand All @@ -174,36 +176,47 @@ void MemTrackerLimiter::add_address_sanitizers(void* buf, size_t size) {
}

void MemTrackerLimiter::remove_address_sanitizers(void* buf, size_t size) {
if (_type == Type::QUERY) {
if (_type == Type::QUERY || (_type == Type::LOAD && !is_group_commit_load)) {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
auto it = _address_sanitizers.find(buf);
if (it != _address_sanitizers.end()) {
if (it->second.size != size) {
LOG(INFO) << "[Address Sanitizer] free memory buf size inaccurate, mem tracker "
"label: "
<< _label << ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value()
<< ", buf: " << buf << ", size: " << size << ", old buf: " << it->first
<< ", old size: " << it->second.size
<< ", new stack_trace: " << get_stack_trace(1, "DISABLED")
<< ", old stack_trace: " << it->second.stack_trace;
_error_address_sanitizers.emplace_back(fmt::format(
"[Address Sanitizer] free memory buf size inaccurate, mem tracker label: "
"{}, consumption: {}, peak consumption: {}, buf: {}, size: {}, old buf: "
"{}, old size: {}, new stack_trace: {}, old stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(), buf,
size, it->first, it->second.size, get_stack_trace(1, "FULL_WITH_INLINE"),
it->second.stack_trace));
}
_address_sanitizers.erase(buf);
} else {
LOG(INFO) << "[Address Sanitizer] memory buf not exist, mem tracker label: " << _label
<< ", consumption: " << _consumption->current_value()
<< ", peak consumption: " << _consumption->peak_value() << ", buf: " << buf
<< ", size: " << size << ", stack_trace: " << get_stack_trace(1, "DISABLED");
_error_address_sanitizers.emplace_back(fmt::format(
"[Address Sanitizer] memory buf not exist, mem tracker label: {}, consumption: "
"{}, peak consumption: {}, buf: {}, size: {}, stack_trace: {}.",
_label, _consumption->current_value(), _consumption->peak_value(), buf, size,
get_stack_trace(1, "FULL_WITH_INLINE")));
}
}
}

std::string MemTrackerLimiter::print_address_sanitizers() {
std::lock_guard<std::mutex> l(_address_sanitizers_mtx);
std::string detail = "[Address Sanitizer]:";
detail += "\n memory not be freed:";
for (const auto& it : _address_sanitizers) {
detail += fmt::format("\n {}, size {}, strack trace: {}", it.first, it.second.size,
it.second.stack_trace);
auto msg = fmt::format(
"\n [Address Sanitizer] buf not be freed, mem tracker label: {}, consumption: "
"{}, peak consumption: {}, buf: {}, size {}, strack trace: {}",
_label, _consumption->current_value(), _consumption->peak_value(), it.first,
it.second.size, it.second.stack_trace);
LOG(INFO) << msg;
detail += msg;
}
detail += "\n incorrect memory alloc and free:";
for (const auto& err_msg : _error_address_sanitizers) {
LOG(INFO) << err_msg;
detail += fmt::format("\n {}", err_msg);
}
return detail;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ class MemTrackerLimiter final : public MemTracker {
void add_address_sanitizers(void* buf, size_t size);
void remove_address_sanitizers(void* buf, size_t size);
std::string print_address_sanitizers();
bool is_group_commit_load {false};
#endif

std::string debug_string() override {
Expand Down Expand Up @@ -260,6 +261,7 @@ class MemTrackerLimiter final : public MemTracker {

std::mutex _address_sanitizers_mtx;
std::unordered_map<void*, AddressSanitizer> _address_sanitizers;
std::vector<std::string> _error_address_sanitizers;
#endif
};

Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/group_commit_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ Status GroupCommitScanNode::get_next(RuntimeState* state, vectorized::Block* blo

Status GroupCommitScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::init(tnode, state));
#ifndef NDEBUG
DCHECK(state->get_query_ctx() != nullptr);
state->get_query_ctx()->query_mem_tracker->is_group_commit_load = true;
#endif
return state->exec_env()->group_commit_mgr()->get_load_block_queue(
_table_id, state->fragment_instance_id(), load_block_queue);
}
Expand Down

0 comments on commit 2dabc8b

Please sign in to comment.