Skip to content

Commit

Permalink
work with non-integer type
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <yan1579196623@gmail.com>
  • Loading branch information
Lloyd-Pottiger committed Jun 21, 2024
1 parent b2b7319 commit 63038b0
Show file tree
Hide file tree
Showing 16 changed files with 585 additions and 412 deletions.
128 changes: 71 additions & 57 deletions dbms/src/IO/Compression/CompressionCodecDeltaFOR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
#include <IO/Compression/CompressionCodecDeltaFOR.h>
#include <IO/Compression/CompressionCodecFOR.h>
#include <IO/Compression/CompressionInfo.h>
#include <IO/Compression/CompressionSettings.h>
#include <IO/Compression/EncodingUtil.h>
#include <common/likely.h>
#include <common/unaligned.h>
#include <lz4.h>

#include <magic_enum.hpp>

namespace DB
{
Expand All @@ -31,8 +34,8 @@ extern const int CANNOT_COMPRESS;
extern const int CANNOT_DECOMPRESS;
} // namespace ErrorCodes

CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(UInt8 bytes_size_)
: bytes_size(bytes_size_)
CompressionCodecDeltaFOR::CompressionCodecDeltaFOR(CompressionDataType data_type_)
: data_type(data_type_)
{}

UInt8 CompressionCodecDeltaFOR::getMethodByte() const
Expand All @@ -42,12 +45,22 @@ UInt8 CompressionCodecDeltaFOR::getMethodByte() const

UInt32 CompressionCodecDeltaFOR::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
/**
*|bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data|
*|1 bytes |bytes_size |sizeof(UInt8)|required size |
*/
const size_t count = uncompressed_size / bytes_size;
return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8);
switch (data_type)
{
case CompressionDataType::Int8:
case CompressionDataType::Int16:
case CompressionDataType::Int32:
case CompressionDataType::Int64:
{
// |bytes_of_original_type|frame_of_reference|width(bits) |bitpacked data|
// |1 bytes |bytes_size |sizeof(UInt8)|required size |
auto bytes_size = magic_enum::enum_integer(data_type);
const size_t count = uncompressed_size / bytes_size;
return 1 + bytes_size + sizeof(UInt8) + BitpackingPrimitives::getRequiredSize(count, bytes_size * 8);
}
default:
return 1 + LZ4_COMPRESSBOUND(uncompressed_size);
}
}

namespace
Expand All @@ -56,35 +69,44 @@ namespace
template <std::integral T>
UInt32 compressData(const char * source, UInt32 source_size, char * dest)
{
constexpr auto bytes_size = sizeof(T);
if unlikely (source_size % bytes_size != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size);
const auto count = source_size / sizeof(T);
DB::Compression::deltaEncoding<T>(reinterpret_cast<const T *>(source), count, reinterpret_cast<T *>(dest));
// Cast deltas to signed type to better compress negative values.
// For example, if we have a sequence of UInt8 values [3, 2, 1, 0], the deltas will be [3, -1, -1, -1]
// If we compress them as UInt8, we will get [3, 255, 255, 255], which is not optimal.
using TS = typename std::make_signed<T>::type;
return DB::CompressionCodecFOR::compressData<TS>(reinterpret_cast<TS *>(dest), count, dest);
return DB::CompressionCodecFOR::compressData<TS>(reinterpret_cast<TS *>(dest), source_size, dest);
}

} // namespace

UInt32 CompressionCodecDeltaFOR::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
if unlikely (source_size % bytes_size != 0)
throw Exception(ErrorCodes::CANNOT_DECOMPRESS, "source size {} is not aligned to {}", source_size, bytes_size);
dest[0] = bytes_size;
size_t start_pos = 1;
switch (bytes_size)
dest[0] = magic_enum::enum_integer(data_type);
dest += 1;
switch (data_type)
{
case 1:
return 1 + compressData<UInt8>(source, source_size, &dest[start_pos]);
case 2:
return 1 + compressData<UInt16>(source, source_size, &dest[start_pos]);
case 4:
return 1 + compressData<UInt32>(source, source_size, &dest[start_pos]);
case 8:
return 1 + compressData<UInt64>(source, source_size, &dest[start_pos]);
case CompressionDataType::Int8:
return 1 + compressData<UInt8>(source, source_size, dest);
case CompressionDataType::Int16:
return 1 + compressData<UInt16>(source, source_size, dest);
case CompressionDataType::Int32:
return 1 + compressData<UInt32>(source, source_size, dest);
case CompressionDataType::Int64:
return 1 + compressData<UInt64>(source, source_size, dest);
default:
throw Exception(ErrorCodes::CANNOT_COMPRESS, "Cannot compress DeltaFor-encoded data. Unsupported bytes size");
auto success = LZ4_compress_fast(
source,
dest,
source_size,
LZ4_COMPRESSBOUND(source_size),
CompressionSetting::getDefaultLevel(CompressionMethod::LZ4));
if (!success)
throw Exception("Cannot LZ4_compress_fast", ErrorCodes::CANNOT_COMPRESS);
return 1 + success;
}
}

