diff --git a/crates/fluss/src/record/kv/kv_record.rs b/crates/fluss/src/record/kv/kv_record.rs index ab8c2ac1..a9c45d69 100644 --- a/crates/fluss/src/record/kv/kv_record.rs +++ b/crates/fluss/src/record/kv/kv_record.rs @@ -27,6 +27,8 @@ use bytes::{BufMut, Bytes, BytesMut}; use std::io; +use crate::row::RowDecoder; +use crate::row::compacted::CompactedRow; use crate::util::varint::{ read_unsigned_varint_bytes, size_of_unsigned_varint, write_unsigned_varint_buf, }; @@ -34,7 +36,7 @@ use crate::util::varint::{ /// Length field size in bytes pub const LENGTH_LENGTH: usize = 4; -/// A key-value record. +/// A key-value record containing raw key and value bytes. /// /// The schema is: /// - Length => Int32 @@ -43,34 +45,39 @@ pub const LENGTH_LENGTH: usize = 4; /// - Value => bytes (BinaryRow, written directly without length prefix) /// /// When the value is None (deletion), no Value bytes are present. -// Reference implementation: -// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java +/// +/// This struct stores only raw bytes. To decode the value into a typed row, +/// use the `row()` method with a RowDecoder (typically obtained from the iterator). +/// +/// Reference implementation: +/// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java #[derive(Debug, Clone)] pub struct KvRecord { key: Bytes, - value: Option, + value_bytes: Option, size_in_bytes: usize, } impl KvRecord { - /// Create a new KvRecord with the given key and optional value. - pub fn new(key: Bytes, value: Option) -> Self { - let size_in_bytes = Self::size_of(&key, value.as_deref()); - Self { - key, - value, - size_in_bytes, - } - } - /// Get the key bytes. pub fn key(&self) -> &Bytes { &self.key } - /// Get the value bytes (None indicates a deletion). - pub fn value(&self) -> Option<&Bytes> { - self.value.as_ref() + /// Get the raw value bytes (for testing). + #[cfg(test)] + pub(crate) fn value_bytes(&self) -> Option<&Bytes> { + self.value_bytes.as_ref() + } + + /// Decode the value bytes into a typed row using the provided decoder. + /// This creates a lightweight CompactedRow view over the raw bytes. + /// Actual field parsing is lazy (on first access). + pub fn row<'a>(&'a self, decoder: &dyn RowDecoder) -> Option> { + self.value_bytes.as_ref().map(|bytes| { + // Decode on-demand - CompactedRow<'a> lifetime tied to &'a self + decoder.decode(bytes.as_ref()) + }) } /// Calculate the total size of the record when serialized (including length prefix). @@ -121,8 +128,7 @@ impl KvRecord { /// Read a KV record from bytes at the given position. /// /// Returns the KvRecord and the number of bytes consumed. - /// - /// TODO: Connect KvReadContext and return CompactedRow records. + /// The record contains only raw bytes; use `row()` with a RowDecoder to decode the value. pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, usize)> { if bytes.len() < position.saturating_add(LENGTH_LENGTH) { return Err(io::Error::new( @@ -183,11 +189,10 @@ impl KvRecord { let key = bytes.slice(current_offset..key_end); current_offset = key_end; - // Read value bytes directly - let value = if current_offset < record_end { + // Read value bytes directly (don't decode yet - will decode on-demand) + let value_bytes = if current_offset < record_end { // Value is present: all remaining bytes are the value - let value_bytes = bytes.slice(current_offset..record_end); - Some(value_bytes) + Some(bytes.slice(current_offset..record_end)) } else { // No remaining bytes: this is a deletion record None @@ -196,7 +201,7 @@ impl KvRecord { Ok(( Self { key, - value, + value_bytes, size_in_bytes: total_size, }, total_size, @@ -207,6 +212,11 @@ impl KvRecord { pub fn get_size_in_bytes(&self) -> usize { self.size_in_bytes } + + /// Check if this is a deletion record (no value). + pub fn is_deletion(&self) -> bool { + self.value_bytes.is_none() + } } #[cfg(test)] @@ -214,30 +224,25 @@ mod tests { use super::*; #[test] - fn test_kv_record_size_calculation() { + fn test_kv_record_basic_operations() { let key = b"test_key"; let value = b"test_value"; - // With value (no value length varint) + // Test size calculation with value let size_with_value = KvRecord::size_of(key, Some(value)); assert_eq!( size_with_value, LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len() + value.len() ); - // Without value + // Test size calculation without value (deletion) let size_without_value = KvRecord::size_of(key, None); assert_eq!( size_without_value, LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len() ); - } - - #[test] - fn test_kv_record_write_read_round_trip() { - let key = b"my_key"; - let value = b"my_value_data"; + // Test write/read round trip with value let mut buf = BytesMut::new(); let written = KvRecord::write_to_buf(&mut buf, key, Some(value)).unwrap(); @@ -246,40 +251,70 @@ mod tests { assert_eq!(written, read_size); assert_eq!(record.key().as_ref(), key); - assert_eq!(record.value().unwrap().as_ref(), value); + assert_eq!(record.value_bytes().unwrap().as_ref(), value); assert_eq!(record.get_size_in_bytes(), written); - } - - #[test] - fn test_kv_record_deletion() { - let key = b"delete_me"; + assert!(!record.is_deletion()); - // Write deletion record (no value) + // Test deletion record (no value) + let delete_key = b"delete_me"; let mut buf = BytesMut::new(); - let written = KvRecord::write_to_buf(&mut buf, key, None).unwrap(); + let written = KvRecord::write_to_buf(&mut buf, delete_key, None).unwrap(); let bytes = buf.freeze(); let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap(); assert_eq!(written, read_size); - assert_eq!(record.key().as_ref(), key); - assert!(record.value().is_none()); + assert_eq!(record.key().as_ref(), delete_key); + assert!(record.is_deletion()); + assert!(record.value_bytes().is_none()); } #[test] - fn test_kv_record_with_large_key() { - let key = vec![0u8; 1024]; - let value = vec![1u8; 4096]; + fn test_kv_record_multiple_records() { + // Test multiple regular-sized records in buffer + let records = vec![ + (b"key1".as_slice(), Some(b"value1".as_slice())), + (b"key2".as_slice(), None), // Deletion + (b"key3".as_slice(), Some(b"value3".as_slice())), + ]; let mut buf = BytesMut::new(); - let written = KvRecord::write_to_buf(&mut buf, &key, Some(&value)).unwrap(); + for (key, value) in &records { + KvRecord::write_to_buf(&mut buf, key, *value).unwrap(); + } + + let bytes = buf.freeze(); + let mut offset = 0; + for (expected_key, expected_value) in &records { + let (record, size) = KvRecord::read_from(&bytes, offset).unwrap(); + assert_eq!(record.key().as_ref(), *expected_key); + match expected_value { + Some(v) => { + assert_eq!(record.value_bytes().unwrap().as_ref(), *v); + assert!(!record.is_deletion()); + } + None => { + assert!(record.is_deletion()); + assert!(record.value_bytes().is_none()); + } + } + offset += size; + } + assert_eq!(offset, bytes.len()); + + // Test large keys and values + let large_key = vec![0u8; 1024]; + let large_value = vec![1u8; 4096]; + + let mut buf = BytesMut::new(); + let written = KvRecord::write_to_buf(&mut buf, &large_key, Some(&large_value)).unwrap(); let bytes = buf.freeze(); let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap(); assert_eq!(written, read_size); - assert_eq!(record.key().len(), key.len()); - assert_eq!(record.value().unwrap().len(), value.len()); + assert_eq!(record.key().len(), large_key.len()); + assert_eq!(record.value_bytes().unwrap().len(), large_value.len()); } #[test] @@ -291,7 +326,9 @@ mod tests { let bytes = buf.freeze(); let result = KvRecord::read_from(&bytes, 0); assert!(result.is_err()); - assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData); + if let Err(e) = result { + assert_eq!(e.kind(), io::ErrorKind::InvalidData); + } // Test overflow length let mut buf = BytesMut::new(); @@ -307,33 +344,8 @@ mod tests { let bytes = buf.freeze(); let result = KvRecord::read_from(&bytes, 0); assert!(result.is_err()); - assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); - } - - #[test] - fn test_multiple_records_in_buffer() { - let records = vec![ - (b"key1".as_slice(), Some(b"value1".as_slice())), - (b"key2".as_slice(), None), - (b"key3".as_slice(), Some(b"value3".as_slice())), - ]; - - let mut buf = BytesMut::new(); - for (key, value) in &records { - KvRecord::write_to_buf(&mut buf, key, *value).unwrap(); + if let Err(e) = result { + assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof); } - - let bytes = buf.freeze(); - let mut offset = 0; - for (expected_key, expected_value) in &records { - let (record, size) = KvRecord::read_from(&bytes, offset).unwrap(); - assert_eq!(record.key().as_ref(), *expected_key); - match expected_value { - Some(v) => assert_eq!(record.value().unwrap().as_ref(), *v), - None => assert!(record.value().is_none()), - } - offset += size; - } - assert_eq!(offset, bytes.len()); } } diff --git a/crates/fluss/src/record/kv/kv_record_batch.rs b/crates/fluss/src/record/kv/kv_record_batch.rs index eb3c09ad..32f712f8 100644 --- a/crates/fluss/src/record/kv/kv_record_batch.rs +++ b/crates/fluss/src/record/kv/kv_record_batch.rs @@ -32,8 +32,11 @@ use bytes::Bytes; use std::io; +use std::sync::Arc; -use crate::record::kv::KvRecord; +use crate::error::Result; +use crate::record::kv::{KvRecord, ReadContext}; +use crate::row::RowDecoder; // Field lengths in bytes pub const LENGTH_LENGTH: usize = 4; @@ -253,38 +256,87 @@ impl KvRecordBatch { ])) } - /// Create an iterator over the records in this batch. - /// This validates the batch checksum before returning the iterator. + /// Create an iterable collection of records in this batch. + /// + /// This validates the batch checksum before returning the records. /// For trusted data paths, use `records_unchecked()` to skip validation. - pub fn records(&self) -> io::Result { + /// + /// Mirrors: KvRecordBatch.records(ReadContext) + pub fn records(&self, read_context: &dyn ReadContext) -> Result { if !self.is_valid() { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Invalid batch checksum", - )); + return Err(crate::error::Error::IoUnexpectedError { + message: "Invalid batch checksum".to_string(), + source: io::Error::new(io::ErrorKind::InvalidData, "Invalid batch checksum"), + }); } - self.records_unchecked() + self.records_unchecked(read_context) } - /// Create an iterator over the records in this batch without validating the checksum - pub fn records_unchecked(&self) -> io::Result { + /// Create an iterable collection of records in this batch without validating the checksum. + pub fn records_unchecked(&self, read_context: &dyn ReadContext) -> Result { let size = self.size_in_bytes()?; let count = self.record_count()?; + let schema_id = self.schema_id()?; + if count < 0 { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("Invalid record count: {count}"), - )); + return Err(crate::error::Error::IoUnexpectedError { + message: format!("Invalid record count: {count}"), + source: io::Error::new(io::ErrorKind::InvalidData, "Invalid record count"), + }); } - Ok(KvRecordIterator { - data: self.data.clone(), - position: self.position + RECORDS_OFFSET, - end: self.position + size, - remaining_count: count, + + // Get row decoder for this schema from context (cached) + let row_decoder = read_context.get_row_decoder(schema_id)?; + + Ok(KvRecords { + iter: KvRecordIterator { + data: self.data.clone(), + position: self.position + RECORDS_OFFSET, + end: self.position + size, + remaining_count: count, + }, + row_decoder, }) } } +/// Iterable collection of KV records with associated decoder. +/// +/// This wrapper provides both iteration capability and access to the row decoder +/// needed to decode record values into typed rows. +pub struct KvRecords { + iter: KvRecordIterator, + row_decoder: Arc, +} + +impl KvRecords { + /// Get a reference to the row decoder for decoding record values. + /// + /// Returns a reference tied to the lifetime of `&self`. + /// Use this when iterating by reference. + pub fn decoder(&self) -> &dyn RowDecoder { + &*self.row_decoder + } + + /// Get an owned Arc to the row decoder. + /// + /// Returns a cloned Arc that can outlive the KvRecords, + /// allowing you to grab it before consuming the iterator. + /// Useful if you must keep the decoder beyond the iterable’s lifetime(collect then decode style) + pub fn decoder_arc(&self) -> Arc { + Arc::clone(&self.row_decoder) + } +} + +impl IntoIterator for KvRecords { + type Item = io::Result; + type IntoIter = KvRecordIterator; + + fn into_iter(self) -> Self::IntoIter { + self.iter + } +} + /// Iterator over records in a KV record batch. pub struct KvRecordIterator { data: Bytes, @@ -319,7 +371,9 @@ impl Iterator for KvRecordIterator { mod tests { use super::*; use crate::metadata::{DataTypes, KvFormat, RowType}; + use crate::record::kv::test_util::TestReadContext; use crate::record::kv::{CURRENT_KV_MAGIC_VALUE, KvRecordBatchBuilder}; + use crate::row::InternalRow; use crate::row::binary::BinaryWriter; use crate::row::compacted::CompactedRow; use bytes::{BufMut, BytesMut}; @@ -380,15 +434,24 @@ mod tests { assert_eq!(batch.batch_sequence().unwrap(), 5); assert_eq!(batch.record_count().unwrap(), 2); - let records: Vec<_> = batch.records().unwrap().collect(); - assert_eq!(records.len(), 2); + // Create ReadContext for reading + let read_context = TestReadContext::compacted(vec![DataTypes::bytes()]); - let record1 = records[0].as_ref().unwrap(); + // Iterate and verify records using typed API + let records = batch.records(&read_context).unwrap(); + let decoder = records.decoder_arc(); // Get Arc before consuming + + let mut iter = records.into_iter(); + let record1 = iter.next().unwrap().unwrap(); assert_eq!(record1.key().as_ref(), key1); - assert_eq!(record1.value().unwrap().as_ref(), value1_writer.buffer()); + assert!(!record1.is_deletion()); + let row1 = record1.row(&*decoder).unwrap(); + assert_eq!(row1.get_bytes(0), &[1, 2, 3, 4, 5]); - let record2 = records[1].as_ref().unwrap(); + let record2 = iter.next().unwrap().unwrap(); assert_eq!(record2.key().as_ref(), key2); - assert!(record2.value().is_none()); + assert!(record2.is_deletion()); + + assert!(iter.next().is_none()); } } diff --git a/crates/fluss/src/record/kv/kv_record_batch_builder.rs b/crates/fluss/src/record/kv/kv_record_batch_builder.rs index c36a8612..636104d1 100644 --- a/crates/fluss/src/record/kv/kv_record_batch_builder.rs +++ b/crates/fluss/src/record/kv/kv_record_batch_builder.rs @@ -330,22 +330,20 @@ mod tests { } #[test] - fn test_builder_basic_workflow() { + fn test_builder_basic_operations() { + // Test basic workflow: initial state, writer state, append, close, build let schema_id = 42; let write_limit = 4096; let mut builder = KvRecordBatchBuilder::new(schema_id, write_limit, KvFormat::COMPACTED); - // Test initial state assert!(!builder.is_closed()); assert_eq!(builder.writer_id(), NO_WRITER_ID); assert_eq!(builder.batch_sequence(), NO_BATCH_SEQUENCE); - // Test writer state builder.set_writer_state(100, 5); assert_eq!(builder.writer_id(), 100); assert_eq!(builder.batch_sequence(), 5); - // Test appending records let key1 = b"key1"; let value1 = create_test_row(b"value1"); assert!(builder.has_room_for_row(key1, Some(&value1))); @@ -355,7 +353,6 @@ mod tests { assert!(builder.has_room_for_row::(key2, None)); builder.append_row::(key2, None).unwrap(); - // Test close and build builder.close().unwrap(); assert!(builder.is_closed()); @@ -365,11 +362,8 @@ mod tests { // Building again should return cached result let bytes2 = builder.build().unwrap(); assert_eq!(bytes.len(), bytes2.len()); - } - #[test] - fn test_builder_lifecycle() { - // Test abort behavior + // Test lifecycle: abort behavior let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); let value = create_test_row(b"value"); builder.append_row(b"key", Some(&value)).unwrap(); @@ -378,13 +372,30 @@ mod tests { assert!(builder.build().is_err()); assert!(builder.close().is_err()); - // Test close behavior + // Test lifecycle: close behavior let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); let value = create_test_row(b"value"); builder.append_row(b"key", Some(&value)).unwrap(); builder.close().unwrap(); - assert!(builder.append_row::(b"key2", None).is_err()); // Can't append after close - assert!(builder.build().is_ok()); // But can still build + assert!(builder.append_row::(b"key2", None).is_err()); + assert!(builder.build().is_ok()); + + // Test KvFormat validation + let mut row_writer = CompactedRowWriter::new(1); + row_writer.write_int(42); + let row_type = RowType::with_data_types(vec![DataTypes::int()]); + let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer()); + + // INDEXED format should reject append_row + let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED); + let result = indexed_builder.append_row(b"key", Some(row)); + assert!(result.is_err()); + assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); + + // COMPACTED format should accept append_row + let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); + let result = compacted_builder.append_row(b"key", Some(row)); + assert!(result.is_ok()); } #[test] @@ -430,7 +441,10 @@ mod tests { } #[test] - fn test_cache_invalidation_on_append() { + fn test_builder_cache_invalidation() { + use crate::record::kv::KvRecordBatch; + + // Test cache invalidation on append let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); builder.set_writer_state(100, 5); @@ -446,18 +460,13 @@ mod tests { let len2 = bytes2.len(); // Verify the second build includes both records - assert!(len2 > len1, "Second build should be larger"); - - use crate::record::kv::KvRecordBatch; + assert!(len2 > len1); let batch = KvRecordBatch::new(bytes2, 0); assert!(batch.is_valid()); - assert_eq!(batch.record_count().unwrap(), 2, "Should have 2 records"); - } + assert_eq!(batch.record_count().unwrap(), 2); - #[test] - fn test_cache_invalidation_on_set_writer_state() { + // Test cache invalidation on writer state change let mut builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); - builder.set_writer_state(100, 5); let value = create_test_row(b"value"); builder.append_row(b"key", Some(&value)).unwrap(); @@ -467,24 +476,19 @@ mod tests { builder.set_writer_state(200, 10); let bytes2 = builder.build().unwrap(); - assert_ne!( - bytes1, bytes2, - "Bytes should differ after writer state change" - ); + assert_ne!(bytes1, bytes2); - use crate::record::kv::KvRecordBatch; let batch1 = KvRecordBatch::new(bytes1, 0); let batch2 = KvRecordBatch::new(bytes2, 0); assert_eq!(batch1.writer_id().unwrap(), 100); assert_eq!(batch1.batch_sequence().unwrap(), 5); - assert_eq!(batch2.writer_id().unwrap(), 200); assert_eq!(batch2.batch_sequence().unwrap(), 10); } #[test] - fn test_builder_with_compacted_row_writer() { + fn test_builder_with_compacted_row_writer() -> crate::error::Result<()> { use crate::record::kv::KvRecordBatch; use crate::row::InternalRow; use crate::row::compacted::CompactedRow; @@ -502,7 +506,7 @@ mod tests { let key1 = b"key1"; assert!(builder.has_room_for_row(key1, Some(row1))); - builder.append_row(key1, Some(row1)).unwrap(); + builder.append_row(key1, Some(row1))?; // Create and append second record let mut row_writer2 = CompactedRowWriter::new(2); @@ -512,63 +516,57 @@ mod tests { let row2 = &CompactedRow::from_bytes(&row_type, row_writer2.buffer()); let key2 = b"key2"; - builder.append_row(key2, Some(row2)).unwrap(); + builder.append_row(key2, Some(row2))?; // Append a deletion record let key3 = b"key3"; - builder.append_row::(key3, None).unwrap(); + builder.append_row::(key3, None)?; // Build and verify - builder.close().unwrap(); - let bytes = builder.build().unwrap(); + builder.close()?; + let bytes = builder.build()?; let batch = KvRecordBatch::new(bytes, 0); assert!(batch.is_valid()); - assert_eq!(batch.record_count().unwrap(), 3); - assert_eq!(batch.writer_id().unwrap(), 100); - assert_eq!(batch.batch_sequence().unwrap(), 5); - - // Read back and verify records - let records: Vec<_> = batch.records().unwrap().collect(); - assert_eq!(records.len(), 3); - - // Verify first record - let record1 = records[0].as_ref().unwrap(); - assert_eq!(record1.key().as_ref(), key1); - let row1 = CompactedRow::from_bytes(&row_type, record1.value().unwrap()); - assert_eq!(row1.get_int(0), 42); - assert_eq!(row1.get_string(1), "hello"); - - // Verify second record - let record2 = records[1].as_ref().unwrap(); - assert_eq!(record2.key().as_ref(), key2); - let row2 = CompactedRow::from_bytes(&row_type, record2.value().unwrap()); - assert_eq!(row2.get_int(0), 100); - assert_eq!(row2.get_string(1), "world"); - - // Verify deletion record - let record3 = records[2].as_ref().unwrap(); - assert_eq!(record3.key().as_ref(), key3); - assert!(record3.value().is_none()); - } - - #[test] - fn test_kv_format_validation() { - let mut row_writer = CompactedRowWriter::new(1); - row_writer.write_int(42); - - let row_type = RowType::with_data_types([DataTypes::int()].to_vec()); - let row = &CompactedRow::from_bytes(&row_type, row_writer.buffer()); - - // INDEXED format should reject append_row - let mut indexed_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::INDEXED); - let result = indexed_builder.append_row(b"key", Some(row)); - assert!(result.is_err()); - assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidInput); + assert_eq!(batch.record_count()?, 3); + assert_eq!(batch.writer_id()?, 100); + assert_eq!(batch.batch_sequence()?, 5); + + // Create ReadContext for reading typed rows + let types = vec![DataTypes::int(), DataTypes::string()]; + let read_context = crate::record::kv::test_util::TestReadContext::compacted(types); + + // Read back and verify records using idiomatic for-loop + let records = batch.records(&read_context)?; + let decoder = records.decoder_arc(); + let mut record_count = 0; + + for rec in records { + let rec = rec?; + record_count += 1; + + match record_count { + 1 => { + assert_eq!(rec.key().as_ref(), key1); + let row = rec.row(&*decoder).unwrap(); + assert_eq!(row.get_int(0), 42); + assert_eq!(row.get_string(1), "hello"); + } + 2 => { + assert_eq!(rec.key().as_ref(), key2); + let row = rec.row(&*decoder).unwrap(); + assert_eq!(row.get_int(0), 100); + assert_eq!(row.get_string(1), "world"); + } + 3 => { + assert_eq!(rec.key().as_ref(), key3); + assert!(rec.is_deletion()); + } + _ => panic!("Unexpected record count"), + } + } - // COMPACTED format should accept append_row - let mut compacted_builder = KvRecordBatchBuilder::new(1, 4096, KvFormat::COMPACTED); - let result = compacted_builder.append_row(b"key", Some(row)); - assert!(result.is_ok()); + assert_eq!(record_count, 3); + Ok(()) } } diff --git a/crates/fluss/src/record/kv/kv_record_read_context.rs b/crates/fluss/src/record/kv/kv_record_read_context.rs new file mode 100644 index 00000000..2049c326 --- /dev/null +++ b/crates/fluss/src/record/kv/kv_record_read_context.rs @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Default implementation of ReadContext with decoder caching. + +use super::ReadContext; +use crate::error::{Error, Result}; +use crate::metadata::{KvFormat, Schema}; +use crate::row::{RowDecoder, RowDecoderFactory}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +/// Trait for fetching schemas by ID. +/// +/// This trait abstracts schema retrieval, allowing different implementations +/// (e.g., from metadata store, cache, or test mocks). +pub trait SchemaGetter: Send + Sync { + /// Get the schema for the given schema ID. + /// + /// # Arguments + /// * `schema_id` - The schema ID to fetch + /// + /// # Returns + /// An Arc-wrapped Schema for the specified ID, or an error if the schema + /// cannot be fetched (missing ID, network error, etc.) + fn get_schema(&self, schema_id: i16) -> Result>; +} + +/// Default implementation of ReadContext with decoder caching. +/// +/// This implementation caches RowDecoders by schema ID for performance, +/// avoiding repeated schema lookups and decoder creation. +/// +/// Reference: org.apache.fluss.record.KvRecordReadContext +pub struct KvRecordReadContext { + kv_format: KvFormat, + schema_getter: Arc, + row_decoder_cache: Mutex>>, +} + +impl KvRecordReadContext { + /// Create a new KvRecordReadContext. + /// + /// # Arguments + /// * `kv_format` - The KV format (COMPACTED or INDEXED) + /// * `schema_getter` - The schema getter for fetching schemas by ID + /// + /// # Returns + /// A new KvRecordReadContext instance + pub fn new(kv_format: KvFormat, schema_getter: Arc) -> Self { + Self { + kv_format, + schema_getter, + row_decoder_cache: Mutex::new(HashMap::new()), + } + } +} + +impl ReadContext for KvRecordReadContext { + fn get_row_decoder(&self, schema_id: i16) -> Result> { + // First check: fast path + { + let cache = self + .row_decoder_cache + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + if let Some(decoder) = cache.get(&schema_id) { + return Ok(Arc::clone(decoder)); + } + } // Release lock before expensive operations + + // Build decoder outside the lock to avoid blocking other threads + let schema = self.schema_getter.get_schema(schema_id)?; + let row_type = match schema.row_type() { + crate::metadata::DataType::Row(row_type) => row_type.clone(), + other => { + return Err(Error::IoUnexpectedError { + message: format!( + "Schema {} has invalid row type: expected Row, got {:?}", + schema_id, other + ), + source: std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Invalid row type", + ), + }); + } + }; + + // Create decoder outside lock + let decoder = RowDecoderFactory::create(self.kv_format.clone(), row_type)?; + + // Second check: insert only if another thread didn't beat us to it + { + let mut cache = self + .row_decoder_cache + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()); + // Check again - another thread might have inserted while we were building + if let Some(existing) = cache.get(&schema_id) { + return Ok(Arc::clone(existing)); + } + cache.insert(schema_id, Arc::clone(&decoder)); + } + + Ok(decoder) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::{DataTypes, Schema}; + + struct MockSchemaGetter { + schema: Arc, + } + + impl MockSchemaGetter { + fn new(data_types: Vec) -> Self { + let mut builder = Schema::builder(); + for (i, dt) in data_types.iter().enumerate() { + builder = builder.column(&format!("field{}", i), dt.clone()); + } + let schema = builder.build().expect("Failed to build schema"); + + Self { + schema: Arc::new(schema), + } + } + } + + impl SchemaGetter for MockSchemaGetter { + fn get_schema(&self, _schema_id: i16) -> Result> { + Ok(Arc::clone(&self.schema)) + } + } + + #[test] + fn test_kv_record_read_context() { + // Test decoder caching for same schema ID + let schema_getter = Arc::new(MockSchemaGetter::new(vec![ + DataTypes::int(), + DataTypes::string(), + ])); + let read_context = KvRecordReadContext::new(KvFormat::COMPACTED, schema_getter); + + // Get decoder twice - should return the same instance (cached) + let decoder1 = read_context.get_row_decoder(42).unwrap(); + let decoder2 = read_context.get_row_decoder(42).unwrap(); + + // Verify same instance (Arc pointer equality) + assert!(Arc::ptr_eq(&decoder1, &decoder2)); + + // Test different schema IDs get different decoders + let schema_getter = Arc::new(MockSchemaGetter::new(vec![DataTypes::int()])); + let read_context = KvRecordReadContext::new(KvFormat::COMPACTED, schema_getter); + + let decoder1 = read_context.get_row_decoder(10).unwrap(); + let decoder2 = read_context.get_row_decoder(20).unwrap(); + + // Should be different instances + assert!(!Arc::ptr_eq(&decoder1, &decoder2)); + } +} diff --git a/crates/fluss/src/record/kv/mod.rs b/crates/fluss/src/record/kv/mod.rs index ecb762df..857c5e5f 100644 --- a/crates/fluss/src/record/kv/mod.rs +++ b/crates/fluss/src/record/kv/mod.rs @@ -20,10 +20,17 @@ mod kv_record; mod kv_record_batch; mod kv_record_batch_builder; +mod kv_record_read_context; +mod read_context; + +#[cfg(test)] +mod test_util; pub use kv_record::{KvRecord, LENGTH_LENGTH as KV_RECORD_LENGTH_LENGTH}; pub use kv_record_batch::*; pub use kv_record_batch_builder::*; +pub use kv_record_read_context::{KvRecordReadContext, SchemaGetter}; +pub use read_context::ReadContext; /// Current KV magic value pub const CURRENT_KV_MAGIC_VALUE: u8 = 0; diff --git a/crates/fluss/src/record/kv/read_context.rs b/crates/fluss/src/record/kv/read_context.rs new file mode 100644 index 00000000..63502613 --- /dev/null +++ b/crates/fluss/src/record/kv/read_context.rs @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Read context for KV record batches. +//! +//! Provides schema and decoder information needed for typed record reading. + +use crate::error::Result; +use crate::row::RowDecoder; +use std::sync::Arc; + +/// Context for reading KV records with type information. +/// +/// The ReadContext provides access to RowDecoders based on schema IDs, +/// enabling typed deserialization of KV record values. +/// +/// Reference: org.apache.fluss.record.KvRecordBatch.ReadContext +pub trait ReadContext: Send + Sync { + /// Get the row decoder for the given schema ID. + /// + /// The decoder is typically cached, so repeated calls with the same + /// schema ID should return the same decoder instance. + /// + /// # Arguments + /// * `schema_id` - The schema ID for which to get the decoder + /// + /// # Returns + /// An Arc-wrapped RowDecoder for the specified schema, or an error if + /// the schema is invalid or cannot be retrieved + fn get_row_decoder(&self, schema_id: i16) -> Result>; +} diff --git a/crates/fluss/src/record/kv/test_util.rs b/crates/fluss/src/record/kv/test_util.rs new file mode 100644 index 00000000..50ab911d --- /dev/null +++ b/crates/fluss/src/record/kv/test_util.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Test utilities for KV record reading. + +use super::ReadContext; +use crate::error::Result; +use crate::metadata::{DataType, KvFormat, RowType}; +use crate::row::{RowDecoder, RowDecoderFactory}; +use std::sync::Arc; + +/// Simple test-only ReadContext that creates decoders directly from data types. +/// +/// This bypasses the production Schema/SchemaGetter machinery for simpler tests. +pub(crate) struct TestReadContext { + kv_format: KvFormat, + data_types: Vec, +} + +impl TestReadContext { + /// Create a test context for COMPACTED format (most common case). + pub(crate) fn compacted(data_types: Vec) -> Self { + Self { + kv_format: KvFormat::COMPACTED, + data_types, + } + } +} + +impl ReadContext for TestReadContext { + fn get_row_decoder(&self, _schema_id: i16) -> Result> { + // Directly create decoder from data types - no Schema needed! + let row_type = RowType::with_data_types(self.data_types.clone()); + RowDecoderFactory::create(self.kv_format.clone(), row_type) + } +} diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 3477f1de..536409ef 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -23,11 +23,13 @@ pub mod binary; pub mod compacted; pub mod encode; mod field_getter; +mod row_decoder; pub use column::*; pub use compacted::CompactedRow; pub use datum::*; pub use encode::KeyEncoder; +pub use row_decoder::{CompactedRowDecoder, RowDecoder, RowDecoderFactory}; pub trait BinaryRow: InternalRow { /// Returns the binary representation of this row as a byte slice. diff --git a/crates/fluss/src/row/row_decoder.rs b/crates/fluss/src/row/row_decoder.rs new file mode 100644 index 00000000..9f9b4217 --- /dev/null +++ b/crates/fluss/src/row/row_decoder.rs @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row decoder for deserializing binary row formats. +//! +//! Mirrors the Java org.apache.fluss.row.decode package. + +use crate::error::{Error, Result}; +use crate::metadata::{KvFormat, RowType}; +use crate::row::compacted::{CompactedRow, CompactedRowDeserializer}; +use std::sync::Arc; + +/// Decoder for creating BinaryRow from bytes. +/// +/// This trait provides an abstraction for decoding different row formats +/// (COMPACTED, INDEXED, etc.) from binary data. +/// +/// Reference: org.apache.fluss.row.decode.RowDecoder +pub trait RowDecoder: Send + Sync { + /// Decode bytes into a CompactedRow. + /// + /// The lifetime 'a ties the returned row to the input data, ensuring + /// the data remains valid as long as the row is used. + fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a>; +} + +/// Decoder for CompactedRow format. +/// +/// Uses the existing CompactedRow infrastructure for decoding. +/// This is a thin wrapper that implements the RowDecoder trait. +/// +/// Reference: org.apache.fluss.row.decode.CompactedRowDecoder +pub struct CompactedRowDecoder { + field_count: usize, + deserializer: Arc>, +} + +impl CompactedRowDecoder { + /// Create a new CompactedRowDecoder with the given row type. + pub fn new(row_type: RowType) -> Self { + let field_count = row_type.fields().len(); + let deserializer = Arc::new(CompactedRowDeserializer::new_from_owned(row_type)); + + Self { + field_count, + deserializer, + } + } +} + +impl RowDecoder for CompactedRowDecoder { + fn decode<'a>(&self, data: &'a [u8]) -> CompactedRow<'a> { + // Use existing CompactedRow::deserialize() infrastructure + CompactedRow::deserialize(Arc::clone(&self.deserializer), self.field_count, data) + } +} + +/// Factory for creating RowDecoders based on KvFormat. +/// +/// Reference: org.apache.fluss.row.decode.RowDecoder.create() +pub struct RowDecoderFactory; + +impl RowDecoderFactory { + /// Create a RowDecoder for the given format and row type. + pub fn create(kv_format: KvFormat, row_type: RowType) -> Result> { + match kv_format { + KvFormat::COMPACTED => Ok(Arc::new(CompactedRowDecoder::new(row_type))), + KvFormat::INDEXED => Err(Error::UnsupportedOperation { + message: "INDEXED format is not yet supported".to_string(), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::metadata::DataTypes; + use crate::row::InternalRow; + use crate::row::binary::BinaryWriter; + use crate::row::compacted::CompactedRowWriter; + + #[test] + fn test_compacted_row_decoder() { + // Write a CompactedRow + let mut writer = CompactedRowWriter::new(2); + writer.write_int(42); + writer.write_string("hello"); + + let data = writer.to_bytes(); + + // Create decoder with RowType + let row_type = RowType::with_data_types(vec![DataTypes::int(), DataTypes::string()]); + let decoder = CompactedRowDecoder::new(row_type); + + // Decode + let row = decoder.decode(&data); + + // Verify + assert_eq!(row.get_field_count(), 2); + assert_eq!(row.get_int(0), 42); + assert_eq!(row.get_string(1), "hello"); + } + + #[test] + fn test_row_decoder_factory() { + let row_type = RowType::with_data_types(vec![DataTypes::int(), DataTypes::string()]); + let decoder = RowDecoderFactory::create(KvFormat::COMPACTED, row_type).unwrap(); + + // Write a row + let mut writer = CompactedRowWriter::new(2); + writer.write_int(100); + writer.write_string("world"); + let data = writer.to_bytes(); + + // Decode + let row = decoder.decode(&data); + + // Verify + assert_eq!(row.get_int(0), 100); + assert_eq!(row.get_string(1), "world"); + } +}