Skip to content

Commit

Permalink
[enhancement][betarowset]optimize lz4 compress and decompress speed b…
Browse files Browse the repository at this point in the history
…y reusing context (#9566)
  • Loading branch information
xiaokang authored May 15, 2022
1 parent cfe22f8 commit e0c7900
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 58 deletions.
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Status ColumnReader::init() {
strings::Substitute("unsupported typeinfo, type=$0", _meta.type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info.get(), _meta.encoding(), &_encoding_info));
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));

for (int i = 0; i < _meta.indexes_size(); i++) {
auto& index_meta = _meta.indexes(i);
Expand Down Expand Up @@ -149,7 +149,7 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag
PageReadOptions opts;
opts.rblock = iter_opts.rblock;
opts.page_pointer = pp;
opts.codec = _compress_codec;
opts.codec = _compress_codec.get();
opts.stats = iter_opts.stats;
opts.verify_checksum = _opts.verify_checksum;
opts.use_page_cache = iter_opts.use_page_cache;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class ColumnReader {
TypeInfoPtr(nullptr, nullptr); // initialized in init(), may changed by subclasses.
const EncodingInfo* _encoding_info =
nullptr; // initialized in init(), used for create PageDecoder
const BlockCompressionCodec* _compress_codec = nullptr; // initialized in init()
std::unique_ptr<BlockCompressionCodec> _compress_codec; // initialized in init()

// meta for various column indexes (null if the index is absent)
const ZoneMapIndexPB* _zone_map_index_meta = nullptr;
Expand Down
8 changes: 4 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ ScalarColumnWriter::~ScalarColumnWriter() {
}

Status ScalarColumnWriter::init() {
RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), _compress_codec));

PageBuilder* page_builder = nullptr;

Expand Down Expand Up @@ -420,7 +420,7 @@ Status ScalarColumnWriter::write_data() {
footer.mutable_dict_page_footer()->set_encoding(PLAIN_ENCODING);

PagePointer dict_pp;
RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
_opts.compression_min_space_saving, _wblock,
{dict_body.slice()}, footer, &dict_pp));
dict_pp.to_proto(_opts.meta->mutable_dict_page());
Expand Down Expand Up @@ -508,8 +508,8 @@ Status ScalarColumnWriter::finish_current_page() {
}
// trying to compress page body
OwnedSlice compressed_body;
RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
body, &compressed_body));
RETURN_IF_ERROR(PageIO::compress_page_body(
_compress_codec.get(), _opts.compression_min_space_saving, body, &compressed_body));
if (compressed_body.slice().empty()) {
// page body is uncompressed
page->data.emplace_back(std::move(encoded_values));
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class ScalarColumnWriter final : public ColumnWriter {
PageHead _pages;
ordinal_t _first_rowid = 0;

const BlockCompressionCodec* _compress_codec = nullptr;
std::unique_ptr<BlockCompressionCodec> _compress_codec;

std::unique_ptr<OrdinalIndexWriter> _ordinal_index_builder;
std::unique_ptr<ZoneMapIndexWriter> _zone_map_index_builder;
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory) {
strings::Substitute("unsupported typeinfo, type=$0", _meta.data_type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), _compress_codec));
_value_key_coder = get_key_coder(_type_info->type());

