Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions crates/fluss/src/metadata/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,11 @@ impl Default for BytesType {
}

impl BytesType {
pub fn new() -> Self {
pub const fn new() -> Self {
Self::with_nullable(true)
}

pub fn with_nullable(nullable: bool) -> Self {
pub const fn with_nullable(nullable: bool) -> Self {
Self { nullable }
}

Expand Down Expand Up @@ -859,6 +859,10 @@ impl RowType {
self.fields.iter().position(|f| f.name == field_name)
}

pub fn field_types(&self) -> impl Iterator<Item = &DataType> + '_ {
self.fields.iter().map(|f| &f.data_type)
}

pub fn get_field_names(&self) -> Vec<&str> {
self.fields.iter().map(|f| f.name.as_str()).collect()
}
Expand Down Expand Up @@ -931,7 +935,7 @@ impl DataTypes {
DataType::Binary(BinaryType::new(length))
}

pub fn bytes() -> DataType {
pub const fn bytes() -> DataType {
DataType::Bytes(BytesType::new())
}

Expand Down
13 changes: 8 additions & 5 deletions crates/fluss/src/record/kv/kv_record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,10 @@ impl Iterator for KvRecordIterator {
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata::KvFormat;
use crate::metadata::{DataTypes, KvFormat};
use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder};
use crate::row::binary::BinaryWriter;
use crate::row::compacted::CompactedRow;
use bytes::{BufMut, BytesMut};

#[test]
Expand Down Expand Up @@ -363,12 +365,13 @@ mod tests {
let key1 = b"key1";
let mut value1_writer = CompactedRowWriter::new(1);
value1_writer.write_bytes(&[1, 2, 3, 4, 5]);
builder.append_row(key1, Some(&value1_writer)).unwrap();

let data_types = &[DataTypes::bytes()];
let row = &CompactedRow::from_bytes(data_types, value1_writer.buffer());
builder.append_row(key1, Some(row)).unwrap();

let key2 = b"key2";
builder
.append_row::<CompactedRowWriter>(key2, None)
.unwrap();
builder.append_row::<CompactedRow>(key2, None).unwrap();

let bytes = builder.build().unwrap();

Expand Down
51 changes: 24 additions & 27 deletions crates/fluss/src/record/kv/kv_record_batch_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,13 +317,14 @@ impl Drop for KvRecordBatchBuilder {
#[cfg(test)]
mod tests {
use super::*;
use crate::row::compacted::CompactedRowWriter;
use crate::metadata::{DataType, DataTypes};
use crate::row::binary::BinaryWriter;
use crate::row::compacted::{CompactedRow, CompactedRowWriter};

// Helper function to create a CompactedRowWriter with a single bytes field for testing
fn create_test_row(data: &[u8]) -> CompactedRowWriter {
let mut writer = CompactedRowWriter::new(1);
writer.write_bytes(data);
writer
fn create_test_row(data: &[u8]) -> CompactedRow<'_> {
const DATA_TYPE: &[DataType] = &[DataTypes::bytes()];
CompactedRow::from_bytes(DATA_TYPE, data)
}

#[test]
Expand All @@ -349,10 +350,8 @@ mod tests {
builder.append_row(key1, Some(&value1)).unwrap();

let key2 = b"key2";
assert!(builder.has_room_for_row::<CompactedRowWriter>(key2, None));
builder
.append_row::<CompactedRowWriter>(key2, None)
.unwrap();
assert!(builder.has_room_for_row::<CompactedRow>(key2, None));
builder.append_row::<CompactedRow>(key2, None).unwrap();

// Test close and build
builder.close().unwrap();
Expand All @@ -373,11 +372,7 @@ mod tests {
let value = create_test_row(b"value");
builder.append_row(b"key", Some(&value)).unwrap();
builder.abort();
assert!(
builder
.append_row::<CompactedRowWriter>(b"key2", None)
.is_err()
);
assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err());
assert!(builder.build().is_err());
assert!(builder.close().is_err());

Expand All @@ -386,11 +381,7 @@ mod tests {
let value = create_test_row(b"value");
builder.append_row(b"key", Some(&value)).unwrap();
builder.close().unwrap();
assert!(
builder
.append_row::<CompactedRowWriter>(b"key2", None)
.is_err()
); // Can't append after close
assert!(builder.append_row::<CompactedRow>(b"key2", None).is_err()); // Can't append after close
assert!(builder.build().is_ok()); // But can still build
}

Expand Down Expand Up @@ -510,23 +501,26 @@ mod tests {
row_writer1.write_int(42);
row_writer1.write_string("hello");

let data_types = &[DataTypes::int(), DataTypes::string()];
let row1 = &CompactedRow::from_bytes(data_types, row_writer1.buffer());

let key1 = b"key1";
assert!(builder.has_room_for_row(key1, Some(&row_writer1)));
builder.append_row(key1, Some(&row_writer1)).unwrap();
assert!(builder.has_room_for_row(key1, Some(row1)));
builder.append_row(key1, Some(row1)).unwrap();

// Create and append second record
let mut row_writer2 = CompactedRowWriter::new(2);
row_writer2.write_int(100);
row_writer2.write_string("world");

let row2 = &CompactedRow::from_bytes(data_types, row_writer2.buffer());

let key2 = b"key2";
builder.append_row(key2, Some(&row_writer2)).unwrap();
builder.append_row(key2, Some(row2)).unwrap();

// Append a deletion record
let key3 = b"key3";
builder
.append_row::<CompactedRowWriter>(key3, None)
.unwrap();
builder.append_row::<CompactedRow>(key3, None).unwrap();

// Build and verify
builder.close().unwrap();
Expand Down Expand Up @@ -567,15 +561,18 @@ mod tests {
let mut row_writer = CompactedRowWriter::new(1);
row_writer.write_int(42);

let data_types = &[DataTypes::int()];
let row = &CompactedRow::from_bytes(data_types, row_writer.buffer());

// INDEXED format should reject append_row
let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED);
let result = indexed_builder.append_row(b"key", Some(&row_writer));
let result = indexed_builder.append_row(b"key", Some(row));
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput);

// COMPACTED format should accept append_row
let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED);
let result = compacted_builder.append_row(b"key", Some(&row_writer));
let result = compacted_builder.append_row(b"key", Some(row));
assert!(result.is_ok());
}
}
41 changes: 29 additions & 12 deletions crates/fluss/src/row/compacted/compacted_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::OnceLock;

use crate::metadata::DataType;
use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader};
use crate::row::{GenericRow, InternalRow};
use crate::row::{BinaryRow, GenericRow, InternalRow};
use std::sync::{Arc, OnceLock};

// Reference implementation:
// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/row/compacted/CompactedRow.java
Expand All @@ -28,9 +27,9 @@ pub struct CompactedRow<'a> {
arity: usize,
size_in_bytes: usize,
decoded_row: OnceLock<GenericRow<'a>>,
deserializer: CompactedRowDeserializer<'a>,
deserializer: Arc<CompactedRowDeserializer<'a>>,
reader: CompactedRowReader<'a>,
data_types: &'a [DataType],
data: &'a [u8],
}

pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
Expand All @@ -40,15 +39,25 @@ pub fn calculate_bit_set_width_in_bytes(arity: usize) -> usize {
#[allow(dead_code)]
impl<'a> CompactedRow<'a> {
pub fn from_bytes(data_types: &'a [DataType], data: &'a [u8]) -> Self {
let arity = data_types.len();
let size = data.len();
Self::deserialize(
Arc::new(CompactedRowDeserializer::new(data_types)),
data_types.len(),
data,
)
}

pub fn deserialize(
deserializer: Arc<CompactedRowDeserializer<'a>>,
arity: usize,
data: &'a [u8],
) -> Self {
Self {
arity,
size_in_bytes: size,
size_in_bytes: data.len(),
decoded_row: OnceLock::new(),
deserializer: CompactedRowDeserializer::new(data_types),
reader: CompactedRowReader::new(arity, data, 0, size),
data_types,
deserializer: Arc::clone(&deserializer),
reader: CompactedRowReader::new(arity, data, 0, data.len()),
data,
}
}

Expand All @@ -62,14 +71,20 @@ impl<'a> CompactedRow<'a> {
}
}

impl BinaryRow for CompactedRow<'_> {
fn as_bytes(&self) -> &[u8] {
self.data
}
}

#[allow(dead_code)]
impl<'a> InternalRow for CompactedRow<'a> {
fn get_field_count(&self) -> usize {
self.arity
}

fn is_null_at(&self, pos: usize) -> bool {
self.data_types[pos].is_nullable() && self.reader.is_null_at(pos)
self.deserializer.get_data_types()[pos].is_nullable() && self.reader.is_null_at(pos)
}

fn get_boolean(&self, pos: usize) -> bool {
Expand Down Expand Up @@ -120,6 +135,8 @@ impl<'a> InternalRow for CompactedRow<'a> {
#[cfg(test)]
mod tests {
use super::*;
use crate::row::binary::BinaryWriter;

use crate::metadata::{
BigIntType, BooleanType, BytesType, DoubleType, FloatType, IntType, SmallIntType,
StringType, TinyIntType,
Expand Down
18 changes: 16 additions & 2 deletions crates/fluss/src/row/compacted/compacted_row_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,31 @@ use crate::{
row::{Datum, GenericRow, compacted::compacted_row_writer::CompactedRowWriter},
util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at},
};
use std::borrow::Cow;
use std::str::from_utf8;

#[allow(dead_code)]
#[derive(Clone)]
pub struct CompactedRowDeserializer<'a> {
schema: &'a [DataType],
schema: Cow<'a, [DataType]>,
}

#[allow(dead_code)]
impl<'a> CompactedRowDeserializer<'a> {
pub fn new(schema: &'a [DataType]) -> Self {
Self { schema }
Self {
schema: Cow::Borrowed(schema),
}
}

pub fn new_from_owned(schema: Vec<DataType>) -> Self {
Self {
schema: Cow::Owned(schema),
}
}

pub fn get_data_types(&self) -> &[DataType] {
self.schema.as_ref()
}

pub fn deserialize(&self, reader: &CompactedRowReader<'a>) -> GenericRow<'a> {
Expand Down
Loading
Loading