Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 13, 2024
1 parent d30a9c3 commit 01933ec
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 48 deletions.
5 changes: 1 addition & 4 deletions be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
namespace doris {

/*
* A tracker that keeps track of the current and peak memory usage seen.
* Relaxed ordering, not accurate in real time.
*
* can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER,
* which will automatically track all memory usage of the code segment where it is located.
*
* This class is thread-safe.
*/
class MemTracker {
class MemTracker final {
public:
MemTracker() = default;
MemTracker(const std::string& label);
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (ExecEnv::tracking_memory()) {
ExecEnv::GetInstance()->orphan_mem_tracker()->consume(consumption());
}
set_consumption(0);
_mem_counter.set(0);
} else if (open_memory_tracker_inaccurate_detect() && !_address_sanitizers.empty()) {
LOG(FATAL) << "[Address Sanitizer] consumption is 0, but address sanitizers not empty. "
<< ", mem tracker label: " << _label
<< ", peak consumption: " << peak_consumption() << print_address_sanitizers();
}
DCHECK(reserved_consumption() == 0);
memory_memtrackerlimiter_cnt << -1;
}

Expand Down
51 changes: 30 additions & 21 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ struct TrackerLimiterGroup {
std::mutex group_lock;
};

// Track and limit the memory usage of process and query.
// Contains an limit, arranged into a tree structure.
//
// Automatically track every once malloc/free of the system memory allocator (Currently, based on TCMlloc hook).
// Put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts,all memory used by this thread
// will be recorded on this Query, otherwise it will be recorded in Orphan Tracker by default.
/*
* Track and limit the memory usage of process and query.
*
* Usually, put Query MemTrackerLimiter into SCOPED_ATTACH_TASK when the thread starts,
* all memory used by this thread will be recorded on this Query.
*
* This class is thread-safe.
*/
class MemTrackerLimiter final {
public:
/*
Expand All @@ -83,10 +85,6 @@ class MemTrackerLimiter final {
OTHER = 5,
};

// Corresponding to MemTrackerLimiter::Type.
// MemCounter contains atomic variables, which are not allowed to be copied or moved.
inline static std::unordered_map<Type, MemCounter> TypeMemSum;

struct Snapshot {
std::string type;
std::string label;
Expand All @@ -97,6 +95,10 @@ class MemTrackerLimiter final {
bool operator<(const Snapshot& rhs) const { return cur_consumption < rhs.cur_consumption; }
};

// Corresponding to MemTrackerLimiter::Type.
// MemCounter contains atomic variables, which are not allowed to be copied or moved.
inline static std::unordered_map<Type, MemCounter> TypeMemSum;

/*
* Part 2, Constructors and property methods
*/
Expand Down Expand Up @@ -147,6 +149,23 @@ class MemTrackerLimiter final {

void release(int64_t bytes) { _mem_counter.sub(bytes); }

bool try_consume(int64_t bytes) {
if (UNLIKELY(bytes == 0)) {
return true;
}
bool rt = true;
if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) {
rt = _mem_counter.try_add(bytes, _limit);
} else {
_mem_counter.add(bytes);
}
if (rt && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
}
return rt;
}

void set_consumption(int64_t bytes) { _mem_counter.set(bytes); }

// Transfer 'bytes' of consumption from this tracker to 'dst'.
Expand All @@ -169,24 +188,14 @@ class MemTrackerLimiter final {
int64_t reserved_peak_consumption() const { return _reserved_counter.peak_value(); }

bool try_reserve(int64_t bytes) {
bool rt = true;
if (is_overcommit_tracker() && !config::enable_query_memory_overcommit) {
rt = _mem_counter.try_add(bytes, _limit);
} else {
_mem_counter.add(bytes);
}
bool rt = try_consume(bytes);
if (rt) {
_reserved_counter.add(bytes);
}
if (rt && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
}
return rt;
}

void release_reserved(int64_t bytes) {
release(bytes);
_reserved_counter.sub(bytes);
DCHECK(reserved_consumption() >= 0);
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
// if _untracked_mem less than -SYNC_PROC_RESERVED_INTERVAL_BYTES, increase process reserved memory.
if (std::abs(_untracked_mem) >= SYNC_PROC_RESERVED_INTERVAL_BYTES) {
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
_limiter_tracker->release_reserved(_untracked_mem);
_untracked_mem = 0;
}
return;
Expand All @@ -211,6 +212,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
size -= _reserved_mem;
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
_untracked_mem);
_limiter_tracker->release_reserved(_reserved_mem + _untracked_mem);
_reserved_mem = 0;
_untracked_mem = 0;
}
Expand Down Expand Up @@ -295,13 +297,15 @@ inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size,
wg_ptr->memory_debug_string());
_limiter_tracker->release(size); // rollback
_limiter_tracker->release_reserved(size); // rollback
return doris::Status::MemoryLimitExceeded(err_msg);
}
}
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
auto err_msg = fmt::format("reserve memory failed, size: {}, because {}", size,
GlobalMemoryArbitrator::process_mem_log_str());
_limiter_tracker->release(size); // rollback
_limiter_tracker->release_reserved(size); // rollback
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
Expand All @@ -316,7 +320,8 @@ inline void ThreadMemTrackerMgr::release_reserved() {
if (_reserved_mem != 0) {
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
_untracked_mem);
_limiter_tracker->release_reserved(_reserved_mem);
_limiter_tracker->release_reserved(_reserved_mem + _untracked_mem);
_limiter_tracker->release(_reserved_mem);
auto wg_ptr = _wg_wptr.lock();
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(_reserved_mem);
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::throw_b
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err);
}

