Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions bench-vortex/src/engines/df/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use datafusion::execution::SessionStateBuilder;
use datafusion::execution::cache::cache_manager::CacheManagerConfig;
use datafusion::execution::cache::cache_unit::{DefaultFileStatisticsCache, DefaultListFilesCache};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::physical_plan::collect;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::GetExt;
use datafusion_physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -135,11 +134,10 @@ pub async fn execute_query(
ctx: &SessionContext,
query: &str,
) -> anyhow::Result<(Vec<RecordBatch>, Arc<dyn ExecutionPlan>)> {
let plan = ctx.sql(query).await?;
let (state, plan) = plan.into_parts();
let physical_plan = state.create_physical_plan(&plan).await?;
let df = ctx.sql(query).await?;
let physical_plan = df.clone().create_physical_plan().await?;
let result = df.collect().await?;

let result = collect(physical_plan.clone(), state.task_ctx()).await?;
Ok((result, physical_plan))
}

Expand Down
46 changes: 40 additions & 6 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
use std::sync::Arc;

use arrow_schema::{DataType, Schema};
use datafusion_common::ScalarValue;
use datafusion_expr::Operator as DFOperator;
use datafusion_functions::core::getfield::GetFieldFunc;
use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
use datafusion_physical_expr_common::physical_expr::{PhysicalExprRef, is_dynamic_physical_expr};
use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef, ScalarFunctionExpr};
use datafusion_physical_expr_common::physical_expr::{
is_dynamic_physical_expr, snapshot_physical_expr,
};
use datafusion_physical_plan::expressions as df_expr;
use itertools::Itertools;
use vortex::compute::LikeOptions;
Expand All @@ -22,13 +25,44 @@ use vortex::scalar::Scalar;

use crate::convert::{FromDataFusion, TryFromDataFusion};

