Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ mod tests {
let plan = ctx.sql(&query).await?;
let plan = plan.create_physical_plan().await?;
let bytes = physical_plan_to_bytes(plan.clone())?;
let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false));
let plan2_formatted =
format!("{}", displayable(plan2.as_ref()).indent(false));
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ mod tests {
let plan = ctx.sql(&query).await?;
let plan = plan.create_physical_plan().await?;
let bytes = physical_plan_to_bytes(plan.clone())?;
let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false));
let plan2_formatted =
format!("{}", displayable(plan2.as_ref()).indent(false));
Expand Down
14 changes: 6 additions & 8 deletions datafusion-examples/examples/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@

use std::any::Any;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;

use datafusion::common::internal_err;
use datafusion::common::Result;
use datafusion::logical_expr::registry::FunctionRegistry;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_proto::physical_plan::{
Expand Down Expand Up @@ -71,9 +70,8 @@ async fn main() {
.expect("to proto");

// deserialize proto back to execution plan
let runtime = ctx.runtime_env();
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
.try_into_physical_plan(&ctx, runtime.deref(), &composed_codec)
.try_into_physical_plan(&ctx.task_ctx(), &composed_codec)
.expect("from proto");

// assert that the original and deserialized execution plans are equal
Expand Down Expand Up @@ -124,7 +122,7 @@ impl ExecutionPlan for ParentExec {
fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::TaskContext>,
_context: Arc<TaskContext>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
unreachable!()
}
Expand All @@ -139,7 +137,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_registry: &dyn FunctionRegistry,
_ctx: &TaskContext,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf == "ParentExec".as_bytes() {
Ok(Arc::new(ParentExec {
Expand Down Expand Up @@ -200,7 +198,7 @@ impl ExecutionPlan for ChildExec {
fn execute(
&self,
_partition: usize,
_context: Arc<datafusion::execution::TaskContext>,
_context: Arc<TaskContext>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream> {
unreachable!()
}
Expand All @@ -215,7 +213,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
&self,
buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_registry: &dyn FunctionRegistry,
_ctx: &TaskContext,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf == "ChildExec".as_bytes() {
Ok(Arc::new(ChildExec {}))
Expand Down
5 changes: 3 additions & 2 deletions datafusion/ffi/src/plan_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {

// TODO Extend FFI to get the registry and codex
let default_ctx = SessionContext::new();
let task_context = default_ctx.task_ctx();
let codex = DefaultPhysicalExtensionCodec {};

let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) };
Expand All @@ -190,7 +191,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let sort_exprs = parse_physical_sort_exprs(
&proto_output_ordering.physical_sort_expr_nodes,
&default_ctx,
&task_context,
&schema,
&codex,
)?;
Expand All @@ -202,7 +203,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let partitioning = parse_protobuf_partitioning(
Some(&proto_output_partitioning),
&default_ctx,
&task_context,
&schema,
&codex,
)?
Expand Down
5 changes: 3 additions & 2 deletions datafusion/ffi/src/udaf/accumulator_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,17 @@ impl TryFrom<FFI_AccumulatorArgs> for ForeignAccumulatorArgs {
let schema = Schema::try_from(&value.schema.0)?;

let default_ctx = SessionContext::new();
let task_ctx = default_ctx.task_ctx();
let codex = DefaultPhysicalExtensionCodec {};

let order_bys = parse_physical_sort_exprs(
&proto_def.ordering_req,
&default_ctx,
&task_ctx,
&schema,
&codex,
)?;

let exprs = parse_physical_exprs(&proto_def.expr, &default_ctx, &schema, &codex)?;
let exprs = parse_physical_exprs(&proto_def.expr, &task_ctx, &schema, &codex)?;

Ok(Self {
return_field,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/ffi/src/udwf/partition_evaluator_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl TryFrom<FFI_PartitionEvaluatorArgs> for ForeignPartitionEvaluatorArgs {
.map_err(|e| DataFusionError::Execution(e.to_string()))?
.iter()
.map(|expr_node| {
parse_physical_expr(expr_node, &default_ctx, &schema, &codec)
parse_physical_expr(expr_node, &default_ctx.task_ctx(), &schema, &codec)
})
.collect::<Result<Vec<_>>>()?;

Expand Down
9 changes: 5 additions & 4 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use crate::protobuf;
use datafusion::execution::TaskContext;
use datafusion_common::{plan_datafusion_err, Result};
use datafusion_expr::{
create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
Expand Down Expand Up @@ -316,13 +317,13 @@ pub fn physical_plan_from_json(
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
back.try_into_physical_plan(ctx, &ctx.runtime_env(), &extension_codec)
back.try_into_physical_plan(&ctx.task_ctx(), &extension_codec)
}

/// Deserialize a PhysicalPlan from bytes
pub fn physical_plan_from_bytes(
bytes: &[u8],
ctx: &SessionContext,
ctx: &TaskContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let extension_codec = DefaultPhysicalExtensionCodec {};
physical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
Expand All @@ -331,10 +332,10 @@ pub fn physical_plan_from_bytes(
/// Deserialize a PhysicalPlan from bytes
pub fn physical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &SessionContext,
ctx: &TaskContext,
extension_codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn ExecutionPlan>> {
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
protobuf.try_into_physical_plan(ctx, &ctx.runtime_env(), extension_codec)
protobuf.try_into_physical_plan(ctx, extension_codec)
}
2 changes: 1 addition & 1 deletion datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
//! let bytes = physical_plan_to_bytes(physical_plan.clone())?;
//!
//! // Decode bytes from somewhere (over network, etc.) back to ExecutionPlan
//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx)?;
//! let physical_round_trip = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
//! assert_eq!(format!("{:?}", physical_plan), format!("{:?}", physical_round_trip));
//! # Ok(())
//! # }
Expand Down
29 changes: 12 additions & 17 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
FileGroup, FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
};
use datafusion::execution::FunctionRegistry;
use datafusion::execution::{FunctionRegistry, TaskContext};
use datafusion::logical_expr::WindowFunctionDefinition;
use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr, ScalarFunctionExpr};
use datafusion::physical_plan::expressions::{
Expand All @@ -47,8 +47,6 @@ use datafusion::physical_plan::expressions::{
};
use datafusion::physical_plan::windows::{create_window_expr, schema_add_window_field};
use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
use datafusion::prelude::SessionContext;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_proto_common::common::proto_error;

Expand Down Expand Up @@ -76,7 +74,7 @@ impl From<&protobuf::PhysicalColumn> for Column {
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_sort_expr(
proto: &protobuf::PhysicalSortExprNode,
ctx: &SessionContext,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<PhysicalSortExpr> {
Expand All @@ -103,7 +101,7 @@ pub fn parse_physical_sort_expr(
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_sort_exprs(
proto: &[protobuf::PhysicalSortExprNode],
ctx: &SessionContext,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Vec<PhysicalSortExpr>> {
Expand All @@ -125,7 +123,7 @@ pub fn parse_physical_sort_exprs(
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_window_expr(
proto: &protobuf::PhysicalWindowExprNode,
ctx: &SessionContext,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn WindowExpr>> {
Expand Down Expand Up @@ -186,7 +184,7 @@ pub fn parse_physical_window_expr(

pub fn parse_physical_exprs<'a, I>(
protos: I,
ctx: &SessionContext,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Vec<Arc<dyn PhysicalExpr>>>
Expand All @@ -210,7 +208,7 @@ where
/// * `codec` - An extension codec used to decode custom UDFs.
pub fn parse_physical_expr(
proto: &protobuf::PhysicalExprNode,
ctx: &SessionContext,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Arc<dyn PhysicalExpr>> {
Expand Down Expand Up @@ -364,11 +362,8 @@ pub fn parse_physical_expr(
let scalar_fun_def = Arc::clone(&udf);

let args = parse_physical_exprs(&e.args, ctx, input_schema, codec)?;
let config_options =
match ctx.state().execution_props().config_options.as_ref() {
Some(config_options) => Arc::clone(config_options),
None => Arc::new(ConfigOptions::default()),
};

let config_options = Arc::clone(ctx.session_config().options());

Arc::new(
ScalarFunctionExpr::new(
Expand Down Expand Up @@ -419,7 +414,7 @@ pub fn parse_physical_expr(

fn parse_required_physical_expr(
expr: Option<&protobuf::PhysicalExprNode>,
ctx: &SessionContext,
ctx: &TaskContext,
field: &str,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
Expand All @@ -433,7 +428,7 @@ fn parse_required_physical_expr(

pub fn parse_protobuf_hash_partitioning(
partitioning: Option<&protobuf::PhysicalHashRepartition>,
ctx: &SessionContext,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Option<Partitioning>> {
Expand All @@ -453,7 +448,7 @@ pub fn parse_protobuf_hash_partitioning(

pub fn parse_protobuf_partitioning(
partitioning: Option<&protobuf::Partitioning>,
ctx: &SessionContext,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
) -> Result<Option<Partitioning>> {
Expand Down Expand Up @@ -491,7 +486,7 @@ pub fn parse_protobuf_file_scan_schema(

pub fn parse_protobuf_file_scan_config(
proto: &protobuf::FileScanExecConf,
ctx: &SessionContext,
ctx: &TaskContext,
codec: &dyn PhysicalExtensionCodec,
file_source: Arc<dyn FileSource>,
) -> Result<FileScanConfig> {
Expand Down
Loading