Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking memory usage of PagePacket and RemotePage in RN fetching pages #7667

Merged
merged 13 commits into from
Jun 29, 2023
4 changes: 3 additions & 1 deletion dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@
M(DTFileCacheUsed) \
M(PageCacheCapacity) \
M(PageCacheUsed) \
M(ConnectionPoolSize)
M(ConnectionPoolSize) \
M(MemoryTrackingQueryStorageTask) \
M(MemoryTrackingFetchPages)

namespace CurrentMetrics
{
Expand Down
64 changes: 57 additions & 7 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

#include <iomanip>

namespace CurrentMetrics
{
extern const Metric MemoryTrackingQueryStorageTask;
extern const Metric MemoryTrackingFetchPages;
} // namespace CurrentMetrics

std::atomic<Int64> real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
std::atomic<UInt64> proc_virt_size{0};
MemoryTracker::~MemoryTracker()
Expand Down Expand Up @@ -65,6 +71,19 @@ static Poco::Logger * getLogger()
return logger;
}

static String storageMemoryUsageDetail()
{
return fmt::format("non-query: peak={}, amount={}; "
"query-storage-task: peak={}, amount={}; "
"fetch-pages: peak={}, amount={}.",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->getPeak()) : "0",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->get()) : "0",
sub_root_of_query_storage_task_mem_trackers ? formatReadableSizeWithBinarySuffix(sub_root_of_query_storage_task_mem_trackers->getPeak()) : "0",
sub_root_of_query_storage_task_mem_trackers ? formatReadableSizeWithBinarySuffix(sub_root_of_query_storage_task_mem_trackers->get()) : "0",
fetch_pages_mem_tracker ? formatReadableSizeWithBinarySuffix(fetch_pages_mem_tracker->getPeak()) : "0",
fetch_pages_mem_tracker ? formatReadableSizeWithBinarySuffix(fetch_pages_mem_tracker->get()) : "0");
}

void MemoryTracker::logPeakMemoryUsage() const
{
const char * tmp_decr = description.load();
Expand All @@ -78,6 +97,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
* So, we allow over-allocations.
*/
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);
reportAmount();

if (!next.load(std::memory_order_relaxed))
CurrentMetrics::add(metric, size);
Expand All @@ -94,15 +114,14 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
if (tmp_decr)
fmt_buf.fmtAppend(" {}", tmp_decr);

fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, memory usage tracked by ProcessList: peak {}, current {}, memory usage not tracked by ProcessList: peak {}, current {} . Virtual memory size: {}",
fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, memory usage tracked by ProcessList: peak {}, current {}. Virtual memory size: {}.",
formatReadableSizeWithBinarySuffix(real_rss),
formatReadableSizeWithBinarySuffix(current_limit),
proc_num_threads.load(),
(root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->peak) : "0"),
(root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->amount) : "0"),
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->peak) : "0"),
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->amount) : "0"),
proc_virt_size.load());
fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}

Expand All @@ -111,17 +130,18 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
if (unlikely(fault_probability && drand48() < fault_probability))
{
amount.fetch_sub(size, std::memory_order_relaxed);
reportAmount();

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory tracker");
const char * tmp_decr = description.load();
if (tmp_decr)
fmt_buf.fmtAppend(" {}", tmp_decr);
fmt_buf.fmtAppend(": fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
fmt_buf.fmtAppend(": fault injected. Would use {} (attempt to allocate chunk of {} bytes), maximum: {}.",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));

fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed);
Expand All @@ -133,6 +153,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
{
DB::GET_METRIC(tiflash_memory_exceed_quota_count).Increment();
amount.fetch_sub(size, std::memory_order_relaxed);
reportAmount();

DB::FmtBuffer fmt_buf;
fmt_buf.append("Memory limit");
Expand All @@ -142,19 +163,21 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)

