Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 311 additions & 0 deletions rust/lance-datafusion/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,89 @@ impl ProjectionPlan {
))
}

/// Returns the column name if expr is a simple column reference, None otherwise
///
/// A simple column reference is an unqualified column (relation is None) with no
/// computation or nested field access. These can be optimized with direct column
/// reordering instead of using DataFusion execution.
fn is_simple_column_ref(expr: &Expr) -> Option<&str> {
match expr {
Expr::Column(Column {
name,
relation: None,
..
}) => Some(name.as_str()),
_ => None,
}
}

/// Try to project batch using simple column reordering (no DataFusion)
///
/// This is a fast path optimization for projections that consist entirely of
/// simple column references. Instead of creating a full DataFusion execution
/// plan, this directly reorders the Arc-wrapped column arrays.
///
/// Returns:
/// - Some(Ok(batch)) if successful
/// - Some(Err(e)) if error during construction
/// - None if not a simple projection (caller should use DataFusion)
fn try_simple_project_batch(&self, batch: &RecordBatch) -> Option<Result<RecordBatch>> {
// Empty projections need special handling in DataFusion (can't create RecordBatch with 0 columns)
if self.requested_output_expr.is_empty() {
return None;
}

// Check if all expressions are simple column references
let column_names: Vec<&str> = self
.requested_output_expr
.iter()
.map(|output_col| Self::is_simple_column_ref(&output_col.expr))
.collect::<Option<Vec<_>>>()?; // Returns None if any expr is complex

// Build output schema and column array
let input_schema = batch.schema();
let mut output_fields = Vec::with_capacity(column_names.len());
let mut output_columns = Vec::with_capacity(column_names.len());

for (output_col, &column_name) in self.requested_output_expr.iter().zip(column_names.iter())
{
// Find column in input batch (return None if not found -> let DataFusion error)
let index = input_schema.index_of(column_name).ok()?;
let field = input_schema.field(index);

// Create field with output name (handles aliasing: SELECT col AS alias)
let output_field = if &output_col.name != field.name() {
ArrowField::new(
&output_col.name,
field.data_type().clone(),
field.is_nullable(),
)
} else {
(*field).clone()
};

output_fields.push(output_field);
output_columns.push(batch.column(index).clone()); // Arc clone - cheap!
}

// Build output batch
let output_schema = Arc::new(ArrowSchema::new(output_fields));
Some(
RecordBatch::try_new(output_schema, output_columns).map_err(|e| Error::Internal {
message: format!("Failed to create projected batch: {}", e),
location: location!(),
}),
)
}

#[instrument(skip_all, level = "debug")]
pub async fn project_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
// Try fast path for simple column projections
if let Some(result) = self.try_simple_project_batch(&batch) {
return result;
}

// Fall back to DataFusion for complex expressions
let src = Arc::new(OneShotExec::from_batch(batch));
let physical_exprs = self.to_physical_exprs(&self.physical_projection.to_arrow_schema())?;
let projection = Arc::new(ProjectionExec::try_new(physical_exprs, src)?);
Expand Down Expand Up @@ -477,4 +558,234 @@ mod tests {
let output_field = output.field_with_name("meta").unwrap();
assert!(is_json_field(output_field));
}

use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use lance_core::datatypes::{Field as LanceField, Schema as LanceSchema};

fn create_test_batch() -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("a", DataType::Int32, false),
ArrowField::new("b", DataType::Int32, false),
ArrowField::new("c", DataType::Utf8, false),
]));

let a = Arc::new(Int32Array::from(vec![1, 2, 3]));
let b = Arc::new(Int32Array::from(vec![4, 5, 6]));
let c = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));

RecordBatch::try_new(schema, vec![a, b, c]).unwrap()
}

fn create_test_schema() -> Arc<LanceSchema> {
Arc::new(LanceSchema {
fields: vec![
LanceField::new_arrow("a", DataType::Int32, false).unwrap(),
LanceField::new_arrow("b", DataType::Int32, false).unwrap(),
LanceField::new_arrow("c", DataType::Utf8, false).unwrap(),
],
metadata: Default::default(),
})
}

#[tokio::test]
async fn test_simple_projection_reordering() {
let batch = create_test_batch();
let schema = create_test_schema();
let plan = ProjectionPlan {
physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()),
must_add_row_offset: false,
requested_output_expr: vec![
OutputColumn {
expr: Expr::Column(Column::from_name("c")),
name: "c".to_string(),
},
OutputColumn {
expr: Expr::Column(Column::from_name("a")),
name: "a".to_string(),
},
OutputColumn {
expr: Expr::Column(Column::from_name("b")),
name: "b".to_string(),
},
],
};

