From e9db691818c2eca959d23cf422f1b135bfd3fc85 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 2 Mar 2023 14:41:47 -0500 Subject: [PATCH] chore: Remove references from SessionState from physical_plan --- datafusion/core/src/dataframe.rs | 11 +++++++---- datafusion/core/src/execution/context.rs | 9 +++------ datafusion/core/src/physical_plan/file_format/csv.rs | 7 +++---- datafusion/core/src/physical_plan/file_format/json.rs | 7 +++---- .../core/src/physical_plan/file_format/parquet.rs | 8 ++++---- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 30e14a2afa59..73f07074581f 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -315,7 +315,7 @@ impl DataFrame { /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); - /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?; + /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?; /// df.describe().await.unwrap(); /// /// # Ok(()) @@ -906,7 +906,8 @@ impl DataFrame { /// Write a `DataFrame` to a CSV file. pub async fn write_csv(self, path: &str) -> Result<()> { let plan = self.session_state.create_physical_plan(&self.plan).await?; - plan_to_csv(&self.session_state, plan, path).await + let task_ctx = Arc::new(self.task_ctx()); + plan_to_csv(task_ctx, plan, path).await } /// Write a `DataFrame` to a Parquet file. @@ -916,13 +917,15 @@ impl DataFrame { writer_properties: Option, ) -> Result<()> { let plan = self.session_state.create_physical_plan(&self.plan).await?; - plan_to_parquet(&self.session_state, plan, path, writer_properties).await + let task_ctx = Arc::new(self.task_ctx()); + plan_to_parquet(task_ctx, plan, path, writer_properties).await } /// Executes a query and writes the results to a partitioned JSON file. pub async fn write_json(self, path: impl AsRef) -> Result<()> { let plan = self.session_state.create_physical_plan(&self.plan).await?; - plan_to_json(&self.session_state, plan, path).await + let task_ctx = Arc::new(self.task_ctx()); + plan_to_json(task_ctx, plan, path).await } /// Add an additional column to the DataFrame. diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 5b39e54dbe86..ce20425a5ba1 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1059,8 +1059,7 @@ impl SessionContext { plan: Arc, path: impl AsRef, ) -> Result<()> { - let state = self.state.read().clone(); - plan_to_csv(&state, plan, path).await + plan_to_csv(self.task_ctx(), plan, path).await } /// Executes a query and writes the results to a partitioned JSON file. @@ -1069,8 +1068,7 @@ impl SessionContext { plan: Arc, path: impl AsRef, ) -> Result<()> { - let state = self.state.read().clone(); - plan_to_json(&state, plan, path).await + plan_to_json(self.task_ctx(), plan, path).await } /// Executes a query and writes the results to a partitioned Parquet file. @@ -1080,8 +1078,7 @@ impl SessionContext { path: impl AsRef, writer_properties: Option, ) -> Result<()> { - let state = self.state.read().clone(); - plan_to_parquet(&state, plan, path, writer_properties).await + plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await } /// Get a new TaskContext to run in this session diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 9197d8f3babf..346c9915815d 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -19,7 +19,7 @@ use crate::datasource::file_format::file_type::FileCompressionType; use crate::error::{DataFusionError, Result}; -use crate::execution::context::{SessionState, TaskContext}; +use crate::execution::context::TaskContext; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, @@ -280,7 +280,7 @@ impl FileOpener for CsvOpener { } pub async fn plan_to_csv( - state: &SessionState, + task_ctx: Arc, plan: Arc, path: impl AsRef, ) -> Result<()> { @@ -300,8 +300,7 @@ pub async fn plan_to_csv( let path = fs_path.join(filename); let file = fs::File::create(path)?; let mut writer = csv::Writer::new(file); - let task_ctx = Arc::new(TaskContext::from(state)); - let stream = plan.execute(i, task_ctx)?; + let stream = plan.execute(i, task_ctx.clone())?; let handle: JoinHandle> = task::spawn(async move { stream diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 3556774a8002..6f26080122c6 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -18,7 +18,6 @@ //! Execution plan for reading line-delimited JSON files use crate::datasource::file_format::file_type::FileCompressionType; use crate::error::{DataFusionError, Result}; -use crate::execution::context::SessionState; use crate::execution::context::TaskContext; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::file_format::file_stream::{ @@ -224,7 +223,7 @@ impl FileOpener for JsonOpener { } pub async fn plan_to_json( - state: &SessionState, + task_ctx: Arc, plan: Arc, path: impl AsRef, ) -> Result<()> { @@ -244,8 +243,7 @@ pub async fn plan_to_json( let path = fs_path.join(filename); let file = fs::File::create(path)?; let mut writer = json::LineDelimitedWriter::new(file); - let task_ctx = Arc::new(TaskContext::from(state)); - let stream = plan.execute(i, task_ctx)?; + let stream = plan.execute(i, task_ctx.clone())?; let handle: JoinHandle> = task::spawn(async move { stream .map(|batch| writer.write(batch?)) @@ -277,6 +275,7 @@ mod tests { use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::execution::context::SessionState; use crate::physical_plan::file_format::chunked_store::ChunkedStore; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 84ccdcb411c5..cbd43fabe011 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -36,7 +36,7 @@ use crate::physical_plan::file_format::FileMeta; use crate::{ datasource::listing::FileRange, error::{DataFusionError, Result}, - execution::context::{SessionState, TaskContext}, + execution::context::TaskContext, physical_optimizer::pruning::PruningPredicate, physical_plan::{ expressions::PhysicalSortExpr, @@ -704,7 +704,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { /// Executes a query and writes the results to a partitioned Parquet file. pub async fn plan_to_parquet( - state: &SessionState, + task_ctx: Arc, plan: Arc, path: impl AsRef, writer_properties: Option, @@ -726,8 +726,7 @@ pub async fn plan_to_parquet( let file = fs::File::create(path)?; let mut writer = ArrowWriter::try_new(file, plan.schema(), writer_properties.clone())?; - let task_ctx = Arc::new(TaskContext::from(state)); - let stream = plan.execute(i, task_ctx)?; + let stream = plan.execute(i, task_ctx.clone())?; let handle: tokio::task::JoinHandle> = tokio::task::spawn(async move { stream @@ -803,6 +802,7 @@ mod tests { use crate::datasource::file_format::test_util::scan_format; use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; + use crate::execution::context::SessionState; use crate::execution::options::CsvReadOptions; use crate::physical_plan::displayable; use crate::physical_plan::file_format::partition_type_wrap;