Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Remove references from SessionState from physical_plan #5455

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl DataFrame {
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
/// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?;
/// df.describe().await.unwrap();
///
/// # Ok(())
Expand Down Expand Up @@ -906,7 +906,8 @@ impl DataFrame {
/// Write a `DataFrame` to a CSV file.
pub async fn write_csv(self, path: &str) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
plan_to_csv(&self.session_state, plan, path).await
let task_ctx = Arc::new(self.task_ctx());
plan_to_csv(task_ctx, plan, path).await
}

/// Write a `DataFrame` to a Parquet file.
Expand All @@ -916,13 +917,15 @@ impl DataFrame {
writer_properties: Option<WriterProperties>,
) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
plan_to_parquet(&self.session_state, plan, path, writer_properties).await
let task_ctx = Arc::new(self.task_ctx());
plan_to_parquet(task_ctx, plan, path, writer_properties).await
}

/// Executes a query and writes the results to a partitioned JSON file.
pub async fn write_json(self, path: impl AsRef<str>) -> Result<()> {
let plan = self.session_state.create_physical_plan(&self.plan).await?;
plan_to_json(&self.session_state, plan, path).await
let task_ctx = Arc::new(self.task_ctx());
plan_to_json(task_ctx, plan, path).await
}

/// Add an additional column to the DataFrame.
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,8 +1059,7 @@ impl SessionContext {
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let state = self.state.read().clone();
plan_to_csv(&state, plan, path).await
plan_to_csv(self.task_ctx(), plan, path).await
}

/// Executes a query and writes the results to a partitioned JSON file.
Expand All @@ -1069,8 +1068,7 @@ impl SessionContext {
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let state = self.state.read().clone();
plan_to_json(&state, plan, path).await
plan_to_json(self.task_ctx(), plan, path).await
}

/// Executes a query and writes the results to a partitioned Parquet file.
Expand All @@ -1080,8 +1078,7 @@ impl SessionContext {
path: impl AsRef<str>,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
let state = self.state.read().clone();
plan_to_parquet(&state, plan, path, writer_properties).await
plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await
}

/// Get a new TaskContext to run in this session
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

use crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::{SessionState, TaskContext};
use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
Expand Down Expand Up @@ -280,7 +280,7 @@ impl FileOpener for CsvOpener {
}

pub async fn plan_to_csv(
state: &SessionState,
task_ctx: Arc<TaskContext>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the pR is to remove the use of SessionState and hoist the creation of TaskContext into SessionContext

plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
Expand All @@ -300,8 +300,7 @@ pub async fn plan_to_csv(
let path = fs_path.join(filename);
let file = fs::File::create(path)?;
let mut writer = csv::Writer::new(file);
let task_ctx = Arc::new(TaskContext::from(state));
let stream = plan.execute(i, task_ctx)?;
let stream = plan.execute(i, task_ctx.clone())?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that TaskContext is simply a clone of the state on SessionState: https://github.com/apache/arrow-datafusion/blob/a95e0ec2fd929aae1c2f67148243eb4825d81a3b/datafusion/core/src/execution/context.rs#L2157-L2173

so making it once and cloning is probably better than making multiple TaskContexts (each that have a bunch of Arcs)


let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Execution plan for reading line-delimited JSON files
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::execution::context::TaskContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::file_format::file_stream::{
Expand Down Expand Up @@ -224,7 +223,7 @@ impl FileOpener for JsonOpener {
}

pub async fn plan_to_json(
state: &SessionState,
task_ctx: Arc<TaskContext>,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
Expand All @@ -244,8 +243,7 @@ pub async fn plan_to_json(
let path = fs_path.join(filename);
let file = fs::File::create(path)?;
let mut writer = json::LineDelimitedWriter::new(file);
let task_ctx = Arc::new(TaskContext::from(state));
let stream = plan.execute(i, task_ctx)?;
let stream = plan.execute(i, task_ctx.clone())?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(batch?))
Expand Down Expand Up @@ -277,6 +275,7 @@ mod tests {
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
use crate::prelude::NdJsonReadOptions;
use crate::prelude::*;
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::physical_plan::file_format::FileMeta;
use crate::{
datasource::listing::FileRange,
error::{DataFusionError, Result},
execution::context::{SessionState, TaskContext},
execution::context::TaskContext,
physical_optimizer::pruning::PruningPredicate,
physical_plan::{
expressions::PhysicalSortExpr,
Expand Down Expand Up @@ -704,7 +704,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {

/// Executes a query and writes the results to a partitioned Parquet file.
pub async fn plan_to_parquet(
state: &SessionState,
task_ctx: Arc<TaskContext>,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
writer_properties: Option<WriterProperties>,
Expand All @@ -726,8 +726,7 @@ pub async fn plan_to_parquet(
let file = fs::File::create(path)?;
let mut writer =
ArrowWriter::try_new(file, plan.schema(), writer_properties.clone())?;
let task_ctx = Arc::new(TaskContext::from(state));
let stream = plan.execute(i, task_ctx)?;
let stream = plan.execute(i, task_ctx.clone())?;
let handle: tokio::task::JoinHandle<Result<()>> =
tokio::task::spawn(async move {
stream
Expand Down Expand Up @@ -803,6 +802,7 @@ mod tests {
use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::context::SessionState;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for tests, which I think is ok to depend on datafusion core

use crate::execution::options::CsvReadOptions;
use crate::physical_plan::displayable;
use crate::physical_plan::file_format::partition_type_wrap;
Expand Down