Skip to content

Commit

Permalink
Add missing protobuf serialisation functionality GetIndexedFieldExpr. (
Browse files Browse the repository at this point in the history
…#5324)

This is required in order for `ballista` to serialise the indexing
expressions for nested lists. Otherwise, `ballista` cannot execute
any SQL that contains indexing expressions.
  • Loading branch information
ahmedriza authored Feb 17, 2023
1 parent d6d9c6f commit c5108ae
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 5 deletions.
9 changes: 8 additions & 1 deletion datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,8 @@ message PhysicalExprNode {
PhysicalDateTimeIntervalExprNode date_time_interval_expr = 17;

PhysicalLikeExprNode like_expr = 18;

PhysicalGetIndexedFieldExprNode get_indexed_field_expr = 19;
}
}

Expand Down Expand Up @@ -1345,4 +1347,9 @@ message ColumnStats {
ScalarValue max_value = 2;
uint32 null_count = 3;
uint32 distinct_count = 4;
}
}

message PhysicalGetIndexedFieldExprNode {
PhysicalExprNode arg = 1;
ScalarValue key = 2;
}
122 changes: 122 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::window_function::WindowFunction;
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::expressions::GetIndexedFieldExpr;
use datafusion::physical_plan::expressions::LikeExpr;
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::{
Expand Down Expand Up @@ -282,6 +283,17 @@ pub fn parse_physical_expr(
input_schema,
)?,
)),
ExprType::GetIndexedFieldExpr(get_indexed_field_expr) => {
Arc::new(GetIndexedFieldExpr::new(
parse_required_physical_expr(
get_indexed_field_expr.arg.as_deref(),
registry,
"arg",
input_schema,
)?,
convert_required!(get_indexed_field_expr.key)?,
))
}
};

Ok(pexpr)
Expand Down
28 changes: 27 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ mod roundtrip_tests {
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::aggregates::PhysicalGroupBy;
use datafusion::physical_plan::expressions::like;
use datafusion::physical_plan::expressions::{like, GetIndexedFieldExpr};
use datafusion::physical_plan::functions;
use datafusion::physical_plan::functions::make_scalar_function;
use datafusion::physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -1628,4 +1628,30 @@ mod roundtrip_tests {
)?);
roundtrip_test(plan)
}

#[test]
fn roundtrip_get_indexed_field() -> Result<()> {
let fields = vec![
Field::new("id", DataType::Int64, true),
Field::new(
"a",
DataType::List(Box::new(Field::new("item", DataType::Float64, true))),
true,
),
];

let schema = Schema::new(fields);
let input = Arc::new(EmptyExec::new(false, Arc::new(schema.clone())));

let col_a = col("a", &schema)?;
let key = ScalarValue::Int64(Some(1));
let get_indexed_field_expr = Arc::new(GetIndexedFieldExpr::new(col_a, key));

let plan = Arc::new(ProjectionExec::try_new(
vec![(get_indexed_field_expr, "result".to_string())],
input,
)?);

roundtrip_test(plan)
}
}
15 changes: 13 additions & 2 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ use datafusion::physical_plan::expressions::{
use datafusion::physical_plan::{AggregateExpr, PhysicalExpr};

use crate::protobuf;
use crate::protobuf::PhysicalSortExprNode;
use crate::protobuf::{PhysicalSortExprNode, ScalarValue};
use datafusion::logical_expr::BuiltinScalarFunction;
use datafusion::physical_expr::expressions::DateTimeIntervalExpr;
use datafusion::physical_expr::expressions::{DateTimeIntervalExpr, GetIndexedFieldExpr};
use datafusion::physical_expr::ScalarFunctionExpr;
use datafusion::physical_plan::joins::utils::JoinSide;
use datafusion_common::DataFusionError;
Expand Down Expand Up @@ -342,6 +342,17 @@ impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
}),
)),
})
} else if let Some(expr) = expr.downcast_ref::<GetIndexedFieldExpr>() {
Ok(protobuf::PhysicalExprNode {
expr_type: Some(
protobuf::physical_expr_node::ExprType::GetIndexedFieldExpr(
Box::new(protobuf::PhysicalGetIndexedFieldExprNode {
arg: Some(Box::new(expr.arg().to_owned().try_into()?)),
key: Some(ScalarValue::try_from(expr.key())?),
}),
),
),
})
} else {
Err(DataFusionError::Internal(format!(
"physical_plan::to_proto() unsupported expression {value:?}"
Expand Down

0 comments on commit c5108ae

Please sign in to comment.