From 880cfc6d23c0143e5002bb447c145bc08997773d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jan 2023 18:14:20 -0700 Subject: [PATCH 1/3] Add DataFrame methods for accessing plans --- src/dataframe.rs | 18 +++++++++++++++++ src/lib.rs | 2 ++ src/physical_plan.rs | 46 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 src/physical_plan.rs diff --git a/src/dataframe.rs b/src/dataframe.rs index b56cadf4..addf2436 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use crate::logical::PyLogicalPlan; +use crate::physical_plan::PyExecutionPlan; use crate::utils::wait_for_future; use crate::{errors::DataFusionError, expression::PyExpr}; use datafusion::arrow::datatypes::Schema; @@ -203,6 +205,22 @@ impl PyDataFrame { pretty::print_batches(&batches).map_err(|err| PyArrowException::new_err(err.to_string())) } + /// Get the logical plan for this `DataFrame` + fn logical_plan(&self) -> PyResult { + Ok(self.df.as_ref().clone().into_optimized_plan()?.into()) + } + + /// Get the optimized logical plan for this `DataFrame` + fn optimized_logical_plan(&self) -> PyResult { + Ok(self.df.as_ref().clone().into_optimized_plan()?.into()) + } + + /// Get the execution plan for this `DataFrame` + fn execution_plan(&self, py: Python) -> PyResult { + let plan = wait_for_future(py, self.df.as_ref().clone().create_physical_plan())?; + Ok(plan.into()) + } + /// Repartition a `DataFrame` based on a logical partitioning scheme. fn repartition(&self, num: usize) -> PyResult { let new_df = self diff --git a/src/lib.rs b/src/lib.rs index 21b47f44..eda74ff0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,6 +35,7 @@ mod expression; #[allow(clippy::borrow_deref_ref)] mod functions; pub mod logical; +pub mod physical_plan; mod pyarrow_filter_expression; pub mod store; pub mod substrait; @@ -65,6 +66,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; // Register the functions as a submodule let funcs = PyModule::new(py, "functions")?; diff --git a/src/physical_plan.rs b/src/physical_plan.rs new file mode 100644 index 00000000..948df52e --- /dev/null +++ b/src/physical_plan.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::physical_plan::ExecutionPlan; +use std::sync::Arc; + +use pyo3::prelude::*; + +#[pyclass(name = "ExecutionPlan", module = "substrait", subclass)] +#[derive(Debug, Clone)] +pub struct PyExecutionPlan { + pub(crate) plan: Arc, +} + +impl PyExecutionPlan { + /// creates a new PyPhysicalPlan + pub fn new(plan: Arc) -> Self { + Self { plan } + } +} + +impl From for Arc { + fn from(plan: PyExecutionPlan) -> Arc { + plan.plan.clone() + } +} + +impl From> for PyExecutionPlan { + fn from(plan: Arc) -> PyExecutionPlan { + PyExecutionPlan { plan: plan.clone() } + } +} From f71ca496068880e5c56990486f886f08d6d0da67 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jan 2023 18:21:12 -0700 Subject: [PATCH 2/3] add tests --- datafusion/tests/test_dataframe.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py index aac5db29..3ecb8365 100644 --- a/datafusion/tests/test_dataframe.py +++ b/datafusion/tests/test_dataframe.py @@ -258,6 +258,21 @@ def test_explain(df): df.explain() +def test_logical_plan(df): + plan = df.logical_plan() + assert plan is not None + + +def test_optimized_logical_plan(df): + plan = df.optimized_logical_plan() + assert plan is not None + + +def test_execution_plan(df): + plan = df.execution_plan() + assert plan is not None + + def test_repartition(df): df.repartition(2) From eae9bc7dd56ad2f7b3763abdd2e6f498fbe330e1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jan 2023 18:28:44 -0700 Subject: [PATCH 3/3] change module names --- src/logical.rs | 2 +- src/physical_plan.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/logical.rs b/src/logical.rs index 8c3acf71..304cdf99 100644 --- a/src/logical.rs +++ b/src/logical.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use datafusion_expr::LogicalPlan; use pyo3::prelude::*; -#[pyclass(name = "LogicalPlan", module = "substrait", subclass)] +#[pyclass(name = "LogicalPlan", module = "datafusion", subclass)] #[derive(Debug, Clone)] pub struct PyLogicalPlan { pub(crate) plan: Arc, diff --git a/src/physical_plan.rs b/src/physical_plan.rs index 948df52e..b4c68b9f 100644 --- a/src/physical_plan.rs +++ b/src/physical_plan.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use pyo3::prelude::*; -#[pyclass(name = "ExecutionPlan", module = "substrait", subclass)] +#[pyclass(name = "ExecutionPlan", module = "datafusion", subclass)] #[derive(Debug, Clone)] pub struct PyExecutionPlan { pub(crate) plan: Arc,