fn is_lit_true(e: &PhysicalExprRef) -> bool {
e.as_any()
.downcast_ref::<df_expr::Literal>()
.is_some_and(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
}

/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty.
pub(crate) fn make_vortex_predicate(
predicate: &[&Arc<dyn PhysicalExpr>],
predicate: &[Arc<dyn PhysicalExpr>],
) -> VortexResult<Option<Expression>> {
let exprs = predicate
.iter()
.map(|e| Expression::try_from_df(e.as_ref()))
.filter_map(|expr| {
// Handle dynamic expressions by snapshotting them first
let expr_to_convert = if is_dynamic_physical_expr(expr) {
// If snapshot fails, filter out this expression
let snapshot = snapshot_physical_expr(expr.clone()).ok()?;

// Filter out literal true expressions (they don't add constraints)
if is_lit_true(&snapshot) {
return None;
}

snapshot
} else {
expr.clone()
};

// Try to convert to Vortex expression
match Expression::try_from_df(expr_to_convert.as_ref()) {
Ok(vortex_expr) => Some(Ok(vortex_expr)),
Err(_) => {
// If we fail to convert the expression to Vortex, it's safe
// to drop it as we don't declare it as pushed down
None
}
}
})
.collect::<VortexResult<Vec<_>>>()?;

Ok(exprs.into_iter().reduce(and))
Expand Down Expand Up @@ -323,15 +357,15 @@ mod tests {
#[test]
fn test_make_vortex_predicate_single() {
let col_expr = Arc::new(df_expr::Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
let result = make_vortex_predicate(&[&col_expr]).unwrap();
let result = make_vortex_predicate(&[col_expr]).unwrap();
assert!(result.is_some());
}

#[test]
fn test_make_vortex_predicate_multiple() {
let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc<dyn PhysicalExpr>;
let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc<dyn PhysicalExpr>;
let result = make_vortex_predicate(&[&col1, &col2]).unwrap();
let result = make_vortex_predicate(&[col1, col2]).unwrap();
assert!(result.is_some());
// Result should be an AND expression combining the two columns
}
Expand Down
47 changes: 20 additions & 27 deletions vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fn register_vortex_format_factory(

#[cfg(test)]
mod tests {

use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
Expand Down Expand Up @@ -195,75 +196,67 @@ mod tests {

#[tokio::test]
async fn create_table_ordered_by() -> anyhow::Result<()> {
let dir = TempDir::new().unwrap();
let dir = TempDir::new()?;

let factory: VortexFormatFactory = VortexFormatFactory::new();
let mut session_state_builder = SessionStateBuilder::new().with_default_features();
register_vortex_format_factory(factory, &mut session_state_builder);
let session = SessionContext::new_with_state(session_state_builder.build());

// Vortex
session
.sql(&format!(
"CREATE EXTERNAL TABLE my_tbl_vx \
"CREATE EXTERNAL TABLE my_tbl \
(c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
STORED AS vortex \
STORED AS VORTEX \
WITH ORDER (c1 ASC)
LOCATION '{}/vx/'",
LOCATION '{}/'",
dir.path().to_str().unwrap()
))
.await?;

session
.sql("INSERT INTO my_tbl_vx VALUES ('air', 5), ('balloon', 42)")
.sql("INSERT INTO my_tbl VALUES ('air', 10), ('alabama', 20), ('balloon', 30)")
.await?
.collect()
.await?;

session
.sql("INSERT INTO my_tbl_vx VALUES ('zebra', 5)")
.sql("INSERT INTO my_tbl VALUES ('kangaroo', 11), ('zebra', 21)")
.await?
.collect()
.await?;

session
.sql("INSERT INTO my_tbl_vx VALUES ('texas', 2000), ('alabama', 2000)")
.await?
.collect()
.await?;
let df = session.sql("SELECT * FROM my_tbl ORDER BY c1 ASC").await?;

let df = session
.sql("SELECT * FROM my_tbl_vx ORDER BY c1 ASC limit 3")
.await?;
let (state, plan) = df.clone().into_parts();
let physical_plan = state.create_physical_plan(&plan).await?;
let physical_plan = df.clone().create_physical_plan().await?;

insta::assert_snapshot!(DisplayableExecutionPlan::new(physical_plan.as_ref())
.tree_render().to_string(), @r"
┌───────────────────────────┐
│ SortPreservingMergeExec │
│ -------------------- │
│ c1 ASC NULLS LASTlimit: │
│ 3 │
│ c1 ASC NULLS LAST │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ DataSourceExec │
│ -------------------- │
│ files: 3
│ files: 2
│ format: vortex │
└───────────────────────────┘
");

let r = df.collect().await?;

insta::assert_snapshot!(pretty_format_batches(&r)?.to_string(), @r"
+---------+------+
| c1 | c2 |
+---------+------+
| air | 5 |
| alabama | 2000 |
| balloon | 42 |
+---------+------+
+----------+----+
| c1 | c2 |
+----------+----+
| air | 10 |
| alabama | 20 |
| balloon | 30 |
| kangaroo | 11 |
| zebra | 21 |
+----------+----+
");

Ok(())
Expand Down
36 changes: 17 additions & 19 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,9 @@ pub(crate) struct VortexOpener {
pub object_store: Arc<dyn ObjectStore>,
/// Projection by index of the file's columns
pub projection: Option<Arc<[usize]>>,
/// Filter expression optimized for pushdown into Vortex scan operations.
/// This may be a subset of file_pruning_predicate containing only expressions
/// that Vortex can efficiently evaluate.
pub filter: Option<PhysicalExprRef>,
/// Filter expression used by DataFusion's FilePruner to eliminate files based on
/// statistics and partition values without opening them.
pub file_pruning_predicate: Option<PhysicalExprRef>,
pub predicate: Option<PhysicalExprRef>,
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
/// Hive-style partitioning columns
Expand Down Expand Up @@ -149,8 +145,7 @@ impl FileOpener for VortexOpener {
let session = self.session.clone();
let object_store = self.object_store.clone();
let projection = self.projection.clone();
let mut filter = self.filter.clone();
let file_pruning_predicate = self.file_pruning_predicate.clone();
let mut predicate = self.predicate.clone();
let expr_adapter_factory = self.expr_adapter_factory.clone();
let partition_fields = self.partition_fields.clone();
let file_cache = self.file_cache.clone();
Expand Down Expand Up @@ -178,7 +173,8 @@ impl FileOpener for VortexOpener {
// opening them based on:
// - Partition column values (e.g., date=2024-01-01)
// - File-level statistics (min/max values per column)
let mut file_pruner = file_pruning_predicate
let mut file_pruner = predicate
.clone()
.map(|predicate| {
// Only create pruner if we have dynamic expressions or file statistics
// to work with. Static predicates without stats won't benefit from pruning.
Expand Down Expand Up @@ -225,15 +221,16 @@ impl FileOpener for VortexOpener {

// The adapter rewrites the expression to the local file schema, allowing
// for schema evolution and divergence between the table's schema and individual files.
filter = filter
.map(|filter| {
predicate = predicate
.clone()
.map(|expr| {
let logical_file_schema =
compute_logical_file_schema(&physical_file_schema, &logical_schema);

let expr = expr_adapter_factory
.create(logical_file_schema, physical_file_schema.clone())
.with_partition_values(partition_values)
.rewrite(filter)?;
.rewrite(expr)?;

// Expression might now reference columns that don't exist in the file, so we can give it
// another simplification pass.
Expand Down Expand Up @@ -294,11 +291,15 @@ impl FileOpener for VortexOpener {
);
}

let filter = filter
let filter = predicate
.and_then(|f| {
let exprs = split_conjunction(&f)
.into_iter()
.filter(|expr| can_be_pushed_down(expr, &predicate_file_schema))
.filter(|expr| {
is_dynamic_physical_expr(expr)
|| can_be_pushed_down(expr, &predicate_file_schema)
})
.cloned()
.collect::<Vec<_>>();

make_vortex_predicate(&exprs).transpose()
Expand Down Expand Up @@ -521,8 +522,7 @@ mod tests {
session: SESSION.clone(),
object_store: object_store.clone(),
projection: Some([0].into()),
filter: Some(filter),
file_pruning_predicate: None,
predicate: Some(filter),
expr_adapter_factory: expr_adapter_factory.clone(),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))],
Expand Down Expand Up @@ -602,8 +602,7 @@ mod tests {
session: SESSION.clone(),
object_store: object_store.clone(),
projection: Some([0].into()),
filter: Some(filter),
file_pruning_predicate: None,
predicate: Some(filter),
expr_adapter_factory: expr_adapter_factory.clone(),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![],
Expand Down Expand Up @@ -710,11 +709,10 @@ mod tests {
session: SESSION.clone(),
object_store: object_store.clone(),
projection: None,
filter: Some(logical2physical(
predicate: Some(logical2physical(
&col("my_struct").is_not_null(),
&table_schema,
)),
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![],
Expand Down
Loading
Loading