if (!is_rss_too_large)
{ // out of memory quota
fmt_buf.fmtAppend(" exceeded caused by 'out of memory quota for data computing' : would use {} for data computing (attempt to allocate chunk of {} bytes), limit of memory for data computing: {}",
fmt_buf.fmtAppend(" exceeded caused by 'out of memory quota for data computing' : would use {} for data computing (attempt to allocate chunk of {} bytes), limit of memory for data computing: {}.",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}
else
{ // RSS too large
fmt_buf.fmtAppend(" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}",
fmt_buf.fmtAppend(" exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}.",
formatReadableSizeWithBinarySuffix(real_rss),
size,
formatReadableSizeWithBinarySuffix(current_limit));
}

fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail());

throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
}
Expand All @@ -171,6 +194,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
catch (...)
{
amount.fetch_sub(size, std::memory_order_relaxed);
reportAmount();
std::rethrow_exception(std::current_exception());
}
}
Expand All @@ -180,6 +204,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
void MemoryTracker::free(Int64 size)
{
Int64 new_amount = amount.fetch_sub(size, std::memory_order_relaxed) - size;
reportAmount();

/** Sometimes, query could free some data, that was allocated outside of query context.
* Example: cache eviction.
Expand All @@ -190,6 +215,7 @@ void MemoryTracker::free(Int64 size)
if (new_amount < 0 && !next.load(std::memory_order_relaxed)) // handle it only for root memory_tracker
{
amount.fetch_sub(new_amount);
reportAmount();
size += new_amount;
}

Expand All @@ -208,6 +234,7 @@ void MemoryTracker::reset()
amount.store(0, std::memory_order_relaxed);
peak.store(0, std::memory_order_relaxed);
limit.store(0, std::memory_order_relaxed);
reportAmount();
}


Expand All @@ -219,6 +246,12 @@ void MemoryTracker::setOrRaiseLimit(Int64 value)
;
}

void MemoryTracker::reportAmount()
{
if (amount_metric.has_value())
CurrentMetrics::set(*amount_metric, amount.load(std::memory_order_relaxed));
}

#if __APPLE__ && __clang__
__thread MemoryTracker * current_memory_tracker = nullptr;
#else
Expand All @@ -228,6 +261,23 @@ thread_local MemoryTracker * current_memory_tracker = nullptr;
std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_query_mem_trackers = MemoryTracker::createGlobalRoot();

std::shared_ptr<MemoryTracker> sub_root_of_query_storage_task_mem_trackers;
std::shared_ptr<MemoryTracker> fetch_pages_mem_tracker;

void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit)
{
LOG_INFO(getLogger(), "Storage task memory limit={}, larger_than_limit={}", formatReadableSizeWithBinarySuffix(limit), formatReadableSizeWithBinarySuffix(larger_than_limit));
RUNTIME_CHECK(sub_root_of_query_storage_task_mem_trackers == nullptr);
sub_root_of_query_storage_task_mem_trackers = MemoryTracker::create(limit);
sub_root_of_query_storage_task_mem_trackers->setBytesThatRssLargerThanLimit(larger_than_limit);
sub_root_of_query_storage_task_mem_trackers->setAmountMetric(CurrentMetrics::MemoryTrackingQueryStorageTask);

RUNTIME_CHECK(fetch_pages_mem_tracker == nullptr);
fetch_pages_mem_tracker = MemoryTracker::create();
fetch_pages_mem_tracker->setNext(sub_root_of_query_storage_task_mem_trackers.get());
fetch_pages_mem_tracker->setAmountMetric(CurrentMetrics::MemoryTrackingFetchPages);
}

namespace CurrentMemoryTracker
{
static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
/// You could specify custom metric to track memory usage.
CurrentMetrics::Metric metric = CurrentMetrics::MemoryTracking;

/// Report the amount of this MemoryTracker.
std::optional<CurrentMetrics::Metric> amount_metric;

/// This description will be used as prefix into log messages (if isn't nullptr)
std::atomic<const char *> description = nullptr;

Expand All @@ -72,6 +75,8 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
, is_global_root(is_global_root)
{}

void reportAmount();

public:
/// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors.
static MemoryTrackerPtr create(Int64 limit = 0)
Expand Down Expand Up @@ -134,6 +139,8 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
/// The memory consumption could be shown in realtime via CurrentMetrics counter
void setMetric(CurrentMetrics::Metric metric_) { metric = metric_; }

void setAmountMetric(CurrentMetrics::Metric amount_metric_) { amount_metric = amount_metric_; }

void setDescription(const char * description_) { description = description_; }

/// Reset the accumulated data.
Expand All @@ -157,6 +164,16 @@ extern thread_local MemoryTracker * current_memory_tracker;
extern std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_query_mem_trackers;

// Initialize in `initStorageMemoryTracker`.
// If a memory tracker of storage tasks is driven by query, it should inherit `sub_root_of_query_storage_task_mem_trackers`.
// Since it is difficult to maintain synchronization with the root_of_query_mem_trackers, it is not inherited from root_of_query_mem_trackers.
// sub_root_of_query_storage_task_mem_trackers
// |-- fetch_pages_mem_tracker
extern std::shared_ptr<MemoryTracker> sub_root_of_query_storage_task_mem_trackers;
bestwoody marked this conversation as resolved.
Show resolved Hide resolved
extern std::shared_ptr<MemoryTracker> fetch_pages_mem_tracker;

void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit);

/// Convenience methods, that use current_memory_tracker if it is available.
namespace CurrentMemoryTracker
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);
// adjust the thread pool size according to settings and logical cores num
adjustThreadPoolSize(settings, server_info.cpu_info.logical_cores);
initStorageMemoryTracker(settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity), settings.bytes_that_rss_larger_than_limit);

/// PageStorage run mode has been determined above
if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/MemoryTracker.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Mpp/GRPCCompletionQueuePool.h>
#include <Flash/Mpp/TrackedMppDataPacket.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Storages/DeltaMerge/Remote/ObjectId.h>
Expand Down Expand Up @@ -112,6 +114,7 @@ std::shared_ptr<disaggregated::FetchDisaggPagesRequest> buildFetchPagesRequest(

RNReadSegmentTaskPtr RNWorkerFetchPages::doWork(const RNReadSegmentTaskPtr & seg_task)
{
MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get());
Stopwatch watch_work{CLOCK_MONOTONIC_COARSE};
SCOPE_EXIT({
// This metric is per-segment.
Expand Down Expand Up @@ -201,6 +204,8 @@ void RNWorkerFetchPages::doFetchPages(
if (bool more = stream_resp->Read(packet.get()); !more)
break;

MemTrackerWrapper packet_mem_tracker_wrapper(packet->SpaceUsedLong(), fetch_pages_mem_tracker.get());

if (!rpc_is_observed)
{
// Count RPC time as sending request + receive first response packet.
Expand All @@ -225,6 +230,7 @@ void RNWorkerFetchPages::doFetchPages(
DM::RemotePb::RemotePage remote_page;
bool parsed = remote_page.ParseFromString(page);
RUNTIME_CHECK_MSG(parsed, "Failed to parse page data (from {})", seg_task->info());
MemTrackerWrapper remote_page_mem_tracker_wrapper(remote_page.SpaceUsedLong(), fetch_pages_mem_tracker.get());

RUNTIME_CHECK(
remaining_pages_to_fetch.contains(remote_page.page_id()),
Expand Down
108 changes: 108 additions & 0 deletions metrics/grafana/tiflash_summary.json
Original file line number Diff line number Diff line change
Expand Up @@ -12881,6 +12881,114 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "Memory Usage of Storage Tasks",
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 0,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 124
},
"hiddenSeries": false,
"id": 233,
"legend": {
"alignAsTable": true,
"avg": false,
"current": false,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null as zero",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.11",
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "sum(tiflash_system_current_metric_MemoryTrackingQueryStorageTask{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (instance)",
"hide": false,
"interval": "",
"legendFormat": "MemoryTrackingQueryStorageTask-{{instance}}",
"refId": "C"
},
{
"exemplar": true,
"expr": "sum(tiflash_system_current_metric_MemoryTrackingFetchPages{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}) by (instance)",
"hide": false,
"interval": "",
"legendFormat": "MemoryTrackingFetchPages-{{instance}}",
"refId": "D"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Memory Usage of Storage Tasks",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": null,
"format": "bytes",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "percentunit",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": false
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"title": "Disaggregated",
Expand Down