Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](segment) reduce memory usage when open segments #46570

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Status ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
continue;
}

int segment_start = 0;
int64_t segment_start = 0;
auto split = RowSetSplits(reader->clone());

for (size_t i = 0; i != segments_rows.size(); ++i) {
Expand Down Expand Up @@ -171,22 +171,18 @@ Status ParallelScannerBuilder::_load() {
if (!_state->skip_delete_predicate()) {
read_source.fill_delete_predicates();
}
bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
? _state->query_options().enable_segment_cache
: true;

for (auto& rs_split : read_source.rs_splits) {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
SegmentCacheHandle segment_cache_handle;

RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
enable_segment_cache, false));

for (const auto& segment : segment_cache_handle.get_segments()) {
_all_segments_rows[rowset_id].emplace_back(segment->num_rows());
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
std::vector<uint32_t> segment_rows;
RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows));
auto segment_count = rowset->num_segments();
for (int64_t i = 0; i != segment_count; i++) {
_all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
}
_total_rows += rowset->num_rows();
}
Expand Down
21 changes: 18 additions & 3 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_file_reader.h"
#include "olap/segment_loader.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/crc32c.h"
Expand All @@ -68,9 +69,23 @@ Status BetaRowset::init() {
return Status::OK(); // no op
}

Status BetaRowset::do_load(bool /*use_cache*/) {
// do nothing.
// the segments in this rowset will be loaded by calling load_segments() explicitly.
Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows) {
DCHECK(_rowset_state_machine.rowset_state() == ROWSET_LOADED);

RETURN_IF_ERROR(_load_segment_rows_once.call([this] {
auto segment_count = num_segments();
_segments_rows.resize(segment_count);
for (int64_t i = 0; i != segment_count; ++i) {
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
std::static_pointer_cast<BetaRowset>(shared_from_this()), i,
&segment_cache_handle, false, false));
const auto& tmp_segments = segment_cache_handle.get_segments();
_segments_rows[i] = tmp_segments[0]->num_rows();
}
return Status::OK();
}));
segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
return Status::OK();
}

Expand Down
7 changes: 5 additions & 2 deletions be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ class BetaRowset final : public Rowset {
Status show_nested_index_file(rapidjson::Value* rowset_value,
rapidjson::Document::AllocatorType& allocator);

Status get_segment_num_rows(std::vector<uint32_t>* segment_rows);

protected:
BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta,
std::string tablet_path);

// init segment groups
Status init() override;

Status do_load(bool use_cache) override;

void do_close() override;

Status check_current_rowset_segment() override;
Expand All @@ -107,6 +107,9 @@ class BetaRowset final : public Rowset {
private:
friend class RowsetFactory;
friend class BetaRowsetReader;

DorisCallOnce<Status> _load_segment_rows_once;
std::vector<uint32_t> _segments_rows;
};

} // namespace doris
Expand Down
92 changes: 25 additions & 67 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.io_ctx.expiration_time = 0;
}

// load segments
bool enable_segment_cache = true;
auto* state = read_context->runtime_state;
if (state != nullptr) {
Expand All @@ -226,76 +225,41 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// When reader type is for query, session variable `enable_segment_cache` should be respected.
bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY &&
enable_segment_cache);
SegmentCacheHandle segment_cache_handle;
{
SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns);
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
_rowset, &segment_cache_handle, should_use_cache,
/*need_load_pk_index_and_bf*/ false));
}

// create iterator for each segment
auto& segments = segment_cache_handle.get_segments();
_segments_rows.resize(segments.size());
for (size_t i = 0; i < segments.size(); i++) {
_segments_rows[i] = segments[i]->num_rows();
}
if (_read_context->record_rowids) {
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_num_rows;
RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_num_rows));
}

auto segment_count = _rowset->num_segments();
auto [seg_start, seg_end] = _segment_offsets;
// If seg_start == seg_end, it means that the segments of a rowset is not
// split scanned by multiple scanners, and the rowset reader is used to read the whole rowset.
if (seg_start == seg_end) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里也需要加一下注释,为什么== 的时候end =segment_count 了

