Skip to content

Commit

Permalink
Support enable lightweight compression
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Jul 9, 2024
1 parent 1ad8a21 commit 5698e49
Show file tree
Hide file tree
Showing 34 changed files with 1,108 additions and 629 deletions.
1 change: 0 additions & 1 deletion dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <DataStreams/SortHelper.h>
#include <DataStreams/copyData.h>
#include <IO/Buffer/WriteBufferFromFile.h>
#include <IO/Compression/CompressedWriteBuffer.h>
#include <common/logger_useful.h>

namespace DB
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodecV1.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <DataTypes/DataTypeNullable.h>
#include <Flash/Coprocessor/CHBlockChunkCodecV1.h>
#include <IO/Buffer/ReadBufferFromString.h>
#include <IO/Compression/CompressionFactory.h>
#include <IO/Compression/CompressionCodecFactory.h>
#include <IO/Compression/CompressionInfo.h>

namespace DB
Expand Down Expand Up @@ -556,7 +556,7 @@ CHBlockChunkCodecV1::EncodeRes CHBlockChunkCodecV1::encode(std::string_view str,
assert(compression_method != CompressionMethod::NONE);

String compressed_buffer;
auto codec = CompressionFactory::create(CompressionSetting(compression_method));
auto codec = CompressionCodecFactory::create(CompressionSetting(compression_method));
compressed_buffer.resize(codec->getCompressedReserveSize(str.size()));
size_t compressed_size = codec->compress(str.data(), str.size(), compressed_buffer.data());
compressed_buffer.resize(compressed_size);
Expand All @@ -579,4 +579,4 @@ Block CHBlockChunkCodecV1::decode(const Block & header, std::string_view str)
return decodeCompression(header, istr);
}

} // namespace DB
} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/IO/Compression/CompressedReadBufferBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <IO/Buffer/BufferWithOwnMemory.h>
#include <IO/Buffer/ReadBuffer.h>
#include <IO/Compression/CompressedReadBufferBase.h>
#include <IO/Compression/CompressionFactory.h>
#include <IO/Compression/CompressionCodecFactory.h>
#include <IO/Compression/CompressionMethod.h>
#include <IO/WriteHelpers.h>
#include <city.h>
Expand All @@ -39,7 +39,7 @@ static void readHeaderAndGetCodec(const char * compressed_buffer, CompressionCod

if (!codec)
{
codec = CompressionFactory::createForDecompress(method_byte);
codec = CompressionCodecFactory::createForDecompress(method_byte);
}
else if (codec->getMethodByte() != method_byte)
{
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/Compression/CompressedWriteBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#include <Core/Types.h>
#include <IO/Compression/CompressedWriteBuffer.h>
#include <IO/Compression/CompressionFactory.h>
#include <IO/Compression/CompressionCodecFactory.h>
#include <IO/Compression/CompressionInfo.h>
#include <city.h>

Expand Down Expand Up @@ -50,7 +50,7 @@ CompressedWriteBuffer<add_legacy_checksum>::CompressedWriteBuffer(
: BufferWithOwnMemory<WriteBuffer>(buf_size)
, out(out_)
, compression_settings(compression_settings_)
, codec(CompressionFactory::create(compression_settings))
, codec(CompressionCodecFactory::create(compression_settings))
{}

template <bool add_legacy_checksum>
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/IO/Compression/CompressionCodecDeflateQpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <Common/config.h>
#if USE_QPL
#include <IO/Compression/CompressionCodecDeflateQpl.h>
#include <IO/Compression/CompressionFactory.h>
#include <IO/Compression/CompressionCodecFactory.h>
#include <IO/Compression/CompressionInfo.h>
#include <x86intrin.h>

Expand Down Expand Up @@ -366,4 +366,4 @@ void CompressionCodecDeflateQpl::doDecompressData(
}

} // namespace DB
#endif
#endif
47 changes: 14 additions & 33 deletions dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
#include <IO/Compression/CompressionCodecDeltaFOR.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>
#include <common/unaligned.h>
#include <lz4.h>

#include <magic_enum.hpp>

Expand All @@ -45,22 +43,11 @@ UInt8 CompressionCodecDeltaFOR::getMethodByte() const

UInt32 CompressionCodecDeltaFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
switch (data_type)
{
case CompressionDataType::Int8:
case CompressionDataType::Int16:
case CompressionDataType::Int32:
case CompressionDataType::Int64:
{
// |bytes_of_original_type|first_value|frame_of_reference|width(bits) |bitpacked data|
// |1 bytes |bytes_size |bytes_size |sizeof(UInt8)|required size |
auto bytes_size = magic_enum::enum_integer(data_type);
const size_t deltas_count = uncompressed_size / bytes_size - 1;
return 1 + bytes_size * 2 + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(deltas_count, bytes_size * 8);
}
default:
return 1 + LZ4_COMPRESSBOUND(uncompressed_size);
}
// |bytes_of_original_type|first_value|frame_of_reference|width(bits) |bitpacked data|
// |1 bytes |bytes_size |bytes_size |sizeof(UInt8)|required size |
auto bytes_size = magic_enum::enum_integer(data_type);
const size_t deltas_count = uncompressed_size / bytes_size - 1;
return 1 + bytes_size * 2 + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(deltas_count, bytes_size * 8);
}

namespace
Expand Down Expand Up @@ -104,15 +91,7 @@ UInt32 CompressionCodecDeltaFOR::doCompressData(const char * source, UInt32 sour
case CompressionDataType::Int64:
return 1 + compressData<UInt64>(source, source_size, dest);
default:
auto success = LZ4_compress_fast(
source,
dest,
source_size,
LZ4_COMPRESSBOUND(source_size),
CompressionSetting::getDefaultLevel(CompressionMethod::LZ4));
if (unlikely(!success))
throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS);
return 1 + success;
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Unsupported data type: {}", magic_enum::enum_name(data_type));
}
}