Expand All @@ -103,76 +125,68 @@ void CompressionCodecDeltaFOR::doDecompressData(
return;

UInt8 bytes_size = source[0];
if unlikely (uncompressed_size % bytes_size != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"uncompressed size {} is not aligned to {}",
uncompressed_size,
bytes_size);
auto data_type = magic_enum::enum_cast<CompressionDataType>(bytes_size);
RUNTIME_CHECK(data_type.has_value());

UInt32 source_size_no_header = source_size - 1;
switch (bytes_size)
switch (data_type.value())
{
case 1:
case CompressionDataType::Int8:
DB::Compression::deltaFORDecoding<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
case CompressionDataType::Int16:
DB::Compression::deltaFORDecoding<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
case CompressionDataType::Int32:
DB::Compression::deltaFORDecoding<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
case CompressionDataType::Int64:
DB::Compression::deltaFORDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. Unsupported bytes size");
if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
break;
}
}

void CompressionCodecDeltaFOR::ordinaryDecompress(
const char * source,
UInt32 source_size,
char * dest,
UInt32 dest_size)
UInt32 uncompressed_size)
{
if unlikely (source_size < 2)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. File has wrong header");

if (dest_size == 0)
if (uncompressed_size == 0)
return;

UInt8 bytes_size = source[0];
if unlikely (dest_size % bytes_size != 0)
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"uncompressed size {} is not aligned to {}",
dest_size,
bytes_size);
auto data_type = magic_enum::enum_cast<CompressionDataType>(bytes_size);
RUNTIME_CHECK(data_type.has_value());

UInt32 source_size_no_header = source_size - 1;
switch (bytes_size)
switch (data_type.value())
{
case 1:
DB::Compression::ordinaryDeltaFORDecoding<UInt8>(&source[1], source_size_no_header, dest, dest_size);
case CompressionDataType::Int8:
DB::Compression::ordinaryDeltaFORDecoding<UInt8>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 2:
DB::Compression::ordinaryDeltaFORDecoding<UInt16>(&source[1], source_size_no_header, dest, dest_size);
case CompressionDataType::Int16:
DB::Compression::ordinaryDeltaFORDecoding<UInt16>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 4:
DB::Compression::ordinaryDeltaFORDecoding<UInt32>(&source[1], source_size_no_header, dest, dest_size);
case CompressionDataType::Int32:
DB::Compression::ordinaryDeltaFORDecoding<UInt32>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
case 8:
DB::Compression::ordinaryDeltaFORDecoding<UInt64>(&source[1], source_size_no_header, dest, dest_size);
case CompressionDataType::Int64:
DB::Compression::ordinaryDeltaFORDecoding<UInt64>(&source[1], source_size_no_header, dest, uncompressed_size);
break;
default:
throw Exception(
ErrorCodes::CANNOT_DECOMPRESS,
"Cannot decompress DeltaFor-encoded data. Unsupported bytes size");
if (unlikely(LZ4_decompress_safe(&source[1], dest, source_size_no_header, uncompressed_size) < 0))
throw Exception("Cannot LZ4_decompress_safe", ErrorCodes::CANNOT_DECOMPRESS);
break;
}
}

Expand Down
6 changes: 3 additions & 3 deletions dbms/src/IO/Compression/CompressionCodecDeltaFOR.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ namespace DB
class CompressionCodecDeltaFOR : public ICompressionCodec
{
public:
explicit CompressionCodecDeltaFOR(UInt8 bytes_size_);
explicit CompressionCodecDeltaFOR(CompressionDataType data_type_);

UInt8 getMethodByte() const override;

static void ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 dest_size);
static void ordinaryDecompress(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size);

#ifndef DBMS_PUBLIC_GTEST
protected:
Expand All @@ -42,7 +42,7 @@ class CompressionCodecDeltaFOR : public ICompressionCodec
bool isGenericCompression() const override { return false; }

private:
const UInt8 bytes_size;
const CompressionDataType data_type;
};

} // namespace DB
Loading

0 comments on commit 63038b0

Please sign in to comment.