diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 40029414b822e3..523474a50316ef 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -49,10 +49,14 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool _is_open(false), _aggregation(aggregation), _need_agg_finalize(need_agg_finalize), - _version(-1), - _mem_tracker(MemTracker::create_tracker( - tracker->limit(), tracker->label() + ":OlapScanner:" + tls_ctx()->thread_id_str(), - tracker)) {} + _version(-1) { +#ifndef NDEBUG + _mem_tracker = MemTracker::create_tracker(tracker->limit(), + "OlapScanner:" + tls_ctx()->thread_id_str(), tracker); +#else + _mem_tracker = tracker; +#endif +} Status OlapScanner::prepare( const TPaloScanRange& scan_range, const std::vector& key_ranges, diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 0e89069cb5cc28..4f3cfaaef6489e 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -52,8 +52,9 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int if (_parent->_transfer_data_by_brpc_attachment) { _tuple_data_buffer_ptr = &_tuple_data_buffer; } - _node_channel_tracker = - MemTracker::create_tracker(-1, "NodeChannel" + tls_ctx()->thread_id_str()); + _node_channel_tracker = MemTracker::create_tracker( + -1, fmt::format("NodeChannel:indexID={}:threadId={}", + std::to_string(_index_channel->_index_id), tls_ctx()->thread_id_str())); } NodeChannel::~NodeChannel() noexcept { diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 53a7ff3b427523..47b519c41a5e22 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -325,7 +325,8 @@ class IndexChannel { public: IndexChannel(OlapTableSink* parent, int64_t index_id, bool is_vec) : _parent(parent), _index_id(index_id), _is_vectorized(is_vec) { - _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel"); + _index_channel_tracker = + MemTracker::create_tracker(-1, "IndexChannel:indexID=" + std::to_string(_index_id)); } ~IndexChannel() = default; diff --git a/be/src/http/action/compaction_action.h b/be/src/http/action/compaction_action.h index 08dfb0ce70d90b..80b11bb4362916 100644 --- a/be/src/http/action/compaction_action.h +++ b/be/src/http/action/compaction_action.h @@ -42,7 +42,7 @@ class CompactionAction : public HttpHandler { CompactionAction(CompactionActionType type) : _type(type) { _compaction_mem_tracker = type == RUN_COMPACTION ? MemTracker::create_tracker(-1, "ManualCompaction", nullptr, - MemTrackerLevel::TASK) + MemTrackerLevel::VERBOSE) : nullptr; } diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 308e59200ab6fb..4496b9f978e402 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -106,7 +106,11 @@ TabletReader::~TabletReader() { } Status TabletReader::init(const ReaderParams& read_params) { +#ifndef NDEBUG _predicate_mem_pool.reset(new MemPool("TabletReader:" + read_params.tablet->full_name())); +#else + _predicate_mem_pool.reset(new MemPool()); +#endif Status res = _init_params(read_params); if (!res.ok()) { diff --git a/be/src/olap/row_block.cpp b/be/src/olap/row_block.cpp index 1568e64650e250..6a04119f2404d4 100644 --- a/be/src/olap/row_block.cpp +++ b/be/src/olap/row_block.cpp @@ -38,7 +38,7 @@ using std::vector; namespace doris { RowBlock::RowBlock(const TabletSchema* schema) : _capacity(0), _schema(schema) { - _mem_pool.reset(new MemPool("RowBlock")); + _mem_pool.reset(new MemPool()); } RowBlock::~RowBlock() { diff --git a/be/src/olap/row_block2.cpp b/be/src/olap/row_block2.cpp index 12e72998dd3ec3..fd4b0ef23ec337 100644 --- a/be/src/olap/row_block2.cpp +++ b/be/src/olap/row_block2.cpp @@ -38,7 +38,7 @@ RowBlockV2::RowBlockV2(const Schema& schema, uint16_t capacity) : _schema(schema), _capacity(capacity), _column_vector_batches(_schema.num_columns()), - _pool(new MemPool("RowBlockV2")), + _pool(new MemPool()), _selection_vector(nullptr) { for (auto cid : _schema.column_ids()) { Status status = ColumnVectorBatch::create( diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp index f8c67e5e210e32..28859732266068 100644 --- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp @@ -39,7 +39,7 @@ BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options) _data_page_builder(nullptr), _dict_builder(nullptr), _encoding_type(DICT_ENCODING), - _pool("BinaryDictPageBuilder") { + _pool() { // initially use DICT_ENCODING // TODO: the data page builder type can be created by Factory according to user config _data_page_builder.reset(new BitshufflePageBuilder(options)); diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.cpp b/be/src/olap/rowset/segment_v2/zone_map_index.cpp index e2f4f9ff6a1d44..0fbe5a1f033590 100644 --- a/be/src/olap/rowset/segment_v2/zone_map_index.cpp +++ b/be/src/olap/rowset/segment_v2/zone_map_index.cpp @@ -30,7 +30,7 @@ namespace doris { namespace segment_v2 { -ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool("ZoneMapIndexWriter") { +ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool() { _page_zone_map.min_value = _field->allocate_zone_map_value(&_pool); _page_zone_map.max_value = _field->allocate_zone_map_value(&_pool); _reset_zone_map(&_page_zone_map); @@ -127,7 +127,7 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) { RETURN_IF_ERROR(reader.load(use_page_cache, kept_in_memory)); IndexedColumnIterator iter(&reader); - MemPool pool("ZoneMapIndexReader ColumnBlock"); + MemPool pool; _page_zone_maps.resize(reader.num_values()); // read and cache all page zone maps diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 83da1df3dba27f..5c4da665ea1f1d 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -900,6 +900,7 @@ Status TabletManager::build_all_report_tablets_info(std::map } Status TabletManager::start_trash_sweep() { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); { std::vector all_tablets; // we use this vector to save all tablet ptr for saving lock time. @@ -1021,6 +1022,7 @@ void TabletManager::unregister_clone_tablet(int64_t tablet_id) { void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id, SchemaHash schema_hash, const string& schema_hash_path) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); // acquire the read lock, so that there is no creating tablet or load tablet from meta tasks // create tablet and load tablet task should check whether the dir exists tablets_shard& shard = _get_tablets_shard(tablet_id); @@ -1132,6 +1134,7 @@ void TabletManager::get_partition_related_tablets(int64_t partition_id, } void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::vector related_tablets; { for (auto& tablets_shard : _tablets_shards) { diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index c22c055f472319..702fef36a5d9c8 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -386,9 +386,13 @@ void TabletColumn::to_schema_pb(ColumnPB* column) { uint32_t TabletColumn::mem_size() const { auto size = sizeof(TabletColumn); + size += _col_name.size(); if (_has_default_value) { size += _default_value.size(); } + if (_has_referenced_column) { + size += _referenced_column.size(); + } for (auto& sub_column : _sub_columns) { size += sub_column.mem_size(); } diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index f8ffec16a4385d..7686a632ae46ad 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -29,7 +29,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) : _alter_tablet_req(request) { _mem_tracker = MemTracker::create_tracker( config::memory_limitation_per_thread_for_schema_change_bytes, - fmt::format("EngineAlterTabletTask: {}-{}", + fmt::format("EngineAlterTabletTask:baseTabletId={}:newTabletId={}", std::to_string(_alter_tablet_req.base_tablet_id), std::to_string(_alter_tablet_req.new_tablet_id)), StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK); diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 19270e72fb0cc4..13052c57844899 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -54,7 +54,9 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vectorbatch_load_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp index 8464bedc5d4652..92d6848b55baaf 100644 --- a/be/src/olap/task/engine_checksum_task.cpp +++ b/be/src/olap/task/engine_checksum_task.cpp @@ -26,9 +26,9 @@ namespace doris { EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash, TVersion version, uint32_t* checksum) : _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) { - _mem_tracker = MemTracker::create_tracker(-1, "compute checksum: " + std::to_string(tablet_id), - StorageEngine::instance()->consistency_mem_tracker(), - MemTrackerLevel::TASK); + _mem_tracker = MemTracker::create_tracker( + -1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id), + StorageEngine::instance()->consistency_mem_tracker(), MemTrackerLevel::TASK); } Status EngineChecksumTask::execute() { diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 41e1fa1f9cf5b7..e49472f5f68bbb 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -58,7 +58,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo& _signature(signature), _master_info(master_info) { _mem_tracker = MemTracker::create_tracker( - -1, "clone tablet: " + std::to_string(_clone_req.tablet_id), + -1, "EngineCloneTask:tabletId=" + std::to_string(_clone_req.tablet_id), StorageEngine::instance()->clone_mem_tracker(), MemTrackerLevel::TASK); } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index ce5448a910ccb2..85f2c5da3f3526 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -18,6 +18,7 @@ #include "runtime/load_channel.h" #include "olap/lru_cache.h" +#include "runtime/exec_env.h" #include "runtime/mem_tracker.h" #include "runtime/tablets_channel.h" #include "runtime/thread_context.h" @@ -31,8 +32,11 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t tim _is_high_priority(is_high_priority), _sender_ip(sender_ip), _is_vec(is_vec) { - _mem_tracker = MemTracker::create_tracker(mem_limit, "LoadChannel:" + _load_id.to_string(), - nullptr, MemTrackerLevel::TASK); + _mem_tracker = MemTracker::create_tracker( + mem_limit, "LoadChannel:tabletId=" + _load_id.to_string(), + ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( + _load_id.to_string()), + MemTrackerLevel::TASK); // _last_updated_time should be set before being inserted to // _load_channels in load_channel_mgr, or it may be erased // immediately by gc thread. @@ -47,7 +51,7 @@ LoadChannel::~LoadChannel() { } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t index_id = params.index_id(); std::shared_ptr channel; { @@ -133,7 +137,7 @@ bool LoadChannel::is_finished() { } Status LoadChannel::cancel() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); for (auto& it : _tablets_channels) { it.second->cancel(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 1d0c3f04e1becc..644e546524d78c 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -128,7 +128,7 @@ class LoadChannel { template Status LoadChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t index_id = request.index_id(); // 1. get tablets channel std::shared_ptr channel; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 7b2ee5bb263fbc..2259eb24034ef5 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -102,7 +102,7 @@ LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64 } Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); std::shared_ptr channel; { @@ -179,7 +179,7 @@ void LoadChannelMgr::_handle_mem_exceed_limit() { } Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); std::shared_ptr cancelled_channel; { diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index c9e84019b13430..121a9ae2740883 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -58,6 +58,8 @@ class LoadChannelMgr { // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); + std::shared_ptr mem_tracker() { return _mem_tracker; } + private: static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, bool is_high_priority, @@ -116,7 +118,6 @@ Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, template Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(request.id()); // 1. get load channel std::shared_ptr channel; diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index adc86f0e67849b..ae70dc4df68c82 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -221,7 +221,9 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) { void MemPool::exchange_data(MemPool* other) { int64_t delta_size = other->total_reserved_bytes_ - total_reserved_bytes_; - other->_mem_tracker->transfer_to(_mem_tracker, delta_size); + if (other->_mem_tracker != _mem_tracker) { + other->_mem_tracker->transfer_to(_mem_tracker, delta_size); + } std::swap(current_chunk_idx_, other->current_chunk_idx_); std::swap(next_chunk_size_, other->next_chunk_size_); diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp index f42de2ef59c088..bc2ae3efc1ab13 100644 --- a/be/src/runtime/mem_tracker.cpp +++ b/be/src/runtime/mem_tracker.cpp @@ -60,18 +60,16 @@ MemTracker* MemTracker::get_raw_process_tracker() { return raw_process_tracker; } -// Track memory for all brpc server responses. -static std::shared_ptr brpc_server_tracker; -static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT; - -void MemTracker::create_brpc_server_tracker() { - brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", get_process_tracker(), - MemTrackerLevel::OVERVIEW); -} - -std::shared_ptr MemTracker::get_brpc_server_tracker() { - GoogleOnceInit(&brpc_server_tracker_once, &MemTracker::create_brpc_server_tracker); - return brpc_server_tracker; +static TrackersMap _temporary_mem_trackers; + +std::shared_ptr MemTracker::get_temporary_mem_tracker(const std::string& label) { + // First time this label registered, make a new object, otherwise do nothing. + // Avoid using locks to resolve erase conflicts. + _temporary_mem_trackers.try_emplace_l( + label, [](std::shared_ptr) {}, + MemTracker::create_tracker(-1, fmt::format("[Temporary]-{}", label), nullptr, + MemTrackerLevel::OVERVIEW)); + return _temporary_mem_trackers[label]; } void MemTracker::list_process_trackers(std::vector>* trackers) { @@ -102,14 +100,8 @@ std::shared_ptr MemTracker::create_tracker(int64_t byte_limit, const const std::shared_ptr& parent, MemTrackerLevel level, RuntimeProfile* profile) { - std::shared_ptr reset_parent = - parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); - DCHECK(reset_parent); - - std::shared_ptr tracker( - new MemTracker(byte_limit, label, reset_parent, - level > reset_parent->_level ? level : reset_parent->_level, profile)); - reset_parent->add_child_tracker(tracker); + std::shared_ptr tracker = + MemTracker::create_tracker_impl(byte_limit, label, parent, level, profile); tracker->init(); return tracker; } @@ -117,14 +109,30 @@ std::shared_ptr MemTracker::create_tracker(int64_t byte_limit, const std::shared_ptr MemTracker::create_virtual_tracker( int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, MemTrackerLevel level) { + std::shared_ptr tracker = MemTracker::create_tracker_impl( + byte_limit, "[Virtual]-" + label, parent, level, nullptr); + tracker->init_virtual(); + return tracker; +} + +std::shared_ptr MemTracker::create_tracker_impl( + int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, + MemTrackerLevel level, RuntimeProfile* profile) { std::shared_ptr reset_parent = parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker(); DCHECK(reset_parent); + std::string reset_label; + MemTracker* task_parent_tracker = reset_parent->parent_task_mem_tracker(); + if (task_parent_tracker) { + reset_label = fmt::format("{}:{}", label, split(task_parent_tracker->label(), ":")[1]); + } else { + reset_label = label; + } std::shared_ptr tracker( - new MemTracker(byte_limit, "[Virtual]-" + label, reset_parent, level, nullptr)); + new MemTracker(byte_limit, reset_label, reset_parent, + level > reset_parent->_level ? level : reset_parent->_level, profile)); reset_parent->add_child_tracker(tracker); - tracker->init_virtual(); return tracker; } @@ -318,7 +326,8 @@ bool MemTracker::gc_memory(int64_t max_consumption) { if (pre_gc_consumption < max_consumption) return false; int64_t curr_consumption = pre_gc_consumption; - const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; // TODO(zxy) Consider as config + // Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later. + const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; // Try to free up some memory for (int i = 0; i < _gc_functions.size(); ++i) { // Try to free up the amount we are over plus some extra so that we don't have to diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 74a7b4bef6bb12..3d4eb740d23ab8 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -20,6 +20,8 @@ #pragma once +#include + #include #include #include @@ -40,6 +42,11 @@ enum class MemTrackerLevel { OVERVIEW = 0, TASK, INSTANCE, VERBOSE }; class MemTracker; class RuntimeState; +using TrackersMap = phmap::parallel_flat_hash_map< + std::string, std::shared_ptr, phmap::priv::hash_default_hash, + phmap::priv::hash_default_eq, + std::allocator>>, 12, std::mutex>; + /// A MemTracker tracks memory consumption; it contains an optional limit /// and can be arranged into a tree structure such that the consumption tracked /// by a MemTracker is also tracked by its ancestors. @@ -80,6 +87,9 @@ class MemTracker { // Cosume/release will not sync to parent.Usually used to manually record the specified memory, // It is independent of the recording of TCMalloc Hook in the thread local tracker, so the same // block of memory is recorded independently in these two trackers. + // TODO(zxy) At present, the purpose of most virtual trackers is only to preserve the logic of + // manually recording memory before, which may be used later. After each virtual tracker is + // required case by case, discuss its necessity. static std::shared_ptr create_virtual_tracker( int64_t byte_limit = -1, const std::string& label = std::string(), const std::shared_ptr& parent = std::shared_ptr(), @@ -97,8 +107,9 @@ class MemTracker { // Gets a shared_ptr to the "process" tracker, creating it if necessary. static std::shared_ptr get_process_tracker(); static MemTracker* get_raw_process_tracker(); - // Gets a shared_ptr to the "brpc server" tracker, creating it if necessary. - static std::shared_ptr get_brpc_server_tracker(); + // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get. + // Temporary trackers are not automatically destructed, which is usually used for debugging. + static std::shared_ptr get_temporary_mem_tracker(const std::string& label); Status check_sys_mem_info(int64_t bytes) { if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) { @@ -423,6 +434,10 @@ class MemTracker { static const std::string COUNTER_NAME; private: + static std::shared_ptr create_tracker_impl( + int64_t byte_limit, const std::string& label, const std::shared_ptr& parent, + MemTrackerLevel level, RuntimeProfile* profile); + /// 'byte_limit' < 0 means no limit /// 'label' is the label used in the usage string (log_usage()) MemTracker(int64_t byte_limit, const std::string& label, @@ -466,8 +481,6 @@ class MemTracker { // Creates the process tracker. static void create_process_tracker(); - // Creates the brpc server tracker. - static void create_brpc_server_tracker(); // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. int64_t _limit; @@ -486,8 +499,6 @@ class MemTracker { // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate // to avoid frequent calls to consume/release of MemTracker. - // TODO(zxy) It may be more performant to use thread_local static, which is inherently thread-safe. - // Test after introducing TCMalloc hook std::atomic _untracked_mem = 0; std::vector _all_trackers; // this tracker plus all of its ancestors diff --git a/be/src/runtime/mem_tracker_task_pool.cpp b/be/src/runtime/mem_tracker_task_pool.cpp index 2d43b927e80329..132b47f57356ab 100644 --- a/be/src/runtime/mem_tracker_task_pool.cpp +++ b/be/src/runtime/mem_tracker_task_pool.cpp @@ -33,15 +33,15 @@ std::shared_ptr MemTrackerTaskPool::register_task_mem_tracker_impl( _task_mem_trackers.try_emplace_l( task_id, [](std::shared_ptr) {}, MemTracker::create_tracker(mem_limit, label, parent, MemTrackerLevel::TASK)); - std::shared_ptr tracker = get_task_mem_tracker(task_id); - return tracker; + return get_task_mem_tracker(task_id); } std::shared_ptr MemTrackerTaskPool::register_query_mem_tracker( const std::string& query_id, int64_t mem_limit) { VLOG_FILE << "Register Query memory tracker, query id: " << query_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); - return register_task_mem_tracker_impl(query_id, mem_limit, fmt::format("queryId={}", query_id), + return register_task_mem_tracker_impl(query_id, mem_limit, + fmt::format("Query:queryId={}", query_id), ExecEnv::GetInstance()->query_pool_mem_tracker()); } @@ -49,7 +49,8 @@ std::shared_ptr MemTrackerTaskPool::register_load_mem_tracker( const std::string& load_id, int64_t mem_limit) { VLOG_FILE << "Register Load memory tracker, load id: " << load_id << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES); - return register_task_mem_tracker_impl(load_id, mem_limit, fmt::format("loadId={}", load_id), + return register_task_mem_tracker_impl(load_id, mem_limit, + fmt::format("Load:loadId={}", load_id), ExecEnv::GetInstance()->load_pool_mem_tracker()); } diff --git a/be/src/runtime/mem_tracker_task_pool.h b/be/src/runtime/mem_tracker_task_pool.h index 20b0eaf7be1fda..e3baec8d518c51 100644 --- a/be/src/runtime/mem_tracker_task_pool.h +++ b/be/src/runtime/mem_tracker_task_pool.h @@ -17,8 +17,6 @@ #pragma once -#include - #include "runtime/mem_tracker.h" namespace doris { @@ -50,13 +48,7 @@ class MemTrackerTaskPool { // All per-task MemTracker objects. // The life cycle of task memtracker in the process is the same as task runtime state, // MemTrackers will be removed from this map after query finish or cancel. - using TaskTrackersMap = phmap::parallel_flat_hash_map< - std::string, std::shared_ptr, phmap::priv::hash_default_hash, - phmap::priv::hash_default_eq, - std::allocator>>, 12, - std::mutex>; - - TaskTrackersMap _task_mem_trackers; + TrackersMap _task_mem_trackers; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp index 7f8259c03496d8..269dc12fcfd870 100644 --- a/be/src/runtime/memory/chunk_allocator.cpp +++ b/be/src/runtime/memory/chunk_allocator.cpp @@ -145,6 +145,7 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, } else { _mem_tracker->transfer_to(reset_tracker, size); } + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); // fast path: allocate from current core arena int core_id = CpuInfo::get_current_core(); @@ -177,9 +178,6 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, SCOPED_RAW_TIMER(&cost_ns); // allocate from system allocator chunk->data = SystemAllocator::allocate(size); - // The allocated chunk is consumed in the tls mem tracker, we want to consume in the ChunkAllocator tracker, - // transfer memory ownership. TODO(zxy) replace with switch tls tracker - tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size); } chunk_pool_system_alloc_count->increment(1); chunk_pool_system_alloc_cost_ns->increment(cost_ns); @@ -193,6 +191,14 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, } void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { + // The chunk's memory ownership is transferred from tls tracker to ChunkAllocator. + if (tracker) { + tracker->transfer_to(_mem_tracker.get(), chunk.size); + } else { + tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), + chunk.size); + } + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); if (chunk.core_id == -1) { return; } @@ -205,12 +211,6 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { { SCOPED_RAW_TIMER(&cost_ns); SystemAllocator::free(chunk.data, chunk.size); - // The freed chunk is released in the tls mem tracker. When the chunk was allocated, - // it was consumed in the parameter tracker, so if the tls mem tracker and the parameter - // tracker are different, transfer memory ownership. - if (tracker) - tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), - chunk.size); } chunk_pool_system_free_count->increment(1); chunk_pool_system_free_cost_ns->increment(cost_ns); @@ -219,13 +219,6 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) { } } while (!_reserved_bytes.compare_exchange_weak(old_reserved_bytes, new_reserved_bytes)); - // The chunk's memory ownership is transferred from MemPool to ChunkAllocator. - if (tracker) { - tracker->transfer_to(_mem_tracker.get(), chunk.size); - } else { - tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), - chunk.size); - } _arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size); } diff --git a/be/src/runtime/result_file_sink.cpp b/be/src/runtime/result_file_sink.cpp index ee7fa490db884b..1f29c3057422b1 100644 --- a/be/src/runtime/result_file_sink.cpp +++ b/be/src/runtime/result_file_sink.cpp @@ -114,10 +114,6 @@ Status ResultFileSink::prepare(RuntimeState* state) { _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES); _uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); - // TODO(zxy) used after - _mem_tracker = MemTracker::create_tracker( - -1, "ResultFileSink:" + print_id(state->fragment_instance_id()), - state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _profile); // create writer _output_batch = new RowBatch(_output_row_descriptor, 1024); _writer.reset(new (std::nothrow) FileResultWriter( diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 07dbca425e046f..2ede9538358170 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -53,7 +53,7 @@ TabletsChannel::~TabletsChannel() { } Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kOpened) { // Normal case, already open by other sender @@ -138,7 +138,6 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, } Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kFinished) { // TabletsChannel is closed without LoadChannel's lock, @@ -239,7 +238,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request } Status TabletsChannel::cancel() { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); std::lock_guard l(_lock); if (_state == kFinished) { return _close_status; diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index eb39956cba68a1..5934c401083252 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -168,7 +168,7 @@ Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request& request template Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); int64_t cur_seq = 0; auto status = _get_current_seq(cur_seq, request); diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h index 548b8862990b8f..600668aea90671 100644 --- a/be/src/runtime/tcmalloc_hook.h +++ b/be/src/runtime/tcmalloc_hook.h @@ -48,7 +48,8 @@ void init_hook() { MallocHook::AddDeleteHook(&delete_hook); } -void destroy_hook() { - MallocHook::RemoveNewHook(&new_hook); - MallocHook::RemoveDeleteHook(&delete_hook); -} +// For later debug. +// static void destroy_hook() { +// MallocHook::RemoveNewHook(&new_hook); +// MallocHook::RemoveDeleteHook(&delete_hook); +// } diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index d1f206dedc8740..fade4fd51e793e 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -92,7 +92,8 @@ SwitchThreadMemTracker::SwitchThreadMemTracker( DCHECK(mem_tracker); // The thread tracker must be switched after the attach task, otherwise switching // in the main thread will cause the cached tracker not be cleaned up in time. - DCHECK(in_task == false || tls_ctx()->_thread_mem_tracker_mgr->is_attach_task()); + DCHECK(in_task == false || tls_ctx()->type() != ThreadContext::TaskType::UNKNOWN) + << ",tls ctx type=" << tls_ctx()->type(); if (Existed) { _old_tracker_id = tls_ctx()->_thread_mem_tracker_mgr->update_tracker(mem_tracker); } else { @@ -128,7 +129,9 @@ SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack( SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() { tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb); +#ifndef NDEBUG DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1); +#endif } SwitchBthread::SwitchBthread() { @@ -137,17 +140,21 @@ SwitchBthread::SwitchBthread() { if (tls == nullptr) { // Create thread-local data on demand. tls = new ThreadContext; - tls->_thread_mem_tracker_mgr->init_bthread(); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, tls)); } else { - tls->_thread_mem_tracker_mgr->init_bthread(); + DCHECK(tls->type() == ThreadContext::TaskType::UNKNOWN); + tls->_thread_mem_tracker_mgr->clear_untracked_mems(); } + tls->_thread_mem_tracker_mgr->init(); + tls->set_type(ThreadContext::TaskType::BRPC); } SwitchBthread::~SwitchBthread() { DCHECK(tls != nullptr); tls->_thread_mem_tracker_mgr->clear_untracked_mems(); + tls->_thread_mem_tracker_mgr->init(); + tls->set_type(ThreadContext::TaskType::UNKNOWN); #ifndef NDEBUG DorisMetrics::instance()->switch_bthread_count->increment(1); #endif diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 8ab72be634df0d..8dc8f5267e757b 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -45,12 +45,22 @@ // The query thread will automatically clear_untracked_mems when detach_task. #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, false) +// `detach task/~switch bthread` will clear cached trackers and unconsumed tracks. +// Used after `attach task/switch bthread` to avoid cached trackers not being destroyed in time. #define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, true); #define SCOPED_SWITCH_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, false) #define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker(mem_tracker, true) +// Count the memory in the scope to a temporary tracker with the specified label name. +// This is very useful when debugging. You can find the position where the tracker statistics are +// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots. +// TODO(zxy) track specifies the memory for each line in the code segment, instead of manually adding +// a switch temporary tracker to each line. Maybe there are open source tools to do this? +#define SCOPED_SWITCH_TEMPORARY_THREAD_LOCAL_MEM_TRACKER(label) \ + auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker( \ + MemTracker::get_temporary_mem_tracker(label), false) // After the non-query thread switches the mem tracker, if the thread will not switch the mem // tracker again in the short term, can consider manually clear_untracked_mems. // The query thread will automatically clear_untracked_mems when detach_task. @@ -95,7 +105,8 @@ class ThreadContext { QUERY = 1, LOAD = 2, COMPACTION = 3, - STORAGE = 4 + STORAGE = 4, + BRPC = 5 // to be added ... }; inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", @@ -112,9 +123,10 @@ class ThreadContext { void attach(const TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { - DCHECK(_type == TaskType::UNKNOWN && _task_id == "") - << ",old tracker label: " << mem_tracker->label() - << ",new tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label(); + DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "") + << ",new tracker label: " << mem_tracker->label() + << ",old tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label(); + DCHECK(type != TaskType::UNKNOWN); _type = type; _task_id = task_id; _fragment_instance_id = fragment_instance_id; @@ -129,6 +141,8 @@ class ThreadContext { _thread_mem_tracker_mgr->detach_task(); } + const TaskType& type() const { return _type; } + const void set_type(const TaskType& type) { _type = type; } const std::string& task_id() const { return _task_id; } const std::string& thread_id_str() const { return _thread_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp index e55a4620f00e31..e9768bf86bd697 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/thread_mem_tracker_mgr.cpp @@ -28,6 +28,8 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { DCHECK(switch_count == 0) << print_debug_string(); + clear_untracked_mems(); + init(); _task_id = task_id; _fragment_instance_id = fragment_instance_id; _consume_err_cb.cancel_msg = cancel_msg; @@ -37,10 +39,10 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: return; } #endif - _temp_task_mem_tracker = + std::shared_ptr tracker = ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker( task_id); - update_tracker(_temp_task_mem_tracker); + update_tracker(tracker); } else { update_tracker(mem_tracker); } @@ -48,9 +50,7 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std:: void ThreadMemTrackerMgr::detach_task() { DCHECK(switch_count == 0) << print_debug_string(); - _task_id = ""; _fragment_instance_id = TUniqueId(); - _consume_err_cb.init(); clear_untracked_mems(); init(); } diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h index 404837a73a0aa8..84033ff4e7ab94 100644 --- a/be/src/runtime/thread_mem_tracker_mgr.h +++ b/be/src/runtime/thread_mem_tracker_mgr.h @@ -70,8 +70,6 @@ class ThreadMemTrackerMgr { // to avoid memory tracking loss. void init(); - void init_bthread(); - void clear_untracked_mems(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker @@ -160,14 +158,16 @@ class ThreadMemTrackerMgr { // NOTE: flat_hash_map, int replaces string as key, all to improve the speed of map find, // the expected speed is increased by more than 10 times. phmap::flat_hash_map> _mem_trackers; - int64_t _tracker_id; phmap::flat_hash_map _untracked_mems; + // After the tracker is added to _mem_trackers, if tracker = null is found when using it, + // we can confirm the tracker label that was added through _mem_tracker_labels. + // Because for performance, all map keys are tracker id. phmap::flat_hash_map _mem_tracker_labels; - // Avoid memory allocation in functions and fall into an infinite loop + int64_t _tracker_id; + // Avoid memory allocation in functions. int64_t _temp_tracker_id; ConsumeErrCallBackInfo _temp_consume_err_cb; - std::shared_ptr _temp_task_mem_tracker; std::string _task_id; TUniqueId _fragment_instance_id; @@ -175,6 +175,8 @@ class ThreadMemTrackerMgr { }; inline void ThreadMemTrackerMgr::init() { + _task_id = ""; + _consume_err_cb.init(); _tracker_id = 0; _mem_trackers.clear(); _mem_trackers[0] = MemTracker::get_process_tracker(); @@ -184,14 +186,6 @@ inline void ThreadMemTrackerMgr::init() { _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label(); } -inline void ThreadMemTrackerMgr::init_bthread() { - init(); - _mem_trackers[1] = MemTracker::get_brpc_server_tracker(); - _untracked_mems[1] = 0; - _mem_tracker_labels[1] = MemTracker::get_brpc_server_tracker()->label(); - _tracker_id = 1; -} - inline void ThreadMemTrackerMgr::clear_untracked_mems() { for (const auto& untracked_mem : _untracked_mems) { if (untracked_mem.second != 0) { @@ -279,7 +273,7 @@ inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr& _mem_trackers[mem_tracker->id()] = mem_tracker; DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string(); _untracked_mems[mem_tracker->id()] = 0; - _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label(); + _mem_tracker_labels[mem_tracker->id()] = mem_tracker->label(); } inline std::shared_ptr ThreadMemTrackerMgr::mem_tracker() { diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 1bcdfdb9d34983..259276809cfbbd 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -484,6 +484,7 @@ int main(int argc, char** argv) { doris::MemInfo::refresh_current_mem(); #endif // TODO(zxy) 10s is too long to clear the expired task mem tracker. + // A query mem tracker is about 57 bytes, assuming 10000 qps, which wastes about 55M of memory. // It should be actively triggered at the end of query/load. doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); sleep(10); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index d9e575621034d2..bfdd1f31ba3eb3 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -132,6 +132,8 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, + _exec_env->load_channel_mgr()->mem_tracker()); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); @@ -152,7 +154,6 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD(); VLOG_RPC << "tablet writer add batch, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); @@ -166,6 +167,8 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); + SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, + _exec_env->load_channel_mgr()->mem_tracker()); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_row_batch(request, cntl); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);