Skip to content

Commit

Permalink
Improve memtrack (#5610)
Browse files Browse the repository at this point in the history
close #5609
  • Loading branch information
bestwoody committed Aug 31, 2022
1 parent dcdb581 commit 72f4951
Show file tree
Hide file tree
Showing 30 changed files with 744 additions and 135 deletions.
94 changes: 94 additions & 0 deletions dbms/src/Common/BackgroundTask.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/BackgroundTask.h>

#include <fstream>
namespace DB
{
bool process_mem_usage(double & resident_set)
{
resident_set = 0.0;

// 'file' stat seems to give the most reliable results
std::ifstream stat_stream("/proc/self/stat", std::ios_base::in);
// if "/proc/self/stat" is not supported
if (!stat_stream.is_open())
return false;

// dummy vars for leading entries in stat that we don't care about
std::string pid, comm, state, ppid, pgrp, session, tty_nr;
std::string tpgid, flags, minflt, cminflt, majflt, cmajflt;
std::string utime, stime, cutime, cstime, priority, nice;
std::string proc_num_threads, itrealvalue, starttime;
UInt64 vsize;

// the field we want
Int64 rss;

stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr
>> tpgid >> flags >> minflt >> cminflt >> majflt >> cmajflt
>> utime >> stime >> cutime >> cstime >> priority >> nice
>> proc_num_threads >> itrealvalue >> starttime >> vsize >> rss; // don't care about the rest

stat_stream.close();

Int64 page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages
resident_set = rss * page_size_kb;
return true;
}

bool isProcStatSupported()
{
std::ifstream stat_stream("/proc/self/stat", std::ios_base::in);
return stat_stream.is_open();
}

void CollectProcInfoBackgroundTask::begin()
{
std::unique_lock lk(mu);
if (!is_already_begin)
{
if (!isProcStatSupported())
{
end_fin = true;
return;
}
std::thread t = ThreadFactory::newThread(false, "MemTrackThread", &CollectProcInfoBackgroundTask::memCheckJob, this);
t.detach();
is_already_begin = true;
}
}

void CollectProcInfoBackgroundTask::memCheckJob()
{
double resident_set;
while (!end_syn)
{
process_mem_usage(resident_set);
resident_set *= 1024; // unit: byte
real_rss = static_cast<Int64>(resident_set);

usleep(100000); // sleep 100ms
}
end_fin = true;
}

void CollectProcInfoBackgroundTask::end()
{
end_syn = true;
while (!end_fin)
usleep(1000); // Just ok since it is called only when TiFlash shutdown.
}
} // namespace DB
39 changes: 39 additions & 0 deletions dbms/src/Common/BackgroundTask.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/ThreadFactory.h>
namespace DB
{
class CollectProcInfoBackgroundTask
{
public:
CollectProcInfoBackgroundTask() = default;
~CollectProcInfoBackgroundTask()
{
end();
}
void begin();

void end();

private:
void memCheckJob();

std::mutex mu;
bool is_already_begin = false;
std::atomic<bool> end_syn{false}, end_fin{false};
};
} // namespace DB
17 changes: 14 additions & 3 deletions dbms/src/Common/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ class MPMCQueue

~MPMCQueue()
{
std::unique_lock lock(mu);
for (; read_pos < write_pos; ++read_pos)
destruct(getObj(read_pos));
drain();
}

void finishAndDrain()
{
finish();
drain();
}

// Cannot to use copy/move constructor,
Expand Down Expand Up @@ -418,6 +422,13 @@ class MPMCQueue
obj.~T();
}

void drain()
{
std::unique_lock lock(mu);
for (; read_pos < write_pos; ++read_pos)
destruct(getObj(read_pos));
}

