Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,27 @@ async def main():

# TODO: support to_duckdb()

# Test the new poll() method for incremental reading
print("\n--- Testing poll() method ---")
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example calls subscribe() again before polling, but there's no explanation of why this is necessary or whether it resets the scanner state. Consider adding a comment explaining this step, similar to line 163 which says '# Reset subscription'.

Suggested change
print("\n--- Testing poll() method ---")
print("\n--- Testing poll() method ---")
# Reset subscription before polling to ensure we start from a known position

Copilot uses AI. Check for mistakes.
log_scanner.subscribe(None, None)

# Poll with a timeout of 5000ms (5 seconds)
# Note: poll() returns an empty table (not an error) on timeout
try:
poll_result = log_scanner.poll(5000)
print(f"Number of rows: {poll_result.num_rows}")

if poll_result.num_rows > 0:
poll_df = poll_result.to_pandas()
print(f"Polled data:\n{poll_df}")
else:
print("Empty result (no records available)")
# Empty table still has schema
print(f"Schema: {poll_result.schema}")

except Exception as e:
print(f"Error during poll: {e}")

except Exception as e:
print(f"Error during scanning: {e}")

Expand Down
56 changes: 55 additions & 1 deletion bindings/python/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
use crate::TOKIO_RUNTIME;
use crate::*;
use arrow::array::RecordBatch;
use arrow_pyarrow::FromPyArrow;
use arrow_pyarrow::{FromPyArrow, ToPyArrow};
use fluss::client::EARLIEST_OFFSET;
use fluss::record::to_arrow_schema;
use fluss::rpc::message::OffsetSpec;
use pyo3_async_runtimes::tokio::future_into_py;
use std::sync::Arc;
Expand Down Expand Up @@ -321,6 +322,59 @@ impl LogScanner {
Ok(df)
}

/// Poll for new records with the specified timeout
///
/// Args:
/// timeout_ms: Timeout in milliseconds to wait for records
///
/// Returns:
/// PyArrow Table containing the polled records
///
/// Note:
/// - Returns an empty table (with correct schema) if no records are available
/// - When timeout expires, returns an empty table (NOT an error)
fn poll(&self, py: Python, timeout_ms: i64) -> PyResult<Py<PyAny>> {
use std::time::Duration;
Copy link

Copilot AI Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use std::time::Duration import is redundant. The Duration type is already used in the to_arrow() method at line 253 without a local import, suggesting it's available in scope. Consider removing this redundant import for consistency.

Copilot uses AI. Check for mistakes.

if timeout_ms < 0 {
return Err(FlussError::new_err(format!(
"timeout_ms must be non-negative, got: {timeout_ms}"
)));
}

let timeout = Duration::from_millis(timeout_ms as u64);
let scan_records = py
.detach(|| TOKIO_RUNTIME.block_on(async { self.inner.poll(timeout).await }))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I'm thinking can we use scanner#poll_batches directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thank you!

.map_err(|e| FlussError::new_err(e.to_string()))?;

// Convert records to Arrow batches per bucket
let mut arrow_batches = Vec::new();
for (_bucket, records) in scan_records.into_records_by_buckets() {
let mut batches = Utils::convert_scan_records_to_arrow(records);
arrow_batches.append(&mut batches);
}
if arrow_batches.is_empty() {
return self.create_empty_table(py);
}

Utils::combine_batches_to_table(py, arrow_batches)
}

/// Create an empty PyArrow table with the correct schema
fn create_empty_table(&self, py: Python) -> PyResult<Py<PyAny>> {
let arrow_schema = to_arrow_schema(self.table_info.get_row_type());
let py_schema = arrow_schema
.to_pyarrow(py)
.map_err(|e| FlussError::new_err(format!("Failed to convert schema: {e}")))?;

let pyarrow = py.import("pyarrow")?;
let empty_table = pyarrow
.getattr("Table")?
.call_method1("from_batches", (vec![] as Vec<Py<PyAny>>, py_schema))?;

Ok(empty_table.into())
}

fn __repr__(&self) -> String {
format!("LogScanner(table={})", self.table_info.table_path)
}
Expand Down
Loading