seg_start = 0;
seg_end = segments.size();
seg_end = segment_count;
}
if (_read_context->record_rowids && _read_context->rowid_conversion) {
// init segment rowid map for rowid conversion
std::vector<uint32_t> segment_rows;
RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows));
RETURN_IF_ERROR(_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_rows));
}

const bool is_merge_iterator = _is_merge_iterator();
const bool use_lazy_init_iterators =
!is_merge_iterator && _read_context->reader_type == ReaderType::READER_QUERY;
for (int i = seg_start; i < seg_end; i++) {
for (int64_t i = seg_start; i < seg_end; i++) {
SCOPED_RAW_TIMER(&_stats->rowset_reader_create_iterators_timer_ns);
auto& seg_ptr = segments[i];
std::unique_ptr<RowwiseIterator> iter;

if (use_lazy_init_iterators) {
/// For non-merging iterators, we don't need to initialize them all at once when creating them.
/// Instead, we should initialize each iterator separately when really using them.
/// This optimization minimizes the lifecycle of resources like column readers
/// and prevents excessive memory consumption, especially for wide tables.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, _input_schema,
_read_options);
} else {
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr, _input_schema,
local_options);
}
/// For iterators, we don't need to initialize them all at once when creating them.
/// Instead, we should initialize each iterator separately when really using them.
/// This optimization minimizes the lifecycle of resources like column readers
/// and prevents excessive memory consumption, especially for wide tables.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
_input_schema, _read_options);
} else {
Status status;
/// If `_segment_row_ranges` is empty, the segment is not split.
if (_segment_row_ranges.empty()) {
_read_options.row_ranges.clear();
status = seg_ptr->new_iterator(_input_schema, _read_options, &iter);
} else {
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
status = seg_ptr->new_iterator(_input_schema, local_options, &iter);
}

if (!status.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
<< "]: " << status.to_string();
return Status::Error<ROWSET_READER_INIT>(status.to_string());
}
DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
auto local_options = _read_options;
local_options.row_ranges = _segment_row_ranges[i - seg_start];
iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i, should_use_cache,
_input_schema, local_options);
}

if (iter->empty()) {
Expand Down Expand Up @@ -423,10 +387,4 @@ bool BetaRowsetReader::_should_push_down_value_predicates() const {
_read_context->sequence_id_idx == -1) ||
_read_context->enable_unique_key_merge_on_write);
}

Status BetaRowsetReader::get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) {
segment_num_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
return Status::OK();
}

} // namespace doris
8 changes: 2 additions & 6 deletions be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ class BetaRowsetReader : public RowsetReader {
return _iterator->current_block_row_locations(locations);
}

Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) override;

bool update_profile(RuntimeProfile* profile) override;

RowsetReaderSharedPtr clone() override;
Expand All @@ -97,7 +95,7 @@ class BetaRowsetReader : public RowsetReader {
_rowset->rowset_meta()->is_segments_overlapping() && _get_segment_num() > 1;
}