std::unique_ptr<fs::ReadableBlock> rblock;
Expand Down Expand Up @@ -83,7 +83,7 @@ Status IndexedColumnReader::read_page(fs::ReadableBlock* rblock, const PagePoint
PageReadOptions opts;
opts.rblock = rblock;
opts.page_pointer = pp;
opts.codec = _compress_codec;
opts.codec = _compress_codec.get();
OlapReaderStatistics tmp_stats;
opts.stats = &tmp_stats;
opts.use_page_cache = _use_page_cache;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/indexed_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class IndexedColumnReader {

const TypeInfo* _type_info = nullptr;
const EncodingInfo* _encoding_info = nullptr;
const BlockCompressionCodec* _compress_codec = nullptr;
std::unique_ptr<BlockCompressionCodec> _compress_codec;
const KeyCoder* _value_key_coder = nullptr;
};

Expand Down
6 changes: 3 additions & 3 deletions be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ Status IndexedColumnWriter::init() {
}

if (_options.compression != NO_COMPRESSION) {
RETURN_IF_ERROR(get_block_compression_codec(_options.compression, &_compress_codec));
RETURN_IF_ERROR(get_block_compression_codec(_options.compression, _compress_codec));
}
return Status::OK();
}
Expand Down Expand Up @@ -110,7 +110,7 @@ Status IndexedColumnWriter::_finish_current_data_page() {
footer.mutable_data_page_footer()->set_num_values(num_values_in_page);
footer.mutable_data_page_footer()->set_nullmap_size(0);

RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec,
RETURN_IF_ERROR(PageIO::compress_and_write_page(_compress_codec.get(),
_options.compression_min_space_saving, _wblock,
{page_body.slice()}, footer, &_last_data_page));
_num_data_pages++;
Expand Down Expand Up @@ -159,7 +159,7 @@ Status IndexedColumnWriter::_flush_index(IndexPageBuilder* index_builder, BTreeM

PagePointer pp;
RETURN_IF_ERROR(PageIO::compress_and_write_page(
_compress_codec, _options.compression_min_space_saving, _wblock,
_compress_codec.get(), _options.compression_min_space_saving, _wblock,
{page_body.slice()}, page_footer, &pp));

meta->set_is_root_data_page(false);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/indexed_column_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class IndexedColumnWriter {
std::unique_ptr<IndexPageBuilder> _value_index_builder;
// encoder for value index's key
const KeyCoder* _value_key_coder;
const BlockCompressionCodec* _compress_codec;
std::unique_ptr<BlockCompressionCodec> _compress_codec;

DISALLOW_COPY_AND_ASSIGN(IndexedColumnWriter);
};
Expand Down
93 changes: 55 additions & 38 deletions be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,46 +88,46 @@ class Lz4BlockCompression : public BlockCompressionCodec {
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
class Lz4fBlockCompression : public BlockCompressionCodec {
public:
static const Lz4fBlockCompression* instance() {
static Lz4fBlockCompression s_instance;
return &s_instance;
}

~Lz4fBlockCompression() override {}
Status init() override {
auto ret1 = LZ4F_createCompressionContext(&ctx_c, LZ4F_VERSION);
if (LZ4F_isError(ret1)) {
return Status::InvalidArgument(strings::Substitute(
"Fail to LZ4F_createCompressionContext, msg=$0", LZ4F_getErrorName(ret1)));
}
ctx_c_inited = true;

Status compress(const Slice& input, Slice* output) const override {
auto compressed_len = LZ4F_compressFrame(output->data, output->size, input.data, input.size,
&_s_preferences);
if (LZ4F_isError(compressed_len)) {
auto ret2 = LZ4F_createDecompressionContext(&ctx_d, LZ4F_VERSION);
if (LZ4F_isError(ret2)) {
return Status::InvalidArgument(strings::Substitute(
"Fail to do LZ4F compress frame, msg=$0", LZ4F_getErrorName(compressed_len)));
"Fail to LZ4F_createDecompressionContext, msg=$0", LZ4F_getErrorName(ret2)));
}
output->size = compressed_len;
ctx_d_inited = true;

return Status::OK();
}

~Lz4fBlockCompression() override {
if (ctx_c_inited) LZ4F_freeCompressionContext(ctx_c);
if (ctx_d_inited) LZ4F_freeDecompressionContext(ctx_d);
}

Status compress(const Slice& input, Slice* output) const override {
std::vector<Slice> inputs {input};
return compress(inputs, output);
}

Status compress(const std::vector<Slice>& inputs, Slice* output) const override {
LZ4F_compressionContext_t ctx = nullptr;
auto lres = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
if (lres != 0) {
return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F compress, res=$0",
LZ4F_getErrorName(lres)));
}
auto st = _compress(ctx, inputs, output);
LZ4F_freeCompressionContext(ctx);
return st;
if (!ctx_c_inited)
return Status::InvalidArgument("LZ4F_createCompressionContext not sucess");

return _compress(ctx_c, inputs, output);
}

Status decompress(const Slice& input, Slice* output) const override {
LZ4F_decompressionContext_t ctx;
auto lres = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
if (LZ4F_isError(lres)) {
return Status::InvalidArgument(strings::Substitute("Fail to do LZ4F decompress, res=$0",
LZ4F_getErrorName(lres)));
}
auto st = _decompress(ctx, input, output);
LZ4F_freeDecompressionContext(ctx);
return st;
if (!ctx_d_inited)
return Status::InvalidArgument("LZ4F_createDecompressionContext not sucess");

return _decompress(ctx_d, input, output);
}

size_t max_compressed_len(size_t len) const override {
Expand Down Expand Up @@ -167,6 +167,8 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
}

Status _decompress(LZ4F_decompressionContext_t ctx, const Slice& input, Slice* output) const {
// reset decompression context to avoid ERROR_maxBlockSize_invalid
LZ4F_resetDecompressionContext(ctx);
size_t input_size = input.size;
auto lres =
LZ4F_decompress(ctx, output->data, &output->size, input.data, &input_size, nullptr);
Expand All @@ -187,6 +189,10 @@ class Lz4fBlockCompression : public BlockCompressionCodec {

private:
static LZ4F_preferences_t _s_preferences;
LZ4F_compressionContext_t ctx_c;
bool ctx_c_inited = false;
LZ4F_decompressionContext_t ctx_d;
bool ctx_d_inited = false;
};

LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
Expand Down Expand Up @@ -370,27 +376,38 @@ class ZlibBlockCompression : public BlockCompressionCodec {
};

Status get_block_compression_codec(segment_v2::CompressionTypePB type,
const BlockCompressionCodec** codec) {
std::unique_ptr<BlockCompressionCodec>& codec) {
BlockCompressionCodec* ptr = nullptr;
switch (type) {
case segment_v2::CompressionTypePB::NO_COMPRESSION:
*codec = nullptr;
break;
codec.reset(nullptr);
return Status::OK();
case segment_v2::CompressionTypePB::SNAPPY:
*codec = SnappyBlockCompression::instance();
ptr = new SnappyBlockCompression();
break;
case segment_v2::CompressionTypePB::LZ4:
*codec = Lz4BlockCompression::instance();
ptr = new Lz4BlockCompression();
break;
case segment_v2::CompressionTypePB::LZ4F:
*codec = Lz4fBlockCompression::instance();
ptr = new Lz4fBlockCompression();
break;
case segment_v2::CompressionTypePB::ZLIB:
*codec = ZlibBlockCompression::instance();
ptr = new ZlibBlockCompression();
break;
default:
return Status::NotFound(strings::Substitute("unknown compression type($0)", type));
}
return Status::OK();

if (!ptr) return Status::NotFound("Failed to create compression codec");

Status st = ptr->init();
if (st.ok()) {
codec.reset(ptr);
} else {
delete ptr;
}

return st;
}

} // namespace doris
4 changes: 3 additions & 1 deletion be/src/util/block_compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class BlockCompressionCodec {
public:
virtual ~BlockCompressionCodec() {}

virtual Status init() { return Status::OK(); }

// This function will compress input data into output.
// output should be preallocated, and its capacity must be large enough
// for compressed input, which can be get through max_compressed_len function.
Expand Down Expand Up @@ -61,6 +63,6 @@ class BlockCompressionCodec {
//
// Return not OK, if error happens.
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
const BlockCompressionCodec** codec);
std::unique_ptr<BlockCompressionCodec>& codec);

} // namespace doris
8 changes: 4 additions & 4 deletions be/test/util/block_compression_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ static std::string generate_str(size_t len) {
}

void test_single_slice(segment_v2::CompressionTypePB type) {
const BlockCompressionCodec* codec = nullptr;
auto st = get_block_compression_codec(type, &codec);
std::unique_ptr<BlockCompressionCodec> codec;
auto st = get_block_compression_codec(type, codec);
EXPECT_TRUE(st.ok());

size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
Expand Down Expand Up @@ -104,8 +104,8 @@ TEST_F(BlockCompressionTest, single) {
}

void test_multi_slices(segment_v2::CompressionTypePB type) {
const BlockCompressionCodec* codec = nullptr;
auto st = get_block_compression_codec(type, &codec);
std::unique_ptr<BlockCompressionCodec> codec;
auto st = get_block_compression_codec(type, codec);
EXPECT_TRUE(st.ok());

size_t test_sizes[] = {0, 1, 10, 1000, 1000000};
Expand Down

0 comments on commit e0c7900

Please sign in to comment.