Expand Down Expand Up @@ -150,9 +129,10 @@ void CompressionCodecDeltaFOR::doDecompressData(
DB::Compression::deltaFORDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
break;
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Unsupported data type: {}",
magic_enum::enum_name(data_type.value()));
}
}

Expand Down Expand Up @@ -190,9 +170,10 @@ void CompressionCodecDeltaFOR::ordinaryDecompress(
DB::Compression::ordinaryDeltaFORDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
break;
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Unsupported data type: {}",
magic_enum::enum_name(data_type.value()));
}
}

Expand Down
40 changes: 10 additions & 30 deletions dbms/src/IO/Compression/CompressionCodecFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
#include <Common/Exception.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>
#include <common/unaligned.h>
#include <lz4.h>

#include <magic_enum.hpp>

Expand All @@ -45,22 +43,11 @@ UInt8 CompressionCodecFOR::getMethodByte() const

UInt32 CompressionCodecFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
switch (data_type)
{
case CompressionDataType::Int8:
case CompressionDataType::Int16:
case CompressionDataType::Int32:
case CompressionDataType::Int64:
{
// |bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data|
// |1 bytes |bytes_size |sizeof(UInt8)|required size |
auto bytes_size = magic_enum::enum_integer(data_type);
const size_t count = uncompressed_size / bytes_size;
return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8);
}
default:
return 1 + LZ4_COMPRESSBOUND(uncompressed_size);
}
// |bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data|
// |1 bytes |bytes_size |sizeof(UInt8)|required size |
auto bytes_size = magic_enum::enum_integer(data_type);
const size_t count = uncompressed_size / bytes_size;
return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8);
}

template <std::integral T>
Expand Down Expand Up @@ -93,15 +80,7 @@ UInt32 CompressionCodecFOR::doCompressData(const char * source, UInt32 source_si
case CompressionDataType::Int64:
return 1 + compressData<UInt64>(reinterpret_cast<const UInt64 *>(source), source_size, dest);
default:
auto success = LZ4_compress_fast(
source,
dest,
source_size,
LZ4_COMPRESSBOUND(source_size),
CompressionSetting::getDefaultLevel(CompressionMethod::LZ4));
if (unlikely(!success))
throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS);
return 1 + success;
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Unsupported data type: {}", magic_enum::enum_name(data_type));
}
}

Expand Down Expand Up @@ -137,9 +116,10 @@ void CompressionCodecFOR::doDecompressData(
DB::Compression::FORDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
break;
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Unsupported data type: {}",
magic_enum::enum_name(data_type.value()));
}
}

Expand Down
Loading

0 comments on commit 5698e49

Please sign in to comment.