Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,12 @@ CONF_Int32(aws_log_level, "3");
// the buffer size when read data from remote storage like s3
CONF_mInt32(remote_storage_read_buffer_mb, "256");

// Default level of MemTracker to show in web page
// now MemTracker support two level:
// RELEASE: 0
// DEBUG: 1
// the level equal or lower than mem_tracker_level will show in web page
CONF_Int16(mem_tracker_level, "0");
} // namespace config

} // namespace doris
Expand Down
14 changes: 7 additions & 7 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
_aggregation(aggregation),
_need_agg_finalize(need_agg_finalize),
_tuple_idx(parent->_tuple_idx),
_direct_conjunct_size(parent->_direct_conjunct_size) {
_direct_conjunct_size(parent->_direct_conjunct_size),
_mem_tracker(MemTracker::CreateTracker(runtime_state->fragment_mem_tracker()->limit(),
"OlapScanner",
runtime_state->fragment_mem_tracker(),
true, true, MemTrackerLevel::DEBUG)) {
_reader.reset(new Reader());
DCHECK(_reader.get() != NULL);

Expand Down Expand Up @@ -90,7 +94,7 @@ Status OlapScanner::prepare(const TPaloScanRange& scan_range,
// the rowsets maybe compacted when the last olap scanner starts
Version rd_version(0, _version);
OLAPStatus acquire_reader_st =
_tablet->capture_rs_readers(rd_version, &_params.rs_readers);
_tablet->capture_rs_readers(rd_version, &_params.rs_readers, _mem_tracker);
if (acquire_reader_st != OLAP_SUCCESS) {
LOG(WARNING) << "fail to init reader.res=" << acquire_reader_st;
std::stringstream ss;
Expand Down Expand Up @@ -248,11 +252,7 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
bzero(tuple_buf, state->batch_size() * _tuple_desc->byte_size());
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buf);

auto tracker = MemTracker::CreateTracker(state->fragment_mem_tracker()->limit(),
"OlapScanner:" + print_id(state->query_id()),
state->fragment_mem_tracker());
std::unique_ptr<MemPool> mem_pool(new MemPool(tracker.get()));

std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get()));
int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
{
SCOPED_TIMER(_parent->_scan_timer);
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class OlapScanner {
bool _is_closed = false;

MonotonicStopWatch _watcher;

std::shared_ptr<MemTracker> _mem_tracker;
};

} // namespace doris
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
// profile must add to state's object pool
_profile = state->obj_pool()->add(new RuntimeProfile("OlapTableSink"));
_mem_tracker = MemTracker::CreateTracker(-1, "OlapTableSink:" + std::to_string(state->load_job_id()),
state->instance_mem_tracker());
state->instance_mem_tracker(), true, false);