let result = plan.project_batch(batch.clone()).await.unwrap();

// Verify schema order
assert_eq!(result.schema().field(0).name(), "c");
assert_eq!(result.schema().field(1).name(), "a");
assert_eq!(result.schema().field(2).name(), "b");

// Verify data
let c_col = result
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(c_col.value(0), "foo");

let a_col = result
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(a_col.value(0), 1);
}

#[tokio::test]
async fn test_simple_projection_subset() {
let batch = create_test_batch();
let schema = create_test_schema();
let plan = ProjectionPlan {
physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()),
must_add_row_offset: false,
requested_output_expr: vec![
OutputColumn {
expr: Expr::Column(Column::from_name("b")),
name: "b".to_string(),
},
OutputColumn {
expr: Expr::Column(Column::from_name("c")),
name: "c".to_string(),
},
],
};

let result = plan.project_batch(batch).await.unwrap();

// Verify only 2 columns
assert_eq!(result.num_columns(), 2);
assert_eq!(result.schema().field(0).name(), "b");
assert_eq!(result.schema().field(1).name(), "c");
}

#[tokio::test]
async fn test_simple_projection_duplicate() {
let batch = create_test_batch();
let schema = create_test_schema();
let plan = ProjectionPlan {
physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()),
must_add_row_offset: false,
requested_output_expr: vec![
OutputColumn {
expr: Expr::Column(Column::from_name("a")),
name: "a".to_string(),
},
OutputColumn {
expr: Expr::Column(Column::from_name("a")),
name: "a".to_string(),
},
OutputColumn {
expr: Expr::Column(Column::from_name("b")),
name: "b".to_string(),
},
],
};

let result = plan.project_batch(batch).await.unwrap();

// Verify column is duplicated
assert_eq!(result.num_columns(), 3);
let a1 = result
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let a2 = result
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(a1.value(0), 1);
assert_eq!(a2.value(0), 1);
}

#[tokio::test]
async fn test_simple_projection_aliasing() {
let batch = create_test_batch();
let schema = create_test_schema();
let plan = ProjectionPlan {
physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()),
must_add_row_offset: false,
requested_output_expr: vec![
OutputColumn {
expr: Expr::Column(Column::from_name("a")),
name: "x".to_string(),
},
OutputColumn {
expr: Expr::Column(Column::from_name("b")),
name: "y".to_string(),
},
],
};

let result = plan.project_batch(batch).await.unwrap();

// Verify aliased names
assert_eq!(result.schema().field(0).name(), "x");
assert_eq!(result.schema().field(1).name(), "y");

// Verify data is correct
let x_col = result
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(x_col.value(0), 1);
}

#[tokio::test]
async fn test_empty_projection() {
let batch = create_test_batch();
let schema = create_test_schema();
let plan = ProjectionPlan {
physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()),
must_add_row_offset: false,
requested_output_expr: vec![],
};

let result = plan.project_batch(batch.clone()).await.unwrap();

// Verify empty schema but same row count
assert_eq!(result.num_columns(), 0);
assert_eq!(result.num_rows(), batch.num_rows());
}

#[tokio::test]
async fn test_complex_projection_uses_datafusion() {
let batch = create_test_batch();
let schema = create_test_schema();

// Create a projection with a complex expression (would require actual DataFusion setup)
// For now, we'll just verify that qualified columns fall back to DataFusion
let plan = ProjectionPlan {
physical_projection: Projection::full(schema).with_blob_version(BlobVersion::default()),
must_add_row_offset: false,
requested_output_expr: vec![OutputColumn {
expr: Expr::Column(Column::new(Some("table"), "a")),
name: "a".to_string(),
}],
};

// This should fall back to DataFusion (which will fail without proper setup)
// but the key is that it doesn't use the fast path
assert!(plan.try_simple_project_batch(&batch).is_none());
}

#[tokio::test]
async fn test_is_simple_column_ref() {
// Test simple column reference
let simple = Expr::Column(Column::from_name("a"));
assert_eq!(ProjectionPlan::is_simple_column_ref(&simple), Some("a"));

// Test qualified column reference (should not be simple)
let qualified = Expr::Column(Column::new(Some("table"), "a"));
assert!(ProjectionPlan::is_simple_column_ref(&qualified).is_none());

// Test other expression types (should not be simple)
let literal = Expr::Literal(datafusion_common::ScalarValue::Int32(Some(1)), None);
assert!(ProjectionPlan::is_simple_column_ref(&literal).is_none());
}
}
Loading