Skip to content

Commit

Permalink
refactor: move versioned meta readers to storages-common-table-meta
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed May 31, 2023
1 parent 7cd6246 commit 06a439b
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 81 deletions.
16 changes: 10 additions & 6 deletions src/query/storages/common/table-meta/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_storages_fuse::io::TableMetaLocationGenerator;
use common_storages_fuse::statistics::STATS_STRING_PREFIX_LEN;
use criterion::black_box;
use criterion::Criterion;
use storages_common_table_meta::meta::testing::Encoding;
use storages_common_table_meta::meta::testing::MetaEncoding;
use storages_common_table_meta::meta::BlockMeta;
use storages_common_table_meta::meta::ColumnMeta;
use storages_common_table_meta::meta::ColumnStatistics;
Expand Down Expand Up @@ -80,15 +80,17 @@ fn bench_encode(c: &mut Criterion) {
grp.bench_function("bincode-segment-serialization", |b| {
b.iter(|| {
let _ = segment_info
.to_bytes_with_encoding(Encoding::Bincode)
.to_bytes_with_encoding(MetaEncoding::Bincode)
.unwrap();
})
});

#[cfg(with_pot)]
grp.bench_function("pot-segment-serialization", |b| {
b.iter(|| {
let _ = segment_info.to_bytes_with_encoding(Encoding::Pot).unwrap();
let _ = segment_info
.to_bytes_with_encoding(MetaEncoding::Pot)
.unwrap();
})
});
}
Expand Down Expand Up @@ -152,12 +154,14 @@ fn bench_decode(c: &mut Criterion) {
});

#[cfg(with_pot)]
let segment_pot_bytes = segment_info.to_bytes_with_encoding(Encoding::Pot).unwrap();
let segment_pot_bytes = segment_info
.to_bytes_with_encoding(MetaEncoding::Pot)
.unwrap();
let segment_bincode_bytes = segment_info
.to_bytes_with_encoding(Encoding::Bincode)
.to_bytes_with_encoding(MetaEncoding::Bincode)
.unwrap();
let segment_msgpack_bytes = segment_info
.to_bytes_with_encoding(Encoding::MessagePack)
.to_bytes_with_encoding(MetaEncoding::MessagePack)
.unwrap();

