From 3b3e65cf01a5a600915fe8d3b56e2a64d7322073 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 10 May 2022 18:17:09 +0800 Subject: [PATCH] [feature-wip] (memory tracker) (step6, End) Fix some details (#9301) 1. Fix LoadTask, ChunkAllocator, TabletMeta, Brpc, the accuracy of memory track. 2. Modified some MemTracker names, deleted some unnecessary trackers, and improved readability. 3. More powerful MemTracker debugging capabilities. 4. Avoid creating TabletColumn temporary objects and improve BE startup time by 8%. 5. Fix some other details. --- be/src/exec/olap_scanner.cpp | 12 ++-- be/src/exec/tablet_sink.cpp | 5 +- be/src/exec/tablet_sink.h | 3 +- be/src/http/action/compaction_action.h | 2 +- be/src/olap/reader.cpp | 4 ++ be/src/olap/row_block.cpp | 2 +- be/src/olap/row_block2.cpp | 2 +- .../rowset/segment_v2/binary_dict_page.cpp | 2 +- .../olap/rowset/segment_v2/zone_map_index.cpp | 4 +- be/src/olap/tablet_manager.cpp | 3 + be/src/olap/tablet_schema.cpp | 4 ++ be/src/olap/task/engine_alter_tablet_task.cpp | 2 +- be/src/olap/task/engine_batch_load_task.cpp | 4 +- be/src/olap/task/engine_checksum_task.cpp | 6 +- be/src/olap/task/engine_clone_task.cpp | 2 +- be/src/runtime/load_channel.cpp | 12 ++-- be/src/runtime/load_channel.h | 2 +- be/src/runtime/load_channel_mgr.cpp | 4 +- be/src/runtime/load_channel_mgr.h | 3 +- be/src/runtime/mem_pool.cpp | 4 +- be/src/runtime/mem_tracker.cpp | 55 +++++++++++-------- be/src/runtime/mem_tracker.h | 23 ++++++-- be/src/runtime/mem_tracker_task_pool.cpp | 9 +-- be/src/runtime/mem_tracker_task_pool.h | 10 +--- be/src/runtime/memory/chunk_allocator.cpp | 25 +++------ be/src/runtime/result_file_sink.cpp | 4 -- be/src/runtime/tablets_channel.cpp | 5 +- be/src/runtime/tablets_channel.h | 2 +- be/src/runtime/tcmalloc_hook.h | 9 +-- be/src/runtime/thread_context.cpp | 13 ++++- be/src/runtime/thread_context.h | 22 ++++++-- be/src/runtime/thread_mem_tracker_mgr.cpp | 8 +-- be/src/runtime/thread_mem_tracker_mgr.h | 22 +++----- be/src/service/doris_main.cpp | 1 + be/src/service/internal_service.cpp | 5 +- 35 files changed, 171 insertions(+), 124 deletions(-) diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 40029414b822e3..523474a50316ef 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -49,10 +49,14 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool _is_open(false), _aggregation(aggregation), _need_agg_finalize(need_agg_finalize), - _version(-1), - _mem_tracker(MemTracker::create_tracker( - tracker->limit(), tracker->label() + ":OlapScanner:" + tls_ctx()->thread_id_str(), - tracker)) {} + _version(-1) { +#ifndef NDEBUG + _mem_tracker = MemTracker::create_tracker(tracker->limit(), + "OlapScanner:" + tls_ctx()->thread_id_str(), tracker); +#else + _mem_tracker = tracker; +#endif +} Status OlapScanner::prepare( const TPaloScanRange& scan_range, const std::vector& key_ranges, diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 0e89069cb5cc28..4f3cfaaef6489e 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -52,8 +52,9 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int if (_parent->_transfer_data_by_brpc_attachment) { _tuple_data_buffer_ptr = &_tuple_data_buffer; } - _node_channel_tracker = - MemTracker::create_tracker(-1, "NodeChannel" + tls_ctx()->thread_id_str()); + _node_channel_tracker = MemTracker::create_tracker( + -1, fmt::format("NodeChannel:indexID={}:threadId={}", + std::to_string(_index_channel->_index_id), tls_ctx()->thread_id_str())); } NodeChannel::~NodeChannel() noexcept { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 53a7ff3b427523..47b519c41a5e22 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -325,7 +325,8 @@ class IndexChannel { public: IndexChannel(OlapTableSink* parent, int64_t index_id, bool is_vec) : _parent(parent), _index_id(index_id), _is_vectorized(is_vec) { - _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel"); + _index_channel_tracker = + MemTracker::create_tracker(-1, "IndexChannel:indexID=" + std::to_string(_index_id)); } ~IndexChannel() = default; diff --git a/be/src/http/action/compaction_action.h b/be/src/http/action/compaction_action.h index 08dfb0ce70d90b..80b11bb4362916 100644 --- a/be/src/http/action/compaction_action.h +++ b/be/src/http/action/compaction_action.h @@ -42,7 +42,7 @@ class CompactionAction : public HttpHandler { CompactionAction(CompactionActionType type) : _type(type) { _compaction_mem_tracker = type == RUN_COMPACTION ? MemTracker::create_tracker(-1, "ManualCompaction", nullptr, - MemTrackerLevel::TASK) + MemTrackerLevel::VERBOSE) : nullptr; } diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 308e59200ab6fb..4496b9f978e402 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -106,7 +106,11 @@ TabletReader::~TabletReader() { } Status TabletReader::init(const ReaderParams& read_params) { +#ifndef NDEBUG _predicate_mem_pool.reset(new MemPool("TabletReader:" + read_params.tablet->full_name())); +#else + _predicate_mem_pool.reset(new MemPool()); +#endif Status res = _init_params(read_params); if (!res.ok()) { diff --git a/be/src/olap/row_block.cpp b/be/src/olap/row_block.cpp index 1568e64650e250..6a04119f2404d4 100644 --- a/be/src/olap/row_block.cpp +++ b/be/src/olap/row_block.cpp @@ -38,7 +38,7 @@ using std::vector; namespace doris { RowBlock::RowBlock(const TabletSchema* schema) : _capacity(0), _schema(schema) { - _mem_pool.reset(new MemPool("RowBlock")); + _mem_pool.reset(new MemPool()); } RowBlock::~RowBlock() { diff --git a/be/src/olap/row_block2.cpp b/be/src/olap/row_block2.cpp index 12e72998dd3ec3..fd4b0ef23ec337 100644 --- a/be/src/olap/row_block2.cpp +++ b/be/src/olap/row_block2.cpp @@ -38,7 +38,7 @@ RowBlockV2::RowBlockV2(const Schema& schema, uint16_t capacity) : _schema(schema), _capacity(capacity), _column_vector_batches(_schema.num_columns()), - _pool(new MemPool("RowBlockV2")), + _pool(new MemPool()), _selection_vector(nullptr) { for (auto cid : _schema.column_ids()) { Status status = ColumnVectorBatch::create( diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index f8c67e5e210e32..28859732266068 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -39,7 +39,7 @@ BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options) _data_page_builder(nullptr), _dict_builder(nullptr), _encoding_type(DICT_ENCODING), - _pool("BinaryDictPageBuilder") { + _pool() { // initially use DICT_ENCODING // TODO: the data page builder type can be created by Factory according to user config _data_page_builder.reset(new BitshufflePageBuilder(options)); diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.cpp b/be/src/olap/rowset/segment_v2/zone_map_index.cpp index e2f4f9ff6a1d44..0fbe5a1f033590 100644 --- a/be/src/olap/rowset/segment_v2/zone_map_index.cpp +++ b/be/src/olap/rowset/segment_v2/zone_map_index.cpp @@ -30,7 +30,7 @@ namespace doris { namespace segment_v2 { -ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool("ZoneMapIndexWriter") { +ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool() { _page_zone_map.min_value = _field->allocate_zone_map_value(&_pool); _page_zone_map.max_value = _field->allocate_zone_map_value(&_pool); _reset_zone_map(&_page_zone_map); @@ -127,7 +127,7 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) { RETURN_IF_ERROR(reader.load(use_page_cache, kept_in_memory)); IndexedColumnIterator iter(&reader); - MemPool pool("ZoneMapIndexReader ColumnBlock"); + MemPool pool; _page_zone_maps.resize(reader.num_values()); // read and cache all page zone maps diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 83da1df3dba27f..5c4da665ea1f1d 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -900,6 +900,7 @@ Status TabletManager::build_all_report_tablets_info(std::map } Status TabletManager::start_trash_sweep() { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); { std::vector all_tablets; // we use this vector to save all tablet ptr for saving lock time. @@ -1021,6 +1022,7 @@ void TabletManager::unregister_clone_tablet(int64_t tablet_id) { void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id, SchemaHash schema_hash, const string& schema_hash_path) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); // acquire the read lock, so that there is no creating tablet or load tablet from meta tasks // create tablet and load tablet task should check whether the dir exists tablets_shard& shard = _get_tablets_shard(tablet_id); @@ -1132,6 +1134,7 @@ void TabletManager::get_partition_related_tablets(int64_t partition_id, } void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::vector related_tablets; { for (auto& tablets_shard : _tablets_shards) { diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index c22c055f472319..702fef36a5d9c8 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -386,9 +386,13 @@ void TabletColumn::to_schema_pb(ColumnPB* column) { uint32_t TabletColumn::mem_size() const { auto size = sizeof(TabletColumn); + size += _col_name.size(); if (_has_default_value) { size += _default_value.size(); } + if (_has_referenced_column) { + size += _referenced_column.size(); + } for (auto& sub_column : _sub_columns) { size += sub_column.mem_size(); } diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index f8ffec16a4385d..7686a632ae46ad 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -29,7 +29,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) : _alter_tablet_req(request) { _mem_tracker = MemTracker::create_tracker( config::memory_limitation_per_thread_for_schema_change_bytes, - fmt::format("EngineAlterTabletTask: {}-{}", + fmt::format("EngineAlterTabletTask:baseTabletId={}:newTabletId={}", std::to_string(_alter_tablet_req.base_tablet_id), std::to_string(_alter_tablet_req.new_tablet_id)), StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK); diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 19270e72fb0cc4..13052c57844899 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -54,7 +54,9 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vectorbatch_load_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 8464bedc5d4652..92d6848b55baaf 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -26,9 +26,9 @@ namespace doris { EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash, TVersion version, uint32_t* checksum) : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { - _mem_tracker = MemTracker::create_tracker(-1, "compute checksum: " + std::to_string(tablet_id), - StorageEngine::instance()->consistency_mem_tracker(), - MemTrackerLevel::TASK); + _mem_tracker = MemTracker::create_tracker( + -1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id), + StorageEngine::instance()->consistency_mem_tracker(), MemTrackerLevel::TASK); } Status EngineChecksumTask::execute() { diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 41e1fa1f9cf5b7..e49472f5f68bbb 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -58,7 +58,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo& _signature(signature), _master_info(master_info) { _mem_tracker = MemTracker::create_tracker( - -1, "clone tablet: " + std::to_string(_clone_req.tablet_id), + -1, "EngineCloneTask:tabletId=" + std::to_string(_clone_req.tablet_id), StorageEngine::instance()->clone_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index ce5448a910ccb2..85f2c5da3f3526 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -18,6 +18,7 @@ #include "runtime/load_channel.h" #include "olap/lru_cache.h" +#include "runtime/exec_env.h" #include "runtime/mem_tracker.h" #include "runtime/tablets_channel.h" #include "runtime/thread_context.h" @@ -31,8 +32,11 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t tim _is_high_priority(is_high_priority), _sender_ip(sender_ip), _is_vec(is_vec) { - _mem_tracker = MemTracker::create_tracker(mem_limit, "LoadChannel:" + _load_id.to_string(), - nullptr, MemTrackerLevel::TASK); + _mem_tracker = MemTracker::create_tracker( + mem_limit, "LoadChannel:tabletId=" + _load_id.to_string(), + ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( + _load_id.to_string()), + MemTrackerLevel::TASK); // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased // immediately by gc thread. @@ -47,7 +51,7 @@ LoadChannel::~LoadChannel() { } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t index_id = params.index_id(); std::shared_ptr channel; { @@ -133,7 +137,7 @@ bool LoadChannel::is_finished() { } Status LoadChannel::cancel() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); for (auto& it : _tablets_channels) { it.second->cancel(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 1d0c3f04e1becc..644e546524d78c 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -128,7 +128,7 @@ class LoadChannel { template Status LoadChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t index_id = request.index_id(); // 1. get tablets channel std::shared_ptr channel; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 7b2ee5bb263fbc..2259eb24034ef5 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -102,7 +102,7 @@ LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64 } Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); std::shared_ptr channel; { @@ -179,7 +179,7 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { } Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); std::shared_ptr cancelled_channel; { diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index c9e84019b13430..121a9ae2740883 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -58,6 +58,8 @@ class LoadChannelMgr { // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); + std::shared_ptr mem_tracker() { return _mem_tracker; } + private: static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, bool is_high_priority, @@ -116,7 +118,6 @@ Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, template Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(request.id()); // 1. get load channel std::shared_ptr channel; diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index adc86f0e67849b..ae70dc4df68c82 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -221,7 +221,9 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { void MemPool::exchange_data(MemPool* other) { int64_t delta_size = other->total_reserved_bytes_ - total_reserved_bytes_; - other->_mem_tracker->transfer_to(_mem_tracker, delta_size); + if (other->_mem_tracker != _mem_tracker) { + other->_mem_tracker->transfer_to(_mem_tracker, delta_size); + } std::swap(current_chunk_idx_, other->current_chunk_idx_); std::swap(next_chunk_size_, other->next_chunk_size_); diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index f42de2ef59c088..bc2ae3efc1ab13 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -60,18 +60,16 @@ MemTracker* MemTracker::get_raw_process_tracker() { return raw_process_tracker; } -// Track memory for all brpc server responses. -static std::shared_ptr brpc_server_tracker; -static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT; - -void MemTracker::create_brpc_server_tracker() { - brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", get_process_tracker(), - MemTrackerLevel::OVERVIEW); -} - -std::shared_ptr MemTracker::get_brpc_server_tracker() { - GoogleOnceInit(&brpc_server_tracker_once, &MemTracker::create_brpc_server_tracker); - return brpc_server_tracker; +static TrackersMap _temporary_mem_trackers; + +std::shared_ptr MemTracker::get_temporary_mem_tracker(const std::string& label) { + // First time this label registered, make a new object, otherwise do nothing. + // Avoid using locks to resolve erase conflicts. + _temporary_mem_trackers.try_emplace_l( + label, [](std::shared_ptr) {}, + MemTracker::create_tracker(-1, fmt::format("[Temporary]-{}", label), nullptr, + MemTrackerLevel::OVERVIEW)); + return _temporary_mem_trackers[label]; } void MemTracker::list_process_trackers(std::vector>* trackers) { @@ -102,14 +100,8 @@ std::shared_ptr MemTracker::create_tracker(int64_t byte_limit, const const std::shared_ptr& parent, MemTrackerLevel level, RuntimeProfile* profile) { - std::shared_ptr reset_parent = - parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); - DCHECK(reset_parent); - - std::shared_ptr tracker( - new MemTracker(byte_limit, label, reset_parent, - level > reset_parent->_level ? level : reset_parent->_level, profile)); - reset_parent->add_child_tracker(tracker); + std::shared_ptr tracker = + MemTracker::create_tracker_impl(byte_limit, label, parent, level, profile); tracker->init(); return tracker; } @@ -117,14 +109,30 @@ std::shared_ptr MemTracker::create_tracker(int64_t byte_limit, const std::shared_ptr MemTracker::create_virtual_tracker( int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, MemTrackerLevel level) { + std::shared_ptr tracker = MemTracker::create_tracker_impl( + byte_limit, "[Virtual]-" + label, parent, level, nullptr); + tracker->init_virtual(); + return tracker; +} + +std::shared_ptr MemTracker::create_tracker_impl( + int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, + MemTrackerLevel level, RuntimeProfile* profile) { std::shared_ptr reset_parent = parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); DCHECK(reset_parent); + std::string reset_label; + MemTracker* task_parent_tracker = reset_parent->parent_task_mem_tracker(); + if (task_parent_tracker) { + reset_label = fmt::format("{}:{}", label, split(task_parent_tracker->label(), ":")[1]); + } else { + reset_label = label; + } std::shared_ptr tracker( - new MemTracker(byte_limit, "[Virtual]-" + label, reset_parent, level, nullptr)); + new MemTracker(byte_limit, reset_label, reset_parent, + level > reset_parent->_level ? level : reset_parent->_level, profile)); reset_parent->add_child_tracker(tracker); - tracker->init_virtual(); return tracker; } @@ -318,7 +326,8 @@ bool MemTracker::gc_memory(int64_t max_consumption) { if (pre_gc_consumption < max_consumption) return false; int64_t curr_consumption = pre_gc_consumption; - const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; // TODO(zxy) Consider as config + // Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later. + const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; // Try to free up some memory for (int i = 0; i < _gc_functions.size(); ++i) { // Try to free up the amount we are over plus some extra so that we don't have to diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 74a7b4bef6bb12..3d4eb740d23ab8 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -20,6 +20,8 @@ #pragma once +#include + #include #include #include @@ -40,6 +42,11 @@ enum class MemTrackerLevel { OVERVIEW = 0, TASK, INSTANCE, VERBOSE }; class MemTracker; class RuntimeState; +using TrackersMap = phmap::parallel_flat_hash_map< + std::string, std::shared_ptr, phmap::priv::hash_default_hash, + phmap::priv::hash_default_eq, + std::allocator>>, 12, std::mutex>; + /// A MemTracker tracks memory consumption; it contains an optional limit /// and can be arranged into a tree structure such that the consumption tracked /// by a MemTracker is also tracked by its ancestors. @@ -80,6 +87,9 @@ class MemTracker { // Cosume/release will not sync to parent.Usually used to manually record the specified memory, // It is independent of the recording of TCMalloc Hook in the thread local tracker, so the same // block of memory is recorded independently in these two trackers. + // TODO(zxy) At present, the purpose of most virtual trackers is only to preserve the logic of + // manually recording memory before, which may be used later. After each virtual tracker is + // required case by case, discuss its necessity. static std::shared_ptr create_virtual_tracker( int64_t byte_limit = -1, const std::string& label = std::string(), const std::shared_ptr& parent = std::shared_ptr(), @@ -97,8 +107,9 @@ class MemTracker { // Gets a shared_ptr to the "process" tracker, creating it if necessary. static std::shared_ptr get_process_tracker(); static MemTracker* get_raw_process_tracker(); - // Gets a shared_ptr to the "brpc server" tracker, creating it if necessary. - static std::shared_ptr get_brpc_server_tracker(); + // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get. + // Temporary trackers are not automatically destructed, which is usually used for debugging. + static std::shared_ptr get_temporary_mem_tracker(const std::string& label); Status check_sys_mem_info(int64_t bytes) { if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) { @@ -423,6 +434,10 @@ class MemTracker { static const std::string COUNTER_NAME; private: + static std::shared_ptr create_tracker_impl( + int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, + MemTrackerLevel level, RuntimeProfile* profile); + /// 'byte_limit' < 0 means no limit /// 'label' is the label used in the usage string (log_usage()) MemTracker(int64_t byte_limit, const std::string& label, @@ -466,8 +481,6 @@ class MemTracker { // Creates the process tracker. static void create_process_tracker(); - // Creates the brpc server tracker. - static void create_brpc_server_tracker(); // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. int64_t _limit; @@ -486,8 +499,6 @@ class MemTracker { // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate // to avoid frequent calls to consume/release of MemTracker. - // TODO(zxy) It may be more performant to use thread_local static, which is inherently thread-safe. - // Test after introducing TCMalloc hook std::atomic _untracked_mem = 0; std::vector _all_trackers; // this tracker plus all of its ancestors diff --git a/be/src/runtime/mem_tracker_task_pool.cpp b/be/src/runtime/mem_tracker_task_pool.cpp index 2d43b927e80329..132b47f57356ab 100644 --- a/be/src/runtime/mem_tracker_task_pool.cpp +++ b/be/src/runtime/mem_tracker_task_pool.cpp @@ -33,15 +33,15 @@ std::shared_ptr MemTrackerTaskPool::register_task_mem_tracker_impl( _task_mem_trackers.try_emplace_l( task_id, [](std::shared_ptr) {}, MemTracker::create_tracker(mem_limit, label, parent, MemTrackerLevel::TASK)); - std::shared_ptr tracker = get_task_mem_tracker(task_id); - return tracker; + return get_task_mem_tracker(task_id); } std::shared_ptr MemTrackerTaskPool::register_query_mem_tracker( const std::string& query_id, int64_t mem_limit) { VLOG_FILE << "Register Query memory tracker, query id: " << query_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); - return register_task_mem_tracker_impl(query_id, mem_limit, fmt::format("queryId={}", query_id), + return register_task_mem_tracker_impl(query_id, mem_limit, + fmt::format("Query:queryId={}", query_id), ExecEnv::GetInstance()->query_pool_mem_tracker()); } @@ -49,7 +49,8 @@ std::shared_ptr MemTrackerTaskPool::register_load_mem_tracker( const std::string& load_id, int64_t mem_limit) { VLOG_FILE << "Register Load memory tracker, load id: " << load_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); - return register_task_mem_tracker_impl(load_id, mem_limit, fmt::format("loadId={}", load_id), + return register_task_mem_tracker_impl(load_id, mem_limit, + fmt::format("Load:loadId={}", load_id), ExecEnv::GetInstance()->load_pool_mem_tracker()); } diff --git a/be/src/runtime/mem_tracker_task_pool.h b/be/src/runtime/mem_tracker_task_pool.h index 20b0eaf7be1fda..e3baec8d518c51 100644 --- a/be/src/runtime/mem_tracker_task_pool.h +++ b/be/src/runtime/mem_tracker_task_pool.h @@ -17,8 +17,6 @@ #pragma once -#include - #include "runtime/mem_tracker.h" namespace doris { @@ -50,13 +48,7 @@ class MemTrackerTaskPool { // All per-task MemTracker objects. // The life cycle of task memtracker in the process is the same as task runtime state, // MemTrackers will be removed from this map after query finish or cancel. - using TaskTrackersMap = phmap::parallel_flat_hash_map< - std::string, std::shared_ptr, phmap::priv::hash_default_hash, - phmap::priv::hash_default_eq, - std::allocator>>, 12, - std::mutex>; - - TaskTrackersMap _task_mem_trackers; + TrackersMap _task_mem_trackers; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 7f8259c03496d8..269dc12fcfd870 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -145,6 +145,7 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, } else { _mem_tracker->transfer_to(reset_tracker, size); } + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); // fast path: allocate from current core arena int core_id = CpuInfo::get_current_core(); @@ -177,9 +178,6 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, SCOPED_RAW_TIMER(&cost_ns); // allocate from system allocator chunk->data = SystemAllocator::allocate(size); - // The allocated chunk is consumed in the tls mem tracker, we want to consume in the ChunkAllocator tracker, - // transfer memory ownership. TODO(zxy) replace with switch tls tracker - tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size); } chunk_pool_system_alloc_count->increment(1); chunk_pool_system_alloc_cost_ns->increment(cost_ns); @@ -193,6 +191,14 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, } void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { + // The chunk's memory ownership is transferred from tls tracker to ChunkAllocator. + if (tracker) { + tracker->transfer_to(_mem_tracker.get(), chunk.size); + } else { + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), + chunk.size); + } + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); if (chunk.core_id == -1) { return; } @@ -205,12 +211,6 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { { SCOPED_RAW_TIMER(&cost_ns); SystemAllocator::free(chunk.data, chunk.size); - // The freed chunk is released in the tls mem tracker. When the chunk was allocated, - // it was consumed in the parameter tracker, so if the tls mem tracker and the parameter - // tracker are different, transfer memory ownership. - if (tracker) - tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), - chunk.size); } chunk_pool_system_free_count->increment(1); chunk_pool_system_free_cost_ns->increment(cost_ns); @@ -219,13 +219,6 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { } } while (!_reserved_bytes.compare_exchange_weak(old_reserved_bytes, new_reserved_bytes)); - // The chunk's memory ownership is transferred from MemPool to ChunkAllocator. - if (tracker) { - tracker->transfer_to(_mem_tracker.get(), chunk.size); - } else { - tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), - chunk.size); - } _arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size); } diff --git a/be/src/runtime/result_file_sink.cpp b/be/src/runtime/result_file_sink.cpp index ee7fa490db884b..1f29c3057422b1 100644 --- a/be/src/runtime/result_file_sink.cpp +++ b/be/src/runtime/result_file_sink.cpp @@ -114,10 +114,6 @@ Status ResultFileSink::prepare(RuntimeState* state) { _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES); _uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); - // TODO(zxy) used after - _mem_tracker = MemTracker::create_tracker( - -1, "ResultFileSink:" + print_id(state->fragment_instance_id()), - state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _profile); // create writer _output_batch = new RowBatch(_output_row_descriptor, 1024); _writer.reset(new (std::nothrow) FileResultWriter( diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 07dbca425e046f..2ede9538358170 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -53,7 +53,7 @@ TabletsChannel::~TabletsChannel() { } Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kOpened) { // Normal case, already open by other sender @@ -138,7 +138,6 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, } Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kFinished) { // TabletsChannel is closed without LoadChannel's lock, @@ -239,7 +238,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request } Status TabletsChannel::cancel() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kFinished) { return _close_status; diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index eb39956cba68a1..5934c401083252 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -168,7 +168,7 @@ Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request& request template Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t cur_seq = 0; auto status = _get_current_seq(cur_seq, request); diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 548b8862990b8f..600668aea90671 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -48,7 +48,8 @@ void init_hook() { MallocHook::AddDeleteHook(&delete_hook); } -void destroy_hook() { - MallocHook::RemoveNewHook(&new_hook); - MallocHook::RemoveDeleteHook(&delete_hook); -} +// For later debug. +// static void destroy_hook() { +// MallocHook::RemoveNewHook(&new_hook); +// MallocHook::RemoveDeleteHook(&delete_hook); +// } diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index d1f206dedc8740..fade4fd51e793e 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -92,7 +92,8 @@ SwitchThreadMemTracker::SwitchThreadMemTracker( DCHECK(mem_tracker); // The thread tracker must be switched after the attach task, otherwise switching // in the main thread will cause the cached tracker not be cleaned up in time. - DCHECK(in_task == false || tls_ctx()->_thread_mem_tracker_mgr->is_attach_task()); + DCHECK(in_task == false || tls_ctx()->type() != ThreadContext::TaskType::UNKNOWN) + << ",tls ctx type=" << tls_ctx()->type(); if (Existed) { _old_tracker_id = tls_ctx()->_thread_mem_tracker_mgr->update_tracker(mem_tracker); } else { @@ -128,7 +129,9 @@ SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack( SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() { tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); +#ifndef NDEBUG DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); +#endif } SwitchBthread::SwitchBthread() { @@ -137,17 +140,21 @@ SwitchBthread::SwitchBthread() { if (tls == nullptr) { // Create thread-local data on demand. tls = new ThreadContext; - tls->_thread_mem_tracker_mgr->init_bthread(); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, tls)); } else { - tls->_thread_mem_tracker_mgr->init_bthread(); + DCHECK(tls->type() == ThreadContext::TaskType::UNKNOWN); + tls->_thread_mem_tracker_mgr->clear_untracked_mems(); } + tls->_thread_mem_tracker_mgr->init(); + tls->set_type(ThreadContext::TaskType::BRPC); } SwitchBthread::~SwitchBthread() { DCHECK(tls != nullptr); tls->_thread_mem_tracker_mgr->clear_untracked_mems(); + tls->_thread_mem_tracker_mgr->init(); + tls->set_type(ThreadContext::TaskType::UNKNOWN); #ifndef NDEBUG DorisMetrics::instance()->switch_bthread_count->increment(1); #endif diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 8ab72be634df0d..8dc8f5267e757b 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -45,12 +45,22 @@ // The query thread will automatically clear_untracked_mems when detach_task. #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, false) +// `detach task/~switch bthread` will clear cached trackers and unconsumed tracks. +// Used after `attach task/switch bthread` to avoid cached trackers not being destroyed in time. #define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, true); #define SCOPED_SWITCH_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, false) #define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, true) +// Count the memory in the scope to a temporary tracker with the specified label name. +// This is very useful when debugging. You can find the position where the tracker statistics are +// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots. +// TODO(zxy) track specifies the memory for each line in the code segment, instead of manually adding +// a switch temporary tracker to each line. Maybe there are open source tools to do this? +#define SCOPED_SWITCH_TEMPORARY_THREAD_LOCAL_MEM_TRACKER(label) \ + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker( \ + MemTracker::get_temporary_mem_tracker(label), false) // After the non-query thread switches the mem tracker, if the thread will not switch the mem // tracker again in the short term, can consider manually clear_untracked_mems. // The query thread will automatically clear_untracked_mems when detach_task. @@ -95,7 +105,8 @@ class ThreadContext { QUERY = 1, LOAD = 2, COMPACTION = 3, - STORAGE = 4 + STORAGE = 4, + BRPC = 5 // to be added ... }; inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", @@ -112,9 +123,10 @@ class ThreadContext { void attach(const TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { - DCHECK(_type == TaskType::UNKNOWN && _task_id == "") - << ",old tracker label: " << mem_tracker->label() - << ",new tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label(); + DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "") + << ",new tracker label: " << mem_tracker->label() + << ",old tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label(); + DCHECK(type != TaskType::UNKNOWN); _type = type; _task_id = task_id; _fragment_instance_id = fragment_instance_id; @@ -129,6 +141,8 @@ class ThreadContext { _thread_mem_tracker_mgr->detach_task(); } + const TaskType& type() const { return _type; } + const void set_type(const TaskType& type) { _type = type; } const std::string& task_id() const { return _task_id; } const std::string& thread_id_str() const { return _thread_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp index e55a4620f00e31..e9768bf86bd697 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/thread_mem_tracker_mgr.cpp @@ -28,6 +28,8 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { DCHECK(switch_count == 0) << print_debug_string(); + clear_untracked_mems(); + init(); _task_id = task_id; _fragment_instance_id = fragment_instance_id; _consume_err_cb.cancel_msg = cancel_msg; @@ -37,10 +39,10 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: return; } #endif - _temp_task_mem_tracker = + std::shared_ptr tracker = ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( task_id); - update_tracker(_temp_task_mem_tracker); + update_tracker(tracker); } else { update_tracker(mem_tracker); } @@ -48,9 +50,7 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: void ThreadMemTrackerMgr::detach_task() { DCHECK(switch_count == 0) << print_debug_string(); - _task_id = ""; _fragment_instance_id = TUniqueId(); - _consume_err_cb.init(); clear_untracked_mems(); init(); } diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 404837a73a0aa8..84033ff4e7ab94 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -70,8 +70,6 @@ class ThreadMemTrackerMgr { // to avoid memory tracking loss. void init(); - void init_bthread(); - void clear_untracked_mems(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker @@ -160,14 +158,16 @@ class ThreadMemTrackerMgr { // NOTE: flat_hash_map, int replaces string as key, all to improve the speed of map find, // the expected speed is increased by more than 10 times. phmap::flat_hash_map> _mem_trackers; - int64_t _tracker_id; phmap::flat_hash_map _untracked_mems; + // After the tracker is added to _mem_trackers, if tracker = null is found when using it, + // we can confirm the tracker label that was added through _mem_tracker_labels. + // Because for performance, all map keys are tracker id. phmap::flat_hash_map _mem_tracker_labels; - // Avoid memory allocation in functions and fall into an infinite loop + int64_t _tracker_id; + // Avoid memory allocation in functions. int64_t _temp_tracker_id; ConsumeErrCallBackInfo _temp_consume_err_cb; - std::shared_ptr _temp_task_mem_tracker; std::string _task_id; TUniqueId _fragment_instance_id; @@ -175,6 +175,8 @@ class ThreadMemTrackerMgr { }; inline void ThreadMemTrackerMgr::init() { + _task_id = ""; + _consume_err_cb.init(); _tracker_id = 0; _mem_trackers.clear(); _mem_trackers[0] = MemTracker::get_process_tracker(); @@ -184,14 +186,6 @@ inline void ThreadMemTrackerMgr::init() { _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); } -inline void ThreadMemTrackerMgr::init_bthread() { - init(); - _mem_trackers[1] = MemTracker::get_brpc_server_tracker(); - _untracked_mems[1] = 0; - _mem_tracker_labels[1] = MemTracker::get_brpc_server_tracker()->label(); - _tracker_id = 1; -} - inline void ThreadMemTrackerMgr::clear_untracked_mems() { for (const auto& untracked_mem : _untracked_mems) { if (untracked_mem.second != 0) { @@ -279,7 +273,7 @@ inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr& _mem_trackers[mem_tracker->id()] = mem_tracker; DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string(); _untracked_mems[mem_tracker->id()] = 0; - _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); + _mem_tracker_labels[mem_tracker->id()] = mem_tracker->label(); } inline std::shared_ptr ThreadMemTrackerMgr::mem_tracker() { diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 1bcdfdb9d34983..259276809cfbbd 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -484,6 +484,7 @@ int main(int argc, char** argv) { doris::MemInfo::refresh_current_mem(); #endif // TODO(zxy) 10s is too long to clear the expired task mem tracker. + // A query mem tracker is about 57 bytes, assuming 10000 qps, which wastes about 55M of memory. // It should be actively triggered at the end of query/load. doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); sleep(10); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d9e575621034d2..bfdd1f31ba3eb3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -132,6 +132,8 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, + _exec_env->load_channel_mgr()->mem_tracker()); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); @@ -152,7 +154,6 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer add batch, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); @@ -166,6 +167,8 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, + _exec_env->load_channel_mgr()->mem_tracker()); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_row_batch(request, cntl); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);