Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
378 changes: 351 additions & 27 deletions crates/fluss/src/client/table/log_fetch_buffer.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl PendingFetch for RemotePendingFetch {
self.read_context,
self.fetch_offset,
self.high_watermark,
)?;
);

Ok(Box::new(completed_fetch))
}
Expand Down
628 changes: 495 additions & 133 deletions crates/fluss/src/client/table/scanner.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/fluss/src/client/write/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ mod tests {
use crate::row::{Datum, GenericRow};
use crate::rpc::FlussError;
use crate::test_utils::build_cluster_arc;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

async fn build_ready_batch(
accumulator: &RecordAccumulator,
Expand Down
5 changes: 5 additions & 0 deletions crates/fluss/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ pub enum Error {
)]
IoUnsupported { message: String },

#[snafu(
visibility(pub(crate)),
display("Fluss hitting wakeup error {}.", message)
)]
WakeupError { message: String },
#[snafu(
visibility(pub(crate)),
display("Fluss hitting unsupported operation error {}.", message)
Expand Down
67 changes: 50 additions & 17 deletions crates/fluss/src/record/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::client::{Record, WriteRecord};
use crate::compression::ArrowCompressionInfo;
use crate::error::Result;
use crate::error::{Error, Result};
use crate::metadata::DataType;
use crate::record::{ChangeType, ScanRecord};
use crate::row::{ColumnarRow, GenericRow};
Expand Down Expand Up @@ -446,7 +446,7 @@ impl LogRecordBatch {
}

pub fn ensure_valid(&self) -> Result<()> {
// todo
// TODO enable validation once checksum handling is corrected.
Ok(())
}

Expand Down Expand Up @@ -780,8 +780,10 @@ impl ReadContext {
arrow_schema: SchemaRef,
projected_fields: Vec<usize>,
is_from_remote: bool,
) -> ReadContext {
let target_schema = Self::project_schema(arrow_schema.clone(), projected_fields.as_slice());
) -> Result<ReadContext> {
Self::validate_projection(&arrow_schema, projected_fields.as_slice())?;
let target_schema =
Self::project_schema(arrow_schema.clone(), projected_fields.as_slice())?;
// the logic is little bit of hard to understand, to refactor it to follow
// java side
let (need_do_reorder, sorted_fields) = {
Expand All @@ -804,16 +806,20 @@ impl ReadContext {
// Calculate reordering indexes to transform from sorted order to user-requested order
let mut reordering_indexes = Vec::with_capacity(projected_fields.len());
for &original_idx in &projected_fields {
let pos = sorted_fields
.binary_search(&original_idx)
.expect("projection index should exist in sorted list");
let pos = sorted_fields.binary_search(&original_idx).map_err(|_| {
Error::IllegalArgument {
message: format!(
"Projection index {original_idx} is invalid for the current schema."
),
}
})?;
reordering_indexes.push(pos);
}
Projection {
ordered_schema: Self::project_schema(
arrow_schema.clone(),
sorted_fields.as_slice(),
),
)?,
projected_fields,
ordered_fields: sorted_fields,
reordering_indexes,
Expand All @@ -824,7 +830,7 @@ impl ReadContext {
ordered_schema: Self::project_schema(
arrow_schema.clone(),
projected_fields.as_slice(),
),
)?,
ordered_fields: projected_fields.clone(),
projected_fields,
reordering_indexes: vec![],
Expand All @@ -833,21 +839,34 @@ impl ReadContext {
}
};

ReadContext {
Ok(ReadContext {
target_schema,
full_schema: arrow_schema,
projection: Some(project),
is_from_remote,
})
}

fn validate_projection(schema: &SchemaRef, projected_fields: &[usize]) -> Result<()> {
let field_count = schema.fields().len();
for &index in projected_fields {
if index >= field_count {
return Err(Error::IllegalArgument {
message: format!(
"Projection index {index} is out of bounds for schema with {field_count} fields."
),
});
}
}
Ok(())
}

pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> SchemaRef {
// todo: handle the exception
SchemaRef::new(
schema
.project(projected_fields)
.expect("can't project schema"),
)
pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> Result<SchemaRef> {
Ok(SchemaRef::new(schema.project(projected_fields).map_err(
|e| Error::IllegalArgument {
message: format!("Invalid projection: {e}"),
},
)?))
}

pub fn project_fields(&self) -> Option<&[usize]> {
Expand Down Expand Up @@ -1035,6 +1054,8 @@ pub struct MyVec<T>(pub StreamReader<T>);
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use crate::metadata::DataField;
use crate::metadata::DataTypes;

#[test]
Expand Down Expand Up @@ -1207,6 +1228,18 @@ mod tests {
);
}

#[test]
fn projection_rejects_out_of_bounds_index() {
let row_type = DataTypes::row(vec![
DataField::new("id".to_string(), DataTypes::int(), None),
DataField::new("name".to_string(), DataTypes::string(), None),
]);
let schema = to_arrow_schema(&row_type);
let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], false);

assert!(matches!(result, Err(Error::IllegalArgument { .. })));
}

fn le_bytes(vals: &[u32]) -> Vec<u8> {
let mut out = Vec::with_capacity(vals.len() * 4);
for &v in vals {
Expand Down
62 changes: 62 additions & 0 deletions crates/fluss/src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,65 @@ impl IntoIterator for ScanRecords {
.into_iter()
}
}

#[cfg(test)]
mod tests {
use super::*;
use ::arrow::array::{Int32Array, RecordBatch};
use ::arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;

fn make_row(values: Vec<i32>, row_id: usize) -> ColumnarRow {
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))])
.expect("record batch");
ColumnarRow::new_with_row_id(Arc::new(batch), row_id)
}

