diff --git a/crates/fluss/src/metadata/datatype.rs b/crates/fluss/src/metadata/datatype.rs index dc1f4073..f1574665 100644 --- a/crates/fluss/src/metadata/datatype.rs +++ b/crates/fluss/src/metadata/datatype.rs @@ -682,11 +682,11 @@ impl Default for BytesType { } impl BytesType { - pub fn new() -> Self { + pub const fn new() -> Self { Self::with_nullable(true) } - pub fn with_nullable(nullable: bool) -> Self { + pub const fn with_nullable(nullable: bool) -> Self { Self { nullable } } @@ -859,6 +859,10 @@ impl RowType { self.fields.iter().position(|f| f.name == field_name) } + pub fn field_types(&self) -> impl Iterator + '_ { + self.fields.iter().map(|f| &f.data_type) + } + pub fn get_field_names(&self) -> Vec<&str> { self.fields.iter().map(|f| f.name.as_str()).collect() } @@ -931,7 +935,7 @@ impl DataTypes { DataType::Binary(BinaryType::new(length)) } - pub fn bytes() -> DataType { + pub const fn bytes() -> DataType { DataType::Bytes(BytesType::new()) } diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index fdd4ad73..6ead6427 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -321,8 +321,10 @@ impl Iterator for KvRecordIterator { #[cfg(test)] mod tests { use super::*; - use crate::metadata::KvFormat; + use crate::metadata::{DataTypes, KvFormat}; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder}; + use crate::row::binary::BinaryWriter; + use crate::row::compacted::CompactedRow; use bytes::{BufMut, BytesMut}; #[test] @@ -363,12 +365,13 @@ mod tests { let key1 = b"key1"; let mut value1_writer = CompactedRowWriter::new(1); value1_writer.write_bytes(&[1, 2, 3, 4, 5]); - builder.append_row(key1, Some(&value1_writer)).unwrap(); + + let data_types = &[DataTypes::bytes()]; + let row = &CompactedRow::from_bytes(data_types, value1_writer.buffer()); + builder.append_row(key1, Some(row)).unwrap(); let key2 = b"key2"; - builder - .append_row::(key2, None) - .unwrap(); + builder.append_row::(key2, None).unwrap(); let bytes = builder.build().unwrap(); diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index 773c7789..7d1a7972 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -317,13 +317,14 @@ impl Drop for KvRecordBatchBuilder { #[cfg(test)] mod tests { use super::*; - use crate::row::compacted::CompactedRowWriter; + use crate::metadata::{DataType, DataTypes}; + use crate::row::binary::BinaryWriter; + use crate::row::compacted::{CompactedRow, CompactedRowWriter}; // Helper function to create a CompactedRowWriter with a single bytes field for testing - fn create_test_row(data: &[u8]) -> CompactedRowWriter { - let mut writer = CompactedRowWriter::new(1); - writer.write_bytes(data); - writer + fn create_test_row(data: &[u8]) -> CompactedRow<'_> { + const DATA_TYPE: &[DataType] = &[DataTypes::bytes()]; + CompactedRow::from_bytes(DATA_TYPE, data) } #[test] @@ -349,10 +350,8 @@ mod tests { builder.append_row(key1, Some(&value1)).unwrap(); let key2 = b"key2"; - assert!(builder.has_room_for_row::(key2, None)); - builder - .append_row::(key2, None) - .unwrap(); + assert!(builder.has_room_for_row::(key2, None)); + builder.append_row::(key2, None).unwrap(); // Test close and build builder.close().unwrap(); @@ -373,11 +372,7 @@ mod tests { let value = create_test_row(b"value"); builder.append_row(b"key", Some(&value)).unwrap(); builder.abort(); - assert!( - builder - .append_row::(b"key2", None) - .is_err() - ); + assert!(builder.append_row::(b"key2", None).is_err()); assert!(builder.build().is_err()); assert!(builder.close().is_err()); @@ -386,11 +381,7 @@ mod tests { let value = create_test_row(b"value"); builder.append_row(b"key", Some(&value)).unwrap(); builder.close().unwrap(); - assert!( - builder - .append_row::(b"key2", None) - .is_err() - ); // Can't append after close + assert!(builder.append_row::(b"key2", None).is_err()); // Can't append after close assert!(builder.build().is_ok()); // But can still build } @@ -510,23 +501,26 @@ mod tests { row_writer1.write_int(42); row_writer1.write_string("hello"); + let data_types = &[DataTypes::int(), DataTypes::string()]; + let row1 = &CompactedRow::from_bytes(data_types, row_writer1.buffer()); + let key1 = b"key1"; - assert!(builder.has_room_for_row(key1, Some(&row_writer1))); - builder.append_row(key1, Some(&row_writer1)).unwrap(); + assert!(builder.has_room_for_row(key1, Some(row1))); + builder.append_row(key1, Some(row1)).unwrap(); // Create and append second record let mut row_writer2 = CompactedRowWriter::new(2); row_writer2.write_int(100); row_writer2.write_string("world"); + let row2 = &CompactedRow::from_bytes(data_types, row_writer2.buffer()); + let key2 = b"key2"; - builder.append_row(key2, Some(&row_writer2)).unwrap(); + builder.append_row(key2, Some(row2)).unwrap(); // Append a deletion record let key3 = b"key3"; - builder - .append_row::(key3, None) - .unwrap(); + builder.append_row::(key3, None).unwrap(); // Build and verify builder.close().unwrap(); @@ -567,15 +561,18 @@ mod tests { let mut row_writer = CompactedRowWriter::new(1); row_writer.write_int(42); + let data_types = &[DataTypes::int()]; + let row = &CompactedRow::from_bytes(data_types, row_writer.buffer()); + // INDEXED format should reject append_row let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED); - let result = indexed_builder.append_row(b"key", Some(&row_writer)); + let result = indexed_builder.append_row(b"key", Some(row)); assert!(result.is_err()); assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); // COMPACTED format should accept append_row let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); - let result = compacted_builder.append_row(b"key", Some(&row_writer)); + let result = compacted_builder.append_row(b"key", Some(row)); assert!(result.is_ok()); } } diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 481f9be5..9ff3b5ff 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -15,11 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::sync::OnceLock; - use crate::metadata::DataType; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; -use crate::row::{GenericRow, InternalRow}; +use crate::row::{BinaryRow, GenericRow, InternalRow}; +use std::sync::{Arc, OnceLock}; // Reference implementation: // https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java @@ -28,9 +27,9 @@ pub struct CompactedRow<'a> { arity: usize, size_in_bytes: usize, decoded_row: OnceLock>, - deserializer: CompactedRowDeserializer<'a>, + deserializer: Arc>, reader: CompactedRowReader<'a>, - data_types: &'a [DataType], + data: &'a [u8], } pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize { @@ -40,15 +39,25 @@ pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize { #[allow(dead_code)] impl<'a> CompactedRow<'a> { pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self { - let arity = data_types.len(); - let size = data.len(); + Self::deserialize( + Arc::new(CompactedRowDeserializer::new(data_types)), + data_types.len(), + data, + ) + } + + pub fn deserialize( + deserializer: Arc>, + arity: usize, + data: &'a [u8], + ) -> Self { Self { arity, - size_in_bytes: size, + size_in_bytes: data.len(), decoded_row: OnceLock::new(), - deserializer: CompactedRowDeserializer::new(data_types), - reader: CompactedRowReader::new(arity, data, 0, size), - data_types, + deserializer: Arc::clone(&deserializer), + reader: CompactedRowReader::new(arity, data, 0, data.len()), + data, } } @@ -62,6 +71,12 @@ impl<'a> CompactedRow<'a> { } } +impl BinaryRow for CompactedRow<'_> { + fn as_bytes(&self) -> &[u8] { + self.data + } +} + #[allow(dead_code)] impl<'a> InternalRow for CompactedRow<'a> { fn get_field_count(&self) -> usize { @@ -69,7 +84,7 @@ impl<'a> InternalRow for CompactedRow<'a> { } fn is_null_at(&self, pos: usize) -> bool { - self.data_types[pos].is_nullable() && self.reader.is_null_at(pos) + self.deserializer.get_data_types()[pos].is_nullable() && self.reader.is_null_at(pos) } fn get_boolean(&self, pos: usize) -> bool { @@ -120,6 +135,8 @@ impl<'a> InternalRow for CompactedRow<'a> { #[cfg(test)] mod tests { use super::*; + use crate::row::binary::BinaryWriter; + use crate::metadata::{ BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType, SmallIntType, StringType, TinyIntType, diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 5ec26089..9ce50952 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -21,17 +21,31 @@ use crate::{ row::{Datum, GenericRow, compacted::compacted_row_writer::CompactedRowWriter}, util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at}, }; +use std::borrow::Cow; use std::str::from_utf8; #[allow(dead_code)] +#[derive(Clone)] pub struct CompactedRowDeserializer<'a> { - schema: &'a [DataType], + schema: Cow<'a, [DataType]>, } #[allow(dead_code)] impl<'a> CompactedRowDeserializer<'a> { pub fn new(schema: &'a [DataType]) -> Self { - Self { schema } + Self { + schema: Cow::Borrowed(schema), + } + } + + pub fn new_from_owned(schema: Vec) -> Self { + Self { + schema: Cow::Owned(schema), + } + } + + pub fn get_data_types(&self) -> &[DataType] { + self.schema.as_ref() } pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> GenericRow<'a> { diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs b/crates/fluss/src/row/compacted/compacted_row_writer.rs index 63b32a3d..c130e94c 100644 --- a/crates/fluss/src/row/compacted/compacted_row_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -use bytes::{Bytes, BytesMut}; -use std::cmp; - -use crate::row::BinaryRow; +use crate::row::binary::BinaryWriter; use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes; use crate::util::varint::{write_unsigned_varint_to_slice, write_unsigned_varint_u64_to_slice}; +use bytes::{Bytes, BytesMut}; +use std::cmp; // Writer for CompactedRow // Reference implementation: @@ -51,11 +50,6 @@ impl CompactedRowWriter { } } - pub fn reset(&mut self) { - self.position = self.header_size_in_bytes; - self.buffer[..self.header_size_in_bytes].fill(0); - } - pub fn position(&self) -> usize { self.position } @@ -81,75 +75,78 @@ impl CompactedRowWriter { self.buffer[self.position..end].copy_from_slice(src); self.position = end; } +} +impl BinaryWriter for CompactedRowWriter { + fn reset(&mut self) { + self.position = self.header_size_in_bytes; + self.buffer[..self.header_size_in_bytes].fill(0); + } - pub fn set_null_at(&mut self, pos: usize) { + fn set_null_at(&mut self, pos: usize) { let byte_index = pos >> 3; let bit = pos & 7; debug_assert!(byte_index < self.header_size_in_bytes); self.buffer[byte_index] |= 1u8 << bit; } - pub fn write_boolean(&mut self, value: bool) { + fn write_boolean(&mut self, value: bool) { let b = if value { 1u8 } else { 0u8 }; self.write_raw(&[b]); } - pub fn write_byte(&mut self, value: u8) { + fn write_byte(&mut self, value: u8) { self.write_raw(&[value]); } - pub fn write_binary(&mut self, bytes: &[u8], length: usize) { - // TODO: currently, we encoding BINARY(length) as the same with BYTES, the length info can - // be omitted and the bytes length should be enforced in the future. - self.write_bytes(&bytes[..length.min(bytes.len())]); - } - - pub fn write_bytes(&mut self, value: &[u8]) { + fn write_bytes(&mut self, value: &[u8]) { let len_i32 = i32::try_from(value.len()).expect("byte slice too large to encode length as i32"); self.write_int(len_i32); self.write_raw(value); } - pub fn write_char(&mut self, value: &str, _length: usize) { + fn write_char(&mut self, value: &str, _length: usize) { // TODO: currently, we encoding CHAR(length) as the same with STRING, the length info can be // omitted and the bytes length should be enforced in the future. self.write_string(value); } - pub fn write_string(&mut self, value: &str) { + fn write_string(&mut self, value: &str) { self.write_bytes(value.as_ref()); } - pub fn write_short(&mut self, value: i16) { + fn write_short(&mut self, value: i16) { self.write_raw(&value.to_ne_bytes()); } - pub fn write_int(&mut self, value: i32) { + fn write_int(&mut self, value: i32) { self.ensure_capacity(Self::MAX_INT_SIZE); let bytes_written = write_unsigned_varint_to_slice(value as u32, &mut self.buffer[self.position..]); self.position += bytes_written; } - pub fn write_long(&mut self, value: i64) { + fn write_long(&mut self, value: i64) { self.ensure_capacity(Self::MAX_LONG_SIZE); let bytes_written = write_unsigned_varint_u64_to_slice(value as u64, &mut self.buffer[self.position..]); self.position += bytes_written; } - - pub fn write_float(&mut self, value: f32) { + fn write_float(&mut self, value: f32) { self.write_raw(&value.to_ne_bytes()); } - pub fn write_double(&mut self, value: f64) { + fn write_double(&mut self, value: f64) { self.write_raw(&value.to_ne_bytes()); } -} -impl BinaryRow for CompactedRowWriter { - fn as_bytes(&self) -> &[u8] { - self.buffer() + fn write_binary(&mut self, bytes: &[u8], length: usize) { + // TODO: currently, we encoding BINARY(length) as the same with BYTES, the length info can + // be omitted and the bytes length should be enforced in the future. + self.write_bytes(&bytes[..length.min(bytes.len())]); + } + + fn complete(&mut self) { + // do nothing } } diff --git a/crates/fluss/src/row/encode/compacted_row_encoder.rs b/crates/fluss/src/row/encode/compacted_row_encoder.rs new file mode 100644 index 00000000..fc39bb7a --- /dev/null +++ b/crates/fluss/src/row/encode/compacted_row_encoder.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Datum; +use crate::row::binary::{BinaryRowFormat, BinaryWriter, ValueWriter}; +use crate::row::compacted::{CompactedRow, CompactedRowDeserializer, CompactedRowWriter}; +use crate::row::encode::{BinaryRow, RowEncoder}; +use std::sync::Arc; + +#[allow(dead_code)] +pub struct CompactedRowEncoder<'a> { + arity: usize, + writer: CompactedRowWriter, + field_writers: Vec, + compacted_row_deserializer: Arc>, +} + +impl<'a> CompactedRowEncoder<'a> { + pub fn new(field_data_types: Vec) -> Result { + let field_writers = field_data_types + .iter() + .map(|d| ValueWriter::create_value_writer(d, Some(&BinaryRowFormat::Compacted))) + .collect::>>()?; + + Ok(Self { + arity: field_data_types.len(), + writer: CompactedRowWriter::new(field_data_types.len()), + field_writers, + compacted_row_deserializer: Arc::new(CompactedRowDeserializer::new_from_owned( + field_data_types, + )), + }) + } +} + +impl RowEncoder for CompactedRowEncoder<'_> { + fn start_new_row(&mut self) -> Result<()> { + self.writer.reset(); + Ok(()) + } + + fn encode_field(&mut self, pos: usize, value: Datum) -> Result<()> { + self.field_writers + .get(pos) + .ok_or_else(|| IllegalArgument { + message: format!( + "invalid position {} when attempting to encode value {}", + pos, value + ), + })? + .write_value(&mut self.writer, pos, &value) + } + + fn finish_row(&mut self) -> Result { + Ok(CompactedRow::deserialize( + Arc::clone(&self.compacted_row_deserializer), + self.arity, + self.writer.buffer(), + )) + } + + fn close(&mut self) -> Result<()> { + // do nothing + Ok(()) + } +} diff --git a/crates/fluss/src/row/encode/mod.rs b/crates/fluss/src/row/encode/mod.rs index 6c6eed99..34863aba 100644 --- a/crates/fluss/src/row/encode/mod.rs +++ b/crates/fluss/src/row/encode/mod.rs @@ -16,11 +16,13 @@ // under the License. mod compacted_key_encoder; +mod compacted_row_encoder; use crate::error::Result; -use crate::metadata::{DataLakeFormat, RowType}; -use crate::row::InternalRow; +use crate::metadata::{DataLakeFormat, DataType, KvFormat, RowType}; use crate::row::encode::compacted_key_encoder::CompactedKeyEncoder; +use crate::row::encode::compacted_row_encoder::CompactedRowEncoder; +use crate::row::{BinaryRow, Datum, InternalRow}; use bytes::Bytes; /// An interface for encoding key of row into bytes. @@ -62,3 +64,65 @@ impl dyn KeyEncoder { } } } + +/// An encoder to write [`BinaryRow`]. It's used to write row +/// multi-times one by one. When writing a new row: +/// +/// 1. call method [`RowEncoder::start_new_row()`] to start the writing. +/// 2. call method [`RowEncoder::encode_field()`] to write the row's field. +/// 3. call method [`RowEncoder::finishRow()`] to finish the writing and get the written row. +#[allow(dead_code)] +pub trait RowEncoder { + /// Start to write a new row. + /// + /// # Returns + /// * Ok(()) if successful + fn start_new_row(&mut self) -> Result<()>; + + /// Write the row's field in given pos with given value. + /// + /// # Arguments + /// * pos - the position of the field to write. + /// * value - the value of the field to write. + /// + /// # Returns + /// * Ok(()) if successful + fn encode_field(&mut self, pos: usize, value: Datum) -> Result<()>; + + /// Finish write the row, returns the written row. + /// + /// Note that returned row borrows from [`RowEncoder`]'s internal buffer which is reused for subsequent rows + /// [`RowEncoder::start_new_row()`] should only be called after the returned row goes out of scope. + /// + /// # Returns + /// * the written row + fn finish_row(&mut self) -> Result; + + /// Closes the row encoder + /// + /// # Returns + /// * Ok(()) if successful + fn close(&mut self) -> Result<()>; +} + +#[allow(dead_code)] +pub struct RowEncoderFactory {} + +#[allow(dead_code)] +impl RowEncoderFactory { + pub fn create(kv_format: KvFormat, row_type: &RowType) -> Result { + Self::create_for_field_types(kv_format, row_type.field_types().cloned().collect()) + } + + pub fn create_for_field_types( + kv_format: KvFormat, + field_data_types: Vec, + ) -> Result { + match kv_format { + KvFormat::INDEXED => { + todo!() + } + KvFormat::COMPACTED => CompactedRowEncoder::new(field_data_types), + } + } +} diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 144d64fd..49960635 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -19,7 +19,7 @@ mod column; mod datum; -mod binary; +pub mod binary; pub mod compacted; mod encode; mod field_getter; @@ -27,7 +27,7 @@ mod field_getter; pub use column::*; pub use datum::*; -pub trait BinaryRow { +pub trait BinaryRow: InternalRow { /// Returns the binary representation of this row as a byte slice. fn as_bytes(&self) -> &[u8]; }