#ifndef NDEBUG
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::add_address_sanitizers(
void* buf, size_t size) const {
Expand All @@ -251,7 +250,6 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::remove_
#endif
doris::thread_context()->thread_mem_tracker()->remove_address_sanitizers(buf, size);
}
#endif

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::alloc(size_t size,
Expand Down
19 changes: 2 additions & 17 deletions be/src/vec/common/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,8 @@ class Allocator {
void consume_memory(size_t size) const;
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
#ifndef NDEBUG
void add_address_sanitizers(void* buf, size_t size) const;
void remove_address_sanitizers(void* buf, size_t size) const;
#endif

void* alloc(size_t size, size_t alignment = 0);
void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0);
Expand Down Expand Up @@ -289,9 +287,7 @@ class Allocator {
if constexpr (MemoryAllocator::need_record_actual_size()) {
record_size = MemoryAllocator::allocated_size(buf);
}
#ifndef NDEBUG
add_address_sanitizers(buf, record_size);
#endif
} else {
buf = nullptr;
int res = MemoryAllocator::posix_memalign(&buf, alignment, size);
Expand All @@ -307,9 +303,7 @@ class Allocator {
if constexpr (MemoryAllocator::need_record_actual_size()) {
record_size = MemoryAllocator::allocated_size(buf);
}
#ifndef NDEBUG
add_address_sanitizers(buf, record_size);
#endif
}
}
if constexpr (MemoryAllocator::need_record_actual_size()) {
Expand All @@ -325,9 +319,7 @@ class Allocator {
throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size));
}
} else {
#ifndef NDEBUG
remove_address_sanitizers(buf, size);
#endif
MemoryAllocator::free(buf);
}
release_memory(size);
Expand All @@ -351,21 +343,16 @@ class Allocator {
if (!use_mmap ||
(old_size < doris::config::mmap_threshold && new_size < doris::config::mmap_threshold &&
alignment <= MALLOC_MIN_ALIGNMENT)) {
#ifndef NDEBUG
remove_address_sanitizers(buf, old_size);
#endif
/// Resize malloc'd memory region with no special alignment requirement.
void* new_buf = MemoryAllocator::realloc(buf, new_size);
if (nullptr == new_buf) {
release_memory(new_size - old_size);
throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {} to {}.", old_size,
new_size));
}
#ifndef NDEBUG
add_address_sanitizers(
new_buf,
new_size); // usually, buf addr = new_buf addr, asan maybe not equal.
#endif
// usually, buf addr = new_buf addr, asan maybe not equal.
add_address_sanitizers(new_buf, new_size);

buf = new_buf;
if constexpr (clear_memory)
Expand Down Expand Up @@ -395,10 +382,8 @@ class Allocator {
// Big allocs that requires a copy.
void* new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
#ifndef NDEBUG
add_address_sanitizers(new_buf, new_size);
remove_address_sanitizers(buf, old_size);
#endif
free(buf, old_size);
buf = new_buf;
}
Expand Down
4 changes: 2 additions & 2 deletions be/test/runtime/memory/thread_mem_tracker_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ TEST_F(ThreadMemTrackerMgrTest, MultiMemTracker) {
std::unique_ptr<ThreadContext> thread_context = std::make_unique<ThreadContext>();
std::shared_ptr<MemTrackerLimiter> t1 =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "UT-MultiMemTracker1");
std::shared_ptr<MemTracker> t2 = std::make_shared<MemTracker>("UT-MultiMemTracker2", t1.get());
std::shared_ptr<MemTracker> t3 = std::make_shared<MemTracker>("UT-MultiMemTracker3", t1.get());
std::shared_ptr<MemTracker> t2 = std::make_shared<MemTracker>("UT-MultiMemTracker2");
std::shared_ptr<MemTracker> t3 = std::make_shared<MemTracker>("UT-MultiMemTracker3");

int64_t size1 = 4 * 1024;
int64_t size2 = 4 * 1024 * 1024;
Expand Down

0 comments on commit 01933ec

Please sign in to comment.