#[test]
fn change_type_round_trip() {
let cases = [
(ChangeType::AppendOnly, "+A", 0),
(ChangeType::Insert, "+I", 1),
(ChangeType::UpdateBefore, "-U", 2),
(ChangeType::UpdateAfter, "+U", 3),
(ChangeType::Delete, "-D", 4),
];

for (change_type, short, byte) in cases {
assert_eq!(change_type.short_string(), short);
assert_eq!(change_type.to_byte_value(), byte);
assert_eq!(ChangeType::from_byte_value(byte).unwrap(), change_type);
}

let err = ChangeType::from_byte_value(9).unwrap_err();
assert!(err.contains("Unsupported byte value"));
}

#[test]
fn scan_records_counts_and_iterates() {
let bucket0 = TableBucket::new(1, 0);
let bucket1 = TableBucket::new(1, 1);
let record0 = ScanRecord::new(make_row(vec![10, 11], 0), 5, 7, ChangeType::Insert);
let record1 = ScanRecord::new(make_row(vec![10, 11], 1), 6, 8, ChangeType::Delete);

let mut records = HashMap::new();
records.insert(bucket0.clone(), vec![record0.clone(), record1.clone()]);

let scan_records = ScanRecords::new(records);
assert_eq!(scan_records.records(&bucket0).len(), 2);
assert!(scan_records.records(&bucket1).is_empty());
assert_eq!(scan_records.count(), 2);

let collected: Vec<_> = scan_records.into_iter().collect();
assert_eq!(collected.len(), 2);
}

#[test]
fn scan_record_default_values() {
let record = ScanRecord::new_default(make_row(vec![1], 0));
assert_eq!(record.offset(), -1);
assert_eq!(record.timestamp(), -1);
assert_eq!(record.change_type(), &ChangeType::Insert);
}
}
64 changes: 64 additions & 0 deletions crates/fluss/src/row/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,67 @@ impl InternalRow for ColumnarRow {
.value(self.row_id)
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int8Array,
Int16Array, Int32Array, Int64Array, StringArray,
};
use arrow::datatypes::{DataType, Field, Schema};

