Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 89 additions & 77 deletions crates/fluss/src/record/kv/kv_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
use bytes::{BufMut, Bytes, BytesMut};
use std::io;

use crate::row::RowDecoder;
use crate::row::compacted::CompactedRow;
use crate::util::varint::{
read_unsigned_varint_bytes, size_of_unsigned_varint, write_unsigned_varint_buf,
};

/// Length field size in bytes
pub const LENGTH_LENGTH: usize = 4;

/// A key-value record.
/// A key-value record containing raw key and value bytes.
///
/// The schema is:
/// - Length => Int32
Expand All @@ -43,34 +45,39 @@ pub const LENGTH_LENGTH: usize = 4;
/// - Value => bytes (BinaryRow, written directly without length prefix)
///
/// When the value is None (deletion), no Value bytes are present.
// Reference implementation:
// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
///
/// This struct stores only raw bytes. To decode the value into a typed row,
/// use the `row()` method with a RowDecoder (typically obtained from the iterator).
///
/// Reference implementation:
/// https://github.com/apache/fluss/blob/main/fluss-common/src/main/java/org/apache/fluss/record/KvRecord.java
#[derive(Debug, Clone)]
pub struct KvRecord {
key: Bytes,
value: Option<Bytes>,
value_bytes: Option<Bytes>,
size_in_bytes: usize,
}

impl KvRecord {
/// Create a new KvRecord with the given key and optional value.
pub fn new(key: Bytes, value: Option<Bytes>) -> Self {
let size_in_bytes = Self::size_of(&key, value.as_deref());
Self {
key,
value,
size_in_bytes,
}
}

/// Get the key bytes.
pub fn key(&self) -> &Bytes {
&self.key
}

/// Get the value bytes (None indicates a deletion).
pub fn value(&self) -> Option<&Bytes> {
self.value.as_ref()
/// Get the raw value bytes (for testing).
#[cfg(test)]
pub(crate) fn value_bytes(&self) -> Option<&Bytes> {
self.value_bytes.as_ref()
}

/// Decode the value bytes into a typed row using the provided decoder.
/// This creates a lightweight CompactedRow view over the raw bytes.
/// Actual field parsing is lazy (on first access).
pub fn row<'a>(&'a self, decoder: &dyn RowDecoder) -> Option<CompactedRow<'a>> {
self.value_bytes.as_ref().map(|bytes| {
// Decode on-demand - CompactedRow<'a> lifetime tied to &'a self
decoder.decode(bytes.as_ref())
})
}

