Skip to content

Commit

Permalink
[feature-wip] (memory tracker) (step6, End) Fix some details (apache#…
Browse files Browse the repository at this point in the history
…9301)

1. Fix LoadTask, ChunkAllocator, TabletMeta, Brpc, the accuracy of memory track.
2. Modified some MemTracker names, deleted some unnecessary trackers, and improved readability.
3. More powerful MemTracker debugging capabilities.
4. Avoid creating TabletColumn temporary objects and improve BE startup time by 8%.
5. Fix some other details.
  • Loading branch information
xinyiZzz authored and jianghaochen committed May 13, 2022
1 parent 486d069 commit 3b3e65c
Show file tree
Hide file tree
Showing 35 changed files with 171 additions and 124 deletions.
12 changes: 8 additions & 4 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
_is_open(false),
_aggregation(aggregation),
_need_agg_finalize(need_agg_finalize),
_version(-1),
_mem_tracker(MemTracker::create_tracker(
tracker->limit(), tracker->label() + ":OlapScanner:" + tls_ctx()->thread_id_str(),
tracker)) {}
_version(-1) {
#ifndef NDEBUG
_mem_tracker = MemTracker::create_tracker(tracker->limit(),
"OlapScanner:" + tls_ctx()->thread_id_str(), tracker);
#else
_mem_tracker = tracker;
#endif
}

Status OlapScanner::prepare(
const TPaloScanRange& scan_range, const std::vector<OlapScanRange*>& key_ranges,
Expand Down
5 changes: 3 additions & 2 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int
if (_parent->_transfer_data_by_brpc_attachment) {
_tuple_data_buffer_ptr = &_tuple_data_buffer;
}
_node_channel_tracker =
MemTracker::create_tracker(-1, "NodeChannel" + tls_ctx()->thread_id_str());
_node_channel_tracker = MemTracker::create_tracker(
-1, fmt::format("NodeChannel:indexID={}:threadId={}",
std::to_string(_index_channel->_index_id), tls_ctx()->thread_id_str()));
}

