diff --git a/crates/fluss/src/record/kv/kv_record.rs b/crates/fluss/src/record/kv/kv_record.rs new file mode 100644 index 00000000..8c30713d --- /dev/null +++ b/crates/fluss/src/record/kv/kv_record.rs @@ -0,0 +1,343 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Key-Value record implementation. +//! +//! This module provides the KvRecord struct which represents an immutable key-value record. +//! The record format is: +//! - Length => Int32 +//! - KeyLength => Unsigned VarInt +//! - Key => bytes +//! - Row => BinaryRow (optional, if null then this is a deletion record) + +use bytes::{BufMut, Bytes, BytesMut}; +use std::io; + +use crate::util::varint::{ + read_unsigned_varint_bytes, size_of_unsigned_varint, write_unsigned_varint_buf, +}; + +/// Length field size in bytes +pub const LENGTH_LENGTH: usize = 4; + +/// A key-value record. +/// +/// The schema is: +/// - Length => Int32 +/// - KeyLength => Unsigned VarInt +/// - Key => bytes +/// - Value => bytes (BinaryRow, written directly without length prefix) +/// +/// When the value is None (deletion), no Value bytes are present. +// Reference implementation: +// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java +#[derive(Debug, Clone)] +pub struct KvRecord { + key: Bytes, + value: Option, + size_in_bytes: usize, +} + +impl KvRecord { + /// Create a new KvRecord with the given key and optional value. + pub fn new(key: Bytes, value: Option) -> Self { + let size_in_bytes = Self::size_of(&key, value.as_deref()); + Self { + key, + value, + size_in_bytes, + } + } + + /// Get the key bytes. + pub fn key(&self) -> &Bytes { + &self.key + } + + /// Get the value bytes (None indicates a deletion). + pub fn value(&self) -> Option<&Bytes> { + self.value.as_ref() + } + + /// Calculate the total size of the record when serialized (including length prefix). + pub fn size_of(key: &[u8], value: Option<&[u8]>) -> usize { + Self::size_without_length(key, value) + LENGTH_LENGTH + } + + /// Calculate the size without the length prefix. + fn size_without_length(key: &[u8], value: Option<&[u8]>) -> usize { + let key_len = key.len(); + let key_len_size = size_of_unsigned_varint(key_len as u32); + + match value { + Some(v) => key_len_size.saturating_add(key_len).saturating_add(v.len()), + None => { + // Deletion: no value bytes + key_len_size.saturating_add(key_len) + } + } + } + + /// Write a KV record to a buffer. + /// + /// Returns the number of bytes written. + pub fn write_to_buf(buf: &mut BytesMut, key: &[u8], value: Option<&[u8]>) -> io::Result { + let size_in_bytes = Self::size_without_length(key, value); + + let size_i32 = i32::try_from(size_in_bytes).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("Record size {} exceeds i32::MAX", size_in_bytes), + ) + })?; + buf.put_i32_le(size_i32); + let key_len = key.len() as u32; + write_unsigned_varint_buf(key_len, buf); + + buf.put_slice(key); + + if let Some(v) = value { + buf.put_slice(v); + } + // For None (deletion), don't write any value bytes + + Ok(size_in_bytes + LENGTH_LENGTH) + } + + /// Read a KV record from bytes at the given position. + /// + /// Returns the KvRecord and the number of bytes consumed. + /// + /// TODO: Connect KvReadContext and return CompactedRow records. + pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, usize)> { + if bytes.len() < position.saturating_add(LENGTH_LENGTH) { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to read record length", + )); + } + + let size_in_bytes_i32 = i32::from_le_bytes([ + bytes[position], + bytes[position + 1], + bytes[position + 2], + bytes[position + 3], + ]); + + if size_in_bytes_i32 < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid record length: {}", size_in_bytes_i32), + )); + } + + let size_in_bytes = size_in_bytes_i32 as usize; + + let total_size = size_in_bytes.checked_add(LENGTH_LENGTH).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Record size overflow: {} + {}", + size_in_bytes, LENGTH_LENGTH + ), + ) + })?; + + let available = bytes.len().saturating_sub(position); + if available < total_size { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!( + "Not enough bytes to read record: expected {}, available {}", + total_size, available + ), + )); + } + + let mut current_offset = position + LENGTH_LENGTH; + let record_end = position + total_size; + + // Read key length as unsigned varint (bounded by record end) + let (key_len, varint_size) = + read_unsigned_varint_bytes(&bytes[current_offset..record_end])?; + current_offset += varint_size; + + // Read key bytes + let key_end = current_offset + key_len as usize; + if key_end > position + total_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Key length exceeds record size", + )); + } + let key = bytes.slice(current_offset..key_end); + current_offset = key_end; + + // Read value bytes directly + let value = if current_offset < record_end { + // Value is present: all remaining bytes are the value + let value_bytes = bytes.slice(current_offset..record_end); + Some(value_bytes) + } else { + // No remaining bytes: this is a deletion record + None + }; + + Ok(( + Self { + key, + value, + size_in_bytes: total_size, + }, + total_size, + )) + } + + /// Get the total size in bytes of this record. + pub fn get_size_in_bytes(&self) -> usize { + self.size_in_bytes + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kv_record_size_calculation() { + let key = b"test_key"; + let value = b"test_value"; + + // With value (no value length varint) + let size_with_value = KvRecord::size_of(key, Some(value)); + assert_eq!( + size_with_value, + LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len() + value.len() + ); + + // Without value + let size_without_value = KvRecord::size_of(key, None); + assert_eq!( + size_without_value, + LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len() + ); + } + + #[test] + fn test_kv_record_write_read_round_trip() { + let key = b"my_key"; + let value = b"my_value_data"; + + let mut buf = BytesMut::new(); + let written = KvRecord::write_to_buf(&mut buf, key, Some(value)).unwrap(); + + let bytes = buf.freeze(); + let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap(); + + assert_eq!(written, read_size); + assert_eq!(record.key().as_ref(), key); + assert_eq!(record.value().unwrap().as_ref(), value); + assert_eq!(record.get_size_in_bytes(), written); + } + + #[test] + fn test_kv_record_deletion() { + let key = b"delete_me"; + + // Write deletion record (no value) + let mut buf = BytesMut::new(); + let written = KvRecord::write_to_buf(&mut buf, key, None).unwrap(); + + let bytes = buf.freeze(); + let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap(); + + assert_eq!(written, read_size); + assert_eq!(record.key().as_ref(), key); + assert!(record.value().is_none()); + } + + #[test] + fn test_kv_record_with_large_key() { + let key = vec![0u8; 1024]; + let value = vec![1u8; 4096]; + + let mut buf = BytesMut::new(); + let written = KvRecord::write_to_buf(&mut buf, &key, Some(&value)).unwrap(); + + let bytes = buf.freeze(); + let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap(); + + assert_eq!(written, read_size); + assert_eq!(record.key().len(), key.len()); + assert_eq!(record.value().unwrap().len(), value.len()); + } + + #[test] + fn test_invalid_record_lengths() { + let mut buf = BytesMut::new(); + buf.put_i32_le(-1); // Negative length + buf.put_u8(1); // Some dummy data + buf.put_slice(b"key"); + let bytes = buf.freeze(); + let result = KvRecord::read_from(&bytes, 0); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData); + + // Test overflow length + let mut buf = BytesMut::new(); + buf.put_i32_le(i32::MAX); // Very large length + buf.put_u8(1); // Some dummy data + let bytes = buf.freeze(); + let result = KvRecord::read_from(&bytes, 0); + assert!(result.is_err()); + + // Test impossibly large but non-negative length + let mut buf = BytesMut::new(); + buf.put_i32_le(1_000_000); + let bytes = buf.freeze(); + let result = KvRecord::read_from(&bytes, 0); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); + } + + #[test] + fn test_multiple_records_in_buffer() { + let records = vec![ + (b"key1".as_slice(), Some(b"value1".as_slice())), + (b"key2".as_slice(), None), + (b"key3".as_slice(), Some(b"value3".as_slice())), + ]; + + let mut buf = BytesMut::new(); + for (key, value) in &records { + KvRecord::write_to_buf(&mut buf, key, *value).unwrap(); + } + + let bytes = buf.freeze(); + let mut offset = 0; + for (expected_key, expected_value) in &records { + let (record, size) = KvRecord::read_from(&bytes, offset).unwrap(); + assert_eq!(record.key().as_ref(), *expected_key); + match expected_value { + Some(v) => assert_eq!(record.value().unwrap().as_ref(), *v), + None => assert!(record.value().is_none()), + } + offset += size; + } + assert_eq!(offset, bytes.len()); + } +} diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs new file mode 100644 index 00000000..fdd4ad73 --- /dev/null +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -0,0 +1,394 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch implementation. +//! +//! The schema of a KvRecordBatch is: +//! - Length => Int32 +//! - Magic => Int8 +//! - CRC => Uint32 +//! - SchemaId => Int16 +//! - Attributes => Int8 +//! - WriterId => Int64 +//! - BatchSequence => Int32 +//! - RecordCount => Int32 +//! - Records => [Record] +//! +//! The CRC covers data from the SchemaId to the end of the batch. + +use bytes::Bytes; +use std::io; + +use crate::record::kv::KvRecord; + +// Field lengths in bytes +pub const LENGTH_LENGTH: usize = 4; +pub const MAGIC_LENGTH: usize = 1; +pub const CRC_LENGTH: usize = 4; +pub const SCHEMA_ID_LENGTH: usize = 2; +pub const ATTRIBUTE_LENGTH: usize = 1; +pub const WRITE_CLIENT_ID_LENGTH: usize = 8; +pub const BATCH_SEQUENCE_LENGTH: usize = 4; +pub const RECORDS_COUNT_LENGTH: usize = 4; + +// Field offsets +pub const LENGTH_OFFSET: usize = 0; +pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH; +pub const CRC_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH; +pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH; +pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; +pub const WRITE_CLIENT_ID_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; +pub const BATCH_SEQUENCE_OFFSET: usize = WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH; +pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; +pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + +/// Total header size +pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; + +/// Overhead of the batch (length field) +pub const KV_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; + +/// A KV record batch. +/// +/// This struct provides read access to a serialized KV record batch. +// Reference implementation: +// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatch.java +pub struct KvRecordBatch { + data: Bytes, + position: usize, +} + +impl KvRecordBatch { + /// Create a new KvRecordBatch pointing to the given data at the specified position. + pub fn new(data: Bytes, position: usize) -> Self { + Self { data, position } + } + + /// Get the size in bytes of this batch. + pub fn size_in_bytes(&self) -> io::Result { + if self.data.len() < self.position.saturating_add(LENGTH_LENGTH) { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to read batch length", + )); + } + let length_i32 = i32::from_le_bytes([ + self.data[self.position], + self.data[self.position + 1], + self.data[self.position + 2], + self.data[self.position + 3], + ]); + + if length_i32 < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid batch length: {}", length_i32), + )); + } + + let length = length_i32 as usize; + + Ok(length.saturating_add(KV_OVERHEAD)) + } + + /// Check if this batch is valid by verifying the checksum. + pub fn is_valid(&self) -> bool { + if !matches!(self.size_in_bytes(), Ok(s) if s >= RECORD_BATCH_HEADER_SIZE) { + return false; + } + + match (self.checksum(), self.compute_checksum()) { + (Ok(stored), Ok(computed)) => stored == computed, + _ => false, + } + } + + /// Get the magic byte. + pub fn magic(&self) -> io::Result { + if self.data.len() < self.position.saturating_add(MAGIC_OFFSET).saturating_add(1) { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to read magic byte", + )); + } + Ok(self.data[self.position + MAGIC_OFFSET]) + } + + /// Get the checksum. + pub fn checksum(&self) -> io::Result { + if self.data.len() < self.position.saturating_add(CRC_OFFSET).saturating_add(4) { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to read checksum", + )); + } + Ok(u32::from_le_bytes([ + self.data[self.position + CRC_OFFSET], + self.data[self.position + CRC_OFFSET + 1], + self.data[self.position + CRC_OFFSET + 2], + self.data[self.position + CRC_OFFSET + 3], + ])) + } + + /// Compute the checksum of this batch. + pub fn compute_checksum(&self) -> io::Result { + let size = self.size_in_bytes()?; + if size < RECORD_BATCH_HEADER_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Batch size {} is less than header size {}", + size, RECORD_BATCH_HEADER_SIZE + ), + )); + } + + let start = self.position.saturating_add(SCHEMA_ID_OFFSET); + let end = self.position.saturating_add(size); + + if end > self.data.len() || start >= end { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to compute checksum", + )); + } + + Ok(crc32c::crc32c(&self.data[start..end])) + } + + /// Get the schema ID. + pub fn schema_id(&self) -> io::Result { + if self.data.len() + < self + .position + .saturating_add(SCHEMA_ID_OFFSET) + .saturating_add(2) + { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to read schema ID", + )); + } + Ok(i16::from_le_bytes([ + self.data[self.position + SCHEMA_ID_OFFSET], + self.data[self.position + SCHEMA_ID_OFFSET + 1], + ])) + } + + /// Get the writer ID. + pub fn writer_id(&self) -> io::Result { + if self.data.len() + < self + .position + .saturating_add(WRITE_CLIENT_ID_OFFSET) + .saturating_add(8) + { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to read writer ID", + )); + } + Ok(i64::from_le_bytes([ + self.data[self.position + WRITE_CLIENT_ID_OFFSET], + self.data[self.position + WRITE_CLIENT_ID_OFFSET + 1], + self.data[self.position + WRITE_CLIENT_ID_OFFSET + 2], + self.data[self.position + WRITE_CLIENT_ID_OFFSET + 3], + self.data[self.position + WRITE_CLIENT_ID_OFFSET + 4], + self.data[self.position + WRITE_CLIENT_ID_OFFSET + 5], + self.data[self.position + WRITE_CLIENT_ID_OFFSET + 6], + self.data[self.position + WRITE_CLIENT_ID_OFFSET + 7], + ])) + } + + /// Get the batch sequence. + pub fn batch_sequence(&self) -> io::Result { + if self.data.len() + < self + .position + .saturating_add(BATCH_SEQUENCE_OFFSET) + .saturating_add(4) + { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to read batch sequence", + )); + } + Ok(i32::from_le_bytes([ + self.data[self.position + BATCH_SEQUENCE_OFFSET], + self.data[self.position + BATCH_SEQUENCE_OFFSET + 1], + self.data[self.position + BATCH_SEQUENCE_OFFSET + 2], + self.data[self.position + BATCH_SEQUENCE_OFFSET + 3], + ])) + } + + /// Get the number of records in this batch. + pub fn record_count(&self) -> io::Result { + if self.data.len() + < self + .position + .saturating_add(RECORDS_COUNT_OFFSET) + .saturating_add(4) + { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Not enough bytes to read record count", + )); + } + Ok(i32::from_le_bytes([ + self.data[self.position + RECORDS_COUNT_OFFSET], + self.data[self.position + RECORDS_COUNT_OFFSET + 1], + self.data[self.position + RECORDS_COUNT_OFFSET + 2], + self.data[self.position + RECORDS_COUNT_OFFSET + 3], + ])) + } + + /// Create an iterator over the records in this batch. + /// This validates the batch checksum before returning the iterator. + /// For trusted data paths, use `records_unchecked()` to skip validation. + pub fn records(&self) -> io::Result { + if !self.is_valid() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid batch checksum", + )); + } + self.records_unchecked() + } + + /// Create an iterator over the records in this batch without validating the checksum + pub fn records_unchecked(&self) -> io::Result { + let size = self.size_in_bytes()?; + let count = self.record_count()?; + if count < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid record count: {}", count), + )); + } + Ok(KvRecordIterator { + data: self.data.clone(), + position: self.position + RECORDS_OFFSET, + end: self.position + size, + remaining_count: count, + }) + } +} + +/// Iterator over records in a KV record batch. +pub struct KvRecordIterator { + data: Bytes, + position: usize, + end: usize, + remaining_count: i32, +} + +impl Iterator for KvRecordIterator { + type Item = io::Result; + + fn next(&mut self) -> Option { + if self.remaining_count <= 0 || self.position >= self.end { + return None; + } + + match KvRecord::read_from(&self.data, self.position) { + Ok((record, size)) => { + self.position += size; + self.remaining_count -= 1; + Some(Ok(record)) + } + Err(e) => { + self.remaining_count = 0; // Stop iteration on error + Some(Err(e)) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::KvFormat; + use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder}; + use bytes::{BufMut, BytesMut}; + + #[test] + fn test_invalid_batch_lengths() { + // Test negative length + let mut buf = BytesMut::new(); + buf.put_i32_le(-1); + let bytes = buf.freeze(); + let batch = KvRecordBatch::new(bytes, 0); + assert!(batch.size_in_bytes().is_err()); // Should error for invalid + assert!(!batch.is_valid()); + + // Test overflow length + let mut buf = BytesMut::new(); + buf.put_i32_le(i32::MAX); + let bytes = buf.freeze(); + let batch = KvRecordBatch::new(bytes, 0); + assert!(!batch.is_valid()); + + // Test too-short buffer + let mut buf = BytesMut::new(); + buf.put_i32_le(100); // Claims 100 bytes but buffer is tiny + let bytes = buf.freeze(); + let batch = KvRecordBatch::new(bytes, 0); + assert!(!batch.is_valid()); + } + + #[test] + fn test_kv_record_batch_build_and_read() { + use crate::row::compacted::CompactedRowWriter; + + let schema_id = 42; + let write_limit = 4096; + + let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit, KvFormat::COMPACTED); + builder.set_writer_state(100, 5); + + let key1 = b"key1"; + let mut value1_writer = CompactedRowWriter::new(1); + value1_writer.write_bytes(&[1, 2, 3, 4, 5]); + builder.append_row(key1, Some(&value1_writer)).unwrap(); + + let key2 = b"key2"; + builder + .append_row::(key2, None) + .unwrap(); + + let bytes = builder.build().unwrap(); + + let batch = KvRecordBatch::new(bytes.clone(), 0); + assert!(batch.is_valid()); + assert_eq!(batch.magic().unwrap(), CURRENT_KV_MAGIC_VALUE); + assert_eq!(batch.schema_id().unwrap(), schema_id as i16); + assert_eq!(batch.writer_id().unwrap(), 100); + assert_eq!(batch.batch_sequence().unwrap(), 5); + assert_eq!(batch.record_count().unwrap(), 2); + + let records: Vec<_> = batch.records().unwrap().collect(); + assert_eq!(records.len(), 2); + + let record1 = records[0].as_ref().unwrap(); + assert_eq!(record1.key().as_ref(), key1); + assert_eq!(record1.value().unwrap().as_ref(), value1_writer.buffer()); + + let record2 = records[1].as_ref().unwrap(); + assert_eq!(record2.key().as_ref(), key2); + assert!(record2.value().is_none()); + } +} diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs new file mode 100644 index 00000000..773c7789 --- /dev/null +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -0,0 +1,581 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! KV record batch builder implementation. +//! +//! This module provides the KvRecordBatchBuilder for building batches of KV records. + +use bytes::{Bytes, BytesMut}; +use std::io; + +use crate::metadata::KvFormat; +use crate::record::kv::kv_record::KvRecord; +use crate::record::kv::kv_record_batch::{ + ATTRIBUTES_OFFSET, BATCH_SEQUENCE_OFFSET, CRC_OFFSET, LENGTH_LENGTH, LENGTH_OFFSET, + MAGIC_OFFSET, RECORD_BATCH_HEADER_SIZE, RECORDS_COUNT_OFFSET, SCHEMA_ID_OFFSET, + WRITE_CLIENT_ID_OFFSET, +}; +use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, NO_BATCH_SEQUENCE, NO_WRITER_ID}; +use crate::row::BinaryRow; + +/// Builder for KvRecordBatch. +/// +/// This builder accumulates KV records and produces a serialized batch with proper +/// header information and checksums. +// Reference implementation: +// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java +pub struct KvRecordBatchBuilder { + schema_id: i32, + magic: u8, + write_limit: usize, + buffer: BytesMut, + writer_id: i64, + batch_sequence: i32, + current_record_number: i32, + size_in_bytes: usize, + is_closed: bool, + kv_format: KvFormat, + aborted: bool, + built_buffer: Option, +} + +impl KvRecordBatchBuilder { + /// Create a new KvRecordBatchBuilder. + /// + /// # Arguments + /// * `schema_id` - The schema ID for records in this batch (must fit in i16) + /// * `write_limit` - Maximum bytes that can be appended + /// * `kv_format` - The KV format (Compacted, Indexed, or Aligned) + pub fn new(schema_id: i32, write_limit: usize, kv_format: KvFormat) -> Self { + assert!( + schema_id <= i16::MAX as i32, + "schema_id shouldn't be greater than the max value of i16: {}", + i16::MAX + ); + + let mut buffer = BytesMut::with_capacity(write_limit.max(RECORD_BATCH_HEADER_SIZE)); + + // Reserve space for header (we'll write it at the end) + buffer.resize(RECORD_BATCH_HEADER_SIZE, 0); + + Self { + schema_id, + magic: CURRENT_KV_MAGIC_VALUE, + write_limit, + buffer, + writer_id: NO_WRITER_ID, + batch_sequence: NO_BATCH_SEQUENCE, + current_record_number: 0, + size_in_bytes: RECORD_BATCH_HEADER_SIZE, + is_closed: false, + kv_format, + aborted: false, + built_buffer: None, + } + } + + /// Check if there is room for a new record containing the given key and row. + /// If no records have been appended, this always returns true. + pub fn has_room_for_row(&self, key: &[u8], row: Option<&R>) -> bool { + let value = row.map(|r| r.as_bytes()); + self.size_in_bytes + KvRecord::size_of(key, value) <= self.write_limit + } + + /// Append a KV record with a row value to the batch. + /// + /// Returns an error if: + /// - The builder has been aborted + /// - The builder is closed + /// - Adding this record would exceed the write limit + /// - The maximum number of records is exceeded + /// - The KV format is not COMPACTED + pub fn append_row(&mut self, key: &[u8], row: Option<&R>) -> io::Result<()> { + if self.kv_format != KvFormat::COMPACTED { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "append_row can only be used with KvFormat::COMPACTED", + )); + } + + if self.aborted { + return Err(io::Error::other( + "Tried to append a record, but KvRecordBatchBuilder has already been aborted", + )); + } + + if self.is_closed { + return Err(io::Error::other( + "Tried to append a record, but KvRecordBatchBuilder is closed for record appends", + )); + } + + // Check record count limit before mutation + if self.current_record_number == i32::MAX { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "Maximum number of records per batch exceeded, max records: {}", + i32::MAX + ), + )); + } + + let value = row.map(|r| r.as_bytes()); + let record_size = KvRecord::size_of(key, value); + if self.size_in_bytes + record_size > self.write_limit { + return Err(io::Error::new( + io::ErrorKind::WriteZero, + format!( + "Adding record would exceed write limit: {} + {} > {}", + self.size_in_bytes, record_size, self.write_limit + ), + )); + } + + let record_byte_size = KvRecord::write_to_buf(&mut self.buffer, key, value)?; + debug_assert_eq!(record_byte_size, record_size, "Record size mismatch"); + + self.current_record_number += 1; + self.size_in_bytes += record_byte_size; + + // Invalidate cached buffer since we modified the batch + self.built_buffer = None; + + Ok(()) + } + + /// Set the writer state (writer ID and batch base sequence). + /// + /// This invalidates any cached buffer, ensuring the batch header will be rebuilt + /// on the next call to [`build`](Self::build). + pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: i32) { + self.writer_id = writer_id; + self.batch_sequence = batch_base_sequence; + // Invalidate cached buffer since header fields changed + self.built_buffer = None; + } + + /// Build the batch and return the serialized bytes. + /// + /// This can be called multiple times as the batch is cached after the first build. + /// + /// # Caching and Mutations + /// + /// The builder caches the result after the first successful build. However, the cache + /// is invalidated (and the batch rebuilt) if any of the following occur after building: + /// - Calling [`append_row`](Self::append_row) to add records + /// - Calling [`set_writer_state`](Self::set_writer_state) to modify writer metadata + /// + /// This allows the builder to be reused with different writer states or to continue + /// appending records after an initial build, but callers should be aware that the + /// built bytes may change if mutations occur between builds. + /// + /// Note: [`close`](Self::close) prevents further appends but does not prevent writer state modifications. + pub fn build(&mut self) -> io::Result { + if self.aborted { + return Err(io::Error::other( + "Attempting to build an aborted record batch", + )); + } + + if let Some(ref cached) = self.built_buffer { + return Ok(cached.clone()); + } + + self.write_batch_header()?; + let bytes = self.buffer.clone().freeze(); + self.built_buffer = Some(bytes); + Ok(self.built_buffer.as_ref().unwrap().clone()) + } + + /// Get the writer ID. + pub fn writer_id(&self) -> i64 { + self.writer_id + } + + /// Get the batch sequence. + pub fn batch_sequence(&self) -> i32 { + self.batch_sequence + } + + /// Check if the builder is closed. + pub fn is_closed(&self) -> bool { + self.is_closed + } + + /// Abort the builder. + /// After aborting, no more records can be appended and the batch cannot be built. + pub fn abort(&mut self) { + self.aborted = true; + } + + /// Close the builder. + /// After closing, no more records can be appended, but the batch can still be built. + pub fn close(&mut self) -> io::Result<()> { + if self.aborted { + return Err(io::Error::other( + "Cannot close KvRecordBatchBuilder as it has already been aborted", + )); + } + self.is_closed = true; + Ok(()) + } + + /// Get the current size in bytes of the batch. + pub fn get_size_in_bytes(&self) -> usize { + self.size_in_bytes + } + + // ----------------------- Internal methods ------------------------------- + + /// Write the batch header. + fn write_batch_header(&mut self) -> io::Result<()> { + let size_without_length = self.size_in_bytes - LENGTH_LENGTH; + let total_size = i32::try_from(size_without_length).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("Batch size {} exceeds i32::MAX", size_without_length), + ) + })?; + + // Compute attributes before borrowing buffer mutably + let attributes = self.compute_attributes(); + + // Write to the beginning of the buffer + let header = &mut self.buffer[0..RECORD_BATCH_HEADER_SIZE]; + + // Write length + header[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH] + .copy_from_slice(&total_size.to_le_bytes()); + + // Write magic + header[MAGIC_OFFSET] = self.magic; + + // Write empty CRC first (will update later) + header[CRC_OFFSET..CRC_OFFSET + 4].copy_from_slice(&0u32.to_le_bytes()); + + // Write schema ID + header[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + 2] + .copy_from_slice(&(self.schema_id as i16).to_le_bytes()); + + // Write attributes + header[ATTRIBUTES_OFFSET] = attributes; + + // Write writer ID + header[WRITE_CLIENT_ID_OFFSET..WRITE_CLIENT_ID_OFFSET + 8] + .copy_from_slice(&self.writer_id.to_le_bytes()); + + // Write batch sequence + header[BATCH_SEQUENCE_OFFSET..BATCH_SEQUENCE_OFFSET + 4] + .copy_from_slice(&self.batch_sequence.to_le_bytes()); + + // Write record count + header[RECORDS_COUNT_OFFSET..RECORDS_COUNT_OFFSET + 4] + .copy_from_slice(&self.current_record_number.to_le_bytes()); + + // Compute and update CRC + let crc = crc32c::crc32c(&self.buffer[SCHEMA_ID_OFFSET..self.size_in_bytes]); + self.buffer[CRC_OFFSET..CRC_OFFSET + 4].copy_from_slice(&crc.to_le_bytes()); + + Ok(()) + } + + /// Compute the attributes byte. + fn compute_attributes(&self) -> u8 { + // Currently no attributes are used + 0 + } +} + +impl Drop for KvRecordBatchBuilder { + fn drop(&mut self) { + // Warn if the builder has records but was never built or was aborted + if self.current_record_number > 0 && !self.aborted && self.built_buffer.is_none() { + eprintln!( + "Warning: KvRecordBatchBuilder dropped with {} record(s) that were never built. \ + Call build() to serialize the batch before dropping.", + self.current_record_number + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::row::compacted::CompactedRowWriter; + + // Helper function to create a CompactedRowWriter with a single bytes field for testing + fn create_test_row(data: &[u8]) -> CompactedRowWriter { + let mut writer = CompactedRowWriter::new(1); + writer.write_bytes(data); + writer + } + + #[test] + fn test_builder_basic_workflow() { + let schema_id = 42; + let write_limit = 4096; + let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit, KvFormat::COMPACTED); + + // Test initial state + assert!(!builder.is_closed()); + assert_eq!(builder.writer_id(), NO_WRITER_ID); + assert_eq!(builder.batch_sequence(), NO_BATCH_SEQUENCE); + + // Test writer state + builder.set_writer_state(100, 5); + assert_eq!(builder.writer_id(), 100); + assert_eq!(builder.batch_sequence(), 5); + + // Test appending records + let key1 = b"key1"; + let value1 = create_test_row(b"value1"); + assert!(builder.has_room_for_row(key1, Some(&value1))); + builder.append_row(key1, Some(&value1)).unwrap(); + + let key2 = b"key2"; + assert!(builder.has_room_for_row::(key2, None)); + builder + .append_row::(key2, None) + .unwrap(); + + // Test close and build + builder.close().unwrap(); + assert!(builder.is_closed()); + + let bytes = builder.build().unwrap(); + assert!(bytes.len() > RECORD_BATCH_HEADER_SIZE); + + // Building again should return cached result + let bytes2 = builder.build().unwrap(); + assert_eq!(bytes.len(), bytes2.len()); + } + + #[test] + fn test_builder_lifecycle() { + // Test abort behavior + let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); + let value = create_test_row(b"value"); + builder.append_row(b"key", Some(&value)).unwrap(); + builder.abort(); + assert!( + builder + .append_row::(b"key2", None) + .is_err() + ); + assert!(builder.build().is_err()); + assert!(builder.close().is_err()); + + // Test close behavior + let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); + let value = create_test_row(b"value"); + builder.append_row(b"key", Some(&value)).unwrap(); + builder.close().unwrap(); + assert!( + builder + .append_row::(b"key2", None) + .is_err() + ); // Can't append after close + assert!(builder.build().is_ok()); // But can still build + } + + #[test] + fn test_write_limit_enforcement() { + let write_limit = 100; // Very small limit + let mut builder = KvRecordBatchBuilder::new(1, write_limit, KvFormat::COMPACTED); + + // Test has_room_for_row helper + let large_key = vec![0u8; 1000]; + let large_value = vec![1u8; 1000]; + let large_row = create_test_row(&large_value); + assert!(!builder.has_room_for_row(&large_key, Some(&large_row))); + let small_value = create_test_row(b"value"); + assert!(builder.has_room_for_row(b"key", Some(&small_value))); + + // Test append enforcement - add small record first + builder.append_row(b"key", Some(&small_value)).unwrap(); + + // Try to add large record that exceeds limit (reuse large_row from above) + let result = builder.append_row(b"key2", Some(&large_row)); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), io::ErrorKind::WriteZero); + } + + #[test] + fn test_append_checks_record_count_limit() { + let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED); + builder.current_record_number = i32::MAX - 1; + + let value1 = create_test_row(b"value1"); + builder.append_row(b"key1", Some(&value1)).unwrap(); + + let value2 = create_test_row(b"value2"); + let result = builder.append_row(b"key2", Some(&value2)); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); + } + + #[test] + #[should_panic(expected = "schema_id shouldn't be greater than")] + fn test_builder_invalid_schema_id() { + KvRecordBatchBuilder::new(i16::MAX as i32 + 1, 4096, KvFormat::COMPACTED); + } + + #[test] + fn test_cache_invalidation_on_append() { + let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); + builder.set_writer_state(100, 5); + + let value1 = create_test_row(b"value1"); + builder.append_row(b"key1", Some(&value1)).unwrap(); + let bytes1 = builder.build().unwrap(); + let len1 = bytes1.len(); + + // Append another record - this should invalidate the cache + let value2 = create_test_row(b"value2"); + builder.append_row(b"key2", Some(&value2)).unwrap(); + let bytes2 = builder.build().unwrap(); + let len2 = bytes2.len(); + + // Verify the second build includes both records + assert!(len2 > len1, "Second build should be larger"); + + use crate::record::kv::KvRecordBatch; + let batch = KvRecordBatch::new(bytes2, 0); + assert!(batch.is_valid()); + assert_eq!(batch.record_count().unwrap(), 2, "Should have 2 records"); + } + + #[test] + fn test_cache_invalidation_on_set_writer_state() { + let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); + + builder.set_writer_state(100, 5); + let value = create_test_row(b"value"); + builder.append_row(b"key", Some(&value)).unwrap(); + let bytes1 = builder.build().unwrap(); + + // Change writer state - this should invalidate the cache + builder.set_writer_state(200, 10); + let bytes2 = builder.build().unwrap(); + + assert_ne!( + bytes1, bytes2, + "Bytes should differ after writer state change" + ); + + use crate::record::kv::KvRecordBatch; + let batch1 = KvRecordBatch::new(bytes1, 0); + let batch2 = KvRecordBatch::new(bytes2, 0); + + assert_eq!(batch1.writer_id().unwrap(), 100); + assert_eq!(batch1.batch_sequence().unwrap(), 5); + + assert_eq!(batch2.writer_id().unwrap(), 200); + assert_eq!(batch2.batch_sequence().unwrap(), 10); + } + + #[test] + fn test_builder_with_compacted_row_writer() { + use crate::metadata::{DataType, IntType, StringType}; + use crate::record::kv::KvRecordBatch; + use crate::row::InternalRow; + use crate::row::compacted::CompactedRow; + + let mut builder = KvRecordBatchBuilder::new(1, 100000, KvFormat::COMPACTED); + builder.set_writer_state(100, 5); + + let types = vec![ + DataType::Int(IntType::new()), + DataType::String(StringType::new()), + ]; + + // Create and append first record with CompactedRowWriter + let mut row_writer1 = CompactedRowWriter::new(2); + row_writer1.write_int(42); + row_writer1.write_string("hello"); + + let key1 = b"key1"; + assert!(builder.has_room_for_row(key1, Some(&row_writer1))); + builder.append_row(key1, Some(&row_writer1)).unwrap(); + + // Create and append second record + let mut row_writer2 = CompactedRowWriter::new(2); + row_writer2.write_int(100); + row_writer2.write_string("world"); + + let key2 = b"key2"; + builder.append_row(key2, Some(&row_writer2)).unwrap(); + + // Append a deletion record + let key3 = b"key3"; + builder + .append_row::(key3, None) + .unwrap(); + + // Build and verify + builder.close().unwrap(); + let bytes = builder.build().unwrap(); + + let batch = KvRecordBatch::new(bytes, 0); + assert!(batch.is_valid()); + assert_eq!(batch.record_count().unwrap(), 3); + assert_eq!(batch.writer_id().unwrap(), 100); + assert_eq!(batch.batch_sequence().unwrap(), 5); + + // Read back and verify records + let records: Vec<_> = batch.records().unwrap().collect(); + assert_eq!(records.len(), 3); + + // Verify first record + let record1 = records[0].as_ref().unwrap(); + assert_eq!(record1.key().as_ref(), key1); + let row1 = CompactedRow::from_bytes(&types, record1.value().unwrap()); + assert_eq!(row1.get_int(0), 42); + assert_eq!(row1.get_string(1), "hello"); + + // Verify second record + let record2 = records[1].as_ref().unwrap(); + assert_eq!(record2.key().as_ref(), key2); + let row2 = CompactedRow::from_bytes(&types, record2.value().unwrap()); + assert_eq!(row2.get_int(0), 100); + assert_eq!(row2.get_string(1), "world"); + + // Verify deletion record + let record3 = records[2].as_ref().unwrap(); + assert_eq!(record3.key().as_ref(), key3); + assert!(record3.value().is_none()); + } + + #[test] + fn test_kv_format_validation() { + let mut row_writer = CompactedRowWriter::new(1); + row_writer.write_int(42); + + // INDEXED format should reject append_row + let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED); + let result = indexed_builder.append_row(b"key", Some(&row_writer)); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); + + // COMPACTED format should accept append_row + let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); + let result = compacted_builder.append_row(b"key", Some(&row_writer)); + assert!(result.is_ok()); + } +} diff --git a/crates/fluss/src/record/kv/mod.rs b/crates/fluss/src/record/kv/mod.rs new file mode 100644 index 00000000..ecb762df --- /dev/null +++ b/crates/fluss/src/record/kv/mod.rs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Key-Value record and batch implementations. + +mod kv_record; +mod kv_record_batch; +mod kv_record_batch_builder; + +pub use kv_record::{KvRecord, LENGTH_LENGTH as KV_RECORD_LENGTH_LENGTH}; +pub use kv_record_batch::*; +pub use kv_record_batch_builder::*; + +/// Current KV magic value +pub const CURRENT_KV_MAGIC_VALUE: u8 = 0; + +/// No writer ID constant +pub const NO_WRITER_ID: i64 = -1; + +/// No batch sequence constant +pub const NO_BATCH_SEQUENCE: i32 = -1; diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs index 35928ea0..c5a3f8e4 100644 --- a/crates/fluss/src/record/mod.rs +++ b/crates/fluss/src/record/mod.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; mod arrow; mod error; +pub mod kv; pub use arrow::*; diff --git a/crates/fluss/src/row/compacted/compacted_key_writer.rs b/crates/fluss/src/row/compacted/compacted_key_writer.rs index 84a6b227..1152b0c5 100644 --- a/crates/fluss/src/row/compacted/compacted_key_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_key_writer.rs @@ -30,6 +30,12 @@ pub struct CompactedKeyWriter { delegate: CompactedRowWriter, } +impl Default for CompactedKeyWriter { + fn default() -> Self { + Self::new() + } +} + impl CompactedKeyWriter { pub fn new() -> CompactedKeyWriter { CompactedKeyWriter { diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index c053d4ec..5ec26089 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -19,6 +19,7 @@ use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes; use crate::{ metadata::DataType, row::{Datum, GenericRow, compacted::compacted_row_writer::CompactedRowWriter}, + util::varint::{read_unsigned_varint_at, read_unsigned_varint_u64_at}, }; use std::str::from_utf8; @@ -150,36 +151,18 @@ impl<'a> CompactedRowReader<'a> { (val, next_pos) } - pub fn read_int(&self, mut pos: usize) -> (i32, usize) { - let mut result: u32 = 0; - let mut shift = 0; - - for _ in 0..CompactedRowWriter::MAX_INT_SIZE { - let (b, next_pos) = self.read_byte(pos); - pos = next_pos; - result |= ((b & 0x7F) as u32) << shift; - if (b & 0x80) == 0 { - return (result as i32, pos); - } - shift += 7; + pub fn read_int(&self, pos: usize) -> (i32, usize) { + match read_unsigned_varint_at(self.segment, pos, CompactedRowWriter::MAX_INT_SIZE) { + Ok((value, next_pos)) => (value as i32, next_pos), + Err(_) => panic!("Invalid VarInt32 input stream."), } - panic!("Invalid VarInt32 input stream."); } - pub fn read_long(&self, mut pos: usize) -> (i64, usize) { - let mut result: u64 = 0; - let mut shift = 0; - - for _ in 0..CompactedRowWriter::MAX_LONG_SIZE { - let (b, next_pos) = self.read_byte(pos); - pos = next_pos; - result |= ((b & 0x7F) as u64) << shift; - if (b & 0x80) == 0 { - return (result as i64, pos); - } - shift += 7; + pub fn read_long(&self, pos: usize) -> (i64, usize) { + match read_unsigned_varint_u64_at(self.segment, pos, CompactedRowWriter::MAX_LONG_SIZE) { + Ok((value, next_pos)) => (value as i64, next_pos), + Err(_) => panic!("Invalid VarInt64 input stream."), } - panic!("Invalid VarInt64 input stream."); } pub fn read_float(&self, pos: usize) -> (f32, usize) { diff --git a/crates/fluss/src/row/compacted/compacted_row_writer.rs b/crates/fluss/src/row/compacted/compacted_row_writer.rs index 4f535c6b..63b32a3d 100644 --- a/crates/fluss/src/row/compacted/compacted_row_writer.rs +++ b/crates/fluss/src/row/compacted/compacted_row_writer.rs @@ -18,7 +18,9 @@ use bytes::{Bytes, BytesMut}; use std::cmp; +use crate::row::BinaryRow; use crate::row::compacted::compacted_row::calculate_bit_set_width_in_bytes; +use crate::util::varint::{write_unsigned_varint_to_slice, write_unsigned_varint_u64_to_slice}; // Writer for CompactedRow // Reference implementation: @@ -125,25 +127,16 @@ impl CompactedRowWriter { pub fn write_int(&mut self, value: i32) { self.ensure_capacity(Self::MAX_INT_SIZE); - let mut v = value as u32; - while (v & !0x7F) != 0 { - self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80; - self.position += 1; - v >>= 7; - } - self.buffer[self.position] = v as u8; - self.position += 1; + let bytes_written = + write_unsigned_varint_to_slice(value as u32, &mut self.buffer[self.position..]); + self.position += bytes_written; } + pub fn write_long(&mut self, value: i64) { self.ensure_capacity(Self::MAX_LONG_SIZE); - let mut v = value as u64; - while (v & !0x7F) != 0 { - self.buffer[self.position] = ((v as u8) & 0x7F) | 0x80; - self.position += 1; - v >>= 7; - } - self.buffer[self.position] = v as u8; - self.position += 1; + let bytes_written = + write_unsigned_varint_u64_to_slice(value as u64, &mut self.buffer[self.position..]); + self.position += bytes_written; } pub fn write_float(&mut self, value: f32) { @@ -154,3 +147,9 @@ impl CompactedRowWriter { self.write_raw(&value.to_ne_bytes()); } } + +impl BinaryRow for CompactedRowWriter { + fn as_bytes(&self) -> &[u8] { + self.buffer() + } +} diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index c321ab9d..144d64fd 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -20,13 +20,18 @@ mod column; mod datum; mod binary; -mod compacted; +pub mod compacted; mod encode; mod field_getter; pub use column::*; pub use datum::*; +pub trait BinaryRow { + /// Returns the binary representation of this row as a byte slice. + fn as_bytes(&self) -> &[u8]; +} + // TODO make functions return Result for better error handling pub trait InternalRow { /// Returns the number of fields in this row diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index 5f67290e..d191615e 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod murmur_hash; +pub mod varint; use crate::metadata::TableBucket; use linked_hash_map::LinkedHashMap; diff --git a/crates/fluss/src/util/varint.rs b/crates/fluss/src/util/varint.rs new file mode 100644 index 00000000..96fd1f50 --- /dev/null +++ b/crates/fluss/src/util/varint.rs @@ -0,0 +1,502 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Variable-length integer encoding utilities. +//! +//! This module provides utilities for encoding integers in variable-length format, +//! which can save space when encoding small integers. The encoding uses 7 bits per byte +//! with the most significant bit as a continuation flag. + +use bytes::BufMut; +use std::io::{self, Read, Write}; + +/// Write an unsigned integer in variable-length format. +/// +/// The encoding uses 7 bits per byte with the MSB set to 1 if more bytes follow. +/// This matches the encoding used in Google Protocol Buffers. +#[allow(dead_code)] +pub fn write_unsigned_varint(value: u32, writer: &mut W) -> io::Result { + let mut v = value; + let mut bytes_written = 0; + + while (v & !0x7F) != 0 { + writer.write_all(&[((v as u8) & 0x7F) | 0x80])?; + bytes_written += 1; + v >>= 7; + } + writer.write_all(&[v as u8])?; + bytes_written += 1; + + Ok(bytes_written) +} + +/// Write an unsigned integer in variable-length format to a buffer. +pub fn write_unsigned_varint_buf(value: u32, buf: &mut impl BufMut) { + let mut v = value; + + while (v & !0x7F) != 0 { + buf.put_u8(((v as u8) & 0x7F) | 0x80); + v >>= 7; + } + buf.put_u8(v as u8); +} + +/// Read an unsigned integer stored in variable-length format. +#[allow(dead_code)] +pub fn read_unsigned_varint(reader: &mut R) -> io::Result { + let mut tmp = [0u8; 1]; + reader.read_exact(&mut tmp)?; + let mut byte = tmp[0] as i8; + + if byte >= 0 { + return Ok(byte as u32); + } + + let mut result = (byte & 127) as u32; + + reader.read_exact(&mut tmp)?; + byte = tmp[0] as i8; + if byte >= 0 { + result |= (byte as u32) << 7; + } else { + result |= ((byte & 127) as u32) << 7; + + reader.read_exact(&mut tmp)?; + byte = tmp[0] as i8; + if byte >= 0 { + result |= (byte as u32) << 14; + } else { + result |= ((byte & 127) as u32) << 14; + + reader.read_exact(&mut tmp)?; + byte = tmp[0] as i8; + if byte >= 0 { + result |= (byte as u32) << 21; + } else { + result |= ((byte & 127) as u32) << 21; + + reader.read_exact(&mut tmp)?; + byte = tmp[0] as i8; + result |= (byte as u32) << 28; + + if byte < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid u32 varint encoding: too many bytes (most significant bit in the 5th byte is set)", + )); + } + } + } + } + + Ok(result) +} + +/// Read an unsigned integer from a byte slice in variable-length format. +pub fn read_unsigned_varint_bytes(bytes: &[u8]) -> io::Result<(u32, usize)> { + if bytes.is_empty() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Cannot read varint from empty buffer", + )); + } + + let mut byte = bytes[0] as i8; + let mut index = 1; + + if byte >= 0 { + return Ok((byte as u32, index)); + } + + let mut result = (byte & 127) as u32; + + if index >= bytes.len() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Incomplete varint", + )); + } + byte = bytes[index] as i8; + index += 1; + if byte >= 0 { + result |= (byte as u32) << 7; + } else { + result |= ((byte & 127) as u32) << 7; + + if index >= bytes.len() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Incomplete varint", + )); + } + byte = bytes[index] as i8; + index += 1; + if byte >= 0 { + result |= (byte as u32) << 14; + } else { + result |= ((byte & 127) as u32) << 14; + + if index >= bytes.len() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Incomplete varint", + )); + } + byte = bytes[index] as i8; + index += 1; + if byte >= 0 { + result |= (byte as u32) << 21; + } else { + result |= ((byte & 127) as u32) << 21; + + if index >= bytes.len() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Incomplete varint", + )); + } + byte = bytes[index] as i8; + index += 1; + result |= (byte as u32) << 28; + + if byte < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid u32 varint encoding: too many bytes (most significant bit in the 5th byte is set)", + )); + } + } + } + } + + Ok((result, index)) +} + +/// Calculate the number of bytes needed to encode a u32 in variable-length format. +/// +/// Varint encoding uses 7 bits per byte, so we need `ceil(bits_used / 7)` bytes. +/// This function computes that efficiently using the formula: +/// +/// size = ((38 - leading_zeros) * 74899) >> 19 + (leading_zeros >> 5) +/// +/// Where: +/// - `38 = 32 + 6` (6 accounts for ceiling in division) +/// - `74899 = 2^19 / 7` (enables division by 7 via multiply + shift) +/// - `leading_zeros >> 5` adds 1 when value is 0 (minimum 1 byte) +pub fn size_of_unsigned_varint(value: u32) -> usize { + let leading_zeros = value.leading_zeros(); + let leading_zeros_below_38_divided_by_7 = ((38 - leading_zeros) * 0b10010010010010011) >> 19; + (leading_zeros_below_38_divided_by_7 + (leading_zeros >> 5)) as usize +} + +/// Calculate the number of bytes needed to encode a u64 in variable-length format. +/// +/// Varint encoding uses 7 bits per byte, so we need `ceil(bits_used / 7)` bytes. +/// This function computes that efficiently using the formula: +/// +/// size = ((70 - leading_zeros) * 74899) >> 19 + (leading_zeros >> 6) +/// +/// - `70 = 64 + 6` (6 accounts for ceiling in division) +/// - `74899 = 2^19 / 7` (enables division by 7 via multiply + shift) +/// - `leading_zeros >> 6` adds 1 when value is 0 (minimum 1 byte) +#[allow(dead_code)] +pub fn size_of_unsigned_varint_u64(value: u64) -> usize { + let leading_zeros = value.leading_zeros(); + let leading_zeros_below_70_divided_by_7 = ((70 - leading_zeros) * 0b10010010010010011) >> 19; + (leading_zeros_below_70_divided_by_7 + (leading_zeros >> 6)) as usize +} + +/// Write an unsigned 64-bit integer in variable-length format to a buffer. +#[allow(dead_code)] +pub fn write_unsigned_varint_u64_buf(value: u64, buf: &mut impl BufMut) { + let mut v = value; + while (v & !0x7F) != 0 { + buf.put_u8(((v as u8) & 0x7F) | 0x80); + v >>= 7; + } + buf.put_u8(v as u8); +} + +/// Write directly to a mutable byte slice, returning the number of bytes written. +/// Used by CompactedRowWriter which manages its own position. +/// +/// # Panics +/// Panics if the slice is too small to hold the encoded varint. +/// The slice must have at least 5 bytes available (the maximum size for a u32 varint). +/// Use [`size_of_unsigned_varint`] to calculate the required size beforehand. +pub fn write_unsigned_varint_to_slice(value: u32, slice: &mut [u8]) -> usize { + let mut v = value; + let mut written = 0; + + while (v & !0x7F) != 0 { + slice[written] = ((v as u8) & 0x7F) | 0x80; + written += 1; + v >>= 7; + } + slice[written] = v as u8; + written + 1 +} + +/// Write unsigned 64-bit varint directly to a mutable byte slice. +/// +/// # Panics +/// Panics if the slice is too small to hold the encoded varint. +/// The slice must have at least 10 bytes available (the maximum size for a u64 varint). +pub fn write_unsigned_varint_u64_to_slice(value: u64, slice: &mut [u8]) -> usize { + let mut v = value; + let mut written = 0; + + while (v & !0x7F) != 0 { + slice[written] = ((v as u8) & 0x7F) | 0x80; + written += 1; + v >>= 7; + } + slice[written] = v as u8; + written + 1 +} + +/// Read unsigned varint from a slice starting at given position. +/// Returns (value, next_position). +/// Used by CompactedRowReader which manages positions. +pub fn read_unsigned_varint_at( + slice: &[u8], + mut pos: usize, + max_bytes: usize, +) -> io::Result<(u32, usize)> { + let mut result: u32 = 0; + let mut shift = 0; + + for _ in 0..max_bytes { + if pos >= slice.len() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Unexpected end of varint", + )); + } + let b = slice[pos]; + pos += 1; + result |= ((b & 0x7F) as u32) << shift; + if (b & 0x80) == 0 { + return Ok((result, pos)); + } + shift += 7; + } + + Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid VarInt32 input stream", + )) +} + +/// Read unsigned 64-bit varint from a slice starting at given position. +pub fn read_unsigned_varint_u64_at( + slice: &[u8], + mut pos: usize, + max_bytes: usize, +) -> io::Result<(u64, usize)> { + let mut result: u64 = 0; + let mut shift = 0; + + for _ in 0..max_bytes { + if pos >= slice.len() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Unexpected end of varint", + )); + } + let b = slice[pos]; + pos += 1; + result |= ((b & 0x7F) as u64) << shift; + if (b & 0x80) == 0 { + return Ok((result, pos)); + } + shift += 7; + } + + Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid VarInt64 input stream", + )) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn test_unsigned_varint_round_trip() { + let test_values = vec![ + 0u32, + 1, + 127, + 128, + 255, + 256, + 16383, + 16384, + 2097151, + 2097152, + 268435455, + 268435456, + u32::MAX, + ]; + + for value in test_values { + // Test with Write trait + let mut buffer = Vec::new(); + let written = write_unsigned_varint(value, &mut buffer).unwrap(); + + let mut reader = Cursor::new(&buffer); + let read_value = read_unsigned_varint(&mut reader).unwrap(); + + assert_eq!(value, read_value, "Round trip failed for value {}", value); + assert_eq!( + written, + buffer.len(), + "Bytes written mismatch for value {}", + value + ); + + // Test with BufMut + let mut buf = bytes::BytesMut::new(); + write_unsigned_varint_buf(value, &mut buf); + assert_eq!(buf.len(), written, "BufMut write length mismatch"); + + // Test size calculation + let calculated_size = size_of_unsigned_varint(value); + assert_eq!( + calculated_size, + buffer.len(), + "Size calculation failed for value {}", + value + ); + + // Test reading from bytes + let (read_value_bytes, bytes_read) = read_unsigned_varint_bytes(&buffer).unwrap(); + assert_eq!( + value, read_value_bytes, + "Bytes read failed for value {}", + value + ); + assert_eq!( + bytes_read, + buffer.len(), + "Bytes read count mismatch for value {}", + value + ); + } + } + + #[test] + fn test_size_of_unsigned_varint() { + assert_eq!(size_of_unsigned_varint(0), 1); + assert_eq!(size_of_unsigned_varint(127), 1); + assert_eq!(size_of_unsigned_varint(128), 2); + assert_eq!(size_of_unsigned_varint(16383), 2); + assert_eq!(size_of_unsigned_varint(16384), 3); + assert_eq!(size_of_unsigned_varint(2097151), 3); + assert_eq!(size_of_unsigned_varint(2097152), 4); + assert_eq!(size_of_unsigned_varint(268435455), 4); + assert_eq!(size_of_unsigned_varint(268435456), 5); + assert_eq!(size_of_unsigned_varint(u32::MAX), 5); + } + + #[test] + fn test_size_of_unsigned_varint_u64() { + assert_eq!(size_of_unsigned_varint_u64(0), 1); + assert_eq!(size_of_unsigned_varint_u64(127), 1); + assert_eq!(size_of_unsigned_varint_u64(128), 2); + assert_eq!(size_of_unsigned_varint_u64(16383), 2); + assert_eq!(size_of_unsigned_varint_u64(16384), 3); + assert_eq!(size_of_unsigned_varint_u64(2097151), 3); + assert_eq!(size_of_unsigned_varint_u64(2097152), 4); + assert_eq!(size_of_unsigned_varint_u64(268435455), 4); + assert_eq!(size_of_unsigned_varint_u64(268435456), 5); + assert_eq!(size_of_unsigned_varint_u64(u32::MAX as u64), 5); + assert_eq!(size_of_unsigned_varint_u64(34359738367), 5); + assert_eq!(size_of_unsigned_varint_u64(34359738368), 6); + assert_eq!(size_of_unsigned_varint_u64(4398046511103), 6); + assert_eq!(size_of_unsigned_varint_u64(4398046511104), 7); + assert_eq!(size_of_unsigned_varint_u64(562949953421311), 7); + assert_eq!(size_of_unsigned_varint_u64(562949953421312), 8); + assert_eq!(size_of_unsigned_varint_u64(72057594037927935), 8); + assert_eq!(size_of_unsigned_varint_u64(72057594037927936), 9); + assert_eq!(size_of_unsigned_varint_u64(9223372036854775807), 9); + assert_eq!(size_of_unsigned_varint_u64(9223372036854775808), 10); + assert_eq!(size_of_unsigned_varint_u64(u64::MAX), 10); + } + + #[test] + fn test_read_unsigned_varint_bytes_error_handling() { + // Empty buffer + assert!(read_unsigned_varint_bytes(&[]).is_err()); + + // Incomplete varint (continuation bit set but no next byte) + assert!(read_unsigned_varint_bytes(&[0x80]).is_err()); + assert!(read_unsigned_varint_bytes(&[0xFF, 0x80]).is_err()); + } + + #[test] + fn test_write_read_to_slice() { + // Test u32 varint to slice + let test_values_u32 = vec![0u32, 127, 128, 16384, u32::MAX]; + + for value in test_values_u32 { + let mut buffer = vec![0u8; 10]; + let written = write_unsigned_varint_to_slice(value, &mut buffer); + + let (read_value, next_pos) = read_unsigned_varint_at(&buffer, 0, 5).unwrap(); + assert_eq!(value, read_value); + assert_eq!(written, next_pos); + } + + // Test u64 varint to slice + let test_values_u64 = vec![0u64, 127, 128, 16384, u32::MAX as u64, u64::MAX]; + + for value in test_values_u64 { + let mut buffer = vec![0u8; 10]; + let written = write_unsigned_varint_u64_to_slice(value, &mut buffer); + + let (read_value, next_pos) = read_unsigned_varint_u64_at(&buffer, 0, 10).unwrap(); + assert_eq!(value, read_value); + assert_eq!(written, next_pos); + } + } + + #[test] + fn test_read_at_with_offset() { + // Write multiple varints and read at different positions + let mut buffer = vec![0u8; 20]; + let mut pos = 0; + + pos += write_unsigned_varint_to_slice(127, &mut buffer[pos..]); + pos += write_unsigned_varint_to_slice(16384, &mut buffer[pos..]); + let end_pos = pos + write_unsigned_varint_to_slice(u32::MAX, &mut buffer[pos..]); + + // Read back + let (val1, pos1) = read_unsigned_varint_at(&buffer, 0, 5).unwrap(); + assert_eq!(val1, 127); + + let (val2, pos2) = read_unsigned_varint_at(&buffer, pos1, 5).unwrap(); + assert_eq!(val2, 16384); + + let (val3, pos3) = read_unsigned_varint_at(&buffer, pos2, 5).unwrap(); + assert_eq!(val3, u32::MAX); + assert_eq!(pos3, end_pos); + } +}