template <typename F>
ALWAYS_INLINE bool changeStatus(F && action)
{
Expand Down
47 changes: 36 additions & 11 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <iomanip>

std::atomic<Int64> real_rss{0};
MemoryTracker::~MemoryTracker()
{
if (peak)
Expand All @@ -46,6 +47,7 @@ MemoryTracker::~MemoryTracker()
* then memory usage of 'next' memory trackers will be underestimated,
* because amount will be decreased twice (first - here, second - when real 'free' happens).
*/
// TODO In future, maybe we can find a better way to handle the "amount > 0" case.
if (auto value = amount.load(std::memory_order_relaxed))
free(value);
}
Expand Down Expand Up @@ -80,7 +82,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
/// In this case, it doesn't matter.
if (unlikely(fault_probability && drand48() < fault_probability))
{
free(size);
amount.fetch_sub(size, std::memory_order_relaxed);

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory tracker");
Expand All @@ -93,20 +95,33 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)

throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}

if (unlikely(current_limit && will_be > current_limit))
bool is_rss_too_large = (!next.load(std::memory_order_relaxed) && current_limit
&& real_rss > current_limit + bytes_rss_larger_than_limit
&& will_be > current_limit - (real_rss - current_limit - bytes_rss_larger_than_limit));
if (is_rss_too_large
|| unlikely(current_limit && will_be > current_limit))
{
free(size);
amount.fetch_sub(size, std::memory_order_relaxed);

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory limit");
if (description)
fmt_buf.fmtAppend(" {}", description);

fmt_buf.fmtAppend(" exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));
if (!is_rss_too_large)
{ // out of memory quota
fmt_buf.fmtAppend(" exceeded caused by 'out of memory quota for data computing' : would use {} for data computing (attempt to allocate chunk of {} bytes), limit of memory for data computing: {}",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}
else
{ // RSS too large
fmt_buf.fmtAppend(" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}",
formatReadableSizeWithBinarySuffix(real_rss),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}

throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
Expand All @@ -116,7 +131,17 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
peak.store(will_be, std::memory_order_relaxed);

if (auto * loaded_next = next.load(std::memory_order_relaxed))
loaded_next->alloc(size, check_memory_limit);
{
try
{
loaded_next->alloc(size, check_memory_limit);
}
catch (...)
{
amount.fetch_sub(size, std::memory_order_relaxed);
std::rethrow_exception(std::current_exception());
}
}
}


Expand All @@ -130,7 +155,7 @@ void MemoryTracker::free(Int64 size)
* Memory usage will be calculated with some error.
* NOTE The code is not atomic. Not worth to fix.
*/
if (new_amount < 0)
if (new_amount < 0 && !next.load(std::memory_order_relaxed)) // handle it only for root memory_tracker
{
amount.fetch_sub(new_amount);
size += new_amount;
Expand Down Expand Up @@ -170,7 +195,7 @@ thread_local MemoryTracker * current_memory_tracker = nullptr;

namespace CurrentMemoryTracker
{
static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 8 * 1024 * 1024; // 8 MiB
static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB
#if __APPLE__ && __clang__
static __thread Int64 local_delta{};
#else
Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <atomic>


extern std::atomic<Int64> real_rss;
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
Expand All @@ -35,6 +35,9 @@ class MemoryTracker
std::atomic<Int64> peak{0};
std::atomic<Int64> limit{0};

// How many bytes RSS(Resident Set Size) can be larger than limit(max_memory_usage_for_all_queries). Default: 5GB
Int64 bytes_rss_larger_than_limit = 5368709120;

/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0;

Expand Down Expand Up @@ -70,13 +73,17 @@ class MemoryTracker

Int64 getPeak() const { return peak.load(std::memory_order_relaxed); }

Int64 getLimit() const { return limit.load(std::memory_order_relaxed); }

void setLimit(Int64 limit_) { limit.store(limit_, std::memory_order_relaxed); }

/** Set limit if it was not set.
* Otherwise, set limit to new value, if new value is greater than previous limit.
*/
void setOrRaiseLimit(Int64 value);

void setBytesThatRssLargerThanLimit(Int64 value) { bytes_rss_larger_than_limit = value; }

void setFaultProbability(double value) { fault_probability = value; }

/// next should be changed only once: from nullptr to some value.
Expand Down
Loading

0 comments on commit 72f4951

Please sign in to comment.