NodeChannel::~NodeChannel() noexcept {
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ class IndexChannel {
public:
IndexChannel(OlapTableSink* parent, int64_t index_id, bool is_vec)
: _parent(parent), _index_id(index_id), _is_vectorized(is_vec) {
_index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel");
_index_channel_tracker =
MemTracker::create_tracker(-1, "IndexChannel:indexID=" + std::to_string(_index_id));
}
~IndexChannel() = default;

Expand Down
2 changes: 1 addition & 1 deletion be/src/http/action/compaction_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CompactionAction : public HttpHandler {
CompactionAction(CompactionActionType type) : _type(type) {
_compaction_mem_tracker =
type == RUN_COMPACTION ? MemTracker::create_tracker(-1, "ManualCompaction", nullptr,
MemTrackerLevel::TASK)
MemTrackerLevel::VERBOSE)
: nullptr;
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ TabletReader::~TabletReader() {
}

Status TabletReader::init(const ReaderParams& read_params) {
#ifndef NDEBUG
_predicate_mem_pool.reset(new MemPool("TabletReader:" + read_params.tablet->full_name()));
#else
_predicate_mem_pool.reset(new MemPool());
#endif

Status res = _init_params(read_params);
if (!res.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/row_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ using std::vector;
namespace doris {

RowBlock::RowBlock(const TabletSchema* schema) : _capacity(0), _schema(schema) {
_mem_pool.reset(new MemPool("RowBlock"));
_mem_pool.reset(new MemPool());
}

RowBlock::~RowBlock() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/row_block2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ RowBlockV2::RowBlockV2(const Schema& schema, uint16_t capacity)
: _schema(schema),
_capacity(capacity),
_column_vector_batches(_schema.num_columns()),
_pool(new MemPool("RowBlockV2")),
_pool(new MemPool()),
_selection_vector(nullptr) {
for (auto cid : _schema.column_ids()) {
Status status = ColumnVectorBatch::create(
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_dict_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ BinaryDictPageBuilder::BinaryDictPageBuilder(const PageBuilderOptions& options)
_data_page_builder(nullptr),
_dict_builder(nullptr),
_encoding_type(DICT_ENCODING),
_pool("BinaryDictPageBuilder") {
_pool() {
// initially use DICT_ENCODING
// TODO: the data page builder type can be created by Factory according to user config
_data_page_builder.reset(new BitshufflePageBuilder<OLAP_FIELD_TYPE_INT>(options));
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/zone_map_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace doris {

namespace segment_v2 {

ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool("ZoneMapIndexWriter") {
ZoneMapIndexWriter::ZoneMapIndexWriter(Field* field) : _field(field), _pool() {
_page_zone_map.min_value = _field->allocate_zone_map_value(&_pool);
_page_zone_map.max_value = _field->allocate_zone_map_value(&_pool);
_reset_zone_map(&_page_zone_map);
Expand Down Expand Up @@ -127,7 +127,7 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) {
RETURN_IF_ERROR(reader.load(use_page_cache, kept_in_memory));
IndexedColumnIterator iter(&reader);

MemPool pool("ZoneMapIndexReader ColumnBlock");
MemPool pool;
_page_zone_maps.resize(reader.num_values());

// read and cache all page zone maps
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ Status TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
}

Status TabletManager::start_trash_sweep() {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
{
std::vector<TabletSharedPtr>
all_tablets; // we use this vector to save all tablet ptr for saving lock time.
Expand Down Expand Up @@ -1021,6 +1022,7 @@ void TabletManager::unregister_clone_tablet(int64_t tablet_id) {
void TabletManager::try_delete_unused_tablet_path(DataDir* data_dir, TTabletId tablet_id,
SchemaHash schema_hash,
const string& schema_hash_path) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
// acquire the read lock, so that there is no creating tablet or load tablet from meta tasks
// create tablet and load tablet task should check whether the dir exists
tablets_shard& shard = _get_tablets_shard(tablet_id);
Expand Down Expand Up @@ -1132,6 +1134,7 @@ void TabletManager::get_partition_related_tablets(int64_t partition_id,
}

void TabletManager::do_tablet_meta_checkpoint(DataDir* data_dir) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::vector<TabletSharedPtr> related_tablets;
{
for (auto& tablets_shard : _tablets_shards) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,13 @@ void TabletColumn::to_schema_pb(ColumnPB* column) {

uint32_t TabletColumn::mem_size() const {
auto size = sizeof(TabletColumn);
size += _col_name.size();
if (_has_default_value) {
size += _default_value.size();
}
if (_has_referenced_column) {
size += _referenced_column.size();
}
for (auto& sub_column : _sub_columns) {
size += sub_column.mem_size();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_alter_tablet_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
: _alter_tablet_req(request) {
_mem_tracker = MemTracker::create_tracker(
config::memory_limitation_per_thread_for_schema_change_bytes,
fmt::format("EngineAlterTabletTask: {}-{}",
fmt::format("EngineAlterTabletTask:baseTabletId={}:newTabletId={}",
std::to_string(_alter_tablet_req.base_tablet_id),
std::to_string(_alter_tablet_req.new_tablet_id)),
StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK);
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/task/engine_batch_load_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet
_res_status(res_status) {
_download_status = Status::OK();
_mem_tracker = MemTracker::create_tracker(
-1, fmt::format("{}: {}", _push_req.push_type, std::to_string(_push_req.tablet_id)),
-1,
fmt::format("EngineBatchLoadTask:pushType={}:tabletId={}", _push_req.push_type,
std::to_string(_push_req.tablet_id)),
StorageEngine::instance()->batch_load_mem_tracker(), MemTrackerLevel::TASK);
}

Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/task/engine_checksum_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ namespace doris {
EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_hash,
TVersion version, uint32_t* checksum)
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version), _checksum(checksum) {
_mem_tracker = MemTracker::create_tracker(-1, "compute checksum: " + std::to_string(tablet_id),
StorageEngine::instance()->consistency_mem_tracker(),
MemTrackerLevel::TASK);
_mem_tracker = MemTracker::create_tracker(
-1, "EngineChecksumTask:tabletId=" + std::to_string(tablet_id),
StorageEngine::instance()->consistency_mem_tracker(), MemTrackerLevel::TASK);
}

Status EngineChecksumTask::execute() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
_signature(signature),
_master_info(master_info) {
_mem_tracker = MemTracker::create_tracker(
-1, "clone tablet: " + std::to_string(_clone_req.tablet_id),
-1, "EngineCloneTask:tabletId=" + std::to_string(_clone_req.tablet_id),
StorageEngine::instance()->clone_mem_tracker(), MemTrackerLevel::TASK);
}

Expand Down
12 changes: 8 additions & 4 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "runtime/load_channel.h"

#include "olap/lru_cache.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
#include "runtime/tablets_channel.h"
#include "runtime/thread_context.h"
Expand All @@ -31,8 +32,11 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t tim
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
_is_vec(is_vec) {
_mem_tracker = MemTracker::create_tracker(mem_limit, "LoadChannel:" + _load_id.to_string(),
nullptr, MemTrackerLevel::TASK);
_mem_tracker = MemTracker::create_tracker(
mem_limit, "LoadChannel:tabletId=" + _load_id.to_string(),
ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(
_load_id.to_string()),
MemTrackerLevel::TASK);
// _last_updated_time should be set before being inserted to
// _load_channels in load_channel_mgr, or it may be erased
// immediately by gc thread.
Expand All @@ -47,7 +51,7 @@ LoadChannel::~LoadChannel() {
}

Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t index_id = params.index_id();
std::shared_ptr<TabletsChannel> channel;
{
Expand Down Expand Up @@ -133,7 +137,7 @@ bool LoadChannel::is_finished() {
}

Status LoadChannel::cancel() {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
for (auto& it : _tablets_channels) {
it.second->cancel();
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class LoadChannel {
template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
TabletWriterAddResult* response) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t index_id = request.index_id();
// 1. get tablets channel
std::shared_ptr<TabletsChannel> channel;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64
}

Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> channel;
{
Expand Down Expand Up @@ -179,7 +179,7 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
}

Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> cancelled_channel;
{
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/load_channel_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class LoadChannelMgr {
// cancel all tablet stream for 'load_id' load
Status cancel(const PTabletWriterCancelRequest& request);

std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }

private:
static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit,
int64_t timeout_s, bool is_high_priority,
Expand Down Expand Up @@ -116,7 +118,6 @@ Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel,
template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request,
TabletWriterAddResult* response) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(request.id());
// 1. get load channel
std::shared_ptr<LoadChannel> channel;
Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) {

void MemPool::exchange_data(MemPool* other) {
int64_t delta_size = other->total_reserved_bytes_ - total_reserved_bytes_;
other->_mem_tracker->transfer_to(_mem_tracker, delta_size);
if (other->_mem_tracker != _mem_tracker) {
other->_mem_tracker->transfer_to(_mem_tracker, delta_size);
}

std::swap(current_chunk_idx_, other->current_chunk_idx_);
std::swap(next_chunk_size_, other->next_chunk_size_);
Expand Down
55 changes: 32 additions & 23 deletions be/src/runtime/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,16 @@ MemTracker* MemTracker::get_raw_process_tracker() {
return raw_process_tracker;
}

// Track memory for all brpc server responses.
static std::shared_ptr<MemTracker> brpc_server_tracker;
static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT;

void MemTracker::create_brpc_server_tracker() {
brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", get_process_tracker(),
MemTrackerLevel::OVERVIEW);
}

std::shared_ptr<MemTracker> MemTracker::get_brpc_server_tracker() {
GoogleOnceInit(&brpc_server_tracker_once, &MemTracker::create_brpc_server_tracker);
return brpc_server_tracker;
static TrackersMap _temporary_mem_trackers;

std::shared_ptr<MemTracker> MemTracker::get_temporary_mem_tracker(const std::string& label) {
// First time this label registered, make a new object, otherwise do nothing.
// Avoid using locks to resolve erase conflicts.
_temporary_mem_trackers.try_emplace_l(
label, [](std::shared_ptr<MemTracker>) {},
MemTracker::create_tracker(-1, fmt::format("[Temporary]-{}", label), nullptr,
MemTrackerLevel::OVERVIEW));
return _temporary_mem_trackers[label];
}

void MemTracker::list_process_trackers(std::vector<std::shared_ptr<MemTracker>>* trackers) {
Expand Down Expand Up @@ -102,29 +100,39 @@ std::shared_ptr<MemTracker> MemTracker::create_tracker(int64_t byte_limit, const
const std::shared_ptr<MemTracker>& parent,
MemTrackerLevel level,
RuntimeProfile* profile) {
std::shared_ptr<MemTracker> reset_parent =
parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
DCHECK(reset_parent);

std::shared_ptr<MemTracker> tracker(
new MemTracker(byte_limit, label, reset_parent,
level > reset_parent->_level ? level : reset_parent->_level, profile));
reset_parent->add_child_tracker(tracker);
std::shared_ptr<MemTracker> tracker =
MemTracker::create_tracker_impl(byte_limit, label, parent, level, profile);
tracker->init();
return tracker;
}

std::shared_ptr<MemTracker> MemTracker::create_virtual_tracker(
int64_t byte_limit, const std::string& label, const std::shared_ptr<MemTracker>& parent,
MemTrackerLevel level) {
std::shared_ptr<MemTracker> tracker = MemTracker::create_tracker_impl(
byte_limit, "[Virtual]-" + label, parent, level, nullptr);
tracker->init_virtual();
return tracker;
}

std::shared_ptr<MemTracker> MemTracker::create_tracker_impl(
int64_t byte_limit, const std::string& label, const std::shared_ptr<MemTracker>& parent,
MemTrackerLevel level, RuntimeProfile* profile) {
std::shared_ptr<MemTracker> reset_parent =
parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
DCHECK(reset_parent);
std::string reset_label;
MemTracker* task_parent_tracker = reset_parent->parent_task_mem_tracker();
if (task_parent_tracker) {
reset_label = fmt::format("{}:{}", label, split(task_parent_tracker->label(), ":")[1]);
} else {
reset_label = label;
}

std::shared_ptr<MemTracker> tracker(
new MemTracker(byte_limit, "[Virtual]-" + label, reset_parent, level, nullptr));
new MemTracker(byte_limit, reset_label, reset_parent,
level > reset_parent->_level ? level : reset_parent->_level, profile));
reset_parent->add_child_tracker(tracker);
tracker->init_virtual();
return tracker;
}

Expand Down Expand Up @@ -318,7 +326,8 @@ bool MemTracker::gc_memory(int64_t max_consumption) {
if (pre_gc_consumption < max_consumption) return false;

int64_t curr_consumption = pre_gc_consumption;
const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L; // TODO(zxy) Consider as config
// Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later.
const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L;
// Try to free up some memory
for (int i = 0; i < _gc_functions.size(); ++i) {
// Try to free up the amount we are over plus some extra so that we don't have to
Expand Down
Loading

0 comments on commit 3b3e65c

Please sign in to comment.