int32_t _get_segment_num() const {
int64_t _get_segment_num() const {
auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
Expand All @@ -108,7 +106,7 @@ class BetaRowsetReader : public RowsetReader {

DorisCallOnce<Status> _init_iter_once;

std::pair<int, int> _segment_offsets;
std::pair<int64_t, int64_t> _segment_offsets;
std::vector<RowRanges> _segment_row_ranges;

SchemaSPtr _input_schema;
Expand All @@ -120,8 +118,6 @@ class BetaRowsetReader : public RowsetReader {

std::unique_ptr<RowwiseIterator> _iterator;

std::vector<uint32_t> _segments_rows;

StorageReadOptions _read_options;

bool _empty = false;
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ Status Rowset::load(bool use_cache) {
std::lock_guard load_lock(_lock);
// after lock, if rowset state is ROWSET_UNLOADING, it is ok to return
if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
// first do load, then change the state
RETURN_IF_ERROR(do_load(use_cache));
RETURN_IF_ERROR(_rowset_state_machine.on_load());
}
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/olap/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder
// this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
virtual Status init() = 0;

// The actual implementation of load(). Guaranteed by to called exactly once.
virtual Status do_load(bool use_cache) = 0;

// release resources in this api
virtual void do_close() = 0;

Expand Down
6 changes: 1 addition & 5 deletions be/src/olap/rowset/rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct RowSetSplits {
// if segment_offsets is not empty, means we only scan
// [pair.first, pair.second) segment in rs_reader, only effective in dup key
// and pipeline
std::pair<int, int> segment_offsets;
std::pair<int64_t, int64_t> segment_offsets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove get_segment_num_rows method from this class


// RowRanges of each segment.
std::vector<RowRanges> segment_row_ranges;
Expand Down Expand Up @@ -83,10 +83,6 @@ class RowsetReader {
return Status::NotSupported("to be implemented");
}

virtual Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) {
return Status::NotSupported("to be implemented");
}

virtual bool update_profile(RuntimeProfile* profile) = 0;

virtual RowsetReaderSharedPtr clone() = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct RowsetReaderContext {
bool enable_unique_key_merge_on_write = false;
const DeleteBitmap* delete_bitmap = nullptr;
bool record_rowids = false;
RowIdConversion* rowid_conversion;
RowIdConversion* rowid_conversion = nullptr;
bool is_key_column_group = false;
const std::set<int32_t>* output_columns = nullptr;
RowsetId rowset_id;
Expand Down
23 changes: 19 additions & 4 deletions be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@

#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"

#include "olap/segment_loader.h"

namespace doris::segment_v2 {

LazyInitSegmentIterator::LazyInitSegmentIterator(std::shared_ptr<Segment> segment,
SchemaSPtr schema, const StorageReadOptions& opts)
: _schema(std::move(schema)), _segment(std::move(segment)), _read_options(opts) {}
LazyInitSegmentIterator::LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int64_t segment_id,
bool should_use_cache, SchemaSPtr schema,
const StorageReadOptions& opts)
: _rowset(std::move(rowset)),
_segment_id(segment_id),
_should_use_cache(should_use_cache),
_schema(std::move(schema)),
_read_options(opts) {}

/// Here do not use the argument of `opts`,
/// see where the iterator is created in `BetaRowsetReader::get_segment_iterators`
Expand All @@ -31,7 +38,15 @@ Status LazyInitSegmentIterator::init(const StorageReadOptions& /*opts*/) {
return Status::OK();
}

RETURN_IF_ERROR(_segment->new_iterator(_schema, _read_options, &_inner_iterator));
std::shared_ptr<Segment> segment;
{
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
_rowset, _segment_id, &segment_cache_handle, _should_use_cache, false));
const auto& tmp_segments = segment_cache_handle.get_segments();
segment = tmp_segments[0];
}
RETURN_IF_ERROR(segment->new_iterator(_schema, _read_options, &_inner_iterator));
return _inner_iterator->init(_read_options);
}

Expand Down
12 changes: 9 additions & 3 deletions be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "vec/core/block.h"

namespace doris {
class BetaRowset;
using BetaRowsetSharedPtr = std::shared_ptr<BetaRowset>;
}; // namespace doris
namespace doris::segment_v2 {

using namespace vectorized;

class LazyInitSegmentIterator : public RowwiseIterator {
public:
LazyInitSegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema,
const StorageReadOptions& opts);
LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int64_t segment_id, bool should_use_cache,
SchemaSPtr schema, const StorageReadOptions& opts);

~LazyInitSegmentIterator() override = default;

Expand Down Expand Up @@ -59,8 +63,10 @@ class LazyInitSegmentIterator : public RowwiseIterator {

private:
bool _need_lazy_init {true};
BetaRowsetSharedPtr _rowset;
int64_t _segment_id {-1};
bool _should_use_cache {false};
SchemaSPtr _schema = nullptr;
std::shared_ptr<Segment> _segment;
StorageReadOptions _read_options;
RowwiseIteratorUPtr _inner_iterator;
};
Expand Down
Loading
Loading