Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PhysicalExtensionCodec consistently #10075

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 44 additions & 27 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;

use crate::common::proto_error;
use crate::convert_required;
use crate::logical_plan::{self, csv_writer_options_from_proto};
use crate::protobuf::physical_expr_node::ExprType;
use crate::protobuf::{self, copy_to_node};

use arrow::compute::SortOptions;
use chrono::{TimeZone, Utc};
use object_store::path::Path;
use object_store::ObjectMeta;

use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::file_format::csv::CsvSink;
use datafusion::datasource::file_format::json::JsonSink;
Expand Down Expand Up @@ -57,13 +55,15 @@ use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common::{not_impl_err, DataFusionError, JoinSide, Result, ScalarValue};

use chrono::{TimeZone, Utc};
use datafusion_expr::ScalarFunctionDefinition;
use object_store::path::Path;
use object_store::ObjectMeta;

use super::{DefaultPhysicalExtensionCodec, PhysicalExtensionCodec};
use crate::common::proto_error;
use crate::convert_required;
use crate::logical_plan::{self, csv_writer_options_from_proto};
use crate::protobuf::physical_expr_node::ExprType;
use crate::protobuf::{self, copy_to_node};

use super::PhysicalExtensionCodec;

impl From<&protobuf::PhysicalColumn> for Column {
fn from(c: &protobuf::PhysicalColumn) -> Column {
Expand All @@ -76,9 +76,10 @@ impl From<&protobuf::PhysicalColumn> for Column {
/// # Arguments
///
/// * `proto` - Input proto with physical sort expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for improving the comments here as well ❤️

pub fn parse_physical_sort_expr(
proto: &protobuf::PhysicalSortExprNode,
registry: &dyn FunctionRegistry,
Expand All @@ -102,9 +103,10 @@ pub fn parse_physical_sort_expr(
/// # Arguments
///
/// * `proto` - Input proto with vector of physical sort expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_sort_exprs(
proto: &[protobuf::PhysicalSortExprNode],
registry: &dyn FunctionRegistry,
Expand All @@ -123,25 +125,26 @@ pub fn parse_physical_sort_exprs(
///
/// # Arguments
///
/// * `proto` - Input proto with physical window exprression node.
/// * `proto` - Input proto with physical window expression node.
/// * `name` - Name of the window expression.
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_window_expr(
proto: &protobuf::PhysicalWindowExprNode,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn WindowExpr>> {
let codec = DefaultPhysicalExtensionCodec {};
let window_node_expr =
parse_physical_exprs(&proto.args, registry, input_schema, &codec)?;
parse_physical_exprs(&proto.args, registry, input_schema, codec)?;

let partition_by =
parse_physical_exprs(&proto.partition_by, registry, input_schema, &codec)?;
parse_physical_exprs(&proto.partition_by, registry, input_schema, codec)?;

let order_by =
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, &codec)?;
parse_physical_sort_exprs(&proto.order_by, registry, input_schema, codec)?;

let window_frame = proto
.window_frame
Expand Down Expand Up @@ -187,9 +190,10 @@ where
/// # Arguments
///
/// * `proto` - Input proto with physical expression node
/// * `registry` - A registry knows how to build logical expressions out of user-defined function' names
/// * `registry` - A registry knows how to build logical expressions out of user-defined function names
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
/// when performing type coercion.
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
registry: &dyn FunctionRegistry,
Expand All @@ -213,13 +217,15 @@ pub fn parse_physical_expr(
registry,
"left",
input_schema,
codec,
)?,
logical_plan::from_proto::from_proto_binary_op(&binary_expr.op)?,
parse_required_physical_expr(
binary_expr.r.as_deref(),
registry,
"right",
input_schema,
codec,
)?,
)),
ExprType::AggregateExpr(_) => {
Expand All @@ -241,6 +247,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::IsNotNullExpr(e) => {
Expand All @@ -249,20 +256,23 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::NotExpr(e) => Arc::new(NotExpr::new(parse_required_physical_expr(
e.expr.as_deref(),
registry,
"expr",
input_schema,
codec,
)?)),
ExprType::Negative(e) => {
Arc::new(NegativeExpr::new(parse_required_physical_expr(
e.expr.as_deref(),
registry,
"expr",
input_schema,
codec,
)?))
}
ExprType::InList(e) => in_list(
Expand All @@ -271,6 +281,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
parse_physical_exprs(&e.list, registry, input_schema, codec)?,
&e.negated,
Expand All @@ -290,12 +301,14 @@ pub fn parse_physical_expr(
registry,
"when_expr",
input_schema,
codec,
)?,
parse_required_physical_expr(
e.then_expr.as_ref(),
registry,
"then_expr",
input_schema,
codec,
)?,
))
})
Expand All @@ -311,6 +324,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
convert_required!(e.arrow_type)?,
None,
Expand All @@ -321,6 +335,7 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
convert_required!(e.arrow_type)?,
)),
Expand Down Expand Up @@ -371,12 +386,14 @@ pub fn parse_physical_expr(
registry,
"expr",
input_schema,
codec,
)?,
parse_required_physical_expr(
like_expr.pattern.as_deref(),
registry,
"pattern",
input_schema,
codec,
)?,
)),
};
Expand All @@ -389,9 +406,9 @@ fn parse_required_physical_expr(
registry: &dyn FunctionRegistry,
field: &str,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn PhysicalExpr>> {
let codec = DefaultPhysicalExtensionCodec {};
expr.map(|e| parse_physical_expr(e, registry, input_schema, &codec))
expr.map(|e| parse_physical_expr(e, registry, input_schema, codec))
.transpose()?
.ok_or_else(|| {
DataFusionError::Internal(format!("Missing required field {field:?}"))
Expand Down Expand Up @@ -433,15 +450,15 @@ pub fn parse_protobuf_hash_partitioning(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
registry: &dyn FunctionRegistry,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Option<Partitioning>> {
match partitioning {
Some(hash_part) => {
let codec = DefaultPhysicalExtensionCodec {};
let expr = parse_physical_exprs(
&hash_part.hash_expr,
registry,
input_schema,
&codec,
codec,
)?;

Ok(Some(Partitioning::Hash(
Expand All @@ -456,6 +473,7 @@ pub fn parse_protobuf_hash_partitioning(
pub fn parse_protobuf_file_scan_config(
proto: &protobuf::FileScanExecConf,
registry: &dyn FunctionRegistry,
codec: &dyn PhysicalExtensionCodec,
) -> Result<FileScanConfig> {
let schema: Arc<Schema> = Arc::new(convert_required!(proto.schema)?);
let projection = proto
Expand Down Expand Up @@ -489,7 +507,7 @@ pub fn parse_protobuf_file_scan_config(
.collect::<Result<Vec<_>>>()?;

// Remove partition columns from the schema after recreating table_partition_cols
// because the partition columns are not in the file. They are present to allow the
// because the partition columns are not in the file. They are present to allow
// the partition column types to be reconstructed after serde.
let file_schema = Arc::new(Schema::new(
schema
Expand All @@ -502,12 +520,11 @@ pub fn parse_protobuf_file_scan_config(

let mut output_ordering = vec![];
for node_collection in &proto.output_ordering {
let codec = DefaultPhysicalExtensionCodec {};
let sort_expr = parse_physical_sort_exprs(
&node_collection.physical_sort_expr_nodes,
registry,
&schema,
&codec,
codec,
)?;
output_ordering.push(sort_expr);
}
Expand Down
Loading