Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions be/src/olap/collect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,41 +40,40 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_reader, _reader));
RETURN_NOT_OK(child->init());
if (child->current_row() == nullptr) {
return OLAP_SUCCESS;
return OLAP_ERR_DATA_EOF;
}

LevelIterator* child_ptr = child.release();
_children.push_back(child_ptr);
_rs_readers.push_back(rs_reader);
return OLAP_SUCCESS;
}

// Build a merge heap. If _merge is true, a rowset with the max rownum
// status will be used as the base rowset, and the other rowsets will be merged first and
// then merged with the base rowset.
void CollectIterator::build_heap() {
DCHECK(_rs_readers.size() == _children.size());
DCHECK(_reader->_rs_readers.size() == _children.size());
_reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS;
if (_children.empty()) {
_inner_iter.reset(nullptr);
return;
} else if (_merge) {
DCHECK(!_rs_readers.empty());
DCHECK(!_reader->_rs_readers.empty());
// build merge heap with two children, a base rowset as level0iterator and
// other cumulative rowsets as a level1iterator
if (_children.size() > 1) {
// find base rowset(max rownum),
RowsetReaderSharedPtr base_reader = _rs_readers[0];
RowsetReaderSharedPtr base_reader = _reader->_rs_readers[0];
int base_reader_idx = 0;
for (size_t i = 1; i < _rs_readers.size(); ++i) {
if (_rs_readers[i]->rowset()->rowset_meta()->num_rows() >
for (size_t i = 1; i < _reader->_rs_readers.size(); ++i) {
if (_reader->_rs_readers[i]->rowset()->rowset_meta()->num_rows() >
base_reader->rowset()->rowset_meta()->num_rows()) {
base_reader = _rs_readers[i];
base_reader = _reader->_rs_readers[i];
base_reader_idx = i;
}
}
std::vector<LevelIterator*> cumu_children;
for (size_t i = 0; i < _rs_readers.size(); ++i) {
for (size_t i = 0; i < _reader->_rs_readers.size(); ++i) {
if (i != base_reader_idx) {
cumu_children.push_back(_children[i]);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/olap/collect_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ class CollectIterator {

// Hold reader point to access read params, such as fetch conditions.
Reader* _reader = nullptr;
std::vector<RowsetReaderSharedPtr> _rs_readers;

};

} // namespace doris
53 changes: 41 additions & 12 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ std::string Reader::KeysParam::to_string() const {
return ss.str();
}

Reader::Reader() {
Reader::Reader() : _collect_iter(new CollectIterator()) {
_tracker.reset(new MemTracker(-1));
_predicate_mem_pool.reset(new MemPool(_tracker.get()));
}
Expand Down Expand Up @@ -119,14 +119,28 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
<< ", version:" << read_params.version;
return res;
}

if (_rs_readers.size() == 1 &&
!_rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) {
_next_row_func = &Reader::_dup_key_next_row;
// When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation
bool has_delete_rowset = false;
int nonoverlapping_count = 0;
for (auto rs_reader : _rs_readers) {
if (rs_reader->rowset()->rowset_meta()->delete_flag()) {
has_delete_rowset = true;
break;
}
if (rs_reader->rowset()->rowset_meta()->num_rows() > 0 &&
!rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) {
if (++nonoverlapping_count > 1) {
break;
}
}
}
if (nonoverlapping_count == 1 && !has_delete_rowset) {
_next_row_func = _tablet->keys_type() == AGG_KEYS ? &Reader::_direct_agg_key_next_row
: &Reader::_direct_next_row;
} else {
switch (_tablet->keys_type()) {
case KeysType::DUP_KEYS:
_next_row_func = &Reader::_dup_key_next_row;
_next_row_func = &Reader::_direct_next_row;
break;
case KeysType::UNIQUE_KEYS:
_next_row_func = &Reader::_unique_key_next_row;
Expand All @@ -143,8 +157,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) {
return OLAP_SUCCESS;
}

OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool,
bool* eof) {
OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool,
bool* eof) {
if (UNLIKELY(_next_key == nullptr)) {
*eof = true;
return OLAP_SUCCESS;
Expand All @@ -158,6 +172,22 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, O
}
return OLAP_SUCCESS;
}
OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool, bool* eof) {
if (UNLIKELY(_next_key == nullptr)) {
*eof = true;
return OLAP_SUCCESS;
}
init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool);
auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) {
return res;
}
if (_need_agg_finalize) {
agg_finalize_row(_value_cids, row_cursor, mem_pool);
}
return OLAP_SUCCESS;
}

OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool,
bool* eof) {
Expand Down Expand Up @@ -254,8 +284,6 @@ void Reader::close() {
for (auto pred : _col_predicates) {
delete pred;
}

delete _collect_iter;
}

OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
Expand Down Expand Up @@ -352,7 +380,9 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) {
LOG(WARNING) << "failed to add child to iterator";
return res;
}
_rs_readers.push_back(rs_reader);
if (res == OLAP_SUCCESS) {
_rs_readers.push_back(rs_reader);
}
}
_collect_iter->build_heap();
_next_key = _collect_iter->current_row(&_next_delete_flag);
Expand Down Expand Up @@ -390,7 +420,6 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) {

_init_seek_columns();

_collect_iter = new CollectIterator();
_collect_iter->init(this);

if (_tablet->tablet_schema().has_sequence_col()) {
Expand Down
8 changes: 5 additions & 3 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ class Reader {

void _init_load_bf_columns(const ReaderParams& read_params);

OLAPStatus _dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool,
bool* eof);
OLAPStatus _direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool,
bool* eof);
OLAPStatus _direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool,
ObjectPool* agg_pool, bool* eof);
OLAPStatus _agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool,
bool* eof);
OLAPStatus _unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool,
Expand Down Expand Up @@ -183,7 +185,7 @@ class Reader {
bool _has_sequence_col = false;
int32_t _sequence_col_idx = -1;
const RowCursor* _next_key = nullptr;
CollectIterator* _collect_iter = nullptr;
std::unique_ptr<CollectIterator> _collect_iter;
std::vector<uint32_t> _key_cids;
std::vector<uint32_t> _value_cids;

Expand Down