/// Calculate the total size of the record when serialized (including length prefix).
Expand Down Expand Up @@ -121,8 +128,7 @@ impl KvRecord {
/// Read a KV record from bytes at the given position.
///
/// Returns the KvRecord and the number of bytes consumed.
///
/// TODO: Connect KvReadContext and return CompactedRow records.
/// The record contains only raw bytes; use `row()` with a RowDecoder to decode the value.
pub fn read_from(bytes: &Bytes, position: usize) -> io::Result<(Self, usize)> {
if bytes.len() < position.saturating_add(LENGTH_LENGTH) {
return Err(io::Error::new(
Expand Down Expand Up @@ -183,11 +189,10 @@ impl KvRecord {
let key = bytes.slice(current_offset..key_end);
current_offset = key_end;

// Read value bytes directly
let value = if current_offset < record_end {
// Read value bytes directly (don't decode yet - will decode on-demand)
let value_bytes = if current_offset < record_end {
// Value is present: all remaining bytes are the value
let value_bytes = bytes.slice(current_offset..record_end);
Some(value_bytes)
Some(bytes.slice(current_offset..record_end))
} else {
// No remaining bytes: this is a deletion record
None
Expand All @@ -196,7 +201,7 @@ impl KvRecord {
Ok((
Self {
key,
value,
value_bytes,
size_in_bytes: total_size,
},
total_size,
Expand All @@ -207,37 +212,37 @@ impl KvRecord {
pub fn get_size_in_bytes(&self) -> usize {
self.size_in_bytes
}

/// Check if this is a deletion record (no value).
pub fn is_deletion(&self) -> bool {
self.value_bytes.is_none()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_kv_record_size_calculation() {
fn test_kv_record_basic_operations() {
let key = b"test_key";
let value = b"test_value";

// With value (no value length varint)
// Test size calculation with value
let size_with_value = KvRecord::size_of(key, Some(value));
assert_eq!(
size_with_value,
LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len() + value.len()
);

// Without value
// Test size calculation without value (deletion)
let size_without_value = KvRecord::size_of(key, None);
assert_eq!(
size_without_value,
LENGTH_LENGTH + size_of_unsigned_varint(key.len() as u32) + key.len()
);
}

#[test]
fn test_kv_record_write_read_round_trip() {
let key = b"my_key";
let value = b"my_value_data";

// Test write/read round trip with value
let mut buf = BytesMut::new();
let written = KvRecord::write_to_buf(&mut buf, key, Some(value)).unwrap();

Expand All @@ -246,40 +251,70 @@ mod tests {

assert_eq!(written, read_size);
assert_eq!(record.key().as_ref(), key);
assert_eq!(record.value().unwrap().as_ref(), value);
assert_eq!(record.value_bytes().unwrap().as_ref(), value);
assert_eq!(record.get_size_in_bytes(), written);
}

#[test]
fn test_kv_record_deletion() {
let key = b"delete_me";
assert!(!record.is_deletion());

// Write deletion record (no value)
// Test deletion record (no value)
let delete_key = b"delete_me";
let mut buf = BytesMut::new();
let written = KvRecord::write_to_buf(&mut buf, key, None).unwrap();
let written = KvRecord::write_to_buf(&mut buf, delete_key, None).unwrap();

let bytes = buf.freeze();
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();

assert_eq!(written, read_size);
assert_eq!(record.key().as_ref(), key);
assert!(record.value().is_none());
assert_eq!(record.key().as_ref(), delete_key);
assert!(record.is_deletion());
assert!(record.value_bytes().is_none());
}

#[test]
fn test_kv_record_with_large_key() {
let key = vec![0u8; 1024];
let value = vec![1u8; 4096];
fn test_kv_record_multiple_records() {
// Test multiple regular-sized records in buffer
let records = vec![
(b"key1".as_slice(), Some(b"value1".as_slice())),
(b"key2".as_slice(), None), // Deletion
(b"key3".as_slice(), Some(b"value3".as_slice())),
];

let mut buf = BytesMut::new();
let written = KvRecord::write_to_buf(&mut buf, &key, Some(&value)).unwrap();
for (key, value) in &records {
KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
}

let bytes = buf.freeze();
let mut offset = 0;
for (expected_key, expected_value) in &records {
let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
assert_eq!(record.key().as_ref(), *expected_key);
match expected_value {
Some(v) => {
assert_eq!(record.value_bytes().unwrap().as_ref(), *v);
assert!(!record.is_deletion());
}
None => {
assert!(record.is_deletion());
assert!(record.value_bytes().is_none());
}
}
offset += size;
}
assert_eq!(offset, bytes.len());

// Test large keys and values
let large_key = vec![0u8; 1024];
let large_value = vec![1u8; 4096];

let mut buf = BytesMut::new();
let written = KvRecord::write_to_buf(&mut buf, &large_key, Some(&large_value)).unwrap();

let bytes = buf.freeze();
let (record, read_size) = KvRecord::read_from(&bytes, 0).unwrap();

assert_eq!(written, read_size);
assert_eq!(record.key().len(), key.len());
assert_eq!(record.value().unwrap().len(), value.len());
assert_eq!(record.key().len(), large_key.len());
assert_eq!(record.value_bytes().unwrap().len(), large_value.len());
}

#[test]
Expand All @@ -291,7 +326,9 @@ mod tests {
let bytes = buf.freeze();
let result = KvRecord::read_from(&bytes, 0);
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::InvalidData);
if let Err(e) = result {
assert_eq!(e.kind(), io::ErrorKind::InvalidData);
}

// Test overflow length
let mut buf = BytesMut::new();
Expand All @@ -307,33 +344,8 @@ mod tests {
let bytes = buf.freeze();
let result = KvRecord::read_from(&bytes, 0);
assert!(result.is_err());
assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
}

#[test]
fn test_multiple_records_in_buffer() {
let records = vec![
(b"key1".as_slice(), Some(b"value1".as_slice())),
(b"key2".as_slice(), None),
(b"key3".as_slice(), Some(b"value3".as_slice())),
];

let mut buf = BytesMut::new();
for (key, value) in &records {
KvRecord::write_to_buf(&mut buf, key, *value).unwrap();
if let Err(e) = result {
assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
}

let bytes = buf.freeze();
let mut offset = 0;
for (expected_key, expected_value) in &records {
let (record, size) = KvRecord::read_from(&bytes, offset).unwrap();
assert_eq!(record.key().as_ref(), *expected_key);
match expected_value {
Some(v) => assert_eq!(record.value().unwrap().as_ref(), *v),
None => assert!(record.value().is_none()),
}
offset += size;
}
assert_eq!(offset, bytes.len());
}
}
Loading
Loading