grp.bench_function("bincode-segment-deserialization", |b| {
Expand Down
72 changes: 35 additions & 37 deletions src/query/storages/common/table-meta/src/meta/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,23 @@ use snap::raw::Encoder as SnapEncoder;
use zstd::Decoder as ZstdDecoder;
use zstd::Encoder as ZstdEncoder;

use crate::meta::MetaCompression;

#[repr(u8)]
#[derive(Default, Debug, Clone)]
pub enum Compression {
pub enum MetaCompression {
None = 0,
#[default]
Zstd = 1,
Snappy = 2,
}

impl TryFrom<u8> for Compression {
impl TryFrom<u8> for MetaCompression {
type Error = ErrorCode;

fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Compression::None),
1 => Ok(Compression::Zstd),
2 => Ok(Compression::Snappy),
0 => Ok(MetaCompression::None),
1 => Ok(MetaCompression::Zstd),
2 => Ok(MetaCompression::Snappy),
other => Err(ErrorCode::UnknownFormat(format!(
"unsupported compression: {}",
other
Expand All @@ -61,16 +59,16 @@ impl TryFrom<u8> for Compression {
}
}

pub fn compress(compression: &Compression, data: Vec<u8>) -> Result<Vec<u8>> {
pub fn compress(compression: &MetaCompression, data: Vec<u8>) -> Result<Vec<u8>> {
match compression {
Compression::None => Ok(data),
Compression::Zstd => {
MetaCompression::None => Ok(data),
MetaCompression::Zstd => {
let mut encoder = ZstdEncoder::new(Vec::new(), 0)?;
encoder.write_all(&data)?;
Ok(encoder.finish()?)
}
#[cfg(feature = "dev")]
Compression::Snappy => Ok(SnapEncoder::new()
MetaCompression::Snappy => Ok(SnapEncoder::new()
.compress_vec(&data)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?),
#[cfg(not(feature = "dev"))]
Expand All @@ -81,10 +79,10 @@ pub fn compress(compression: &Compression, data: Vec<u8>) -> Result<Vec<u8>> {
}
}

pub fn decompress(compression: &Compression, data: Vec<u8>) -> Result<Vec<u8>> {
pub fn decompress(compression: &MetaCompression, data: Vec<u8>) -> Result<Vec<u8>> {
match compression {
Compression::None => Ok(data),
Compression::Zstd => {
MetaCompression::None => Ok(data),
MetaCompression::Zstd => {
let mut decoder = ZstdDecoder::new(&data[..])?;
let mut decompressed_data = Vec::new();
decoder
Expand All @@ -93,7 +91,7 @@ pub fn decompress(compression: &Compression, data: Vec<u8>) -> Result<Vec<u8>> {
Ok(decompressed_data)
}
#[cfg(feature = "dev")]
Compression::Snappy => Ok(SnapDecoder::new()
MetaCompression::Snappy => Ok(SnapDecoder::new()
.decompress_vec(&data)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?),
#[cfg(not(feature = "dev"))]
Expand All @@ -106,20 +104,20 @@ pub fn decompress(compression: &Compression, data: Vec<u8>) -> Result<Vec<u8>> {

#[repr(u8)]
#[derive(Debug, Clone)]
pub enum Encoding {
pub enum MetaEncoding {
Bincode = 1,
MessagePack = 2,
Json = 3,
}

impl TryFrom<u8> for Encoding {
impl TryFrom<u8> for MetaEncoding {
type Error = ErrorCode;

fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
1 => Ok(Encoding::Bincode),
2 => Ok(Encoding::MessagePack),
3 => Ok(Encoding::Json),
1 => Ok(MetaEncoding::Bincode),
2 => Ok(MetaEncoding::MessagePack),
3 => Ok(MetaEncoding::Json),
other => Err(ErrorCode::UnknownFormat(format!(
"unsupported encoding: {}",
other
Expand All @@ -128,48 +126,48 @@ impl TryFrom<u8> for Encoding {
}
}

impl Encoding {
impl MetaEncoding {
pub fn as_str(&self) -> &str {
match self {
Encoding::Bincode => "bincode",
Encoding::MessagePack => "messagepack",
Encoding::Json => "json",
MetaEncoding::Bincode => "bincode",
MetaEncoding::MessagePack => "messagepack",
MetaEncoding::Json => "json",
}
}
}

pub fn encode<T: Serialize>(encoding: &Encoding, data: &T) -> Result<Vec<u8>> {
pub fn encode<T: Serialize>(encoding: &MetaEncoding, data: &T) -> Result<Vec<u8>> {
match encoding {
Encoding::Bincode => {
MetaEncoding::Bincode => {
Ok(bincode::serialize(data).map_err(|e| Error::new(ErrorKind::InvalidData, e))?)
}
Encoding::MessagePack => {
MetaEncoding::MessagePack => {
// using to_vec_named to keep the format backward compatible
let bytes = rmp_serde::to_vec_named(&data)
.map_err(|e| Error::new(ErrorKind::InvalidData, e))?;
Ok(bytes)
}
Encoding::Json => Ok(serde_json::to_vec(&data)?),
MetaEncoding::Json => Ok(serde_json::to_vec(&data)?),
}
}

pub fn decode<'a, T: Deserialize<'a>>(encoding: &Encoding, data: &'a [u8]) -> Result<T> {
pub fn decode<'a, T: Deserialize<'a>>(encoding: &MetaEncoding, data: &'a [u8]) -> Result<T> {
match encoding {
Encoding::Bincode => {
MetaEncoding::Bincode => {
Ok(bincode::deserialize(data).map_err(|e| Error::new(ErrorKind::InvalidData, e))?)
}
Encoding::MessagePack => {
MetaEncoding::MessagePack => {
Ok(rmp_serde::from_slice(data).map_err(|e| Error::new(ErrorKind::InvalidData, e))?)
}
Encoding::Json => Ok(from_slice::<T>(data)?),
MetaEncoding::Json => Ok(from_slice::<T>(data)?),
}
}

pub fn read_and_deserialize<R, T>(
reader: &mut R,
size: u64,
encoding: &Encoding,
compression: &Compression,
encoding: &MetaEncoding,
compression: &MetaCompression,
) -> Result<T>
where
R: Read + Unpin + Send,
Expand All @@ -185,16 +183,16 @@ where

pub struct SegmentHeader {
pub version: u64,
pub encoding: Encoding,
pub compression: Compression,
pub encoding: MetaEncoding,
pub compression: MetaCompression,
pub blocks_size: u64,
pub summary_size: u64,
}

pub fn decode_segment_header<R>(reader: &mut R) -> Result<SegmentHeader>
where R: Read + Unpin + Send {
let version = reader.read_scalar::<u64>()?;
let encoding = Encoding::try_from(reader.read_scalar::<u8>()?)?;
let encoding = MetaEncoding::try_from(reader.read_scalar::<u8>()?)?;
let compression = MetaCompression::try_from(reader.read_scalar::<u8>()?)?;
let blocks_size: u64 = reader.read_scalar::<u64>()?;
let summary_size: u64 = reader.read_scalar::<u64>()?;
Expand Down
22 changes: 10 additions & 12 deletions src/query/storages/common/table-meta/src/meta/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub use compression::Compression;
// table meta types of current version
pub use current::*;
pub(crate) use format::load_json;
pub(crate) use format::Compression as MetaCompression;
pub(crate) use format::Encoding;
pub(crate) use format::MetaCompression;
pub(crate) use format::MetaEncoding;
pub use statistics::ClusterKey;
pub use statistics::ClusterStatistics;
pub use statistics::ColumnStatistics;
Expand All @@ -50,15 +50,13 @@ pub use versions::SnapshotVersion;
pub use versions::TableSnapshotStatisticsVersion;
pub use versions::Versioned;

// export legacy versioned table meta types publicly for testing purposes
// currently, only used by crate `test_kits`
// - export legacy versioned table meta types for testing purposes
// currently, only used by crate `test_kits`
// - export meta encoding to benchmarking tests
pub mod testing {
pub use v2::SegmentInfo as SegmentInfoV2;
pub use v2::TableSnapshot as TableSnapshotV2;
pub use v3::SegmentInfo as SegmentInfoV3;
pub use v3::TableSnapshot as TableSnapshotV3;

pub use super::format::Encoding;
use super::v2;
use super::v3;
pub use super::format::MetaEncoding;
pub use super::v2::SegmentInfo as SegmentInfoV2;
pub use super::v2::TableSnapshot as TableSnapshotV2;
pub use super::v3::SegmentInfo as SegmentInfoV3;
pub use super::v3::TableSnapshot as TableSnapshotV3;
}
6 changes: 3 additions & 3 deletions src/query/storages/common/table-meta/src/meta/v3/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::meta::format::read_and_deserialize;
use crate::meta::format::SegmentHeader;
use crate::meta::statistics::FormatVersion;
use crate::meta::v2::BlockMeta;
use crate::meta::Encoding;
use crate::meta::MetaEncoding;
use crate::meta::Statistics;
use crate::meta::Versioned;

Expand Down Expand Up @@ -69,8 +69,8 @@ impl SegmentInfo {
}

#[inline]
pub fn encoding() -> Encoding {
Encoding::Bincode
pub fn encoding() -> MetaEncoding {
MetaEncoding::Bincode
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/query/storages/common/table-meta/src/meta/v3/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use crate::meta::format::read_and_deserialize;
use crate::meta::statistics::FormatVersion;
use crate::meta::v2;
use crate::meta::ClusterKey;
use crate::meta::Encoding;
use crate::meta::Location;
use crate::meta::MetaCompression;
use crate::meta::MetaEncoding;
use crate::meta::SnapshotId;
use crate::meta::Statistics;
use crate::meta::Versioned;
Expand Down Expand Up @@ -83,15 +83,15 @@ impl TableSnapshot {
let mut cursor = Cursor::new(buffer);
let version = cursor.read_scalar::<u64>()?;
assert_eq!(version, TableSnapshot::VERSION);
let encoding = Encoding::try_from(cursor.read_scalar::<u8>()?)?;
let encoding = MetaEncoding::try_from(cursor.read_scalar::<u8>()?)?;
let compression = MetaCompression::try_from(cursor.read_scalar::<u8>()?)?;
let snapshot_size: u64 = cursor.read_scalar::<u64>()?;

read_and_deserialize(&mut cursor, snapshot_size, &encoding, &compression)
}
#[inline]
pub fn encoding() -> Encoding {
Encoding::Bincode
pub fn encoding() -> MetaEncoding {
MetaEncoding::Bincode
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/query/storages/common/table-meta/src/meta/v4/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use crate::meta::format::compress;
use crate::meta::format::decode_segment_header;
use crate::meta::format::encode;
use crate::meta::format::read_and_deserialize;
use crate::meta::format::Compression;
use crate::meta::format::MetaCompression;
use crate::meta::format::SegmentHeader;
use crate::meta::statistics::FormatVersion;
use crate::meta::v2::BlockMeta;
use crate::meta::Encoding;
use crate::meta::MetaEncoding;
use crate::meta::Statistics;
use crate::meta::Versioned;

Expand Down Expand Up @@ -74,16 +74,16 @@ impl SegmentInfo {
}

#[inline]
pub fn encoding() -> Encoding {
Encoding::MessagePack
pub fn encoding() -> MetaEncoding {
MetaEncoding::MessagePack
}

// Encode self.blocks as RawBlockMeta.
fn block_raw_bytes(&self) -> Result<RawBlockMeta> {
let encoding = Encoding::MessagePack;
let encoding = MetaEncoding::MessagePack;
let bytes = encode(&encoding, &self.blocks)?;

let compression = Compression::default();
let compression = MetaCompression::default();
let compressed = compress(&compression, bytes)?;

Ok(RawBlockMeta {
Expand Down Expand Up @@ -116,11 +116,11 @@ impl SegmentInfo {
/// A Result containing the serialized Segment data as a byte vector. If any errors occur during
/// encoding, compression, or writing to the byte vector, an error will be returned.
pub fn to_bytes(&self) -> Result<Vec<u8>> {
self.to_bytes_with_encoding(Encoding::MessagePack)
self.to_bytes_with_encoding(MetaEncoding::MessagePack)
}

pub fn to_bytes_with_encoding(&self, encoding: Encoding) -> Result<Vec<u8>> {
let compression = Compression::default();
pub fn to_bytes_with_encoding(&self, encoding: MetaEncoding) -> Result<Vec<u8>> {
let compression = MetaCompression::default();

let blocks = encode(&encoding, &self.blocks)?;
let blocks_compress = compress(&compression, blocks)?;
Expand Down Expand Up @@ -176,8 +176,8 @@ impl SegmentInfo {
#[derive(Clone)]
pub struct RawBlockMeta {
pub bytes: Vec<u8>,
pub encoding: Encoding,
pub compression: Compression,
pub encoding: MetaEncoding,
pub compression: MetaCompression,
}

#[derive(Clone)]
Expand Down
Loading

0 comments on commit 06a439b

Please sign in to comment.