Skip to content

Commit

Permalink
Tracking memory usage of PagePacket and RemotePage in RN fetching…
Browse files Browse the repository at this point in the history
… pages (#7667)

ref #7628, ref #7670
  • Loading branch information
JinheLin authored Jun 29, 2023
1 parent 56d851d commit 7bd02a8
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 8 deletions.
4 changes: 3 additions & 1 deletion dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@
M(DTFileCacheUsed) \
M(PageCacheCapacity) \
M(PageCacheUsed) \
M(ConnectionPoolSize)
M(ConnectionPoolSize) \
M(MemoryTrackingQueryStorageTask) \
M(MemoryTrackingFetchPages)

namespace CurrentMetrics
{
Expand Down
64 changes: 57 additions & 7 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

#include <iomanip>

namespace CurrentMetrics
{
extern const Metric MemoryTrackingQueryStorageTask;
extern const Metric MemoryTrackingFetchPages;
} // namespace CurrentMetrics

std::atomic<Int64> real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
std::atomic<UInt64> proc_virt_size{0};
MemoryTracker::~MemoryTracker()
Expand Down Expand Up @@ -65,6 +71,19 @@ static Poco::Logger * getLogger()
return logger;
}

static String storageMemoryUsageDetail()
{
return fmt::format("non-query: peak={}, amount={}; "
"query-storage-task: peak={}, amount={}; "
"fetch-pages: peak={}, amount={}.",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->getPeak()) : "0",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->get()) : "0",
sub_root_of_query_storage_task_mem_trackers ? formatReadableSizeWithBinarySuffix(sub_root_of_query_storage_task_mem_trackers->getPeak()) : "0",
sub_root_of_query_storage_task_mem_trackers ? formatReadableSizeWithBinarySuffix(sub_root_of_query_storage_task_mem_trackers->get()) : "0",
fetch_pages_mem_tracker ? formatReadableSizeWithBinarySuffix(fetch_pages_mem_tracker->getPeak()) : "0",
fetch_pages_mem_tracker ? formatReadableSizeWithBinarySuffix(fetch_pages_mem_tracker->get()) : "0");
}

void MemoryTracker::logPeakMemoryUsage() const
{
const char * tmp_decr = description.load();
Expand All @@ -78,6 +97,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
* So, we allow over-allocations.
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
reportAmount();

if (!next.load(std::memory_order_relaxed))
CurrentMetrics::add(metric, size);
Expand All @@ -94,15 +114,14 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
if (tmp_decr)
fmt_buf.fmtAppend(" {}", tmp_decr);

fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, memory usage tracked by ProcessList: peak {}, current {}, memory usage not tracked by ProcessList: peak {}, current {} . Virtual memory size: {}",
fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, memory usage tracked by ProcessList: peak {}, current {}. Virtual memory size: {}.",
formatReadableSizeWithBinarySuffix(real_rss),
formatReadableSizeWithBinarySuffix(current_limit),
proc_num_threads.load(),
(root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->peak) : "0"),
(root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->amount) : "0"),
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->peak) : "0"),
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->amount) : "0"),
proc_virt_size.load());
fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}

Expand All @@ -111,17 +130,18 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
if (unlikely(fault_probability && drand48() < fault_probability))
{
amount.fetch_sub(size, std::memory_order_relaxed);
reportAmount();

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory tracker");
const char * tmp_decr = description.load();
if (tmp_decr)
fmt_buf.fmtAppend(" {}", tmp_decr);
fmt_buf.fmtAppend(": fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
fmt_buf.fmtAppend(": fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}.",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));

fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed);
Expand All @@ -133,6 +153,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
{
DB::GET_METRIC(tiflash_memory_exceed_quota_count).Increment();
amount.fetch_sub(size, std::memory_order_relaxed);
reportAmount();

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory limit");
Expand All @@ -142,19 +163,21 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)

