diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 0b1e67d..7c03511 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -79,6 +79,19 @@ async def main(): except Exception as e: print(f"Failed to get table info: {e}") + # Demo: List offsets + print("\n--- Testing list_offsets() ---") + try: + # Query latest offsets using OffsetType constant (recommended for type safety) + offsets = await admin.list_offsets( + table_path, + bucket_ids=[0], + offset_type=fluss.OffsetType.LATEST + ) + print(f"Latest offsets for table (before writes): {offsets}") + except Exception as e: + print(f"Failed to list offsets: {e}") + # Get the table instance table = await conn.get_table(table_path) print(f"Got table: {table}") @@ -88,7 +101,7 @@ async def main(): print(f"Created append writer: {append_writer}") try: - # Test 1: Write PyArrow Table + # Demo: Write PyArrow Table print("\n--- Testing PyArrow Table write ---") pa_table = pa.Table.from_arrays( [ @@ -103,7 +116,7 @@ async def main(): append_writer.write_arrow(pa_table) print("Successfully wrote PyArrow Table") - # Test 2: Write PyArrow RecordBatch + # Demo: Write PyArrow RecordBatch print("\n--- Testing PyArrow RecordBatch write ---") pa_record_batch = pa.RecordBatch.from_arrays( [ @@ -118,7 +131,7 @@ async def main(): append_writer.write_arrow_batch(pa_record_batch) print("Successfully wrote PyArrow RecordBatch") - # Test 3: Write Pandas DataFrame + # Demo: Write Pandas DataFrame print("\n--- Testing Pandas DataFrame write ---") df = pd.DataFrame( { @@ -137,6 +150,19 @@ async def main(): append_writer.flush() print("Successfully flushed data") + # Demo: Check offsets after writes + print("\n--- Checking offsets after writes ---") + try: + # Query with string constant (alternative API - both strings and constants are supported) + offsets = await admin.list_offsets( + table_path, + bucket_ids=[0], + offset_type="latest" # Can also use "earliest" or "timestamp" + ) + print(f"Latest offsets after writing 7 records: {offsets}") + except Exception as e: + print(f"Failed to list offsets: {e}") + except Exception as e: print(f"Error during writing: {e}") @@ -200,6 +226,15 @@ async def main(): except Exception as e: print(f"Error during projection: {e}") + # Demo: Drop table + print("\n--- Testing drop_table() ---") + try: + # Drop the table (ignore_if_not_exists=True means no error if already dropped) + await admin.drop_table(table_path, ignore_if_not_exists=True) + print(f"Successfully dropped table: {table_path}") + except Exception as e: + print(f"Failed to drop table: {e}") + # Close connection conn.close() print("\nConnection closed") diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs index fa189eb..434d745 100644 --- a/bindings/python/src/admin.rs +++ b/bindings/python/src/admin.rs @@ -38,7 +38,7 @@ impl FlussAdmin { ) -> PyResult> { let ignore = ignore_if_exists.unwrap_or(false); - let core_table_path = table_path.to_core().clone(); + let core_table_path = table_path.to_core(); let core_descriptor = table_descriptor.to_core().clone(); let admin = self.__admin.clone(); @@ -58,7 +58,7 @@ impl FlussAdmin { py: Python<'py>, table_path: &TablePath, ) -> PyResult> { - let core_table_path = table_path.to_core().clone(); + let core_table_path = table_path.to_core(); let admin = self.__admin.clone(); future_into_py(py, async move { @@ -80,7 +80,7 @@ impl FlussAdmin { py: Python<'py>, table_path: &TablePath, ) -> PyResult> { - let core_table_path = table_path.to_core().clone(); + let core_table_path = table_path.to_core(); let admin = self.__admin.clone(); future_into_py(py, async move { @@ -96,6 +96,104 @@ impl FlussAdmin { }) } + /// Drop a table + #[pyo3(signature = (table_path, ignore_if_not_exists=false))] + pub fn drop_table<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + ignore_if_not_exists: bool, + ) -> PyResult> { + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + admin + .drop_table(&core_table_path, ignore_if_not_exists) + .await + .map_err(|e| FlussError::new_err(format!("Failed to drop table: {e}")))?; + + Python::attach(|py| Ok(py.None())) + }) + } + + /// List offsets for buckets. + /// + /// Args: + /// table_path: Path to the table + /// bucket_ids: List of bucket IDs to query + /// offset_type: Type of offset to retrieve: + /// - "earliest" or OffsetType.EARLIEST: Start of the log + /// - "latest" or OffsetType.LATEST: End of the log + /// - "timestamp" or OffsetType.TIMESTAMP: Offset at given timestamp (requires timestamp arg) + /// timestamp: Required when offset_type is "timestamp", ignored otherwise + /// + /// Returns: + /// dict[int, int]: Mapping of bucket_id -> offset + /// + /// Example: + /// >>> offsets = await admin.list_offsets(table_path, [0, 1], "latest") + /// >>> print(offsets) # {0: 100, 1: 150} + #[pyo3(signature = (table_path, bucket_ids, offset_type, timestamp=None))] + pub fn list_offsets<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + bucket_ids: Vec, + offset_type: &str, + timestamp: Option, + ) -> PyResult> { + use fcore::rpc::message::OffsetSpec; + + // Validate bucket IDs + for &bucket_id in &bucket_ids { + if bucket_id < 0 { + return Err(FlussError::new_err(format!( + "Invalid bucket_id: {}. Bucket IDs must be non-negative", + bucket_id + ))); + } + } + + let core_table_path = table_path.to_core(); + let admin = self.__admin.clone(); + + // Parse offset_type (case-insensitive, no allocation) + let offset_spec = match offset_type { + s if s.eq_ignore_ascii_case("earliest") => OffsetSpec::Earliest, + s if s.eq_ignore_ascii_case("latest") => OffsetSpec::Latest, + s if s.eq_ignore_ascii_case("timestamp") => { + let ts = timestamp.ok_or_else(|| { + FlussError::new_err( + "timestamp must be provided when offset_type='timestamp'".to_string(), + ) + })?; + OffsetSpec::Timestamp(ts) + } + _ => { + return Err(FlussError::new_err(format!( + "Invalid offset_type: '{}'. Must be 'earliest', 'latest', or 'timestamp'", + offset_type + ))); + } + }; + + future_into_py(py, async move { + let offsets = admin + .list_offsets(&core_table_path, &bucket_ids, offset_spec) + .await + .map_err(|e| FlussError::new_err(format!("Failed to list offsets: {e}")))?; + + Python::attach(|py| { + let dict = pyo3::types::PyDict::new(py); + for (bucket_id, offset) in offsets { + dict.set_item(bucket_id, offset)?; + } + Ok(dict.unbind()) + }) + }) + } + fn __repr__(&self) -> String { "FlussAdmin()".to_string() } diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 49d5179..d842dd0 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -44,6 +44,23 @@ static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { .expect("Failed to create Tokio runtime") }); +/// Offset type constants for list_offsets() +#[pyclass] +#[derive(Clone)] +pub struct OffsetType; + +#[pymethods] +impl OffsetType { + #[classattr] + const EARLIEST: &'static str = "earliest"; + + #[classattr] + const LATEST: &'static str = "latest"; + + #[classattr] + const TIMESTAMP: &'static str = "timestamp"; +} + #[pymodule] fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { // Register all classes @@ -59,6 +76,7 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; // Register exception types m.add_class::()?;