SCOPED_TIMER(_profile->total_time_counter());

Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ namespace doris {

Compaction::Compaction(TabletSharedPtr tablet, const std::string& label,
const std::shared_ptr<MemTracker>& parent_tracker)
: _mem_tracker(MemTracker::CreateTracker(-1, label, parent_tracker)),
_readers_tracker(MemTracker::CreateTracker(-1, "CompactionReaderTracker:" + std::to_string(tablet->tablet_id()), _mem_tracker)),
_writer_tracker(MemTracker::CreateTracker(-1, "CompationWriterTracker:" + std::to_string(tablet->tablet_id()), _mem_tracker)),
: _mem_tracker(MemTracker::CreateTracker(-1, label, parent_tracker, true, false)),
_readers_tracker(MemTracker::CreateTracker(-1, "CompactionReaderTracker:" + std::to_string(tablet->tablet_id()), _mem_tracker,
true, false)),
_writer_tracker(MemTracker::CreateTracker(-1, "CompationWriterTracker:" + std::to_string(tablet->tablet_id()), _mem_tracker,
true, false)),
_tablet(tablet),
_input_rowsets_size(0),
_input_row_num(0),
Expand Down Expand Up @@ -171,7 +173,7 @@ OLAPStatus Compaction::construct_input_rowset_readers() {
RETURN_NOT_OK(rowset->create_reader(
MemTracker::CreateTracker(
-1, "Compaction:RowsetReader:" + rowset->rowset_id().to_string(),
_readers_tracker),
_readers_tracker, true, true, MemTrackerLevel::DEBUG),
&rs_reader));
_input_rs_readers.push_back(std::move(rs_reader));
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ uint32_t ShardedLRUCache::_shard(uint32_t hash) {

ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity, std::shared_ptr<MemTracker> parent)
: _name(name), _last_id(1),
_mem_tracker(MemTracker::CreateTracker(-1, name, parent, true)) {
_mem_tracker(MemTracker::CreateTracker(-1, name, parent, true, false)) {
const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
_shards[s].set_capacity(per_shard);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
_slot_descs(slot_descs),
_keys_type(keys_type),
_row_comparator(_schema),
_mem_tracker(MemTracker::CreateTracker(-1, "MemTable:" + std::to_string(tablet_id), parent_tracker)),
_mem_tracker(MemTracker::CreateTracker(-1, "MemTable", parent_tracker)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove tablet id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beacause the tracker will use the parent label id

_buffer_mem_pool(new MemPool(_mem_tracker.get())),
_table_mem_pool(new MemPool(_mem_tracker.get())),
_schema_size(_schema->schema_size()),
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/row_block2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/row_block2.h"

#include <sstream>
#include <utility>

#include "gutil/strings/substitute.h"
#include "olap/row_cursor.h"
Expand All @@ -33,7 +34,7 @@ RowBlockV2::RowBlockV2(const Schema& schema, uint16_t capacity, std::shared_ptr<
: _schema(schema),
_capacity(capacity),
_column_vector_batches(_schema.num_columns()),
_tracker(MemTracker::CreateTracker(-1, "RowBlockV2", parent)),
_tracker(MemTracker::CreateTracker(-1, "RowBlockV2", std::move(parent))),
_pool(new MemPool(_tracker.get())),
_selection_vector(nullptr) {
for (auto cid : _schema.column_ids()) {
Expand Down
21 changes: 12 additions & 9 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "beta_rowset_reader.h"

#include <utility>

#include "olap/delete_handler.h"
#include "olap/generic_iterators.h"
#include "olap/row_block.h"
Expand All @@ -28,12 +30,17 @@
namespace doris {

BetaRowsetReader::BetaRowsetReader(BetaRowsetSharedPtr rowset,
const std::shared_ptr<MemTracker>& parent_tracker)
: _rowset(std::move(rowset)), _stats(&_owned_stats), _parent_tracker(parent_tracker) {
std::shared_ptr<MemTracker> parent_tracker)
: _rowset(std::move(rowset)), _stats(&_owned_stats), _parent_tracker(std::move(parent_tracker)) {
_rowset->aquire();
}

OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
// If do not init the RowsetReader with a parent_tracker, use the runtime_state instance_mem_tracker
if (_parent_tracker == nullptr && read_context->runtime_state != nullptr) {
_parent_tracker = read_context->runtime_state->instance_mem_tracker();
}

RETURN_NOT_OK(_rowset->load(true, _parent_tracker));
_context = read_context;
if (_context->stats != nullptr) {
Expand Down Expand Up @@ -110,13 +117,9 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
// init input block
_input_block.reset(new RowBlockV2(schema, 1024, _parent_tracker));

// init output block and row
if (_parent_tracker == nullptr && read_context->runtime_state != nullptr) {
_output_block.reset(new RowBlock(read_context->tablet_schema,
read_context->runtime_state->instance_mem_tracker()));
} else {
_output_block.reset(new RowBlock(read_context->tablet_schema, _parent_tracker));
}
// init input/output block and row
_output_block.reset(new RowBlock(read_context->tablet_schema, _parent_tracker));

RowBlockInfo output_block_info;
output_block_info.row_num = 1024;
output_block_info.null_supported = true;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace doris {
class BetaRowsetReader : public RowsetReader {
public:
BetaRowsetReader(BetaRowsetSharedPtr rowset,
const std::shared_ptr<MemTracker>& parent_tracker = nullptr);
std::shared_ptr<MemTracker> parent_tracker = nullptr);

~BetaRowsetReader() override { _rowset->release(); }

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Status Segment::open(std::string filename, uint32_t segment_id, const TabletSche

Segment::Segment(std::string fname, uint32_t segment_id, const TabletSchema* tablet_schema, std::shared_ptr<MemTracker> parent)
: _fname(std::move(fname)), _segment_id(segment_id),
_tablet_schema(tablet_schema), _mem_tracker(MemTracker::CreateTracker(-1, "Segment", std::move(parent), false)) {}
_tablet_schema(tablet_schema), _mem_tracker(MemTracker::CreateTracker(-1, "Segment", std::move(parent), false, true)) {}

Segment::~Segment() {
_mem_tracker->Release(_mem_tracker->consumption());
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
}
}
_seek_schema.reset(new Schema(key_fields, key_fields.size()));
_seek_block.reset(new RowBlockV2(*_seek_schema, 1));
_seek_block.reset(new RowBlockV2(*_seek_schema, 1, _mem_tracker));

// create used column iterator
for (auto cid : _seek_schema->column_ids()) {
Expand Down
9 changes: 6 additions & 3 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,7 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
std::vector<RowsetReaderSharedPtr> rs_readers;
for (auto& rowset : src_rowsets) {
RowsetReaderSharedPtr rs_reader;
auto res = rowset->create_reader(&rs_reader);
auto res = rowset->create_reader(_mem_tracker, &rs_reader);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to create rowset reader.";
return false;
Expand Down Expand Up @@ -1473,6 +1473,9 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
// for schema change, seek_columns is the same to return_columns
reader_context.seek_columns = &return_columns;

auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-"
+ std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, MemTrackerLevel::DEBUG);

do {
// get history data to be converted and it will check if there is hold in base tablet
res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed);
Expand Down Expand Up @@ -1535,7 +1538,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
}

// acquire data sources correspond to history versions
base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers);
base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers, mem_tracker);
if (rs_readers.size() < 1) {
LOG(WARNING) << "fail to acquire all data sources. "
<< "version_num=" << versions_to_be_changed.size()
Expand Down Expand Up @@ -1688,7 +1691,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
reader_context.seek_columns = &return_columns;

RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
RETURN_NOT_OK((*base_rowset)->create_reader(_mem_tracker, &rowset_reader));
rowset_reader->init(&reader_context);

RowsetWriterContext writer_context;
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,15 +628,17 @@ OLAPStatus Tablet::_capture_consistent_rowsets_unlocked(
}

OLAPStatus Tablet::capture_rs_readers(const Version& spec_version,
std::vector<RowsetReaderSharedPtr>* rs_readers) const {
std::vector<RowsetReaderSharedPtr>* rs_readers,
std::shared_ptr<MemTracker> parent_tracker) const {
std::vector<Version> version_path;
RETURN_NOT_OK(capture_consistent_versions(spec_version, &version_path));
RETURN_NOT_OK(capture_rs_readers(version_path, rs_readers));
RETURN_NOT_OK(capture_rs_readers(version_path, rs_readers, parent_tracker));
return OLAP_SUCCESS;
}

OLAPStatus Tablet::capture_rs_readers(const std::vector<Version>& version_path,
std::vector<RowsetReaderSharedPtr>* rs_readers) const {
std::vector<RowsetReaderSharedPtr>* rs_readers,
std::shared_ptr<MemTracker> parent_tracker) const {
DCHECK(rs_readers != nullptr && rs_readers->empty());
for (auto version : version_path) {
auto it = _rs_version_map.find(version);
Expand All @@ -653,7 +655,7 @@ OLAPStatus Tablet::capture_rs_readers(const std::vector<Version>& version_path,
}
}
RowsetReaderSharedPtr rs_reader;
auto res = it->second->create_reader(&rs_reader);
auto res = it->second->create_reader(parent_tracker, &rs_reader);
if (res != OLAP_SUCCESS) {
LOG(WARNING) << "failed to create reader for rowset:" << it->second->rowset_id();
return OLAP_ERR_CAPTURE_ROWSET_READER_ERROR;
Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,12 @@ class Tablet : public BaseTablet {
OLAPStatus capture_consistent_rowsets(const Version& spec_version,
vector<RowsetSharedPtr>* rowsets) const;
OLAPStatus capture_rs_readers(const Version& spec_version,
vector<RowsetReaderSharedPtr>* rs_readers) const;
vector<RowsetReaderSharedPtr>* rs_readers,
std::shared_ptr<MemTracker> parent_tracker = nullptr) const;

OLAPStatus capture_rs_readers(const vector<Version>& version_path,
vector<RowsetReaderSharedPtr>* rs_readers) const;
vector<RowsetReaderSharedPtr>* rs_readers,
std::shared_ptr<MemTracker> parent_tracker = nullptr) const;

DelPredicateArray delete_predicates() { return _tablet_meta->delete_predicates(); }
void add_delete_predicate(const DeletePredicatePB& delete_predicate, int64_t version);
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/data_stream_recvr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ DataStreamRecvr::DataStreamRecvr(
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
_mem_tracker = MemTracker::CreateTracker(
_profile, -1, "DataStreamRecvr:" + print_id(_fragment_instance_id), parent_tracker);
_profile, -1, "DataStreamRecvr", parent_tracker);

// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/data_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,7 @@ Status DataStreamSender::prepare(RuntimeState* state) {
_profile = _pool->add(new RuntimeProfile(title.str()));
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker = MemTracker::CreateTracker(
_profile, -1, "DataStreamSender:" + print_id(state->fragment_instance_id()),
state->instance_mem_tracker());
_profile, -1, "DataStreamSender", state->instance_mem_tracker());

if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
// Randomize the order we open/transmit to channels to avoid thundering herd problems.
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ Status ExecEnv::_init_mem_tracker() {
return Status::InternalError(ss.str());
}

_mem_tracker =
MemTracker::CreateTracker(bytes_limit, "Query", MemTracker::GetRootTracker());
_mem_tracker = MemTracker::CreateTracker(bytes_limit, "Query", MemTracker::GetRootTracker(),
false, false);

LOG(INFO) << "Using global memory limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
RETURN_IF_ERROR(_disk_io_mgr->init(_mem_tracker));
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/export_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ Status ExportSink::prepare(RuntimeState* state) {

_mem_tracker = MemTracker::CreateTracker(
-1,
"ExportSink:" + print_id(state->fragment_instance_id()),
state->instance_mem_tracker());
"ExportSink", state->instance_mem_tracker());

// Prepare the exprs to run.
RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _mem_tracker));
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t tim
const std::shared_ptr<MemTracker>& mem_tracker)
: _load_id(load_id), _timeout_s(timeout_s) {
_mem_tracker = MemTracker::CreateTracker(
mem_limit, "LoadChannel:" + _load_id.to_string(), mem_tracker);
mem_limit, "LoadChannel:" + _load_id.to_string(), mem_tracker, true, false);
// _last_updated_time should be set before being inserted to
// _load_channels in load_channel_mgr, or it may be erased
// immediately by gc thread.
Expand Down
Loading