#[test]
fn columnar_row_reads_values() {
let schema = Arc::new(Schema::new(vec![
Field::new("b", DataType::Boolean, false),
Field::new("i8", DataType::Int8, false),
Field::new("i16", DataType::Int16, false),
Field::new("i32", DataType::Int32, false),
Field::new("i64", DataType::Int64, false),
Field::new("f32", DataType::Float32, false),
Field::new("f64", DataType::Float64, false),
Field::new("s", DataType::Utf8, false),
Field::new("bin", DataType::Binary, false),
Field::new("char", DataType::FixedSizeBinary(2), false),
]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(BooleanArray::from(vec![true])),
Arc::new(Int8Array::from(vec![1])),
Arc::new(Int16Array::from(vec![2])),
Arc::new(Int32Array::from(vec![3])),
Arc::new(Int64Array::from(vec![4])),
Arc::new(Float32Array::from(vec![1.25])),
Arc::new(Float64Array::from(vec![2.5])),
Arc::new(StringArray::from(vec!["hello"])),
Arc::new(BinaryArray::from(vec![b"data".as_slice()])),
Arc::new(
FixedSizeBinaryArray::try_from_sparse_iter_with_size(
vec![Some(b"ab".as_slice())].into_iter(),
2,
)
.expect("fixed array"),
),
],
)
.expect("record batch");

let mut row = ColumnarRow::new(Arc::new(batch));
assert_eq!(row.get_field_count(), 10);
assert!(row.get_boolean(0));
assert_eq!(row.get_byte(1), 1);
assert_eq!(row.get_short(2), 2);
assert_eq!(row.get_int(3), 3);
assert_eq!(row.get_long(4), 4);
assert_eq!(row.get_float(5), 1.25);
assert_eq!(row.get_double(6), 2.5);
assert_eq!(row.get_string(7), "hello");
assert_eq!(row.get_bytes(8), b"data");
assert_eq!(row.get_char(9, 2), "ab");
row.set_row_id(0);
assert_eq!(row.get_row_id(), 0);
}
}
52 changes: 39 additions & 13 deletions crates/fluss/src/rpc/message/list_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use crate::{impl_read_version_type, impl_write_version_type, proto};

use crate::error::Error;
use crate::error::Result as FlussResult;
use crate::proto::ListOffsetsResponse;
use crate::error::{Error, FlussError};
use crate::proto::{ErrorResponse, ListOffsetsResponse};
use crate::rpc::frame::ReadError;

use crate::rpc::api_key::ApiKey;
Expand Down Expand Up @@ -108,22 +108,48 @@ impl ListOffsetsResponse {
self.buckets_resp
.iter()
.map(|resp| {
if resp.error_code.is_some() {
// todo: consider use another suitable error
Err(Error::UnexpectedError {
if let Some(error_code) = resp.error_code
&& error_code != FlussError::None.code()
{
let api_error = ErrorResponse {
error_code,
error_message: resp.error_message.clone(),
}
.into();
return Err(Error::FlussAPIError { api_error });
}
// if no error msg, offset must exists
resp.offset
.map(|offset| (resp.bucket_id, offset))
.ok_or_else(|| Error::UnexpectedError {
message: format!(
"Missing offset, error message: {}",
resp.error_message
.as_deref()
.unwrap_or("unknown server exception")
"Missing offset for bucket {} without error code.",
resp.bucket_id
),
source: None,
})
} else {
// if no error msg, offset must exists
Ok((resp.bucket_id, resp.offset.unwrap()))
}
})
.collect()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::proto::{ListOffsetsResponse, PbListOffsetsRespForBucket};

#[test]
fn offsets_returns_api_error_on_error_code() {
let response = ListOffsetsResponse {
buckets_resp: vec![PbListOffsetsRespForBucket {
bucket_id: 1,
error_code: Some(FlussError::TableNotExist.code()),
error_message: Some("missing".to_string()),
offset: None,
}],
};

let result = response.offsets();
assert!(matches!(result, Err(Error::FlussAPIError { .. })));
}
}
Loading
Loading