From f3cf2341b386c22a05b3cf2c58cadcd6dabbc16a Mon Sep 17 00:00:00 2001 From: yangzhg Date: Thu, 3 Dec 2020 14:24:02 +0800 Subject: [PATCH 1/5] fix core when schema change --- be/src/olap/reader.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 43598e5689de4d..87fe5beed0a37c 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -119,8 +119,7 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { << ", version:" << read_params.version; return res; } - - if (_rs_readers.size() == 1 && + if (read_params.reader_type == READER_QUERY && _rs_readers.size() == 1 && !_rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) { _next_row_func = &Reader::_dup_key_next_row; } else { From 0c298b5d29ff03e81b704e9ff014b6f13eafe7c6 Mon Sep 17 00:00:00 2001 From: yangzhg Date: Thu, 3 Dec 2020 16:30:13 +0800 Subject: [PATCH 2/5] add version check --- be/src/olap/reader.cpp | 42 ++++++++++++++++++++++++++++++++++++------ be/src/olap/reader.h | 6 ++++-- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 87fe5beed0a37c..9ac066e054311f 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -119,13 +119,34 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { << ", version:" << read_params.version; return res; } - if (read_params.reader_type == READER_QUERY && _rs_readers.size() == 1 && - !_rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) { - _next_row_func = &Reader::_dup_key_next_row; + bool single_version = false; + bool has_delete_rowset = false; + int nonoverlapping_count = 0; + for (auto rs_reader : _rs_readers) { + if (rs_reader->rowset()->rowset_meta()->delete_flag()) { + has_delete_rowset = true; + break; + } + if (rs_reader->rowset()->rowset_meta()->num_rows() > 0 && + !rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) { + ++nonoverlapping_count; + } + } + if (nonoverlapping_count == 1 && !has_delete_rowset) { + single_version = true; + } + if (single_version) { + LOG(INFO) << "======================================"; + if (_tablet->keys_type() == AGG_KEYS) { + _next_row_func = &Reader::_direct_agg_key_next_row; + } else { + _next_row_func = &Reader::_direct_next_row; + } } else { + LOG(INFO) << "----------------------------------"; switch (_tablet->keys_type()) { case KeysType::DUP_KEYS: - _next_row_func = &Reader::_dup_key_next_row; + _next_row_func = &Reader::_direct_next_row; break; case KeysType::UNIQUE_KEYS: _next_row_func = &Reader::_unique_key_next_row; @@ -142,8 +163,8 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return OLAP_SUCCESS; } -OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, - bool* eof) { +OLAPStatus Reader::_direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, + bool* eof) { if (UNLIKELY(_next_key == nullptr)) { *eof = true; return OLAP_SUCCESS; @@ -157,6 +178,15 @@ OLAPStatus Reader::_dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, O } return OLAP_SUCCESS; } +OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, + ObjectPool* agg_pool, bool* eof) { + if (UNLIKELY(_next_key == nullptr)) { + *eof = true; + return OLAP_SUCCESS; + } + init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool); + return _direct_next_row(row_cursor, mem_pool, agg_pool, eof); +} OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof) { diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 0cd7142d130984..7aab6d9ddd62e0 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -145,8 +145,10 @@ class Reader { void _init_load_bf_columns(const ReaderParams& read_params); - OLAPStatus _dup_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, - bool* eof); + OLAPStatus _direct_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, + bool* eof); + OLAPStatus _direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, + ObjectPool* agg_pool, bool* eof); OLAPStatus _agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof); OLAPStatus _unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool, From 6073fa575332a0b84be1be041f0bc2aeed9fde74 Mon Sep 17 00:00:00 2001 From: yangzhg Date: Thu, 3 Dec 2020 16:52:03 +0800 Subject: [PATCH 3/5] add version check --- be/src/olap/reader.cpp | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 9ac066e054311f..61f84b0f7fe4fc 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -119,7 +119,7 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { << ", version:" << read_params.version; return res; } - bool single_version = false; + // When only one rowset has data, and this rowset is nonoverlapping, we can read directly without aggregation bool has_delete_rowset = false; int nonoverlapping_count = 0; for (auto rs_reader : _rs_readers) { @@ -129,21 +129,15 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { } if (rs_reader->rowset()->rowset_meta()->num_rows() > 0 && !rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) { - ++nonoverlapping_count; + if (++nonoverlapping_count > 1) { + break; + } } } if (nonoverlapping_count == 1 && !has_delete_rowset) { - single_version = true; - } - if (single_version) { - LOG(INFO) << "======================================"; - if (_tablet->keys_type() == AGG_KEYS) { - _next_row_func = &Reader::_direct_agg_key_next_row; - } else { - _next_row_func = &Reader::_direct_next_row; - } + _next_row_func = _tablet->keys_type() == AGG_KEYS ? &Reader::_direct_agg_key_next_row + : &Reader::_direct_next_row; } else { - LOG(INFO) << "----------------------------------"; switch (_tablet->keys_type()) { case KeysType::DUP_KEYS: _next_row_func = &Reader::_direct_next_row; From a1243b136452717609115056c79b08138978d716 Mon Sep 17 00:00:00 2001 From: yangzhg Date: Thu, 3 Dec 2020 19:51:38 +0800 Subject: [PATCH 4/5] fix core --- be/src/olap/collect_iterator.cpp | 17 ++++++++--------- be/src/olap/collect_iterator.h | 2 -- be/src/olap/reader.cpp | 15 +++++++-------- be/src/olap/reader.h | 2 +- 4 files changed, 16 insertions(+), 20 deletions(-) diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp index fba50f3865c4e1..c3e0ebdd20e948 100644 --- a/be/src/olap/collect_iterator.cpp +++ b/be/src/olap/collect_iterator.cpp @@ -40,12 +40,11 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { std::unique_ptr child(new Level0Iterator(rs_reader, _reader)); RETURN_NOT_OK(child->init()); if (child->current_row() == nullptr) { - return OLAP_SUCCESS; + return OLAP_ERR_DATA_EOF; } LevelIterator* child_ptr = child.release(); _children.push_back(child_ptr); - _rs_readers.push_back(rs_reader); return OLAP_SUCCESS; } @@ -53,28 +52,28 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { // status will be used as the base rowset, and the other rowsets will be merged first and // then merged with the base rowset. void CollectIterator::build_heap() { - DCHECK(_rs_readers.size() == _children.size()); + DCHECK(_reader->_rs_readers.size() == _children.size()); _reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS; if (_children.empty()) { _inner_iter.reset(nullptr); return; } else if (_merge) { - DCHECK(!_rs_readers.empty()); + DCHECK(!_reader->_rs_readers.empty()); // build merge heap with two children, a base rowset as level0iterator and // other cumulative rowsets as a level1iterator if (_children.size() > 1) { // find base rowset(max rownum), - RowsetReaderSharedPtr base_reader = _rs_readers[0]; + RowsetReaderSharedPtr base_reader = _reader->_rs_readers[0]; int base_reader_idx = 0; - for (size_t i = 1; i < _rs_readers.size(); ++i) { - if (_rs_readers[i]->rowset()->rowset_meta()->num_rows() > + for (size_t i = 1; i < _reader->_rs_readers.size(); ++i) { + if (_reader->_rs_readers[i]->rowset()->rowset_meta()->num_rows() > base_reader->rowset()->rowset_meta()->num_rows()) { - base_reader = _rs_readers[i]; + base_reader = _reader->_rs_readers[i]; base_reader_idx = i; } } std::vector cumu_children; - for (size_t i = 0; i < _rs_readers.size(); ++i) { + for (size_t i = 0; i < _reader->_rs_readers.size(); ++i) { if (i != base_reader_idx) { cumu_children.push_back(_children[i]); } diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h index 97a4834705686d..173dba4947d384 100644 --- a/be/src/olap/collect_iterator.h +++ b/be/src/olap/collect_iterator.h @@ -164,8 +164,6 @@ class CollectIterator { // Hold reader point to access read params, such as fetch conditions. Reader* _reader = nullptr; - std::vector _rs_readers; - }; } // namespace doris diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 61f84b0f7fe4fc..0efa096362da08 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -90,7 +90,7 @@ std::string Reader::KeysParam::to_string() const { return ss.str(); } -Reader::Reader() { +Reader::Reader() : _collect_iter(new CollectIterator()) { _tracker.reset(new MemTracker(-1)); _predicate_mem_pool.reset(new MemPool(_tracker.get())); } @@ -129,9 +129,9 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { } if (rs_reader->rowset()->rowset_meta()->num_rows() > 0 && !rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) { - if (++nonoverlapping_count > 1) { - break; - } + if (++nonoverlapping_count > 1) { + break; + } } } if (nonoverlapping_count == 1 && !has_delete_rowset) { @@ -277,8 +277,6 @@ void Reader::close() { for (auto pred : _col_predicates) { delete pred; } - - delete _collect_iter; } OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { @@ -375,7 +373,9 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { LOG(WARNING) << "failed to add child to iterator"; return res; } - _rs_readers.push_back(rs_reader); + if (res == OLAP_SUCCESS) { + _rs_readers.push_back(rs_reader); + } } _collect_iter->build_heap(); _next_key = _collect_iter->current_row(&_next_delete_flag); @@ -413,7 +413,6 @@ OLAPStatus Reader::_init_params(const ReaderParams& read_params) { _init_seek_columns(); - _collect_iter = new CollectIterator(); _collect_iter->init(this); if (_tablet->tablet_schema().has_sequence_col()) { diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 7aab6d9ddd62e0..9b2c9115d4df52 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -185,7 +185,7 @@ class Reader { bool _has_sequence_col = false; int32_t _sequence_col_idx = -1; const RowCursor* _next_key = nullptr; - CollectIterator* _collect_iter = nullptr; + std::unique_ptr _collect_iter; std::vector _key_cids; std::vector _value_cids; From 254cdd47e34fd30a9ade6933a94fb72cb53df832 Mon Sep 17 00:00:00 2001 From: yangzhg Date: Fri, 4 Dec 2020 06:49:04 +0800 Subject: [PATCH 5/5] fix core --- be/src/olap/reader.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 0efa096362da08..75f978988d1b98 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -179,7 +179,14 @@ OLAPStatus Reader::_direct_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_ return OLAP_SUCCESS; } init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool); - return _direct_next_row(row_cursor, mem_pool, agg_pool, eof); + auto res = _collect_iter->next(&_next_key, &_next_delete_flag); + if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) { + return res; + } + if (_need_agg_finalize) { + agg_finalize_row(_value_cids, row_cursor, mem_pool); + } + return OLAP_SUCCESS; } OLAPStatus Reader::_agg_key_next_row(RowCursor* row_cursor, MemPool* mem_pool, ObjectPool* agg_pool,