diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index d7e81cec15dfae..7aa92ea95d14e9 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -104,7 +104,6 @@ Status ColumnReader::init() { strings::Substitute("unsupported typeinfo, type=$0", _meta.type())); } RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), _meta.encoding(), &_encoding_info)); - RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec)); for (int i = 0; i < _meta.indexes_size(); i++) { auto& index_meta = _meta.indexes(i); @@ -144,12 +143,13 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) { } Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp, - PageHandle* handle, Slice* page_body, PageFooterPB* footer) { + PageHandle* handle, Slice* page_body, PageFooterPB* footer, + BlockCompressionCodec* codec) { iter_opts.sanity_check(); PageReadOptions opts; opts.rblock = iter_opts.rblock; opts.page_pointer = pp; - opts.codec = _compress_codec.get(); + opts.codec = codec; opts.stats = iter_opts.stats; opts.verify_checksum = _opts.verify_checksum; opts.use_page_cache = iter_opts.use_page_cache; @@ -465,6 +465,12 @@ Status ArrayFileColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, bool FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader) {} +Status FileColumnIterator::init(const ColumnIteratorOptions& opts) { + _opts = opts; + RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec)); + return Status::OK(); +} + FileColumnIterator::~FileColumnIterator() = default; Status FileColumnIterator::seek_to_first() { @@ -653,7 +659,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter) Slice page_body; PageFooterPB footer; _opts.type = DATA_PAGE; - RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer)); + RETURN_IF_ERROR(_reader->read_page(_opts, iter.page(), &handle, &page_body, &footer, + _compress_codec.get())); // parse data page RETURN_IF_ERROR(ParsedPage::create(std::move(handle), page_body, footer.data_page_footer(), _reader->encoding_info(), iter.page(), iter.page_index(), @@ -673,7 +680,8 @@ Status FileColumnIterator::_read_data_page(const OrdinalPageIndexIterator& iter) PageFooterPB dict_footer; _opts.type = INDEX_PAGE; RETURN_IF_ERROR(_reader->read_page(_opts, _reader->get_dict_page_pointer(), - &_dict_page_handle, &dict_data, &dict_footer)); + &_dict_page_handle, &dict_data, &dict_footer, + _compress_codec.get())); // ignore dict_footer.dict_page_footer().encoding() due to only // PLAIN_ENCODING is supported for dict page right now _dict_decoder = std::make_unique(dict_data); diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index e5e07a0871331d..de51b01824dbd5 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -103,7 +103,8 @@ class ColumnReader { // read a page from file into a page handle Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp, - PageHandle* handle, Slice* page_body, PageFooterPB* footer); + PageHandle* handle, Slice* page_body, PageFooterPB* footer, + BlockCompressionCodec* codec); bool is_nullable() const { return _meta.is_nullable(); } @@ -131,6 +132,8 @@ class ColumnReader { bool is_empty() const { return _num_rows == 0; } + CompressionTypePB get_compression() const { return _meta.compression(); } + private: ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows, FilePathDesc path_desc); @@ -175,7 +178,6 @@ class ColumnReader { TypeInfoPtr(nullptr, nullptr); // initialized in init(), may changed by subclasses. const EncodingInfo* _encoding_info = nullptr; // initialized in init(), used for create PageDecoder - std::unique_ptr _compress_codec; // initialized in init() // meta for various column indexes (null if the index is absent) const ZoneMapIndexPB* _zone_map_index_meta = nullptr; @@ -253,6 +255,8 @@ class FileColumnIterator final : public ColumnIterator { explicit FileColumnIterator(ColumnReader* reader); ~FileColumnIterator() override; + Status init(const ColumnIteratorOptions& opts) override; + Status seek_to_first() override; Status seek_to_ordinal(ordinal_t ord) override; @@ -285,6 +289,9 @@ class FileColumnIterator final : public ColumnIterator { private: ColumnReader* _reader; + // iterator owned compress codec, should NOT be shared by threads, initialized in init() + std::unique_ptr _compress_codec; + // 1. The _page represents current page. // 2. We define an operation is one seek and following read, // If new seek is issued, the _page will be reset. diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 955c3b96e3720b..69646c4d494739 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -37,7 +37,6 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) { strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type())); } RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info)); - RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec)); _value_key_coder = get_key_coder(_type_info->type()); std::unique_ptr rblock; @@ -72,18 +71,21 @@ Status IndexedColumnReader::load_index_page(fs::ReadableBlock* rblock, const Pag PageHandle* handle, IndexPageReader* reader) { Slice body; PageFooterPB footer; - RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE)); + std::unique_ptr local_compress_codec; + RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), local_compress_codec)); + RETURN_IF_ERROR(read_page(rblock, PagePointer(pp), handle, &body, &footer, INDEX_PAGE, + local_compress_codec.get())); RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer())); return Status::OK(); } Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePointer& pp, PageHandle* handle, Slice* body, PageFooterPB* footer, - PageTypePB type) const { + PageTypePB type, BlockCompressionCodec* codec) const { PageReadOptions opts; opts.rblock = rblock; opts.page_pointer = pp; - opts.codec = _compress_codec.get(); + opts.codec = codec; OlapReaderStatistics tmp_stats; opts.stats = &tmp_stats; opts.use_page_cache = _use_page_cache; @@ -96,10 +98,15 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint /////////////////////////////////////////////////////////////////////////////// Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) { + // there is not init() for IndexedColumnIterator, so do it here + if (!_compress_codec.get()) + RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), _compress_codec)); + PageHandle handle; Slice body; PageFooterPB footer; - RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE)); + RETURN_IF_ERROR(_reader->read_page(_rblock.get(), pp, &handle, &body, &footer, DATA_PAGE, + _compress_codec.get())); // parse data page // note that page_index is not used in IndexedColumnIterator, so we pass 0 return ParsedPage::create(std::move(handle), body, footer.data_page_footer(), diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h index 6663ac10776911..ae86b064a6769a 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h @@ -52,7 +52,8 @@ class IndexedColumnReader { // read a page specified by `pp' from `file' into `handle' Status read_page(fs::ReadableBlock* rblock, const PagePointer& pp, PageHandle* handle, - Slice* body, PageFooterPB* footer, PageTypePB type) const; + Slice* body, PageFooterPB* footer, PageTypePB type, + BlockCompressionCodec* codec) const; int64_t num_values() const { return _num_values; } const EncodingInfo* encoding_info() const { return _encoding_info; } @@ -60,6 +61,8 @@ class IndexedColumnReader { bool support_ordinal_seek() const { return _meta.has_ordinal_index_meta(); } bool support_value_seek() const { return _meta.has_value_index_meta(); } + CompressionTypePB get_compression() const { return _meta.compression(); } + private: Status load_index_page(fs::ReadableBlock* rblock, const PagePointerPB& pp, PageHandle* handle, IndexPageReader* reader); @@ -84,7 +87,6 @@ class IndexedColumnReader { const TypeInfo* _type_info = nullptr; const EncodingInfo* _encoding_info = nullptr; - std::unique_ptr _compress_codec; const KeyCoder* _value_key_coder = nullptr; }; @@ -145,6 +147,8 @@ class IndexedColumnIterator { ordinal_t _current_ordinal = 0; // open file handle std::unique_ptr _rblock; + // iterator owned compress codec, should NOT be shared by threads, initialized before used + std::unique_ptr _compress_codec; }; } // namespace segment_v2 diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h index 7ad3f9ecb79ed4..ddda23a3ba9609 100644 --- a/be/src/util/block_compression.h +++ b/be/src/util/block_compression.h @@ -30,6 +30,9 @@ namespace doris { // This class only used to compress a block data, which means all data // should given when call compress or decompress. This class don't handle // stream compression. +// +// NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads +// class BlockCompressionCodec { public: virtual ~BlockCompressionCodec() {} @@ -59,7 +62,9 @@ class BlockCompressionCodec { // Get a BlockCompressionCodec through type. // Return Status::OK if a valid codec is found. If codec is null, it means it is // NO_COMPRESSION. If codec is not null, user can use it to compress/decompress -// data. And client doesn't have to release the codec. +// data. +// +// NOTICE!! BlockCompressionCodec is NOT thread safe, it should NOT be shared by threads // // Return not OK, if error happens. Status get_block_compression_codec(segment_v2::CompressionTypePB type,