diff --git a/rust/lance-datafusion/src/projection.rs b/rust/lance-datafusion/src/projection.rs index 1c504615ed5..1c87c1b09db 100644 --- a/rust/lance-datafusion/src/projection.rs +++ b/rust/lance-datafusion/src/projection.rs @@ -428,8 +428,89 @@ impl ProjectionPlan { )) } + /// Returns the column name if expr is a simple column reference, None otherwise + /// + /// A simple column reference is an unqualified column (relation is None) with no + /// computation or nested field access. These can be optimized with direct column + /// reordering instead of using DataFusion execution. + fn is_simple_column_ref(expr: &Expr) -> Option<&str> { + match expr { + Expr::Column(Column { + name, + relation: None, + .. + }) => Some(name.as_str()), + _ => None, + } + } + + /// Try to project batch using simple column reordering (no DataFusion) + /// + /// This is a fast path optimization for projections that consist entirely of + /// simple column references. Instead of creating a full DataFusion execution + /// plan, this directly reorders the Arc-wrapped column arrays. + /// + /// Returns: + /// - Some(Ok(batch)) if successful + /// - Some(Err(e)) if error during construction + /// - None if not a simple projection (caller should use DataFusion) + fn try_simple_project_batch(&self, batch: &RecordBatch) -> Option> { + // Empty projections need special handling in DataFusion (can't create RecordBatch with 0 columns) + if self.requested_output_expr.is_empty() { + return None; + } + + // Check if all expressions are simple column references + let column_names: Vec<&str> = self + .requested_output_expr + .iter() + .map(|output_col| Self::is_simple_column_ref(&output_col.expr)) + .collect::>>()?; // Returns None if any expr is complex + + // Build output schema and column array + let input_schema = batch.schema(); + let mut output_fields = Vec::with_capacity(column_names.len()); + let mut output_columns = Vec::with_capacity(column_names.len()); + + for (output_col, &column_name) in self.requested_output_expr.iter().zip(column_names.iter()) + { + // Find column in input batch (return None if not found -> let DataFusion error) + let index = input_schema.index_of(column_name).ok()?; + let field = input_schema.field(index); + + // Create field with output name (handles aliasing: SELECT col AS alias) + let output_field = if &output_col.name != field.name() { + ArrowField::new( + &output_col.name, + field.data_type().clone(), + field.is_nullable(), + ) + } else { + (*field).clone() + }; + + output_fields.push(output_field); + output_columns.push(batch.column(index).clone()); // Arc clone - cheap! + } + + // Build output batch + let output_schema = Arc::new(ArrowSchema::new(output_fields)); + Some( + RecordBatch::try_new(output_schema, output_columns).map_err(|e| Error::Internal { + message: format!("Failed to create projected batch: {}", e), + location: location!(), + }), + ) + } + #[instrument(skip_all, level = "debug")] pub async fn project_batch(&self, batch: RecordBatch) -> Result { + // Try fast path for simple column projections + if let Some(result) = self.try_simple_project_batch(&batch) { + return result; + } + + // Fall back to DataFusion for complex expressions let src = Arc::new(OneShotExec::from_batch(batch)); let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?; let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?); @@ -477,4 +558,234 @@ mod tests { let output_field = output.field_with_name("meta").unwrap(); assert!(is_json_field(output_field)); } + + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_core::datatypes::{Field as LanceField, Schema as LanceSchema}; + + fn create_test_batch() -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new("b", DataType::Int32, false), + ArrowField::new("c", DataType::Utf8, false), + ])); + + let a = Arc::new(Int32Array::from(vec![1, 2, 3])); + let b = Arc::new(Int32Array::from(vec![4, 5, 6])); + let c = Arc::new(StringArray::from(vec!["foo", "bar", "baz"])); + + RecordBatch::try_new(schema, vec![a, b, c]).unwrap() + } + + fn create_test_schema() -> Arc { + Arc::new(LanceSchema { + fields: vec![ + LanceField::new_arrow("a", DataType::Int32, false).unwrap(), + LanceField::new_arrow("b", DataType::Int32, false).unwrap(), + LanceField::new_arrow("c", DataType::Utf8, false).unwrap(), + ], + metadata: Default::default(), + }) + } + + #[tokio::test] + async fn test_simple_projection_reordering() { + let batch = create_test_batch(); + let schema = create_test_schema(); + let plan = ProjectionPlan { + physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()), + must_add_row_offset: false, + requested_output_expr: vec![ + OutputColumn { + expr: Expr::Column(Column::from_name("c")), + name: "c".to_string(), + }, + OutputColumn { + expr: Expr::Column(Column::from_name("a")), + name: "a".to_string(), + }, + OutputColumn { + expr: Expr::Column(Column::from_name("b")), + name: "b".to_string(), + }, + ], + }; + + let result = plan.project_batch(batch.clone()).await.unwrap(); + + // Verify schema order + assert_eq!(result.schema().field(0).name(), "c"); + assert_eq!(result.schema().field(1).name(), "a"); + assert_eq!(result.schema().field(2).name(), "b"); + + // Verify data + let c_col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(c_col.value(0), "foo"); + + let a_col = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(a_col.value(0), 1); + } + + #[tokio::test] + async fn test_simple_projection_subset() { + let batch = create_test_batch(); + let schema = create_test_schema(); + let plan = ProjectionPlan { + physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()), + must_add_row_offset: false, + requested_output_expr: vec![ + OutputColumn { + expr: Expr::Column(Column::from_name("b")), + name: "b".to_string(), + }, + OutputColumn { + expr: Expr::Column(Column::from_name("c")), + name: "c".to_string(), + }, + ], + }; + + let result = plan.project_batch(batch).await.unwrap(); + + // Verify only 2 columns + assert_eq!(result.num_columns(), 2); + assert_eq!(result.schema().field(0).name(), "b"); + assert_eq!(result.schema().field(1).name(), "c"); + } + + #[tokio::test] + async fn test_simple_projection_duplicate() { + let batch = create_test_batch(); + let schema = create_test_schema(); + let plan = ProjectionPlan { + physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()), + must_add_row_offset: false, + requested_output_expr: vec![ + OutputColumn { + expr: Expr::Column(Column::from_name("a")), + name: "a".to_string(), + }, + OutputColumn { + expr: Expr::Column(Column::from_name("a")), + name: "a".to_string(), + }, + OutputColumn { + expr: Expr::Column(Column::from_name("b")), + name: "b".to_string(), + }, + ], + }; + + let result = plan.project_batch(batch).await.unwrap(); + + // Verify column is duplicated + assert_eq!(result.num_columns(), 3); + let a1 = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let a2 = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(a1.value(0), 1); + assert_eq!(a2.value(0), 1); + } + + #[tokio::test] + async fn test_simple_projection_aliasing() { + let batch = create_test_batch(); + let schema = create_test_schema(); + let plan = ProjectionPlan { + physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()), + must_add_row_offset: false, + requested_output_expr: vec![ + OutputColumn { + expr: Expr::Column(Column::from_name("a")), + name: "x".to_string(), + }, + OutputColumn { + expr: Expr::Column(Column::from_name("b")), + name: "y".to_string(), + }, + ], + }; + + let result = plan.project_batch(batch).await.unwrap(); + + // Verify aliased names + assert_eq!(result.schema().field(0).name(), "x"); + assert_eq!(result.schema().field(1).name(), "y"); + + // Verify data is correct + let x_col = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(x_col.value(0), 1); + } + + #[tokio::test] + async fn test_empty_projection() { + let batch = create_test_batch(); + let schema = create_test_schema(); + let plan = ProjectionPlan { + physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()), + must_add_row_offset: false, + requested_output_expr: vec![], + }; + + let result = plan.project_batch(batch.clone()).await.unwrap(); + + // Verify empty schema but same row count + assert_eq!(result.num_columns(), 0); + assert_eq!(result.num_rows(), batch.num_rows()); + } + + #[tokio::test] + async fn test_complex_projection_uses_datafusion() { + let batch = create_test_batch(); + let schema = create_test_schema(); + + // Create a projection with a complex expression (would require actual DataFusion setup) + // For now, we'll just verify that qualified columns fall back to DataFusion + let plan = ProjectionPlan { + physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()), + must_add_row_offset: false, + requested_output_expr: vec![OutputColumn { + expr: Expr::Column(Column::new(Some("table"), "a")), + name: "a".to_string(), + }], + }; + + // This should fall back to DataFusion (which will fail without proper setup) + // but the key is that it doesn't use the fast path + assert!(plan.try_simple_project_batch(&batch).is_none()); + } + + #[tokio::test] + async fn test_is_simple_column_ref() { + // Test simple column reference + let simple = Expr::Column(Column::from_name("a")); + assert_eq!(ProjectionPlan::is_simple_column_ref(&simple), Some("a")); + + // Test qualified column reference (should not be simple) + let qualified = Expr::Column(Column::new(Some("table"), "a")); + assert!(ProjectionPlan::is_simple_column_ref(&qualified).is_none()); + + // Test other expression types (should not be simple) + let literal = Expr::Literal(datafusion_common::ScalarValue::Int32(Some(1)), None); + assert!(ProjectionPlan::is_simple_column_ref(&literal).is_none()); + } }