diff --git a/.gitignore b/.gitignore index 8fb5733edbb179..322ddd538127bf 100644 --- a/.gitignore +++ b/.gitignore @@ -50,8 +50,19 @@ be/src/gen_cpp/*.cc be/src/gen_cpp/*.cpp be/src/gen_cpp/*.h be/src/gen_cpp/opcode -be/ut_build_ASAN/ +be/ut_build_* be/tags +be/build* +be/test/olap/test_data/tablet_meta_test.hdr +build + +ui/dist +ui/node_modules +ui/package-lock.json +docs/package-lock.json +fe/fe-common/.classpath +rpc_data/ + #ignore vscode project file .vscode diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt index 4fa49fac49267c..f9827f8bf0c273 100644 --- a/be/src/olap/CMakeLists.txt +++ b/be/src/olap/CMakeLists.txt @@ -33,6 +33,7 @@ add_library(Olap STATIC bloom_filter_reader.cpp bloom_filter_writer.cpp byte_buffer.cpp + collect_iterator.cpp compaction.cpp compaction_permit_limiter.cpp comparison_predicate.cpp diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp new file mode 100644 index 00000000000000..fba50f3865c4e1 --- /dev/null +++ b/be/src/olap/collect_iterator.cpp @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/collect_iterator.h" + +#include "olap/reader.h" +#include "olap/row.h" +#include "olap/row_block.h" +#include "olap/row_cursor.h" + +namespace doris { + +CollectIterator::~CollectIterator() {} + +void CollectIterator::init(Reader* reader) { + _reader = reader; + // when aggregate is enabled or key_type is DUP_KEYS, we don't merge + // multiple data to aggregate for performance in user fetch + if (_reader->_reader_type == READER_QUERY && + (_reader->_aggregation || _reader->_tablet->keys_type() == KeysType::DUP_KEYS)) { + _merge = false; + } +} + +OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { + std::unique_ptr child(new Level0Iterator(rs_reader, _reader)); + RETURN_NOT_OK(child->init()); + if (child->current_row() == nullptr) { + return OLAP_SUCCESS; + } + + LevelIterator* child_ptr = child.release(); + _children.push_back(child_ptr); + _rs_readers.push_back(rs_reader); + return OLAP_SUCCESS; +} + +// Build a merge heap. If _merge is true, a rowset with the max rownum +// status will be used as the base rowset, and the other rowsets will be merged first and +// then merged with the base rowset. +void CollectIterator::build_heap() { + DCHECK(_rs_readers.size() == _children.size()); + _reverse = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS; + if (_children.empty()) { + _inner_iter.reset(nullptr); + return; + } else if (_merge) { + DCHECK(!_rs_readers.empty()); + // build merge heap with two children, a base rowset as level0iterator and + // other cumulative rowsets as a level1iterator + if (_children.size() > 1) { + // find base rowset(max rownum), + RowsetReaderSharedPtr base_reader = _rs_readers[0]; + int base_reader_idx = 0; + for (size_t i = 1; i < _rs_readers.size(); ++i) { + if (_rs_readers[i]->rowset()->rowset_meta()->num_rows() > + base_reader->rowset()->rowset_meta()->num_rows()) { + base_reader = _rs_readers[i]; + base_reader_idx = i; + } + } + std::vector cumu_children; + for (size_t i = 0; i < _rs_readers.size(); ++i) { + if (i != base_reader_idx) { + cumu_children.push_back(_children[i]); + } + } + Level1Iterator* cumu_iter = + new Level1Iterator(cumu_children, cumu_children.size() > 1, _reverse); + cumu_iter->init(); + std::vector children; + children.push_back(_children[base_reader_idx]); + children.push_back(cumu_iter); + _inner_iter.reset(new Level1Iterator(children, _merge, _reverse)); + } else { + _inner_iter.reset(new Level1Iterator(_children, _merge, _reverse)); + } + } else { + _inner_iter.reset(new Level1Iterator(_children, _merge, _reverse)); + } + _inner_iter->init(); +} + +bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a, + const LevelIterator* b) { + // First compare row cursor. + const RowCursor* first = a->current_row(); + const RowCursor* second = b->current_row(); + int cmp_res = compare_row(*first, *second); + if (cmp_res != 0) { + return cmp_res > 0; + } + // if row cursors equal, compare data version. + // read data from higher version to lower version. + // for UNIQUE_KEYS just read the highest version and no need agg_update. + // for AGG_KEYS if a version is deleted, the lower version no need to agg_update + if (_reverse) { + return a->version() < b->version(); + } + return a->version() > b->version(); +} + +void CollectIterator::clear() { + for (auto child : _children) { + if (child != nullptr) { + delete child; + child = nullptr; + } + } + _children.clear(); +} + +const RowCursor* CollectIterator::current_row(bool* delete_flag) const { + if (LIKELY(_inner_iter)) { + return _inner_iter->current_row(delete_flag); + } + return nullptr; +} + +OLAPStatus CollectIterator::next(const RowCursor** row, bool* delete_flag) { + if (LIKELY(_inner_iter)) { + return _inner_iter->next(row, delete_flag); + } else { + return OLAP_ERR_DATA_EOF; + } +} + +CollectIterator::Level0Iterator::Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader) + : _rs_reader(rs_reader), _is_delete(rs_reader->delete_flag()), _reader(reader) {} + +CollectIterator::Level0Iterator::~Level0Iterator() {} + +OLAPStatus CollectIterator::Level0Iterator::init() { + auto res = _row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns); + if (res != OLAP_SUCCESS) { + LOG(WARNING) << "failed to init row cursor, res=" << res; + return res; + } + RETURN_NOT_OK(_refresh_current_row()); + return OLAP_SUCCESS; +} + +const RowCursor* CollectIterator::Level0Iterator::current_row(bool* delete_flag) const { + *delete_flag = _is_delete || _current_row->is_delete(); + return _current_row; +} + +const RowCursor* CollectIterator::Level0Iterator::current_row() const { + return _current_row; +} + +int32_t CollectIterator::Level0Iterator::version() const { + return _rs_reader->version().second; +} + +OLAPStatus CollectIterator::Level0Iterator::_refresh_current_row() { + do { + if (_row_block != nullptr && _row_block->has_remaining()) { + size_t pos = _row_block->pos(); + _row_block->get_row(pos, &_row_cursor); + if (_row_block->block_status() == DEL_PARTIAL_SATISFIED && + _reader->_delete_handler.is_filter_data(_rs_reader->version().second, + _row_cursor)) { + _reader->_stats.rows_del_filtered++; + _row_block->pos_inc(); + continue; + } + _current_row = &_row_cursor; + return OLAP_SUCCESS; + } else { + auto res = _rs_reader->next_block(&_row_block); + if (res != OLAP_SUCCESS) { + _current_row = nullptr; + return res; + } + } + } while (_row_block != nullptr); + _current_row = nullptr; + return OLAP_ERR_DATA_EOF; +} + +OLAPStatus CollectIterator::Level0Iterator::next(const RowCursor** row, bool* delete_flag) { + _row_block->pos_inc(); + auto res = _refresh_current_row(); + *row = _current_row; + *delete_flag = _is_delete; + if (_current_row != nullptr) { + *delete_flag = _is_delete || _current_row->is_delete(); + } + return res; +} + +CollectIterator::Level1Iterator::Level1Iterator( + const std::vector& children, bool merge, bool reverse) + : _children(children), _merge(merge), _reverse(reverse) {} + +CollectIterator::LevelIterator::~LevelIterator() {} + +CollectIterator::Level1Iterator::~Level1Iterator() { + for (auto child : _children) { + if (child != nullptr) { + delete child; + child = nullptr; + } + } +} + +// Read next row into *row. +// Returns +// OLAP_SUCCESS when read successfully. +// OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached. +// Others when error happens +OLAPStatus CollectIterator::Level1Iterator::next(const RowCursor** row, bool* delete_flag) { + if (UNLIKELY(_children.size() == 0)) { + return OLAP_ERR_DATA_EOF; + } + if (_merge) { + return _merge_next(row, delete_flag); + } else { + return _normal_next(row, delete_flag); + } +} + +// Get top row of the heap, nullptr if reach end. +const RowCursor* CollectIterator::Level1Iterator::current_row(bool* delete_flag) const { + if (_cur_child != nullptr) { + return _cur_child->current_row(delete_flag); + } + return nullptr; +} + +// Get top row of the heap, nullptr if reach end. +const RowCursor* CollectIterator::Level1Iterator::current_row() const { + if (_cur_child != nullptr) { + return _cur_child->current_row(); + } + return nullptr; +} + +int32_t CollectIterator::Level1Iterator::version() const { + if (_cur_child != nullptr) { + return _cur_child->version(); + } + return -1; +} + +OLAPStatus CollectIterator::Level1Iterator::init() { + if (_children.size() == 0) { + return OLAP_SUCCESS; + } + // Only when there are multiple children that need to be merged + if (_merge && _children.size() > 1) { + _heap.reset(new MergeHeap(LevelIteratorComparator(_reverse))); + for (auto child : _children) { + if (child == nullptr || child->current_row() == nullptr) { + continue; + } + _heap->push(child); + _cur_child = _heap->top(); + } + } else { + _merge = false; + _heap.reset(nullptr); + _cur_child = _children[_child_idx]; + } + return OLAP_SUCCESS; +} + +inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor** row, + bool* delete_flag) { + _heap->pop(); + auto res = _cur_child->next(row, delete_flag); + if (res == OLAP_SUCCESS) { + _heap->push(_cur_child); + _cur_child = _heap->top(); + } else if (res == OLAP_ERR_DATA_EOF) { + if (!_heap->empty()) { + _cur_child = _heap->top(); + } else { + _cur_child = nullptr; + return OLAP_ERR_DATA_EOF; + } + } else { + LOG(WARNING) << "failed to get next from child, res=" << res; + return res; + } + *row = _cur_child->current_row(delete_flag); + return OLAP_SUCCESS; +} + +inline OLAPStatus CollectIterator::Level1Iterator::_normal_next(const RowCursor** row, + bool* delete_flag) { + auto res = _cur_child->next(row, delete_flag); + if (LIKELY(res == OLAP_SUCCESS)) { + return OLAP_SUCCESS; + } else if (res == OLAP_ERR_DATA_EOF) { + // this child has been read, to read next + _child_idx++; + if (_child_idx < _children.size()) { + _cur_child = _children[_child_idx]; + *row = _cur_child->current_row(delete_flag); + return OLAP_SUCCESS; + } else { + _cur_child = nullptr; + return OLAP_ERR_DATA_EOF; + } + } else { + LOG(WARNING) << "failed to get next from child, res=" << res; + return res; + } +} + +} // namespace doris diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h new file mode 100644 index 00000000000000..97a4834705686d --- /dev/null +++ b/be/src/olap/collect_iterator.h @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "olap/olap_define.h" +#include "olap/row_cursor.h" +#include "olap/rowset/rowset_reader.h" + +namespace doris { + +class Reader; +class RowCursor; + +class CollectIterator { +public: + ~CollectIterator(); + + // Hold reader point to get reader params + void init(Reader* reader); + + OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); + + void build_heap(); + + // Get top row of the heap, nullptr if reach end. + const RowCursor* current_row(bool* delete_flag) const; + + // Read next row into *row. + // Returns + // OLAP_SUCCESS when read successfully. + // OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached. + // Others when error happens + OLAPStatus next(const RowCursor** row, bool* delete_flag); + + // Clear the MergeSet element and reset state. + void clear(); + +private: + // This interface is the actual implementation of the new version of iterator. + // It currently contains two implementations, one is Level0Iterator, + // which only reads data from the rowset reader, and the other is Level1Iterator, + // which can read merged data from multiple LevelIterators through MergeHeap. + // By using Level1Iterator, some rowset readers can be merged in advance and + // then merged with other rowset readers. + class LevelIterator { + public: + virtual OLAPStatus init() = 0; + + virtual const RowCursor* current_row(bool* delete_flag) const = 0; + + virtual const RowCursor* current_row() const = 0; + + virtual int32_t version() const = 0; + + virtual OLAPStatus next(const RowCursor** row, bool* delete_flag) = 0; + virtual ~LevelIterator() = 0; + }; + // Compare row cursors between multiple merge elements, + // if row cursors equal, compare data version. + class LevelIteratorComparator { + public: + LevelIteratorComparator(const bool reverse = false) : _reverse(reverse) {} + bool operator()(const LevelIterator* a, const LevelIterator* b); + + private: + bool _reverse; + OlapReaderStatistics* _stats; + }; + + typedef std::priority_queue, + LevelIteratorComparator> + MergeHeap; + // Iterate from rowset reader. This Iterator usually like a leaf node + class Level0Iterator : public LevelIterator { + public: + Level0Iterator(RowsetReaderSharedPtr rs_reader, Reader* reader); + + OLAPStatus init(); + + const RowCursor* current_row(bool* delete_flag) const; + + const RowCursor* current_row() const; + + int32_t version() const; + + OLAPStatus next(const RowCursor** row, bool* delete_flag); + + ~Level0Iterator(); + + private: + // refresh_current_row + OLAPStatus _refresh_current_row(); + + RowsetReaderSharedPtr _rs_reader; + const RowCursor* _current_row = nullptr; + bool _is_delete = false; + Reader* _reader = nullptr; + // point to rows inside `_row_block` + RowCursor _row_cursor; + RowBlock* _row_block = nullptr; + }; + // Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed) + class Level1Iterator : public LevelIterator { + public: + Level1Iterator(const std::vector& children, bool merge, bool reverse); + + OLAPStatus init(); + + const RowCursor* current_row(bool* delete_flag) const; + + const RowCursor* current_row() const; + + int32_t version() const; + + OLAPStatus next(const RowCursor** row, bool* delete_flag); + + ~Level1Iterator(); + + private: + inline OLAPStatus _merge_next(const RowCursor** row, bool* delete_flag); + inline OLAPStatus _normal_next(const RowCursor** row, bool* delete_flag); + + // each Level0Iterator corresponds to a rowset reader + const std::vector _children; + // point to the Level0Iterator containing the next output row. + // null when CollectIterator hasn't been initialized or reaches EOF. + LevelIterator* _cur_child = nullptr; + + // when `_merge == true`, rowset reader returns ordered rows and CollectIterator uses a priority queue to merge + // sort them. The output of CollectIterator is also ordered. + // When `_merge == false`, rowset reader returns *partial* ordered rows. CollectIterator simply returns all rows + // from the first rowset, the second rowset, .., the last rowset. The output of CollectorIterator is also + // *partially* ordered. + bool _merge = true; + bool _reverse = false; + // used when `_merge == true` + std::unique_ptr _heap; + // used when `_merge == false` + int _child_idx = 0; + }; + + std::unique_ptr _inner_iter; + + // each LevelIterator corresponds to a rowset reader + std::vector _children; + + bool _merge = true; + bool _reverse = false; + + // Hold reader point to access read params, such as fetch conditions. + Reader* _reader = nullptr; + std::vector _rs_readers; + +}; + +} // namespace doris diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 8a8e96ff4edbfc..43598e5689de4d 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -19,6 +19,7 @@ #include +#include "olap/collect_iterator.h" #include "olap/comparison_predicate.h" #include "olap/in_list_predicate.h" #include "olap/null_predicate.h" @@ -39,261 +40,54 @@ using std::vector; namespace doris { -class CollectIterator { -public: - ~CollectIterator(); - - // Hold reader point to get reader params - void init(Reader* reader); - - OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); - - // Get top row of the heap, nullptr if reach end. - const RowCursor* current_row(bool* delete_flag) const { - if (_cur_child != nullptr) { - return _cur_child->current_row(delete_flag); - } - return nullptr; - } - - // Read next row into *row. - // Returns - // OLAP_SUCCESS when read successfully. - // OLAP_ERR_DATA_EOF and set *row to nullptr when EOF is reached. - // Others when error happens - OLAPStatus next(const RowCursor** row, bool* delete_flag) { - DCHECK(_cur_child != nullptr); - if (_merge) { - return _merge_next(row, delete_flag); - } else { - return _normal_next(row, delete_flag); - } +void ReaderParams::check_validation() const { + if (UNLIKELY(version.first == -1)) { + LOG(FATAL) << "version is not set. tablet=" << tablet->full_name(); } +} - // Clear the MergeSet element and reset state. - void clear(); - -private: - class ChildCtx { - public: - ChildCtx(RowsetReaderSharedPtr rs_reader, Reader* reader) - : _rs_reader(rs_reader), _is_delete(rs_reader->delete_flag()), _reader(reader) {} - - OLAPStatus init() { - auto res = _row_cursor.init(_reader->_tablet->tablet_schema(), _reader->_seek_columns); - if (res != OLAP_SUCCESS) { - LOG(WARNING) << "failed to init row cursor, res=" << res; - return res; - } - RETURN_NOT_OK(_refresh_current_row()); - return OLAP_SUCCESS; - } - - const RowCursor* current_row(bool* delete_flag) const { - *delete_flag = _is_delete || _current_row->is_delete(); - return _current_row; - } - - const RowCursor* current_row() const { return _current_row; } - - int32_t version() const { return _rs_reader->version().second; } - - OLAPStatus next(const RowCursor** row, bool* delete_flag) { - _row_block->pos_inc(); - auto res = _refresh_current_row(); - *row = _current_row; - *delete_flag = _is_delete; - if (_current_row != nullptr) { - *delete_flag = _is_delete || _current_row->is_delete(); - }; - return res; - } - - private: - // refresh_current_row - OLAPStatus _refresh_current_row() { - do { - if (_row_block != nullptr && _row_block->has_remaining()) { - size_t pos = _row_block->pos(); - _row_block->get_row(pos, &_row_cursor); - if (_row_block->block_status() == DEL_PARTIAL_SATISFIED && - _reader->_delete_handler.is_filter_data(_rs_reader->version().second, - _row_cursor)) { - _reader->_stats.rows_del_filtered++; - _row_block->pos_inc(); - continue; - } - _current_row = &_row_cursor; - return OLAP_SUCCESS; - } else { - auto res = _rs_reader->next_block(&_row_block); - if (res != OLAP_SUCCESS) { - _current_row = nullptr; - return res; - } - } - } while (_row_block != nullptr); - _current_row = nullptr; - return OLAP_ERR_DATA_EOF; - } +std::string ReaderParams::to_string() { + std::stringstream ss; + ss << "tablet=" << tablet->full_name() << " reader_type=" << reader_type + << " aggregation=" << aggregation << " version=" << version << " range=" << range + << " end_range=" << end_range; - RowsetReaderSharedPtr _rs_reader; - const RowCursor* _current_row = nullptr; - bool _is_delete = false; - Reader* _reader = nullptr; - - RowCursor _row_cursor; // point to rows inside `_row_block` - RowBlock* _row_block = nullptr; - }; - - // Compare row cursors between multiple merge elements, - // if row cursors equal, compare data version. - class ChildCtxComparator { - public: - ChildCtxComparator(const bool& revparam = false) { _reverse = revparam; } - bool operator()(const ChildCtx* a, const ChildCtx* b); - - private: - bool _reverse; - }; - - inline OLAPStatus _merge_next(const RowCursor** row, bool* delete_flag); - inline OLAPStatus _normal_next(const RowCursor** row, bool* delete_flag); - - // each ChildCtx corresponds to a rowset reader - std::vector _children; - // point to the ChildCtx containing the next output row. - // null when CollectIterator hasn't been initialized or reaches EOF. - ChildCtx* _cur_child = nullptr; - - // when `_merge == true`, rowset reader returns ordered rows and CollectIterator uses a priority queue to merge - // sort them. The output of CollectIterator is also ordered. - // When `_merge == false`, rowset reader returns *partial* ordered rows. CollectIterator simply returns all rows - // from the first rowset, the second rowset, .., the last rowset. The output of CollectorIterator is also - // *partially* ordered. - bool _merge = true; - // used when `_merge == true` - typedef std::priority_queue, ChildCtxComparator> MergeHeap; - std::unique_ptr _heap; - // used when `_merge == false` - int _child_idx = 0; - - // Hold reader point to access read params, such as fetch conditions. - Reader* _reader = nullptr; -}; - -CollectIterator::~CollectIterator() { - for (auto child : _children) { - delete child; + for (auto& key : start_key) { + ss << " keys=" << key; } -} -void CollectIterator::init(Reader* reader) { - _reader = reader; - // when aggregate is enabled or key_type is DUP_KEYS, we don't merge - // multiple data to aggregate for performance in user fetch - if (_reader->_reader_type == READER_QUERY && - (_reader->_aggregation || _reader->_tablet->keys_type() == KeysType::DUP_KEYS)) { - _merge = false; - _heap.reset(nullptr); - } else if (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS) { - _heap.reset(new MergeHeap(ChildCtxComparator(true))); - } else { - _heap.reset(new MergeHeap()); + for (auto& key : end_key) { + ss << " end_keys=" << key; } -} -OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { - std::unique_ptr child(new ChildCtx(rs_reader, _reader)); - RETURN_NOT_OK(child->init()); - if (child->current_row() == nullptr) { - return OLAP_SUCCESS; + for (auto& condition : conditions) { + ss << " conditions=" << apache::thrift::ThriftDebugString(condition); } - ChildCtx* child_ptr = child.release(); - _children.push_back(child_ptr); - if (_merge) { - _heap->push(child_ptr); - _cur_child = _heap->top(); - } else { - if (_cur_child == nullptr) { - _cur_child = _children[_child_idx]; - } - } - return OLAP_SUCCESS; + return ss.str(); } - -inline OLAPStatus CollectIterator::_merge_next(const RowCursor** row, bool* delete_flag) { - _heap->pop(); - auto res = _cur_child->next(row, delete_flag); - if (res == OLAP_SUCCESS) { - _heap->push(_cur_child); - _cur_child = _heap->top(); - } else if (res == OLAP_ERR_DATA_EOF) { - if (!_heap->empty()) { - _cur_child = _heap->top(); - } else { - _cur_child = nullptr; - return OLAP_ERR_DATA_EOF; - } - } else { - LOG(WARNING) << "failed to get next from child, res=" << res; - return res; +Reader::KeysParam::~KeysParam() { + for (auto start_key : start_keys) { + SAFE_DELETE(start_key); } - *row = _cur_child->current_row(delete_flag); - return OLAP_SUCCESS; -} -inline OLAPStatus CollectIterator::_normal_next(const RowCursor** row, bool* delete_flag) { - auto res = _cur_child->next(row, delete_flag); - if (LIKELY(res == OLAP_SUCCESS)) { - return OLAP_SUCCESS; - } else if (res == OLAP_ERR_DATA_EOF) { - // this child has been read, to read next - _child_idx++; - if (_child_idx < _children.size()) { - _cur_child = _children[_child_idx]; - *row = _cur_child->current_row(delete_flag); - return OLAP_SUCCESS; - } else { - _cur_child = nullptr; - return OLAP_ERR_DATA_EOF; - } - } else { - LOG(WARNING) << "failed to get next from child, res=" << res; - return res; + for (auto end_key : end_keys) { + SAFE_DELETE(end_key); } } -bool CollectIterator::ChildCtxComparator::operator()(const ChildCtx* a, const ChildCtx* b) { - // First compare row cursor. - const RowCursor* first = a->current_row(); - const RowCursor* second = b->current_row(); - int cmp_res = compare_row(*first, *second); - if (cmp_res != 0) { - return cmp_res > 0; - } - // if row cursors equal, compare data version. - // read data from higher version to lower version. - // for UNIQUE_KEYS just read the highest version and no need agg_update. - // for AGG_KEYS if a version is deleted, the lower version no need to agg_update - if (_reverse) { - return a->version() < b->version(); - } - return a->version() > b->version(); -} +std::string Reader::KeysParam::to_string() const { + std::stringstream ss; + ss << "range=" << range << " end_range=" << end_range; -void CollectIterator::clear() { - while (_heap != nullptr && !_heap->empty()) { - _heap->pop(); + for (auto start_key : start_keys) { + ss << " keys=" << start_key->to_string(); } - for (auto child : _children) { - delete child; + for (auto end_key : end_keys) { + ss << " end_keys=" << end_key->to_string(); } - // _children.swap(std::vector()); - _children.clear(); - _cur_child = nullptr; - _child_idx = 0; + + return ss.str(); } Reader::Reader() { @@ -326,18 +120,23 @@ OLAPStatus Reader::init(const ReaderParams& read_params) { return res; } - switch (_tablet->keys_type()) { - case KeysType::DUP_KEYS: + if (_rs_readers.size() == 1 && + !_rs_readers[0]->rowset()->rowset_meta()->is_segments_overlapping()) { _next_row_func = &Reader::_dup_key_next_row; - break; - case KeysType::UNIQUE_KEYS: - _next_row_func = &Reader::_unique_key_next_row; - break; - case KeysType::AGG_KEYS: - _next_row_func = &Reader::_agg_key_next_row; - break; - default: - break; + } else { + switch (_tablet->keys_type()) { + case KeysType::DUP_KEYS: + _next_row_func = &Reader::_dup_key_next_row; + break; + case KeysType::UNIQUE_KEYS: + _next_row_func = &Reader::_unique_key_next_row; + break; + case KeysType::AGG_KEYS: + _next_row_func = &Reader::_agg_key_next_row; + break; + default: + break; + } } DCHECK(_next_row_func != nullptr) << "No next row function for type:" << _tablet->keys_type(); @@ -555,7 +354,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { } _rs_readers.push_back(rs_reader); } - + _collect_iter->build_heap(); _next_key = _collect_iter->current_row(&_next_delete_flag); return OLAP_SUCCESS; } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 88edae1f049557..b4547341c96ea0 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -74,32 +74,9 @@ struct ReaderParams { RuntimeProfile* profile = nullptr; RuntimeState* runtime_state = nullptr; - void check_validation() const { - if (UNLIKELY(version.first == -1)) { - LOG(FATAL) << "version is not set. tablet=" << tablet->full_name(); - } - } - - std::string to_string() { - std::stringstream ss; - ss << "tablet=" << tablet->full_name() << " reader_type=" << reader_type - << " aggregation=" << aggregation << " version=" << version << " range=" << range - << " end_range=" << end_range; - - for (auto& key : start_key) { - ss << " keys=" << key; - } - - for (auto& key : end_key) { - ss << " end_keys=" << key; - } + void check_validation() const; - for (auto& condition : conditions) { - ss << " conditions=" << apache::thrift::ThriftDebugString(condition); - } - - return ss.str(); - } + std::string to_string(); }; class Reader { @@ -130,30 +107,9 @@ class Reader { private: struct KeysParam { - ~KeysParam() { - for (auto start_key : start_keys) { - SAFE_DELETE(start_key); - } - - for (auto end_key : end_keys) { - SAFE_DELETE(end_key); - } - } - - std::string to_string() const { - std::stringstream ss; - ss << "range=" << range << " end_range=" << end_range; - - for (auto start_key : start_keys) { - ss << " keys=" << start_key->to_string(); - } - - for (auto end_key : end_keys) { - ss << " end_keys=" << end_key->to_string(); - } - - return ss.str(); - } + ~KeysParam(); + + std::string to_string() const; std::string range; std::string end_range; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index ea6b96910e0db8..19540f21093eae 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -725,7 +725,7 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction( } if (now_ms - last_failure_ms <= config::min_compaction_failure_interval_sec * 1000) { - VLOG(1) << "Too often to check compaction, skip it." + VLOG(1) << "Too often to check compaction, skip it. " << "compaction_type=" << compaction_type_str << ", last_failure_time_ms=" << last_failure_ms << ", tablet_id=" << tablet_ptr->tablet_id();