diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index 0523f943..9e5f9b3f 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -178,6 +178,28 @@ async def main(): except Exception as e: print(f"Error during scanning: {e}") + # Demo: Column projection + print("\n--- Testing Column Projection ---") + try: + # Project specific columns by index (C++ parity) + print("\n1. Projection by index [0, 1] (id, name):") + scanner_index = await table.new_log_scanner_with_projection([0, 1]) + scanner_index.subscribe(None, None) + df_projected = scanner_index.to_pandas() + print(df_projected.head()) + print(f" Projected {df_projected.shape[1]} columns: {list(df_projected.columns)}") + + # Project specific columns by name (Python-specific, more idiomatic!) + print("\n2. Projection by name ['name', 'score'] (Pythonic):") + scanner_names = await table.new_log_scanner_with_column_names(["name", "score"]) + scanner_names.subscribe(None, None) + df_named = scanner_names.to_pandas() + print(df_named.head()) + print(f" Projected {df_named.shape[1]} columns: {list(df_named.columns)}") + + except Exception as e: + print(f"Error during projection: {e}") + # Close connection conn.close() print("\nConnection closed") diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 8a116485..36bb9b5e 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -70,9 +70,112 @@ impl FlussTable { let table_scan = fluss_table.new_scan(); let rust_scanner = table_scan.create_log_scanner().map_err(|e| { - PyErr::new::(format!( - "Failed to create log scanner: {e:?}" - )) + FlussError::new_err(format!("Failed to create log scanner: {e}")) + })?; + + let admin = conn + .get_admin() + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); + Python::attach(|py| Py::new(py, py_scanner)) + }) + } + + /// Create a new log scanner with column projection (by index). + /// + /// Args: + /// column_indices: List of column indices to include in the scan (0-based) + /// + /// Returns: + /// LogScanner with projection applied + /// + /// Example: + /// >>> scanner = await table.new_log_scanner_with_projection([0, 2, 4]) + pub fn new_log_scanner_with_projection<'py>( + &self, + py: Python<'py>, + column_indices: Vec, + ) -> PyResult> { + // Validate early with Python-friendly error + if column_indices.is_empty() { + return Err(FlussError::new_err( + "column_indices cannot be empty".to_string(), + )); + } + + let conn = self.connection.clone(); + let metadata = self.metadata.clone(); + let table_info = self.table_info.clone(); + + future_into_py(py, async move { + let fluss_table = + fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); + + let table_scan = fluss_table.new_scan(); + let table_scan = table_scan + .project(&column_indices) + .map_err(|e| FlussError::new_err(format!("Failed to project columns: {e}")))?; + + let rust_scanner = table_scan.create_log_scanner().map_err(|e| { + FlussError::new_err(format!("Failed to create log scanner: {e}")) + })?; + + let admin = conn + .get_admin() + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let py_scanner = LogScanner::from_core(rust_scanner, admin, table_info.clone()); + Python::attach(|py| Py::new(py, py_scanner)) + }) + } + + /// Create a new log scanner with column projection (by name). + /// + /// This is the more Pythonic way to specify projections using column names + /// instead of indices. + /// + /// Args: + /// column_names: List of column names to include in the scan + /// + /// Returns: + /// LogScanner with projection applied + /// + /// Example: + /// >>> scanner = await table.new_log_scanner_with_column_names(["id", "name", "email"]) + pub fn new_log_scanner_with_column_names<'py>( + &self, + py: Python<'py>, + column_names: Vec, + ) -> PyResult> { + // Validate early with Python-friendly error + if column_names.is_empty() { + return Err(FlussError::new_err( + "column_names cannot be empty".to_string(), + )); + } + + let conn = self.connection.clone(); + let metadata = self.metadata.clone(); + let table_info = self.table_info.clone(); + + future_into_py(py, async move { + let fluss_table = + fcore::client::FlussTable::new(&conn, metadata.clone(), table_info.clone()); + + // Convert Vec to Vec<&str> for the API + // Safe: project_by_name validates names immediately, doesn't store refs + let column_name_refs: Vec<&str> = column_names.iter().map(|s| s.as_str()).collect(); + + let table_scan = fluss_table.new_scan(); + let table_scan = table_scan + .project_by_name(&column_name_refs) + .map_err(|e| FlussError::new_err(format!("Failed to project columns: {e}")))?; + + let rust_scanner = table_scan.create_log_scanner().map_err(|e| { + FlussError::new_err(format!("Failed to create log scanner: {e}")) })?; let admin = conn