From e0c790094c87325bd64649d8705e127730e9a728 Mon Sep 17 00:00:00 2001 From: Kang Date: Sun, 15 May 2022 21:18:32 +0800 Subject: [PATCH] [enhancement][betarowset]optimize lz4 compress and decompress speed by reusing context (#9566) --- .../olap/rowset/segment_v2/column_reader.cpp | 4 +- be/src/olap/rowset/segment_v2/column_reader.h | 2 +- .../olap/rowset/segment_v2/column_writer.cpp | 8 +- be/src/olap/rowset/segment_v2/column_writer.h | 2 +- .../segment_v2/indexed_column_reader.cpp | 4 +- .../rowset/segment_v2/indexed_column_reader.h | 2 +- .../segment_v2/indexed_column_writer.cpp | 6 +- .../rowset/segment_v2/indexed_column_writer.h | 2 +- be/src/util/block_compression.cpp | 93 +++++++++++-------- be/src/util/block_compression.h | 4 +- be/test/util/block_compression_test.cpp | 8 +- 11 files changed, 77 insertions(+), 58 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index cc88552f9c96f6..1b57d35269b74d 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -104,7 +104,7 @@ Status ColumnReader::init() { strings::Substitute("unsupported typeinfo, type=$0", _meta.type())); } RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), _meta.encoding(), &_encoding_info)); - RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec)); for (int i = 0; i < _meta.indexes_size(); i++) { auto& index_meta = _meta.indexes(i); @@ -149,7 +149,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag PageReadOptions opts; opts.rblock = iter_opts.rblock; opts.page_pointer = pp; - opts.codec = _compress_codec; + opts.codec = _compress_codec.get(); opts.stats = iter_opts.stats; opts.verify_checksum = _opts.verify_checksum; opts.use_page_cache = iter_opts.use_page_cache; diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 305ba5f8dc50d4..0e3f11c859d6f7 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -175,7 +175,7 @@ class ColumnReader { TypeInfoPtr(nullptr, nullptr); // initialized in init(), may changed by subclasses. const EncodingInfo* _encoding_info = nullptr; // initialized in init(), used for create PageDecoder - const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init() + std::unique_ptr _compress_codec; // initialized in init() // meta for various column indexes (null if the index is absent) const ZoneMapIndexPB* _zone_map_index_meta = nullptr; diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 5cb5140102af42..9a54b210c86fce 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -264,7 +264,7 @@ ScalarColumnWriter::~ScalarColumnWriter() { } Status ScalarColumnWriter::init() { - RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec)); PageBuilder* page_builder = nullptr; @@ -420,7 +420,7 @@ Status ScalarColumnWriter::write_data() { footer.mutable_dict_page_footer()->set_encoding(PLAIN_ENCODING); PagePointer dict_pp; - RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec, + RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(), _opts.compression_min_space_saving, _wblock, {dict_body.slice()}, footer, &dict_pp)); dict_pp.to_proto(_opts.meta->mutable_dict_page()); @@ -508,8 +508,8 @@ Status ScalarColumnWriter::finish_current_page() { } // trying to compress page body OwnedSlice compressed_body; - RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving, - body, &compressed_body)); + RETURN_IF_ERROR(PageIO::compress_page_body( + _compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body)); if (compressed_body.slice().empty()) { // page body is uncompressed page->data.emplace_back(std::move(encoded_values)); diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index 0cacbf8547d2f5..2f50ebf075744f 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -249,7 +249,7 @@ class ScalarColumnWriter final : public ColumnWriter { PageHead _pages; ordinal_t _first_rowid = 0; - const BlockCompressionCodec* _compress_codec = nullptr; + std::unique_ptr _compress_codec; std::unique_ptr _ordinal_index_builder; std::unique_ptr _zone_map_index_builder; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 4ebe0d3af34896..955c3b96e3720b 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -37,7 +37,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) { strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type())); } RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info)); - RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec)); _value_key_coder = get_key_coder(_type_info->type()); std::unique_ptr rblock; @@ -83,7 +83,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint PageReadOptions opts; opts.rblock = rblock; opts.page_pointer = pp; - opts.codec = _compress_codec; + opts.codec = _compress_codec.get(); OlapReaderStatistics tmp_stats; opts.stats = &tmp_stats; opts.use_page_cache = _use_page_cache; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h index 439c790e6021d7..6663ac10776911 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h @@ -84,7 +84,7 @@ class IndexedColumnReader { const TypeInfo* _type_info = nullptr; const EncodingInfo* _encoding_info = nullptr; - const BlockCompressionCodec* _compress_codec = nullptr; + std::unique_ptr _compress_codec; const KeyCoder* _value_key_coder = nullptr; }; diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp index d195b9f2c4376a..4c7259c90c6d44 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.cpp @@ -72,7 +72,7 @@ Status IndexedColumnWriter::init() { } if (_options.compression != NO_COMPRESSION) { - RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec)); + RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec)); } return Status::OK(); } @@ -110,7 +110,7 @@ Status IndexedColumnWriter::_finish_current_data_page() { footer.mutable_data_page_footer()->set_num_values(num_values_in_page); footer.mutable_data_page_footer()->set_nullmap_size(0); - RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec, + RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(), _options.compression_min_space_saving, _wblock, {page_body.slice()}, footer, &_last_data_page)); _num_data_pages++; @@ -159,7 +159,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM PagePointer pp; RETURN_IF_ERROR(PageIO::compress_and_write_page( - _compress_codec, _options.compression_min_space_saving, _wblock, + _compress_codec.get(), _options.compression_min_space_saving, _wblock, {page_body.slice()}, page_footer, &pp)); meta->set_is_root_data_page(false); diff --git a/be/src/olap/rowset/segment_v2/indexed_column_writer.h b/be/src/olap/rowset/segment_v2/indexed_column_writer.h index fc222388c6af73..285ba890b23d2e 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_writer.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_writer.h @@ -108,7 +108,7 @@ class IndexedColumnWriter { std::unique_ptr _value_index_builder; // encoder for value index's key const KeyCoder* _value_key_coder; - const BlockCompressionCodec* _compress_codec; + std::unique_ptr _compress_codec; DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter); }; diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index 1b0f8143e2dc52..a1ee74f047d123 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -88,46 +88,46 @@ class Lz4BlockCompression : public BlockCompressionCodec { // Used for LZ4 frame format, decompress speed is two times faster than LZ4. class Lz4fBlockCompression : public BlockCompressionCodec { public: - static const Lz4fBlockCompression* instance() { - static Lz4fBlockCompression s_instance; - return &s_instance; - } - - ~Lz4fBlockCompression() override {} + Status init() override { + auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION); + if (LZ4F_isError(ret1)) { + return Status::InvalidArgument(strings::Substitute( + "Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1))); + } + ctx_c_inited = true; - Status compress(const Slice& input, Slice* output) const override { - auto compressed_len = LZ4F_compressFrame(output->data, output->size, input.data, input.size, - &_s_preferences); - if (LZ4F_isError(compressed_len)) { + auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION); + if (LZ4F_isError(ret2)) { return Status::InvalidArgument(strings::Substitute( - "Fail to do LZ4F compress frame, msg=$0", LZ4F_getErrorName(compressed_len))); + "Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2))); } - output->size = compressed_len; + ctx_d_inited = true; + return Status::OK(); } + ~Lz4fBlockCompression() override { + if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c); + if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d); + } + + Status compress(const Slice& input, Slice* output) const override { + std::vector inputs {input}; + return compress(inputs, output); + } + Status compress(const std::vector& inputs, Slice* output) const override { - LZ4F_compressionContext_t ctx = nullptr; - auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); - if (lres != 0) { - return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F compress, res=$0", - LZ4F_getErrorName(lres))); - } - auto st = _compress(ctx, inputs, output); - LZ4F_freeCompressionContext(ctx); - return st; + if (!ctx_c_inited) + return Status::InvalidArgument("LZ4F_createCompressionContext not sucess"); + + return _compress(ctx_c, inputs, output); } Status decompress(const Slice& input, Slice* output) const override { - LZ4F_decompressionContext_t ctx; - auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION); - if (LZ4F_isError(lres)) { - return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F decompress, res=$0", - LZ4F_getErrorName(lres))); - } - auto st = _decompress(ctx, input, output); - LZ4F_freeDecompressionContext(ctx); - return st; + if (!ctx_d_inited) + return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess"); + + return _decompress(ctx_d, input, output); } size_t max_compressed_len(size_t len) const override { @@ -167,6 +167,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec { } Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const { + // reset decompression context to avoid ERROR_maxBlockSize_invalid + LZ4F_resetDecompressionContext(ctx); size_t input_size = input.size; auto lres = LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr); @@ -187,6 +189,10 @@ class Lz4fBlockCompression : public BlockCompressionCodec { private: static LZ4F_preferences_t _s_preferences; + LZ4F_compressionContext_t ctx_c; + bool ctx_c_inited = false; + LZ4F_decompressionContext_t ctx_d; + bool ctx_d_inited = false; }; LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { @@ -370,27 +376,38 @@ class ZlibBlockCompression : public BlockCompressionCodec { }; Status get_block_compression_codec(segment_v2::CompressionTypePB type, - const BlockCompressionCodec** codec) { + std::unique_ptr& codec) { + BlockCompressionCodec* ptr = nullptr; switch (type) { case segment_v2::CompressionTypePB::NO_COMPRESSION: - *codec = nullptr; - break; + codec.reset(nullptr); + return Status::OK(); case segment_v2::CompressionTypePB::SNAPPY: - *codec = SnappyBlockCompression::instance(); + ptr = new SnappyBlockCompression(); break; case segment_v2::CompressionTypePB::LZ4: - *codec = Lz4BlockCompression::instance(); + ptr = new Lz4BlockCompression(); break; case segment_v2::CompressionTypePB::LZ4F: - *codec = Lz4fBlockCompression::instance(); + ptr = new Lz4fBlockCompression(); break; case segment_v2::CompressionTypePB::ZLIB: - *codec = ZlibBlockCompression::instance(); + ptr = new ZlibBlockCompression(); break; default: return Status::NotFound(strings::Substitute("unknown compression type($0)", type)); } - return Status::OK(); + + if (!ptr) return Status::NotFound("Failed to create compression codec"); + + Status st = ptr->init(); + if (st.ok()) { + codec.reset(ptr); + } else { + delete ptr; + } + + return st; } } // namespace doris diff --git a/be/src/util/block_compression.h b/be/src/util/block_compression.h index ff251137937e01..7ad3f9ecb79ed4 100644 --- a/be/src/util/block_compression.h +++ b/be/src/util/block_compression.h @@ -34,6 +34,8 @@ class BlockCompressionCodec { public: virtual ~BlockCompressionCodec() {} + virtual Status init() { return Status::OK(); } + // This function will compress input data into output. // output should be preallocated, and its capacity must be large enough // for compressed input, which can be get through max_compressed_len function. @@ -61,6 +63,6 @@ class BlockCompressionCodec { // // Return not OK, if error happens. Status get_block_compression_codec(segment_v2::CompressionTypePB type, - const BlockCompressionCodec** codec); + std::unique_ptr& codec); } // namespace doris diff --git a/be/test/util/block_compression_test.cpp b/be/test/util/block_compression_test.cpp index cec2ac48dac775..0bf62e6d9d9a0b 100644 --- a/be/test/util/block_compression_test.cpp +++ b/be/test/util/block_compression_test.cpp @@ -42,8 +42,8 @@ static std::string generate_str(size_t len) { } void test_single_slice(segment_v2::CompressionTypePB type) { - const BlockCompressionCodec* codec = nullptr; - auto st = get_block_compression_codec(type, &codec); + std::unique_ptr codec; + auto st = get_block_compression_codec(type, codec); EXPECT_TRUE(st.ok()); size_t test_sizes[] = {0, 1, 10, 1000, 1000000}; @@ -104,8 +104,8 @@ TEST_F(BlockCompressionTest, single) { } void test_multi_slices(segment_v2::CompressionTypePB type) { - const BlockCompressionCodec* codec = nullptr; - auto st = get_block_compression_codec(type, &codec); + std::unique_ptr codec; + auto st = get_block_compression_codec(type, codec); EXPECT_TRUE(st.ok()); size_t test_sizes[] = {0, 1, 10, 1000, 1000000};