diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index e9bac53f..791c0e0a 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -18,7 +18,7 @@ use arrow::array::RecordBatch; use parking_lot::Mutex; -use crate::error::Result; +use crate::error::{ApiError, Error, Result}; use crate::metadata::TableBucket; use crate::record::{ LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord, @@ -29,12 +29,38 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use tokio::sync::Notify; +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum FetchErrorAction { + Ignore, + LogOffsetOutOfRange, + Authorization, + CorruptMessage, + Unexpected, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) enum FetchErrorLogLevel { + Debug, + Warn, +} + +#[derive(Clone, Debug)] +pub(crate) struct FetchErrorContext { + pub(crate) action: FetchErrorAction, + pub(crate) log_level: FetchErrorLogLevel, + pub(crate) log_message: String, +} + /// Represents a completed fetch that can be consumed pub trait CompletedFetch: Send + Sync { fn table_bucket(&self) -> &TableBucket; + fn api_error(&self) -> Option<&ApiError>; + fn fetch_error_context(&self) -> Option<&FetchErrorContext>; + fn take_error(&mut self) -> Option; fn fetch_records(&mut self, max_records: usize) -> Result>; fn fetch_batches(&mut self, max_batches: usize) -> Result>; fn is_consumed(&self) -> bool; + fn records_read(&self) -> usize; fn drain(&mut self); fn size_in_bytes(&self) -> usize; fn high_watermark(&self) -> i64; @@ -52,6 +78,7 @@ pub trait PendingFetch: Send + Sync { /// Thread-safe buffer for completed fetches pub struct LogFetchBuffer { + read_context: ReadContext, completed_fetches: Mutex>>, pending_fetches: Mutex>>>, next_in_line_fetch: Mutex>>, @@ -60,8 +87,9 @@ pub struct LogFetchBuffer { } impl LogFetchBuffer { - pub fn new() -> Self { + pub fn new(read_context: ReadContext) -> Self { Self { + read_context, completed_fetches: Mutex::new(VecDeque::new()), pending_fetches: Mutex::new(HashMap::new()), next_in_line_fetch: Mutex::new(None), @@ -75,26 +103,28 @@ impl LogFetchBuffer { self.completed_fetches.lock().is_empty() } - /// Wait for the buffer to become non-empty, with timeout - /// Returns true if data became available, false if timeout - pub async fn await_not_empty(&self, timeout: Duration) -> bool { + /// Wait for the buffer to become non-empty, with timeout. + /// Returns true if data became available, false if timeout. + pub async fn await_not_empty(&self, timeout: Duration) -> Result { let deadline = std::time::Instant::now() + timeout; loop { // Check if buffer is not empty if !self.is_empty() { - return true; + return Ok(true); } // Check if woken up if self.woken_up.swap(false, Ordering::Acquire) { - return true; + return Err(Error::WakeupError { + message: "The await operation was interrupted by wakeup.".to_string(), + }); } // Check if timeout let now = std::time::Instant::now(); if now >= deadline { - return false; + return Ok(false); } // Wait for notification with remaining time @@ -102,7 +132,7 @@ impl LogFetchBuffer { let notified = self.not_empty_notify.notified(); tokio::select! { _ = tokio::time::sleep(remaining) => { - return false; // Timeout + return Ok(false); // Timeout } _ = notified => { // Got notification, check again @@ -119,6 +149,26 @@ impl LogFetchBuffer { self.not_empty_notify.notify_waiters(); } + pub(crate) fn add_api_error( + &self, + table_bucket: TableBucket, + api_error: ApiError, + fetch_error_context: FetchErrorContext, + fetch_offset: i64, + ) { + let error_fetch = DefaultCompletedFetch::from_api_error( + table_bucket, + api_error, + fetch_error_context, + fetch_offset, + self.read_context.clone(), + ); + self.completed_fetches + .lock() + .push_back(Box::new(error_fetch)); + self.not_empty_notify.notify_waiters(); + } + /// Add a pending fetch to the buffer pub fn pend(&self, pending_fetch: Box) { let table_bucket = pending_fetch.table_bucket().clone(); @@ -136,6 +186,7 @@ impl LogFetchBuffer { // holding both locks simultaneously. let mut completed_to_push: Vec> = Vec::new(); let mut has_completed = false; + let mut pending_error: Option = None; { let mut pending_map = self.pending_fetches.lock(); if let Some(pendings) = pending_map.get_mut(table_bucket) { @@ -148,8 +199,9 @@ impl LogFetchBuffer { has_completed = true; } Err(e) => { - // todo: handle exception? - log::error!("Error when completing: {e}"); + pending_error = Some(e); + has_completed = true; + break; } } } else { @@ -162,11 +214,22 @@ impl LogFetchBuffer { } } + if let Some(error) = pending_error { + let error_fetch = DefaultCompletedFetch::from_error( + table_bucket.clone(), + error, + -1, + self.read_context.clone(), + ); + completed_to_push.push(Box::new(error_fetch)); + } + if !completed_to_push.is_empty() { let mut completed_queue = self.completed_fetches.lock(); for completed in completed_to_push { completed_queue.push_back(completed); } + has_completed = true; } if has_completed { @@ -228,12 +291,6 @@ impl LogFetchBuffer { } } -impl Default for LogFetchBuffer { - fn default() -> Self { - Self::new() - } -} - /// A wrapper that makes a completed fetch look like a pending fetch struct CompletedPendingFetch { completed_fetch: Box, @@ -262,6 +319,9 @@ impl PendingFetch for CompletedPendingFetch { /// Default implementation of CompletedFetch for in-memory log records pub struct DefaultCompletedFetch { table_bucket: TableBucket, + api_error: Option, + fetch_error_context: Option, + error: Option, log_record_batch: LogRecordsBatches, read_context: ReadContext, next_fetch_offset: i64, @@ -272,6 +332,9 @@ pub struct DefaultCompletedFetch { records_read: usize, current_record_iterator: Option, current_record_batch: Option, + last_record: Option, + cached_record_error: Option, + corrupt_last_record: bool, } impl DefaultCompletedFetch { @@ -282,9 +345,12 @@ impl DefaultCompletedFetch { read_context: ReadContext, fetch_offset: i64, high_watermark: i64, - ) -> Result { - Ok(Self { + ) -> Self { + Self { table_bucket, + api_error: None, + fetch_error_context: None, + error: None, log_record_batch, read_context, next_fetch_offset: fetch_offset, @@ -295,7 +361,65 @@ impl DefaultCompletedFetch { records_read: 0, current_record_iterator: None, current_record_batch: None, - }) + last_record: None, + cached_record_error: None, + corrupt_last_record: false, + } + } + + pub(crate) fn from_error( + table_bucket: TableBucket, + error: Error, + fetch_offset: i64, + read_context: ReadContext, + ) -> Self { + Self { + table_bucket, + api_error: None, + fetch_error_context: None, + error: Some(error), + log_record_batch: LogRecordsBatches::new(Vec::new()), + read_context, + next_fetch_offset: fetch_offset, + high_watermark: -1, + size_in_bytes: 0, + consumed: false, + initialized: false, + records_read: 0, + current_record_iterator: None, + current_record_batch: None, + last_record: None, + cached_record_error: None, + corrupt_last_record: false, + } + } + + pub(crate) fn from_api_error( + table_bucket: TableBucket, + api_error: ApiError, + fetch_error_context: FetchErrorContext, + fetch_offset: i64, + read_context: ReadContext, + ) -> Self { + Self { + table_bucket, + api_error: Some(api_error), + fetch_error_context: Some(fetch_error_context), + error: None, + log_record_batch: LogRecordsBatches::new(Vec::new()), + read_context, + next_fetch_offset: fetch_offset, + high_watermark: -1, + size_in_bytes: 0, + consumed: false, + initialized: false, + records_read: 0, + current_record_iterator: None, + current_record_batch: None, + last_record: None, + cached_record_error: None, + corrupt_last_record: false, + } } /// Get the next fetched record, handling batch iteration and record skipping @@ -322,6 +446,19 @@ impl DefaultCompletedFetch { } } + fn fetch_error(&self) -> Error { + let mut message = format!( + "Received exception when fetching the next record from {table_bucket}. If needed, please back to past the record to continue scanning.", + table_bucket = self.table_bucket + ); + if let Some(cause) = self.cached_record_error.as_deref() { + message.push_str(&format!(" Cause: {cause}")); + } + Error::UnexpectedError { + message, + source: None, + } + } /// Get the next batch directly without row iteration fn next_fetched_batch(&mut self) -> Result> { loop { @@ -360,8 +497,36 @@ impl CompletedFetch for DefaultCompletedFetch { &self.table_bucket } + fn api_error(&self) -> Option<&ApiError> { + self.api_error.as_ref() + } + + fn fetch_error_context(&self) -> Option<&FetchErrorContext> { + self.fetch_error_context.as_ref() + } + + fn take_error(&mut self) -> Option { + self.error.take() + } + fn fetch_records(&mut self, max_records: usize) -> Result> { - // todo: handle corrupt_last_record + if let Some(error) = self.error.take() { + return Err(error); + } + + if let Some(api_error) = self.api_error.as_ref() { + return Err(Error::FlussAPIError { + api_error: ApiError { + code: api_error.code, + message: api_error.message.clone(), + }, + }); + } + + if self.corrupt_last_record { + return Err(self.fetch_error()); + } + if self.consumed { return Ok(Vec::new()); } @@ -369,19 +534,53 @@ impl CompletedFetch for DefaultCompletedFetch { let mut scan_records = Vec::new(); for _ in 0..max_records { - if let Some(record) = self.next_fetched_record()? { - self.next_fetch_offset = record.offset() + 1; - self.records_read += 1; - scan_records.push(record); - } else { - break; + if self.cached_record_error.is_none() { + self.corrupt_last_record = true; + match self.next_fetched_record() { + Ok(Some(record)) => { + self.corrupt_last_record = false; + self.last_record = Some(record); + } + Ok(None) => { + self.corrupt_last_record = false; + self.last_record = None; + } + Err(e) => { + self.cached_record_error = Some(e.to_string()); + } + } } + + let Some(record) = self.last_record.take() else { + break; + }; + + self.next_fetch_offset = record.offset() + 1; + self.records_read += 1; + scan_records.push(record); + } + + if self.cached_record_error.is_some() && scan_records.is_empty() { + return Err(self.fetch_error()); } Ok(scan_records) } fn fetch_batches(&mut self, max_batches: usize) -> Result> { + if let Some(error) = self.error.take() { + return Err(error); + } + + if let Some(api_error) = self.api_error.as_ref() { + return Err(Error::FlussAPIError { + api_error: ApiError { + code: api_error.code, + message: api_error.message.clone(), + }, + }); + } + if self.consumed { return Ok(Vec::new()); } @@ -402,8 +601,18 @@ impl CompletedFetch for DefaultCompletedFetch { self.consumed } + fn records_read(&self) -> usize { + self.records_read + } + fn drain(&mut self) { self.consumed = true; + self.api_error = None; + self.fetch_error_context = None; + self.error = None; + self.cached_record_error = None; + self.corrupt_last_record = false; + self.last_record = None; } fn size_in_bytes(&self) -> usize { @@ -426,3 +635,118 @@ impl CompletedFetch for DefaultCompletedFetch { self.next_fetch_offset } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::WriteRecord; + use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }; + use crate::metadata::{DataField, DataTypes, TablePath}; + use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema}; + use crate::row::GenericRow; + use std::sync::Arc; + use std::time::Duration; + + fn test_read_context() -> ReadContext { + let row_type = DataTypes::row(vec![DataField::new( + "id".to_string(), + DataTypes::int(), + None, + )]); + ReadContext::new(to_arrow_schema(&row_type), false) + } + + struct ErrorPendingFetch { + table_bucket: TableBucket, + } + + impl PendingFetch for ErrorPendingFetch { + fn table_bucket(&self) -> &TableBucket { + &self.table_bucket + } + + fn is_completed(&self) -> bool { + true + } + + fn to_completed_fetch(self: Box) -> Result> { + Err(Error::UnexpectedError { + message: "pending fetch failure".to_string(), + source: None, + }) + } + } + + #[tokio::test] + async fn await_not_empty_returns_wakeup_error() { + let buffer = LogFetchBuffer::new(test_read_context()); + buffer.wakeup(); + + let result = buffer.await_not_empty(Duration::from_millis(10)).await; + assert!(matches!(result, Err(Error::WakeupError { .. }))); + } + + #[tokio::test] + async fn await_not_empty_returns_pending_error() { + let buffer = LogFetchBuffer::new(test_read_context()); + let table_bucket = TableBucket::new(1, 0); + buffer.pend(Box::new(ErrorPendingFetch { + table_bucket: table_bucket.clone(), + })); + buffer.try_complete(&table_bucket); + + let result = buffer.await_not_empty(Duration::from_millis(10)).await; + assert!(matches!(result, Ok(true))); + + let mut completed = buffer.poll().expect("completed fetch"); + assert!(completed.take_error().is_some()); + } + + #[test] + fn default_completed_fetch_reads_records() -> Result<()> { + let row_type = DataTypes::row(vec![ + DataField::new("id".to_string(), DataTypes::int(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]); + let table_path = Arc::new(TablePath::new("db".to_string(), "tbl".to_string())); + + let mut builder = MemoryLogRecordsArrowBuilder::new( + 1, + &row_type, + false, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + ); + + let mut row = GenericRow::new(); + row.set_field(0, 1_i32); + row.set_field(1, "alice"); + let record = WriteRecord::new(table_path, row); + builder.append(&record)?; + + let data = builder.build()?; + let log_records = LogRecordsBatches::new(data.clone()); + let read_context = ReadContext::new(to_arrow_schema(&row_type), false); + let mut fetch = DefaultCompletedFetch::new( + TableBucket::new(1, 0), + log_records, + data.len(), + read_context, + 0, + 0, + ); + + let records = fetch.fetch_records(10)?; + assert_eq!(records.len(), 1); + assert_eq!(records[0].offset(), 0); + + let empty = fetch.fetch_records(10)?; + assert!(empty.is_empty()); + + Ok(()) + } +} diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index d9abd19c..01425157 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -409,7 +409,7 @@ impl PendingFetch for RemotePendingFetch { self.read_context, self.fetch_offset, self.high_watermark, - )?; + ); Ok(Box::new(completed_fetch)) } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 7d22324d..3e7d61ff 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -17,7 +17,7 @@ use arrow::array::RecordBatch; use arrow_schema::SchemaRef; -use log::{debug, error, warn}; +use log::{debug, warn}; use parking_lot::{Mutex, RwLock}; use std::collections::{HashMap, HashSet}; use std::slice::from_ref; @@ -29,16 +29,17 @@ use crate::client::connection::FlussConnection; use crate::client::credentials::CredentialsCache; use crate::client::metadata::Metadata; use crate::client::table::log_fetch_buffer::{ - CompletedFetch, DefaultCompletedFetch, LogFetchBuffer, + CompletedFetch, DefaultCompletedFetch, FetchErrorAction, FetchErrorContext, FetchErrorLogLevel, + LogFetchBuffer, }; use crate::client::table::remote_log::{ RemoteLogDownloader, RemoteLogFetchInfo, RemotePendingFetch, }; -use crate::error::{Error, Result, RpcError}; +use crate::error::{ApiError, Error, FlussError, Result}; use crate::metadata::{TableBucket, TableInfo, TablePath}; -use crate::proto::{FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; +use crate::proto::{ErrorResponse, FetchLogRequest, PbFetchLogReqForBucket, PbFetchLogReqForTable}; use crate::record::{LogRecordsBatches, ReadContext, ScanRecord, ScanRecords, to_arrow_schema}; -use crate::rpc::{RpcClient, message}; +use crate::rpc::{RpcClient, RpcError, message}; use crate::util::FairBucketStatusMap; const LOG_FETCH_MAX_BYTES: i32 = 16 * 1024 * 1024; @@ -318,7 +319,7 @@ impl LogScannerInner { .log_fetcher .log_fetch_buffer .await_not_empty(remaining) - .await; + .await?; if !has_data { // Timeout while waiting @@ -396,7 +397,7 @@ impl LogScannerInner { .log_fetcher .log_fetch_buffer .await_not_empty(remaining) - .await; + .await?; if !has_data { return Ok(Vec::new()); @@ -448,6 +449,8 @@ impl RecordBatchLogScanner { struct LogFetcher { conns: Arc, metadata: Arc, + table_path: TablePath, + is_partitioned: bool, log_scanner_status: Arc, read_context: ReadContext, remote_read_context: ReadContext, @@ -457,8 +460,6 @@ struct LogFetcher { credentials_cache: Arc, log_fetch_buffer: Arc, nodes_with_pending_fetch_requests: Arc>>, - table_path: TablePath, - is_partitioned: bool, } impl LogFetcher { @@ -471,24 +472,25 @@ impl LogFetcher { ) -> Result { let full_arrow_schema = to_arrow_schema(table_info.get_row_type()); let read_context = - Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false); + Self::create_read_context(full_arrow_schema.clone(), projected_fields.clone(), false)?; let remote_read_context = - Self::create_read_context(full_arrow_schema, projected_fields.clone(), true); + Self::create_read_context(full_arrow_schema, projected_fields.clone(), true)?; let tmp_dir = TempDir::with_prefix("fluss-remote-logs")?; + let log_fetch_buffer = Arc::new(LogFetchBuffer::new(read_context.clone())); Ok(LogFetcher { conns: conns.clone(), metadata: metadata.clone(), + table_path: table_info.table_path.clone(), + is_partitioned: table_info.is_partitioned(), log_scanner_status, read_context, remote_read_context, remote_log_downloader: Arc::new(RemoteLogDownloader::new(tmp_dir)?), credentials_cache: Arc::new(CredentialsCache::new(conns.clone(), metadata.clone())), - log_fetch_buffer: Arc::new(LogFetchBuffer::new()), + log_fetch_buffer, nodes_with_pending_fetch_requests: Arc::new(Mutex::new(HashSet::new())), - table_path: table_info.table_path.clone(), - is_partitioned: table_info.is_partitioned(), }) } @@ -496,23 +498,79 @@ impl LogFetcher { full_arrow_schema: SchemaRef, projected_fields: Option>, is_from_remote: bool, - ) -> ReadContext { + ) -> Result { match projected_fields { - None => ReadContext::new(full_arrow_schema, is_from_remote), + None => Ok(ReadContext::new(full_arrow_schema, is_from_remote)), Some(fields) => { ReadContext::with_projection_pushdown(full_arrow_schema, fields, is_from_remote) } } } - async fn check_and_update_metadata(&self) -> Result<()> { - if self.is_partitioned { - // TODO: Implement partition-aware metadata refresh for buckets whose leaders are unknown. - // The implementation will likely need to collect partition IDs for such buckets and - // perform targeted metadata updates. Until then, we avoid computing unused partition_ids. - return Ok(()); + fn describe_fetch_error( + error: FlussError, + table_bucket: &TableBucket, + fetch_offset: i64, + error_message: &str, + ) -> FetchErrorContext { + match error { + FlussError::NotLeaderOrFollower + | FlussError::LogStorageException + | FlussError::KvStorageException + | FlussError::StorageException + | FlussError::FencedLeaderEpochException => FetchErrorContext { + action: FetchErrorAction::Ignore, + log_level: FetchErrorLogLevel::Debug, + log_message: format!( + "Error in fetch for bucket {table_bucket}: {error:?}: {error_message}" + ), + }, + FlussError::UnknownTableOrBucketException => FetchErrorContext { + action: FetchErrorAction::Ignore, + log_level: FetchErrorLogLevel::Warn, + log_message: format!( + "Received unknown table or bucket error in fetch for bucket {table_bucket}" + ), + }, + FlussError::LogOffsetOutOfRangeException => FetchErrorContext { + action: FetchErrorAction::LogOffsetOutOfRange, + log_level: FetchErrorLogLevel::Debug, + log_message: format!( + "The fetching offset {fetch_offset} is out of range for bucket {table_bucket}: {error_message}" + ), + }, + FlussError::AuthorizationException => FetchErrorContext { + action: FetchErrorAction::Authorization, + log_level: FetchErrorLogLevel::Debug, + log_message: format!( + "Authorization error while fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}" + ), + }, + FlussError::UnknownServerError => FetchErrorContext { + action: FetchErrorAction::Ignore, + log_level: FetchErrorLogLevel::Warn, + log_message: format!( + "Unknown server error while fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}" + ), + }, + FlussError::CorruptMessage => FetchErrorContext { + action: FetchErrorAction::CorruptMessage, + log_level: FetchErrorLogLevel::Debug, + log_message: format!( + "Encountered corrupt message when fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}" + ), + }, + _ => FetchErrorContext { + action: FetchErrorAction::Unexpected, + log_level: FetchErrorLogLevel::Debug, + log_message: format!( + "Unexpected error code {error:?} while fetching at offset {fetch_offset} from bucket {table_bucket}: {error_message}" + ), + }, } + } + async fn check_and_update_metadata(&self) -> Result<()> { let need_update = self .fetchable_buckets() .iter() @@ -522,6 +580,26 @@ impl LogFetcher { return Ok(()); } + if self.is_partitioned { + // Fallback to full table metadata refresh until partition-aware updates are available. + self.metadata + .update_tables_metadata(&HashSet::from([&self.table_path])) + .await + .or_else(|e| { + if let Error::RpcError { source, .. } = &e + && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) + { + warn!( + "Retrying after encountering error while updating table metadata: {e}" + ); + Ok(()) + } else { + Err(e) + } + })?; + return Ok(()); + } + // TODO: Handle PartitionNotExist error self.metadata .update_tables_metadata(&HashSet::from([&self.table_path])) @@ -561,7 +639,6 @@ impl LogFetcher { let creds_cache = self.credentials_cache.clone(); let nodes_with_pending = self.nodes_with_pending_fetch_requests.clone(); let metadata = self.metadata.clone(); - // Spawn async task to handle the fetch request // Note: These tasks are not explicitly tracked or cancelled when LogFetcher is dropped. // This is acceptable because: @@ -607,7 +684,7 @@ impl LogFetcher { } }; - if let Err(e) = Self::handle_fetch_response( + Self::handle_fetch_response( fetch_response, &log_fetch_buffer, &log_scanner_status, @@ -616,10 +693,7 @@ impl LogFetcher { &remote_log_downloader, &creds_cache, ) - .await - { - error!("Fail to handle fetch response: {e:?}"); - } + .await; }); } @@ -644,7 +718,7 @@ impl LogFetcher { remote_read_context: &ReadContext, remote_log_downloader: &Arc, credentials_cache: &Arc, - ) -> Result<()> { + ) { for pb_fetch_log_resp in fetch_response.tables_resp { let table_id = pb_fetch_log_resp.table_id; let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; @@ -661,11 +735,45 @@ impl LogFetcher { continue; }; + if let Some(error_code) = fetch_log_for_bucket.error_code + && error_code != FlussError::None.code() + { + let api_error: ApiError = ErrorResponse { + error_code, + error_message: fetch_log_for_bucket.error_message.clone(), + } + .into(); + + let error = FlussError::for_code(error_code); + let error_context = Self::describe_fetch_error( + error, + &table_bucket, + fetch_offset, + api_error.message.as_str(), + ); + log_scanner_status.move_bucket_to_end(table_bucket.clone()); + match error_context.log_level { + FetchErrorLogLevel::Debug => { + debug!("{}", error_context.log_message); + } + FetchErrorLogLevel::Warn => { + warn!("{}", error_context.log_message); + } + } + log_fetch_buffer.add_api_error( + table_bucket.clone(), + api_error, + error_context, + fetch_offset, + ); + continue; + } + // Check if this is a remote log fetch if let Some(ref remote_log_fetch_info) = fetch_log_for_bucket.remote_log_fetch_info { // set remote fs props - let remote_fs_props = credentials_cache.get_or_refresh().await?; + let remote_fs_props = credentials_cache.get_or_refresh().await.unwrap(); remote_log_downloader.set_remote_fs_props(remote_fs_props); let remote_fetch_info = @@ -688,26 +796,18 @@ impl LogFetcher { let size_in_bytes = records.len(); let log_record_batch = LogRecordsBatches::new(records); - match DefaultCompletedFetch::new( + let completed_fetch = DefaultCompletedFetch::new( table_bucket.clone(), log_record_batch, size_in_bytes, read_context.clone(), fetch_offset, high_watermark, - ) { - Ok(completed_fetch) => { - log_fetch_buffer.add(Box::new(completed_fetch)); - } - Err(e) => { - // todo: handle error - log::warn!("Failed to create completed fetch: {e:?}"); - } - } + ); + log_fetch_buffer.add(Box::new(completed_fetch)); } } } - Ok(()) } fn pending_remote_fetches( @@ -763,69 +863,91 @@ impl LogFetcher { let mut result: HashMap> = HashMap::new(); let mut records_remaining = MAX_POLL_RECORDS; - while records_remaining > 0 { - // Get the next in line fetch, or get a new one from buffer - let next_in_line = self.log_fetch_buffer.next_in_line_fetch(); - - if next_in_line.is_none() || next_in_line.as_ref().unwrap().is_consumed() { - // Get a new fetch from buffer - if let Some(completed_fetch) = self.log_fetch_buffer.poll() { - // Initialize the fetch if not already initialized - if !completed_fetch.is_initialized() { - let size_in_bytes = completed_fetch.size_in_bytes(); - match self.initialize_fetch(completed_fetch) { - Ok(initialized) => { - self.log_fetch_buffer.set_next_in_line_fetch(initialized); - continue; + let collect_result: Result<()> = { + while records_remaining > 0 { + // Get the next in line fetch, or get a new one from buffer + let next_in_line = self.log_fetch_buffer.next_in_line_fetch(); + + if next_in_line.is_none() || next_in_line.as_ref().unwrap().is_consumed() { + // Get a new fetch from buffer + if let Some(completed_fetch) = self.log_fetch_buffer.poll() { + // Initialize the fetch if not already initialized + if !completed_fetch.is_initialized() { + let size_in_bytes = completed_fetch.size_in_bytes(); + match self.initialize_fetch(completed_fetch) { + Ok(initialized) => { + self.log_fetch_buffer.set_next_in_line_fetch(initialized); + continue; + } + Err(e) => { + // Remove a completedFetch upon a parse with exception if + // (1) it contains no records, and + // (2) there are no fetched records with actual content preceding this + // exception. + if result.is_empty() && size_in_bytes == 0 { + // todo: do we need to consider it like java ? + // self.log_fetch_buffer.poll(); + } + return Err(e); + } } + } else { + self.log_fetch_buffer + .set_next_in_line_fetch(Some(completed_fetch)); + } + // Note: poll() already removed the fetch from buffer, so no need to call poll() + } else { + // No more fetches available + break; + } + } else { + // Fetch records from next_in_line + if let Some(mut next_fetch) = next_in_line { + let records = match self + .fetch_records_from_fetch(&mut next_fetch, records_remaining) + { + Ok(records) => records, Err(e) => { - // Remove a completedFetch upon a parse with exception if - // (1) it contains no records, and - // (2) there are no fetched records with actual content preceding this - // exception. - if result.is_empty() && size_in_bytes == 0 { - // todo: do we need to consider it like java ? - // self.log_fetch_buffer.poll(); + if !next_fetch.is_consumed() { + self.log_fetch_buffer + .set_next_in_line_fetch(Some(next_fetch)); } return Err(e); } + }; + + if !records.is_empty() { + let table_bucket = next_fetch.table_bucket().clone(); + // Merge with existing records for this bucket + let existing = result.entry(table_bucket).or_default(); + let records_count = records.len(); + existing.extend(records); + + records_remaining = records_remaining.saturating_sub(records_count); } - } else { - self.log_fetch_buffer - .set_next_in_line_fetch(Some(completed_fetch)); + + // If the fetch is not fully consumed, put it back for the next round + if !next_fetch.is_consumed() { + self.log_fetch_buffer + .set_next_in_line_fetch(Some(next_fetch)); + } + // If consumed, next_fetch will be dropped here (which is correct) } - // Note: poll() already removed the fetch from buffer, so no need to call poll() - } else { - // No more fetches available - break; } - } else { - // Fetch records from next_in_line - if let Some(mut next_fetch) = next_in_line { - let records = - self.fetch_records_from_fetch(&mut next_fetch, records_remaining)?; - - if !records.is_empty() { - let table_bucket = next_fetch.table_bucket().clone(); - // Merge with existing records for this bucket - let existing = result.entry(table_bucket).or_default(); - let records_count = records.len(); - existing.extend(records); - - records_remaining = records_remaining.saturating_sub(records_count); - } + } + Ok(()) + }; - // If the fetch is not fully consumed, put it back for the next round - if !next_fetch.is_consumed() { - self.log_fetch_buffer - .set_next_in_line_fetch(Some(next_fetch)); - } - // If consumed, next_fetch will be dropped here (which is correct) + match collect_result { + Ok(()) => Ok(result), + Err(e) => { + if result.is_empty() { + Err(e) + } else { + Ok(result) } } } - - Ok(result) } /// Initialize a completed fetch, checking offset match and updating high watermark @@ -833,12 +955,63 @@ impl LogFetcher { &self, mut completed_fetch: Box, ) -> Result>> { - // todo: handle error in initialize fetch - let table_bucket = completed_fetch.table_bucket(); + if let Some(error) = completed_fetch.take_error() { + return Err(error); + } + + let table_bucket = completed_fetch.table_bucket().clone(); let fetch_offset = completed_fetch.next_fetch_offset(); + if let Some(api_error) = completed_fetch.api_error() { + let error = FlussError::for_code(api_error.code); + let error_message = api_error.message.as_str(); + self.log_scanner_status + .move_bucket_to_end(table_bucket.clone()); + let action = completed_fetch + .fetch_error_context() + .map(|context| context.action) + .unwrap_or(FetchErrorAction::Unexpected); + match action { + FetchErrorAction::Ignore => { + return Ok(None); + } + FetchErrorAction::LogOffsetOutOfRange => { + return Err(Error::UnexpectedError { + message: format!( + "The fetching offset {fetch_offset} is out of range: {error_message}" + ), + source: None, + }); + } + FetchErrorAction::Authorization => { + return Err(Error::FlussAPIError { + api_error: ApiError { + code: api_error.code, + message: api_error.message.to_string(), + }, + }); + } + FetchErrorAction::CorruptMessage => { + return Err(Error::UnexpectedError { + message: format!( + "Encountered corrupt message when fetching offset {fetch_offset} for bucket {table_bucket}: {error_message}" + ), + source: None, + }); + } + FetchErrorAction::Unexpected => { + return Err(Error::UnexpectedError { + message: format!( + "Unexpected error code {error:?} while fetching at offset {fetch_offset} from bucket {table_bucket}: {error_message}" + ), + source: None, + }); + } + } + } + // Check if bucket is still subscribed - let Some(current_offset) = self.log_scanner_status.get_bucket_offset(table_bucket) else { + let Some(current_offset) = self.log_scanner_status.get_bucket_offset(&table_bucket) else { warn!( "Discarding stale fetch response for bucket {table_bucket:?} since the bucket has been unsubscribed" ); @@ -857,7 +1030,7 @@ impl LogFetcher { let high_watermark = completed_fetch.high_watermark(); if high_watermark >= 0 { self.log_scanner_status - .update_high_watermark(table_bucket, high_watermark); + .update_high_watermark(&table_bucket, high_watermark); } completed_fetch.set_initialized(); @@ -894,6 +1067,11 @@ impl LogFetcher { .update_offset(&table_bucket, next_fetch_offset); } + if next_in_line_fetch.is_consumed() && next_in_line_fetch.records_read() > 0 { + self.log_scanner_status + .move_bucket_to_end(table_bucket.clone()); + } + Ok(records) } else { // These records aren't next in line, ignore them @@ -915,58 +1093,70 @@ impl LogFetcher { let mut batches_remaining = MAX_BATCHES; let mut bytes_consumed: usize = 0; - while batches_remaining > 0 && bytes_consumed < MAX_BYTES { - let next_in_line = self.log_fetch_buffer.next_in_line_fetch(); + let collect_result: Result<()> = { + while batches_remaining > 0 && bytes_consumed < MAX_BYTES { + let next_in_line = self.log_fetch_buffer.next_in_line_fetch(); - match next_in_line { - Some(mut next_fetch) if !next_fetch.is_consumed() => { - let batches = - self.fetch_batches_from_fetch(&mut next_fetch, batches_remaining)?; - let batch_count = batches.len(); + match next_in_line { + Some(mut next_fetch) if !next_fetch.is_consumed() => { + let batches = + self.fetch_batches_from_fetch(&mut next_fetch, batches_remaining)?; + let batch_count = batches.len(); - if !batches.is_empty() { - // Track bytes consumed (soft cap - may exceed by one fetch) - let batch_bytes: usize = - batches.iter().map(|b| b.get_array_memory_size()).sum(); - bytes_consumed += batch_bytes; + if !batches.is_empty() { + // Track bytes consumed (soft cap - may exceed by one fetch) + let batch_bytes: usize = + batches.iter().map(|b| b.get_array_memory_size()).sum(); + bytes_consumed += batch_bytes; - result.extend(batches); - batches_remaining = batches_remaining.saturating_sub(batch_count); - } + result.extend(batches); + batches_remaining = batches_remaining.saturating_sub(batch_count); + } - if !next_fetch.is_consumed() { - self.log_fetch_buffer - .set_next_in_line_fetch(Some(next_fetch)); + if !next_fetch.is_consumed() { + self.log_fetch_buffer + .set_next_in_line_fetch(Some(next_fetch)); + } } - } - _ => { - if let Some(completed_fetch) = self.log_fetch_buffer.poll() { - if !completed_fetch.is_initialized() { - let size_in_bytes = completed_fetch.size_in_bytes(); - match self.initialize_fetch(completed_fetch) { - Ok(initialized) => { - self.log_fetch_buffer.set_next_in_line_fetch(initialized); - continue; - } - Err(e) => { - if result.is_empty() && size_in_bytes == 0 { + _ => { + if let Some(completed_fetch) = self.log_fetch_buffer.poll() { + if !completed_fetch.is_initialized() { + let size_in_bytes = completed_fetch.size_in_bytes(); + match self.initialize_fetch(completed_fetch) { + Ok(initialized) => { + self.log_fetch_buffer.set_next_in_line_fetch(initialized); continue; } - return Err(e); + Err(e) => { + if result.is_empty() && size_in_bytes == 0 { + continue; + } + return Err(e); + } } + } else { + self.log_fetch_buffer + .set_next_in_line_fetch(Some(completed_fetch)); } } else { - self.log_fetch_buffer - .set_next_in_line_fetch(Some(completed_fetch)); + break; } - } else { - break; } } } - } + Ok(()) + }; - Ok(result) + match collect_result { + Ok(()) => Ok(result), + Err(e) => { + if result.is_empty() { + Err(e) + } else { + Ok(result) + } + } + } } fn fetch_batches_from_fetch( @@ -1231,3 +1421,175 @@ impl BucketScanStatus { *self.high_watermark.write() = high_watermark } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::WriteRecord; + use crate::client::metadata::Metadata; + use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }; + use crate::metadata::{TableInfo, TablePath}; + use crate::record::MemoryLogRecordsArrowBuilder; + use crate::row::{Datum, GenericRow}; + use crate::rpc::FlussError; + use crate::test_utils::{build_cluster_arc, build_table_info}; + + fn build_records(table_info: &TableInfo, table_path: Arc) -> Result> { + let mut builder = MemoryLogRecordsArrowBuilder::new( + 1, + table_info.get_row_type(), + false, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + ); + let record = WriteRecord::new( + table_path, + GenericRow { + values: vec![Datum::Int32(1)], + }, + ); + builder.append(&record)?; + builder.build() + } + + #[tokio::test] + async fn collect_fetches_updates_offset() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let status = Arc::new(LogScannerStatus::new()); + let fetcher = LogFetcher::new( + table_info.clone(), + Arc::new(RpcClient::new()), + metadata, + status.clone(), + None, + )?; + + let bucket = TableBucket::new(1, 0); + status.assign_scan_bucket(bucket.clone(), 0); + + let data = build_records(&table_info, Arc::new(table_path))?; + let log_records = LogRecordsBatches::new(data.clone()); + let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type()), false); + let completed = + DefaultCompletedFetch::new(bucket.clone(), log_records, data.len(), read_context, 0, 0); + fetcher.log_fetch_buffer.add(Box::new(completed)); + + let fetched = fetcher.collect_fetches()?; + assert_eq!(fetched.get(&bucket).unwrap().len(), 1); + assert_eq!(status.get_bucket_offset(&bucket), Some(1)); + Ok(()) + } + + #[test] + fn fetch_records_from_fetch_drains_unassigned_bucket() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let status = Arc::new(LogScannerStatus::new()); + let fetcher = LogFetcher::new( + table_info.clone(), + Arc::new(RpcClient::new()), + metadata, + status, + None, + )?; + + let bucket = TableBucket::new(1, 0); + let data = build_records(&table_info, Arc::new(table_path))?; + let log_records = LogRecordsBatches::new(data.clone()); + let read_context = ReadContext::new(to_arrow_schema(table_info.get_row_type()), false); + let mut completed: Box = Box::new(DefaultCompletedFetch::new( + bucket, + log_records, + data.len(), + read_context, + 0, + 0, + )); + + let records = fetcher.fetch_records_from_fetch(&mut completed, 10)?; + assert!(records.is_empty()); + assert!(completed.is_consumed()); + Ok(()) + } + + #[tokio::test] + async fn prepare_fetch_log_requests_skips_pending() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let status = Arc::new(LogScannerStatus::new()); + status.assign_scan_bucket(TableBucket::new(1, 0), 0); + let fetcher = LogFetcher::new( + table_info, + Arc::new(RpcClient::new()), + metadata, + status, + None, + )?; + + fetcher.nodes_with_pending_fetch_requests.lock().insert(1); + + let requests = fetcher.prepare_fetch_log_requests().await; + assert!(requests.is_empty()); + Ok(()) + } + + #[tokio::test] + async fn handle_fetch_response_sets_error() -> Result<()> { + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = build_table_info(table_path.clone(), 1, 1); + let cluster = build_cluster_arc(&table_path, 1, 1); + let metadata = Arc::new(Metadata::new_for_test(cluster)); + let status = Arc::new(LogScannerStatus::new()); + status.assign_scan_bucket(TableBucket::new(1, 0), 5); + let fetcher = LogFetcher::new( + table_info.clone(), + Arc::new(RpcClient::new()), + metadata.clone(), + status.clone(), + None, + )?; + + let response = crate::proto::FetchLogResponse { + tables_resp: vec![crate::proto::PbFetchLogRespForTable { + table_id: 1, + buckets_resp: vec![crate::proto::PbFetchLogRespForBucket { + partition_id: None, + bucket_id: 0, + error_code: Some(FlussError::AuthorizationException.code()), + error_message: Some("denied".to_string()), + high_watermark: None, + log_start_offset: None, + remote_log_fetch_info: None, + records: None, + }], + }], + }; + + LogFetcher::handle_fetch_response( + response, + &fetcher.log_fetch_buffer, + &fetcher.log_scanner_status, + &fetcher.read_context, + &fetcher.remote_read_context, + &fetcher.remote_log_downloader, + &fetcher.credentials_cache, + ) + .await; + + let completed = fetcher.log_fetch_buffer.poll().expect("completed fetch"); + let api_error = completed.api_error().expect("api error"); + assert_eq!(api_error.code, FlussError::AuthorizationException.code()); + Ok(()) + } +} diff --git a/crates/fluss/src/client/write/sender.rs b/crates/fluss/src/client/write/sender.rs index cb03a2c4..ffac0af8 100644 --- a/crates/fluss/src/client/write/sender.rs +++ b/crates/fluss/src/client/write/sender.rs @@ -455,7 +455,7 @@ mod tests { use crate::row::{Datum, GenericRow}; use crate::rpc::FlussError; use crate::test_utils::build_cluster_arc; - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; async fn build_ready_batch( accumulator: &RecordAccumulator, diff --git a/crates/fluss/src/error.rs b/crates/fluss/src/error.rs index e04fde14..a351c441 100644 --- a/crates/fluss/src/error.rs +++ b/crates/fluss/src/error.rs @@ -98,6 +98,11 @@ pub enum Error { )] IoUnsupported { message: String }, + #[snafu( + visibility(pub(crate)), + display("Fluss hitting wakeup error {}.", message) + )] + WakeupError { message: String }, #[snafu( visibility(pub(crate)), display("Fluss hitting unsupported operation error {}.", message) diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 89fb7b9c..c166ebe8 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -17,7 +17,7 @@ use crate::client::{Record, WriteRecord}; use crate::compression::ArrowCompressionInfo; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::metadata::DataType; use crate::record::{ChangeType, ScanRecord}; use crate::row::{ColumnarRow, GenericRow}; @@ -446,7 +446,7 @@ impl LogRecordBatch { } pub fn ensure_valid(&self) -> Result<()> { - // todo + // TODO enable validation once checksum handling is corrected. Ok(()) } @@ -780,8 +780,10 @@ impl ReadContext { arrow_schema: SchemaRef, projected_fields: Vec, is_from_remote: bool, - ) -> ReadContext { - let target_schema = Self::project_schema(arrow_schema.clone(), projected_fields.as_slice()); + ) -> Result { + Self::validate_projection(&arrow_schema, projected_fields.as_slice())?; + let target_schema = + Self::project_schema(arrow_schema.clone(), projected_fields.as_slice())?; // the logic is little bit of hard to understand, to refactor it to follow // java side let (need_do_reorder, sorted_fields) = { @@ -804,16 +806,20 @@ impl ReadContext { // Calculate reordering indexes to transform from sorted order to user-requested order let mut reordering_indexes = Vec::with_capacity(projected_fields.len()); for &original_idx in &projected_fields { - let pos = sorted_fields - .binary_search(&original_idx) - .expect("projection index should exist in sorted list"); + let pos = sorted_fields.binary_search(&original_idx).map_err(|_| { + Error::IllegalArgument { + message: format!( + "Projection index {original_idx} is invalid for the current schema." + ), + } + })?; reordering_indexes.push(pos); } Projection { ordered_schema: Self::project_schema( arrow_schema.clone(), sorted_fields.as_slice(), - ), + )?, projected_fields, ordered_fields: sorted_fields, reordering_indexes, @@ -824,7 +830,7 @@ impl ReadContext { ordered_schema: Self::project_schema( arrow_schema.clone(), projected_fields.as_slice(), - ), + )?, ordered_fields: projected_fields.clone(), projected_fields, reordering_indexes: vec![], @@ -833,21 +839,34 @@ impl ReadContext { } }; - ReadContext { + Ok(ReadContext { target_schema, full_schema: arrow_schema, projection: Some(project), is_from_remote, + }) + } + + fn validate_projection(schema: &SchemaRef, projected_fields: &[usize]) -> Result<()> { + let field_count = schema.fields().len(); + for &index in projected_fields { + if index >= field_count { + return Err(Error::IllegalArgument { + message: format!( + "Projection index {index} is out of bounds for schema with {field_count} fields." + ), + }); + } } + Ok(()) } - pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> SchemaRef { - // todo: handle the exception - SchemaRef::new( - schema - .project(projected_fields) - .expect("can't project schema"), - ) + pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> Result { + Ok(SchemaRef::new(schema.project(projected_fields).map_err( + |e| Error::IllegalArgument { + message: format!("Invalid projection: {e}"), + }, + )?)) } pub fn project_fields(&self) -> Option<&[usize]> { @@ -1035,6 +1054,8 @@ pub struct MyVec(pub StreamReader); #[cfg(test)] mod tests { use super::*; + use crate::error::Error; + use crate::metadata::DataField; use crate::metadata::DataTypes; #[test] @@ -1207,6 +1228,18 @@ mod tests { ); } + #[test] + fn projection_rejects_out_of_bounds_index() { + let row_type = DataTypes::row(vec![ + DataField::new("id".to_string(), DataTypes::int(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]); + let schema = to_arrow_schema(&row_type); + let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], false); + + assert!(matches!(result, Err(Error::IllegalArgument { .. }))); + } + fn le_bytes(vals: &[u32]) -> Vec { let mut out = Vec::with_capacity(vals.len() * 4); for &v in vals { diff --git a/crates/fluss/src/record/mod.rs b/crates/fluss/src/record/mod.rs index 35928ea0..79310803 100644 --- a/crates/fluss/src/record/mod.rs +++ b/crates/fluss/src/record/mod.rs @@ -181,3 +181,65 @@ impl IntoIterator for ScanRecords { .into_iter() } } + +#[cfg(test)] +mod tests { + use super::*; + use ::arrow::array::{Int32Array, RecordBatch}; + use ::arrow::datatypes::{DataType, Field, Schema}; + use std::sync::Arc; + + fn make_row(values: Vec, row_id: usize) -> ColumnarRow { + let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int32, false)])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values))]) + .expect("record batch"); + ColumnarRow::new_with_row_id(Arc::new(batch), row_id) + } + + #[test] + fn change_type_round_trip() { + let cases = [ + (ChangeType::AppendOnly, "+A", 0), + (ChangeType::Insert, "+I", 1), + (ChangeType::UpdateBefore, "-U", 2), + (ChangeType::UpdateAfter, "+U", 3), + (ChangeType::Delete, "-D", 4), + ]; + + for (change_type, short, byte) in cases { + assert_eq!(change_type.short_string(), short); + assert_eq!(change_type.to_byte_value(), byte); + assert_eq!(ChangeType::from_byte_value(byte).unwrap(), change_type); + } + + let err = ChangeType::from_byte_value(9).unwrap_err(); + assert!(err.contains("Unsupported byte value")); + } + + #[test] + fn scan_records_counts_and_iterates() { + let bucket0 = TableBucket::new(1, 0); + let bucket1 = TableBucket::new(1, 1); + let record0 = ScanRecord::new(make_row(vec![10, 11], 0), 5, 7, ChangeType::Insert); + let record1 = ScanRecord::new(make_row(vec![10, 11], 1), 6, 8, ChangeType::Delete); + + let mut records = HashMap::new(); + records.insert(bucket0.clone(), vec![record0.clone(), record1.clone()]); + + let scan_records = ScanRecords::new(records); + assert_eq!(scan_records.records(&bucket0).len(), 2); + assert!(scan_records.records(&bucket1).is_empty()); + assert_eq!(scan_records.count(), 2); + + let collected: Vec<_> = scan_records.into_iter().collect(); + assert_eq!(collected.len(), 2); + } + + #[test] + fn scan_record_default_values() { + let record = ScanRecord::new_default(make_row(vec![1], 0)); + assert_eq!(record.offset(), -1); + assert_eq!(record.timestamp(), -1); + assert_eq!(record.change_type(), &ChangeType::Insert); + } +} diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 31f0fdf2..90437c11 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -166,3 +166,67 @@ impl InternalRow for ColumnarRow { .value(self.row_id) } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ + BinaryArray, BooleanArray, FixedSizeBinaryArray, Float32Array, Float64Array, Int8Array, + Int16Array, Int32Array, Int64Array, StringArray, + }; + use arrow::datatypes::{DataType, Field, Schema}; + + #[test] + fn columnar_row_reads_values() { + let schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Boolean, false), + Field::new("i8", DataType::Int8, false), + Field::new("i16", DataType::Int16, false), + Field::new("i32", DataType::Int32, false), + Field::new("i64", DataType::Int64, false), + Field::new("f32", DataType::Float32, false), + Field::new("f64", DataType::Float64, false), + Field::new("s", DataType::Utf8, false), + Field::new("bin", DataType::Binary, false), + Field::new("char", DataType::FixedSizeBinary(2), false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(BooleanArray::from(vec![true])), + Arc::new(Int8Array::from(vec![1])), + Arc::new(Int16Array::from(vec![2])), + Arc::new(Int32Array::from(vec![3])), + Arc::new(Int64Array::from(vec![4])), + Arc::new(Float32Array::from(vec![1.25])), + Arc::new(Float64Array::from(vec![2.5])), + Arc::new(StringArray::from(vec!["hello"])), + Arc::new(BinaryArray::from(vec![b"data".as_slice()])), + Arc::new( + FixedSizeBinaryArray::try_from_sparse_iter_with_size( + vec![Some(b"ab".as_slice())].into_iter(), + 2, + ) + .expect("fixed array"), + ), + ], + ) + .expect("record batch"); + + let mut row = ColumnarRow::new(Arc::new(batch)); + assert_eq!(row.get_field_count(), 10); + assert!(row.get_boolean(0)); + assert_eq!(row.get_byte(1), 1); + assert_eq!(row.get_short(2), 2); + assert_eq!(row.get_int(3), 3); + assert_eq!(row.get_long(4), 4); + assert_eq!(row.get_float(5), 1.25); + assert_eq!(row.get_double(6), 2.5); + assert_eq!(row.get_string(7), "hello"); + assert_eq!(row.get_bytes(8), b"data"); + assert_eq!(row.get_char(9, 2), "ab"); + row.set_row_id(0); + assert_eq!(row.get_row_id(), 0); + } +} diff --git a/crates/fluss/src/rpc/message/list_offsets.rs b/crates/fluss/src/rpc/message/list_offsets.rs index 9ab1f143..fcecb418 100644 --- a/crates/fluss/src/rpc/message/list_offsets.rs +++ b/crates/fluss/src/rpc/message/list_offsets.rs @@ -17,9 +17,9 @@ use crate::{impl_read_version_type, impl_write_version_type, proto}; -use crate::error::Error; use crate::error::Result as FlussResult; -use crate::proto::ListOffsetsResponse; +use crate::error::{Error, FlussError}; +use crate::proto::{ErrorResponse, ListOffsetsResponse}; use crate::rpc::frame::ReadError; use crate::rpc::api_key::ApiKey; @@ -108,22 +108,48 @@ impl ListOffsetsResponse { self.buckets_resp .iter() .map(|resp| { - if resp.error_code.is_some() { - // todo: consider use another suitable error - Err(Error::UnexpectedError { + if let Some(error_code) = resp.error_code + && error_code != FlussError::None.code() + { + let api_error = ErrorResponse { + error_code, + error_message: resp.error_message.clone(), + } + .into(); + return Err(Error::FlussAPIError { api_error }); + } + // if no error msg, offset must exists + resp.offset + .map(|offset| (resp.bucket_id, offset)) + .ok_or_else(|| Error::UnexpectedError { message: format!( - "Missing offset, error message: {}", - resp.error_message - .as_deref() - .unwrap_or("unknown server exception") + "Missing offset for bucket {} without error code.", + resp.bucket_id ), source: None, }) - } else { - // if no error msg, offset must exists - Ok((resp.bucket_id, resp.offset.unwrap())) - } }) .collect() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::proto::{ListOffsetsResponse, PbListOffsetsRespForBucket}; + + #[test] + fn offsets_returns_api_error_on_error_code() { + let response = ListOffsetsResponse { + buckets_resp: vec![PbListOffsetsRespForBucket { + bucket_id: 1, + error_code: Some(FlussError::TableNotExist.code()), + error_message: Some("missing".to_string()), + offset: None, + }], + }; + + let result = response.offsets(); + assert!(matches!(result, Err(Error::FlussAPIError { .. }))); + } +} diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index 5f67290e..43d92a83 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -183,3 +183,57 @@ impl Default for FairBucketStatusMap { Self::new() } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + #[test] + fn fair_bucket_status_map_tracks_order_and_size() { + let bucket0 = TableBucket::new(1, 0); + let bucket1 = TableBucket::new(1, 1); + + let mut map = FairBucketStatusMap::new(); + map.update_and_move_to_end(bucket0.clone(), 10); + map.update_and_move_to_end(bucket1.clone(), 20); + assert_eq!(map.size(), 2); + + let values: Vec = map + .bucket_status_values() + .into_iter() + .map(|value| **value) + .collect(); + assert_eq!(values, vec![10, 20]); + + map.move_to_end(bucket0.clone()); + let values: Vec = map + .bucket_status_values() + .into_iter() + .map(|value| **value) + .collect(); + assert_eq!(values, vec![20, 10]); + } + + #[test] + fn fair_bucket_status_map_mutations() { + let bucket0 = TableBucket::new(1, 0); + let bucket1 = TableBucket::new(2, 1); + + let mut map = FairBucketStatusMap::new(); + let mut input = HashMap::new(); + input.insert(bucket0.clone(), Arc::new(1)); + input.insert(bucket1.clone(), Arc::new(2)); + map.set(input); + + assert!(map.contains(&bucket0)); + assert!(map.contains(&bucket1)); + assert_eq!(map.bucket_set().len(), 2); + + map.remove(&bucket1); + assert_eq!(map.size(), 1); + + map.clear(); + assert_eq!(map.size(), 0); + } +}