diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 6e772e7c8499..5aaf79997b63 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -876,7 +876,7 @@ pub fn parse_expr( .ok_or_else(|| Error::required("value"))? .try_into()?; - let expr = parse_required_expr(&field.expr, registry, "expr")?; + let expr = parse_required_expr(field.expr.as_deref(), registry, "expr")?; Ok(Expr::GetIndexedField(GetIndexedField::new( Box::new(expr), @@ -926,7 +926,7 @@ pub fn parse_expr( datafusion_expr::window_function::WindowFunction::AggregateFunction( aggr_function, ), - vec![parse_required_expr(&expr.expr, registry, "expr")?], + vec![parse_required_expr(expr.expr.as_deref(), registry, "expr")?], partition_by, order_by, window_frame, @@ -937,7 +937,7 @@ pub fn parse_expr( .ok_or_else(|| Error::unknown("BuiltInWindowFunction", *i))? .into(); - let args = parse_optional_expr(&expr.expr, registry)? + let args = parse_optional_expr(expr.expr.as_deref(), registry)? .map(|e| vec![e]) .unwrap_or_else(Vec::new); @@ -963,64 +963,104 @@ pub fn parse_expr( .map(|e| parse_expr(e, registry)) .collect::, _>>()?, expr.distinct, - parse_optional_expr(&expr.filter, registry)?.map(Box::new), + parse_optional_expr(expr.filter.as_deref(), registry)?.map(Box::new), ))) } ExprType::Alias(alias) => Ok(Expr::Alias( - Box::new(parse_required_expr(&alias.expr, registry, "expr")?), + Box::new(parse_required_expr( + alias.expr.as_deref(), + registry, + "expr", + )?), alias.alias.clone(), )), ExprType::IsNullExpr(is_null) => Ok(Expr::IsNull(Box::new(parse_required_expr( - &is_null.expr, + is_null.expr.as_deref(), registry, "expr", )?))), ExprType::IsNotNullExpr(is_not_null) => Ok(Expr::IsNotNull(Box::new( - parse_required_expr(&is_not_null.expr, registry, "expr")?, + parse_required_expr(is_not_null.expr.as_deref(), registry, "expr")?, ))), ExprType::NotExpr(not) => Ok(Expr::Not(Box::new(parse_required_expr( - ¬.expr, registry, "expr", + not.expr.as_deref(), + registry, + "expr", )?))), ExprType::IsTrue(msg) => Ok(Expr::IsTrue(Box::new(parse_required_expr( - &msg.expr, registry, "expr", + msg.expr.as_deref(), + registry, + "expr", )?))), ExprType::IsFalse(msg) => Ok(Expr::IsFalse(Box::new(parse_required_expr( - &msg.expr, registry, "expr", + msg.expr.as_deref(), + registry, + "expr", )?))), ExprType::IsUnknown(msg) => Ok(Expr::IsUnknown(Box::new(parse_required_expr( - &msg.expr, registry, "expr", + msg.expr.as_deref(), + registry, + "expr", )?))), ExprType::IsNotTrue(msg) => Ok(Expr::IsNotTrue(Box::new(parse_required_expr( - &msg.expr, registry, "expr", + msg.expr.as_deref(), + registry, + "expr", )?))), ExprType::IsNotFalse(msg) => Ok(Expr::IsNotFalse(Box::new(parse_required_expr( - &msg.expr, registry, "expr", + msg.expr.as_deref(), + registry, + "expr", )?))), ExprType::IsNotUnknown(msg) => Ok(Expr::IsNotUnknown(Box::new( - parse_required_expr(&msg.expr, registry, "expr")?, + parse_required_expr(msg.expr.as_deref(), registry, "expr")?, ))), ExprType::Between(between) => Ok(Expr::Between(Between::new( - Box::new(parse_required_expr(&between.expr, registry, "expr")?), + Box::new(parse_required_expr( + between.expr.as_deref(), + registry, + "expr", + )?), between.negated, - Box::new(parse_required_expr(&between.low, registry, "expr")?), - Box::new(parse_required_expr(&between.high, registry, "expr")?), + Box::new(parse_required_expr( + between.low.as_deref(), + registry, + "expr", + )?), + Box::new(parse_required_expr( + between.high.as_deref(), + registry, + "expr", + )?), ))), ExprType::Like(like) => Ok(Expr::Like(Like::new( like.negated, - Box::new(parse_required_expr(&like.expr, registry, "expr")?), - Box::new(parse_required_expr(&like.pattern, registry, "pattern")?), + Box::new(parse_required_expr(like.expr.as_deref(), registry, "expr")?), + Box::new(parse_required_expr( + like.pattern.as_deref(), + registry, + "pattern", + )?), parse_escape_char(&like.escape_char)?, ))), ExprType::Ilike(like) => Ok(Expr::ILike(Like::new( like.negated, - Box::new(parse_required_expr(&like.expr, registry, "expr")?), - Box::new(parse_required_expr(&like.pattern, registry, "pattern")?), + Box::new(parse_required_expr(like.expr.as_deref(), registry, "expr")?), + Box::new(parse_required_expr( + like.pattern.as_deref(), + registry, + "pattern", + )?), parse_escape_char(&like.escape_char)?, ))), ExprType::SimilarTo(like) => Ok(Expr::SimilarTo(Like::new( like.negated, - Box::new(parse_required_expr(&like.expr, registry, "expr")?), - Box::new(parse_required_expr(&like.pattern, registry, "pattern")?), + Box::new(parse_required_expr(like.expr.as_deref(), registry, "expr")?), + Box::new(parse_required_expr( + like.pattern.as_deref(), + registry, + "pattern", + )?), parse_escape_char(&like.escape_char)?, ))), ExprType::Case(case) => { @@ -1028,45 +1068,45 @@ pub fn parse_expr( .when_then_expr .iter() .map(|e| { - let when_expr = parse_required_expr_inner( - e.when_expr.as_ref(), - registry, - "when_expr", - )?; - let then_expr = parse_required_expr_inner( - e.then_expr.as_ref(), - registry, - "then_expr", - )?; + let when_expr = + parse_required_expr(e.when_expr.as_ref(), registry, "when_expr")?; + let then_expr = + parse_required_expr(e.then_expr.as_ref(), registry, "then_expr")?; Ok((Box::new(when_expr), Box::new(then_expr))) }) .collect::, Box)>, Error>>()?; Ok(Expr::Case(Case::new( - parse_optional_expr(&case.expr, registry)?.map(Box::new), + parse_optional_expr(case.expr.as_deref(), registry)?.map(Box::new), when_then_expr, - parse_optional_expr(&case.else_expr, registry)?.map(Box::new), + parse_optional_expr(case.else_expr.as_deref(), registry)?.map(Box::new), ))) } ExprType::Cast(cast) => { - let expr = Box::new(parse_required_expr(&cast.expr, registry, "expr")?); + let expr = + Box::new(parse_required_expr(cast.expr.as_deref(), registry, "expr")?); let data_type = cast.arrow_type.as_ref().required("arrow_type")?; Ok(Expr::Cast(Cast::new(expr, data_type))) } ExprType::TryCast(cast) => { - let expr = Box::new(parse_required_expr(&cast.expr, registry, "expr")?); + let expr = + Box::new(parse_required_expr(cast.expr.as_deref(), registry, "expr")?); let data_type = cast.arrow_type.as_ref().required("arrow_type")?; Ok(Expr::TryCast(TryCast::new(expr, data_type))) } ExprType::Sort(sort) => Ok(Expr::Sort(Sort::new( - Box::new(parse_required_expr(&sort.expr, registry, "expr")?), + Box::new(parse_required_expr(sort.expr.as_deref(), registry, "expr")?), sort.asc, sort.nulls_first, ))), ExprType::Negative(negative) => Ok(Expr::Negative(Box::new( - parse_required_expr(&negative.expr, registry, "expr")?, + parse_required_expr(negative.expr.as_deref(), registry, "expr")?, ))), ExprType::InList(in_list) => Ok(Expr::InList { - expr: Box::new(parse_required_expr(&in_list.expr, registry, "expr")?), + expr: Box::new(parse_required_expr( + in_list.expr.as_deref(), + registry, + "expr", + )?), list: in_list .list .iter() @@ -1295,7 +1335,8 @@ pub fn parse_expr( .iter() .map(|expr| parse_expr(expr, registry)) .collect::, Error>>()?, - filter: parse_optional_expr(&pb.filter, registry)?.map(Box::new), + filter: parse_optional_expr(pb.filter.as_deref(), registry)? + .map(Box::new), }) } @@ -1389,27 +1430,16 @@ pub fn from_proto_binary_op(op: &str) -> Result { } fn parse_optional_expr( - p: &Option>, + p: Option<&protobuf::LogicalExprNode>, registry: &dyn FunctionRegistry, ) -> Result, Error> { match p { - Some(expr) => parse_expr(expr.as_ref(), registry).map(Some), + Some(expr) => parse_expr(expr, registry).map(Some), None => Ok(None), } } fn parse_required_expr( - p: &Option>, - registry: &dyn FunctionRegistry, - field: impl Into, -) -> Result { - match p { - Some(expr) => parse_expr(expr.as_ref(), registry), - None => Err(Error::required(field)), - } -} - -fn parse_required_expr_inner( p: Option<&protobuf::LogicalExprNode>, registry: &dyn FunctionRegistry, field: impl Into, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index f1f7f8844df2..d1141c850f9c 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -76,24 +76,34 @@ pub(crate) fn parse_physical_expr( } ExprType::Literal(scalar) => Arc::new(Literal::new(scalar.try_into()?)), ExprType::BinaryExpr(binary_expr) => Arc::new(BinaryExpr::new( - parse_required_physical_box_expr( - &binary_expr.l, + parse_required_physical_expr( + binary_expr.l.as_deref(), registry, "left", input_schema, )?, logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?, - parse_required_physical_box_expr( - &binary_expr.r, + parse_required_physical_expr( + binary_expr.r.as_deref(), registry, "right", input_schema, )?, )), ExprType::DateTimeIntervalExpr(expr) => Arc::new(DateTimeIntervalExpr::try_new( - parse_required_physical_box_expr(&expr.l, registry, "left", input_schema)?, + parse_required_physical_expr( + expr.l.as_deref(), + registry, + "left", + input_schema, + )?, logical_plan::from_proto::from_proto_binary_op(&expr.op)?, - parse_required_physical_box_expr(&expr.r, registry, "right", input_schema)?, + parse_required_physical_expr( + expr.r.as_deref(), + registry, + "right", + input_schema, + )?, input_schema, )?), ExprType::AggregateExpr(_) => { @@ -111,23 +121,43 @@ pub(crate) fn parse_physical_expr( "Cannot convert sort expr node to physical expression".to_owned(), )); } - ExprType::IsNullExpr(e) => Arc::new(IsNullExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, - )), - ExprType::IsNotNullExpr(e) => Arc::new(IsNotNullExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, - )), - ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_box_expr( - &e.expr, + ExprType::IsNullExpr(e) => { + Arc::new(IsNullExpr::new(parse_required_physical_expr( + e.expr.as_deref(), + registry, + "expr", + input_schema, + )?)) + } + ExprType::IsNotNullExpr(e) => { + Arc::new(IsNotNullExpr::new(parse_required_physical_expr( + e.expr.as_deref(), + registry, + "expr", + input_schema, + )?)) + } + ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr( + e.expr.as_deref(), registry, "expr", input_schema, )?)), - ExprType::Negative(e) => Arc::new(NegativeExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, - )), + ExprType::Negative(e) => { + Arc::new(NegativeExpr::new(parse_required_physical_expr( + e.expr.as_deref(), + registry, + "expr", + input_schema, + )?)) + } ExprType::InList(e) => Arc::new(InListExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, + parse_required_physical_expr( + e.expr.as_deref(), + registry, + "expr", + input_schema, + )?, e.list .iter() .map(|x| parse_physical_expr(x, registry, input_schema)) @@ -165,12 +195,22 @@ pub(crate) fn parse_physical_expr( .transpose()?, )?), ExprType::Cast(e) => Arc::new(CastExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, + parse_required_physical_expr( + e.expr.as_deref(), + registry, + "expr", + input_schema, + )?, convert_required!(e.arrow_type)?, DEFAULT_DATAFUSION_CAST_OPTIONS, )), ExprType::TryCast(e) => Arc::new(TryCastExpr::new( - parse_required_physical_box_expr(&e.expr, registry, "expr", input_schema)?, + parse_required_physical_expr( + e.expr.as_deref(), + registry, + "expr", + input_schema, + )?, convert_required!(e.arrow_type)?, )), ExprType::ScalarFunction(e) => { @@ -221,14 +261,14 @@ pub(crate) fn parse_physical_expr( ExprType::LikeExpr(like_expr) => Arc::new(LikeExpr::new( like_expr.negated, like_expr.case_insensitive, - parse_required_physical_box_expr( - &like_expr.expr, + parse_required_physical_expr( + like_expr.expr.as_deref(), registry, "expr", input_schema, )?, - parse_required_physical_box_expr( - &like_expr.pattern, + parse_required_physical_expr( + like_expr.pattern.as_deref(), registry, "pattern", input_schema, @@ -239,28 +279,13 @@ pub(crate) fn parse_physical_expr( Ok(pexpr) } -fn parse_required_physical_box_expr( - expr: &Option>, - registry: &dyn FunctionRegistry, - field: &str, - input_schema: &Schema, -) -> Result, DataFusionError> { - expr.as_ref() - .map(|e| parse_physical_expr(e.as_ref(), registry, input_schema)) - .transpose()? - .ok_or_else(|| { - DataFusionError::Internal(format!("Missing required field {field:?}")) - }) -} - fn parse_required_physical_expr( expr: Option<&protobuf::PhysicalExprNode>, registry: &dyn FunctionRegistry, field: &str, input_schema: &Schema, ) -> Result, DataFusionError> { - expr.as_ref() - .map(|e| parse_physical_expr(e, registry, input_schema)) + expr.map(|e| parse_physical_expr(e, registry, input_schema)) .transpose()? .ok_or_else(|| { DataFusionError::Internal(format!("Missing required field {field:?}"))