Skip to content

Commit

Permalink
feat: implement columnar encoding for integer
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Aug 23, 2023
1 parent 0f938df commit 591c55b
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 61 deletions.
80 changes: 50 additions & 30 deletions components/codec/src/columnar/int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,68 @@
// limitations under the License.

use bytes_ext::{Buf, BufMut};
use snafu::ResultExt;
use snafu::{ensure, ResultExt};

use crate::{
columnar::{
DecodeContext, Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder, ValuesEncoderImpl,
Varint,
DecodeContext, InvalidVersion, Result, ValuesDecoder, ValuesDecoderImpl, ValuesEncoder,
ValuesEncoderImpl, Varint,
},
varint,
};

/// The max number of the bytes used to store a varint encoding u64/i64.
const MAX_NUM_BYTES_OF_64VARINT: usize = 10;

impl ValuesEncoder<i32> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = i32>,
{
for v in values {
buf.put_i32(v);
const VERSION: u8 = 0;
const VERSION_SIZE: usize = 1;

macro_rules! impl_int_encoding {
($int_type: ty, $write_method: ident, $read_method: ident) => {
impl ValuesEncoder<$int_type> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = $int_type>,
{
for v in values {
buf.$write_method(v);
}

Ok(())
}
}

Ok(())
}
}

impl ValuesDecoder<i32> for ValuesDecoderImpl {
fn decode<B, F>(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut(i32) -> Result<()>,
{
while buf.remaining() > 0 {
let v = buf.get_i32();
f(v)?;
impl ValuesDecoder<$int_type> for ValuesDecoderImpl {
fn decode<B, F>(&self, _ctx: DecodeContext<'_>, buf: &mut B, mut f: F) -> Result<()>
where
B: Buf,
F: FnMut($int_type) -> Result<()>,
{
while buf.remaining() > 0 {
let v = buf.$read_method();
f(v)?;
}

Ok(())
}
}

Ok(())
}
};
}

impl_int_encoding!(i8, put_i8, get_i8);
impl_int_encoding!(u8, put_u8, get_u8);
impl_int_encoding!(u16, put_u16, get_u16);
impl_int_encoding!(i16, put_i16, get_i16);
impl_int_encoding!(u32, put_u32, get_u32);
impl_int_encoding!(i32, put_i32, get_i32);

impl ValuesEncoder<i64> for ValuesEncoderImpl {
fn encode<B, I>(&self, buf: &mut B, values: I) -> Result<()>
where
B: BufMut,
I: Iterator<Item = i64>,
{
buf.put_u8(VERSION);
for v in values {
varint::encode_varint(buf, v).context(Varint)?;
}
Expand All @@ -74,7 +88,7 @@ impl ValuesEncoder<i64> for ValuesEncoderImpl {
{
let (lower, higher) = values.size_hint();
let num = lower.max(higher.unwrap_or_default());
num * MAX_NUM_BYTES_OF_64VARINT
num * MAX_NUM_BYTES_OF_64VARINT + VERSION_SIZE
}
}

Expand All @@ -84,6 +98,9 @@ impl ValuesDecoder<i64> for ValuesDecoderImpl {
B: Buf,
F: FnMut(i64) -> Result<()>,
{
let version = buf.get_u8();
ensure!(version == VERSION, InvalidVersion { version });

while buf.remaining() > 0 {
let v = varint::decode_varint(buf).context(Varint)?;
f(v)?;
Expand Down Expand Up @@ -112,7 +129,7 @@ impl ValuesEncoder<u64> for ValuesEncoderImpl {
{
let (lower, higher) = values.size_hint();
let num = lower.max(higher.unwrap_or_default());
num * MAX_NUM_BYTES_OF_64VARINT
num * MAX_NUM_BYTES_OF_64VARINT + VERSION_SIZE
}
}

Expand All @@ -122,6 +139,9 @@ impl ValuesDecoder<u64> for ValuesDecoderImpl {
B: Buf,
F: FnMut(u64) -> Result<()>,
{
let version = buf.get_u8();
ensure!(version == VERSION, InvalidVersion { version });

while buf.remaining() > 0 {
let v = varint::decode_uvarint(buf).context(Varint)?;
f(v)?;
Expand Down
124 changes: 93 additions & 31 deletions components/codec/src/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,25 @@ impl ColumnarEncoder {
DatumKind::UInt64 => {
enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_u64()))
}
DatumKind::UInt32 => todo!(),
DatumKind::UInt16 => todo!(),
DatumKind::UInt8 => todo!(),
DatumKind::UInt32 => {
enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_u32()))
}
DatumKind::UInt16 => {
enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_u16()))
}
DatumKind::UInt8 => {
enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_u8()))
}
DatumKind::Int64 => {
enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_i64()))
}
DatumKind::Int32 => {
enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_i32()))
}
DatumKind::Int16 => todo!(),
DatumKind::Int8 => todo!(),
DatumKind::Int16 => {
enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_i16()))
}
DatumKind::Int8 => enc.estimated_encoded_size(datums.clone().filter_map(|v| v.as_i8())),
DatumKind::Boolean => todo!(),
DatumKind::Date => todo!(),
DatumKind::Time => todo!(),
Expand Down Expand Up @@ -316,13 +324,13 @@ impl ColumnarEncoder {
datums.filter_map(|v| v.into_str().map(|v| v.as_bytes())),
),
DatumKind::UInt64 => enc.encode(buf, datums.filter_map(|v| v.as_u64())),
DatumKind::UInt32 => todo!(),
DatumKind::UInt16 => todo!(),
DatumKind::UInt8 => todo!(),
DatumKind::UInt32 => enc.encode(buf, datums.filter_map(|v| v.as_u32())),
DatumKind::UInt16 => enc.encode(buf, datums.filter_map(|v| v.as_u16())),
DatumKind::UInt8 => enc.encode(buf, datums.filter_map(|v| v.as_u8())),
DatumKind::Int64 => enc.encode(buf, datums.filter_map(|v| v.as_i64())),
DatumKind::Int32 => enc.encode(buf, datums.filter_map(|v| v.as_i32())),
DatumKind::Int16 => todo!(),
DatumKind::Int8 => todo!(),
DatumKind::Int16 => enc.encode(buf, datums.filter_map(|v| v.as_i16())),
DatumKind::Int8 => enc.encode(buf, datums.filter_map(|v| v.as_i8())),
DatumKind::Boolean => todo!(),
DatumKind::Date => todo!(),
DatumKind::Time => todo!(),
Expand Down Expand Up @@ -454,9 +462,27 @@ impl ColumnarDecoder {
};
ValuesDecoderImpl.decode(ctx, buf, with_u64)
}
DatumKind::UInt32 => todo!(),
DatumKind::UInt16 => todo!(),
DatumKind::UInt8 => todo!(),
DatumKind::UInt32 => {
let with_u32 = |value: u32| {
let datum = Datum::from(value);
f(datum)
};
ValuesDecoderImpl.decode(ctx, buf, with_u32)
}
DatumKind::UInt16 => {
let with_u16 = |value: u16| {
let datum = Datum::from(value);
f(datum)
};
ValuesDecoderImpl.decode(ctx, buf, with_u16)
}
DatumKind::UInt8 => {
let with_u8 = |value: u8| {
let datum = Datum::from(value);
f(datum)
};
ValuesDecoderImpl.decode(ctx, buf, with_u8)
}
DatumKind::Int64 => {
let with_i64 = |value: i64| {
let datum = Datum::from(value);
Expand All @@ -468,8 +494,20 @@ impl ColumnarDecoder {
let with_i32 = |v: i32| f(Datum::from(v));
ValuesDecoderImpl.decode(ctx, buf, with_i32)
}
DatumKind::Int16 => todo!(),
DatumKind::Int8 => todo!(),
DatumKind::Int16 => {
let with_i16 = |value: i16| {
let datum = Datum::from(value);
f(datum)
};
ValuesDecoderImpl.decode(ctx, buf, with_i16)
}
DatumKind::Int8 => {
let with_i8 = |value: i8| {
let datum = Datum::from(value);
f(datum)
};
ValuesDecoderImpl.decode(ctx, buf, with_i8)
}
DatumKind::Boolean => todo!(),
DatumKind::Date => todo!(),
DatumKind::Time => todo!(),
Expand Down Expand Up @@ -510,20 +548,44 @@ mod tests {
}

#[test]
fn test_i32() {
let datums = vec![
Datum::from(10i32),
Datum::from(1i32),
Datum::from(2i32),
Datum::from(18i32),
Datum::from(38i32),
Datum::from(48i32),
Datum::from(80i32),
Datum::from(81i32),
Datum::from(82i32),
];
fn test_small_int() {
let datums = vec![10u32, 1u32, 2u32, 81u32, 82u32];

check_encode_end_decode(10, datums, DatumKind::Int32);
check_encode_end_decode(
10,
datums.iter().map(|v| Datum::from(*v)).collect(),
DatumKind::UInt32,
);

check_encode_end_decode(
10,
datums.iter().map(|v| Datum::from(*v as i32)).collect(),
DatumKind::Int32,
);

check_encode_end_decode(
10,
datums.iter().map(|v| Datum::from(*v as u16)).collect(),
DatumKind::UInt16,
);

check_encode_end_decode(
10,
datums.iter().map(|v| Datum::from(*v as i16)).collect(),
DatumKind::Int16,
);

check_encode_end_decode(
10,
datums.iter().map(|v| Datum::from(*v as i8)).collect(),
DatumKind::Int8,
);

check_encode_end_decode(
10,
datums.iter().map(|v| Datum::from(*v as u8)).collect(),
DatumKind::UInt8,
);
}

#[test]
Expand Down Expand Up @@ -580,9 +642,9 @@ mod tests {
Datum::from(18i64),
Datum::from(38i64),
Datum::from(48i64),
Datum::from(80i64),
Datum::from(81i64),
Datum::from(82i64),
Datum::from(-80i64),
Datum::from(-81i64),
Datum::from(-82i64),
];

check_encode_end_decode(10, datums, DatumKind::Int64);
Expand Down

0 comments on commit 591c55b

Please sign in to comment.