if (!is_rss_too_large)
{ // out of memory quota
fmt_buf.fmtAppend(" exceeded caused by 'out of memory quota for data computing' : would use {} for data computing (attempt to allocate chunk of {} bytes), limit of memory for data computing: {}",
fmt_buf.fmtAppend(" exceeded caused by 'out of memory quota for data computing' : would use {} for data computing (attempt to allocate chunk of {} bytes), limit of memory for data computing: {}.",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}
else
{ // RSS too large
fmt_buf.fmtAppend(" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}",
fmt_buf.fmtAppend(" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}.",
formatReadableSizeWithBinarySuffix(real_rss),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}

fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail());

throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
}
Expand All @@ -171,6 +194,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
catch (...)
{
amount.fetch_sub(size, std::memory_order_relaxed);
reportAmount();
std::rethrow_exception(std::current_exception());
}
}
Expand All @@ -180,6 +204,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
void MemoryTracker::free(Int64 size)
{
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
reportAmount();

/** Sometimes, query could free some data, that was allocated outside of query context.
* Example: cache eviction.
Expand All @@ -190,6 +215,7 @@ void MemoryTracker::free(Int64 size)
if (new_amount < 0 && !next.load(std::memory_order_relaxed)) // handle it only for root memory_tracker
{
amount.fetch_sub(new_amount);
reportAmount();
size += new_amount;
}

Expand All @@ -208,6 +234,7 @@ void MemoryTracker::reset()
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
limit.store(0, std::memory_order_relaxed);
reportAmount();
}


Expand All @@ -219,6 +246,12 @@ void MemoryTracker::setOrRaiseLimit(Int64 value)
;
}

void MemoryTracker::reportAmount()
{
if (amount_metric.has_value())
CurrentMetrics::set(*amount_metric, amount.load(std::memory_order_relaxed));
}

#if __APPLE__ && __clang__
__thread MemoryTracker * current_memory_tracker = nullptr;
#else
Expand All @@ -228,6 +261,23 @@ thread_local MemoryTracker * current_memory_tracker = nullptr;
std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_query_mem_trackers = MemoryTracker::createGlobalRoot();

std::shared_ptr<MemoryTracker> sub_root_of_query_storage_task_mem_trackers;
std::shared_ptr<MemoryTracker> fetch_pages_mem_tracker;

void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit)
{
LOG_INFO(getLogger(), "Storage task memory limit={}, larger_than_limit={}", formatReadableSizeWithBinarySuffix(limit), formatReadableSizeWithBinarySuffix(larger_than_limit));
RUNTIME_CHECK(sub_root_of_query_storage_task_mem_trackers == nullptr);
sub_root_of_query_storage_task_mem_trackers = MemoryTracker::create(limit);
sub_root_of_query_storage_task_mem_trackers->setBytesThatRssLargerThanLimit(larger_than_limit);
sub_root_of_query_storage_task_mem_trackers->setAmountMetric(CurrentMetrics::MemoryTrackingQueryStorageTask);

RUNTIME_CHECK(fetch_pages_mem_tracker == nullptr);
fetch_pages_mem_tracker = MemoryTracker::create();
fetch_pages_mem_tracker->setNext(sub_root_of_query_storage_task_mem_trackers.get());
fetch_pages_mem_tracker->setAmountMetric(CurrentMetrics::MemoryTrackingFetchPages);
}

namespace CurrentMemoryTracker
{
static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
/// You could specify custom metric to track memory usage.
CurrentMetrics::Metric metric = CurrentMetrics::MemoryTracking;

/// Report the amount of this MemoryTracker.
std::optional<CurrentMetrics::Metric> amount_metric;

/// This description will be used as prefix into log messages (if isn't nullptr)
std::atomic<const char *> description = nullptr;

Expand All @@ -72,6 +75,8 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
, is_global_root(is_global_root)
{}

void reportAmount();

public:
/// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors.
static MemoryTrackerPtr create(Int64 limit = 0)
Expand Down Expand Up @@ -134,6 +139,8 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
/// The memory consumption could be shown in realtime via CurrentMetrics counter
void setMetric(CurrentMetrics::Metric metric_) { metric = metric_; }

void setAmountMetric(CurrentMetrics::Metric amount_metric_) { amount_metric = amount_metric_; }

void setDescription(const char * description_) { description = description_; }

/// Reset the accumulated data.
Expand All @@ -157,6 +164,16 @@ extern thread_local MemoryTracker * current_memory_tracker;
extern std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_query_mem_trackers;

// Initialize in `initStorageMemoryTracker`.
// If a memory tracker of storage tasks is driven by query, it should inherit `sub_root_of_query_storage_task_mem_trackers`.
// Since it is difficult to maintain synchronization with the root_of_query_mem_trackers, it is not inherited from root_of_query_mem_trackers.
// sub_root_of_query_storage_task_mem_trackers
// |-- fetch_pages_mem_tracker
extern std::shared_ptr<MemoryTracker> sub_root_of_query_storage_task_mem_trackers;
extern std::shared_ptr<MemoryTracker> fetch_pages_mem_tracker;

void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit);

/// Convenience methods, that use current_memory_tracker if it is available.
namespace CurrentMemoryTracker
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);
// adjust the thread pool size according to settings and logical cores num
adjustThreadPoolSize(settings, server_info.cpu_info.logical_cores);
initStorageMemoryTracker(settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity), settings.bytes_that_rss_larger_than_limit);

/// PageStorage run mode has been determined above
if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/MemoryTracker.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Mpp/GRPCCompletionQueuePool.h>
#include <Flash/Mpp/TrackedMppDataPacket.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/Remote/ObjectId.h>
Expand Down Expand Up @@ -112,6 +114,7 @@ std::shared_ptr<disaggregated::FetchDisaggPagesRequest> buildFetchPagesRequest(

RNReadSegmentTaskPtr RNWorkerFetchPages::doWork(const RNReadSegmentTaskPtr & seg_task)
{
MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get());
Stopwatch watch_work{CLOCK_MONOTONIC_COARSE};
SCOPE_EXIT({
// This metric is per-segment.
Expand Down Expand Up @@ -201,6 +204,8 @@ void RNWorkerFetchPages::doFetchPages(
if (bool more = stream_resp->Read(packet.get()); !more)
break;

MemTrackerWrapper packet_mem_tracker_wrapper(packet->SpaceUsedLong(), fetch_pages_mem_tracker.get());

if (!rpc_is_observed)
{
// Count RPC time as sending request + receive first response packet.
Expand All @@ -225,6 +230,7 @@ void RNWorkerFetchPages::doFetchPages(
DM::RemotePb::RemotePage remote_page;
bool parsed = remote_page.ParseFromString(page);
RUNTIME_CHECK_MSG(parsed, "Failed to parse page data (from {})", seg_task->info());
MemTrackerWrapper remote_page_mem_tracker_wrapper(remote_page.SpaceUsedLong(), fetch_pages_mem_tracker.get());

RUNTIME_CHECK(
remaining_pages_to_fetch.contains(remote_page.page_id()),
Expand Down
108 changes: 108 additions & 0 deletions metrics/grafana/tiflash_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -12881,6 +12881,114 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "Memory Usage of Storage Tasks",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 0,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 124
},
"hiddenSeries": false,
"id": 233,
"legend": {
"alignAsTable": true,
"avg": false,
"current": false,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null as zero",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.11",
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(tiflash_system_current_metric_MemoryTrackingQueryStorageTask{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (instance)",
"hide": false,
"interval": "",
"legendFormat": "MemoryTrackingQueryStorageTask-{{instance}}",
"refId": "C"
},
{
"exemplar": true,
"expr": "sum(tiflash_system_current_metric_MemoryTrackingFetchPages{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (instance)",
"hide": false,
"interval": "",
"legendFormat": "MemoryTrackingFetchPages-{{instance}}",
"refId": "D"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Memory Usage of Storage Tasks",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": null,
"format": "bytes",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "percentunit",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": false
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "Disaggregated",
Expand Down

0 comments on commit 7bd02a8

Please sign in to comment.