diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 1c5047852203..7af4bc5539f5 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -387,7 +387,7 @@ async fn get_table( let config = ListingTableConfig::new(table_path).with_listing_options(options); let config = match table_format { - "parquet" => config.infer_schema(&state).await?, + "parquet" => config.infer_schema(&state.task_ctx()).await?, "tbl" => config.with_schema(Arc::new(get_tbl_tpch_table_schema(table))), "csv" => config.with_schema(Arc::new(get_tpch_table_schema(table))), _ => unreachable!(), diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e385f9f6483b..3e441a429a80 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -301,7 +301,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.13", ] [[package]] @@ -576,9 +576,9 @@ checksum = "13418e745008f7349ec7e449155f419a61b92b58a99cc3616942b926825ec76b" [[package]] name = "core-foundation-sys" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" [[package]] name = "cpufeatures" @@ -659,7 +659,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn 2.0.12", + "syn 2.0.13", ] [[package]] @@ -676,7 +676,7 @@ checksum = "2345488264226bf682893e25de0769f3360aac9957980ec49361b083ddaa5bc5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.13", ] [[package]] @@ -1103,7 +1103,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.13", ] [[package]] @@ -1507,9 +1507,9 @@ checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "libmimalloc-sys" -version = "0.1.30" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8c7cbf8b89019683667e347572e6d55a7df7ea36b0c4ce69961b0cde67b174" +checksum = "ef2c45001fb108f37d41bed8efd715769acb14674c1ce3e266ef0e317ef5f877" dependencies = [ "cc", "libc", @@ -1597,9 +1597,9 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "mimalloc" -version = "0.1.34" +version = "0.1.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dcb174b18635f7561a0c6c9fc2ce57218ac7523cf72c50af80e2d79ab8f3ba1" +checksum = "92666043c712f7f5c756d07443469ddcda6dd971cc15258bb7f3c3216fd1b7aa" dependencies = [ "libmimalloc-sys", ] @@ -1741,9 +1741,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.5.5" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1ea8f683b4f89a64181393742c041520a1a87e9775e6b4c0dd5a3281af05fc6" +checksum = "ec9cd6ca25e796a49fa242876d1c4de36a24a6da5258e9f0bc062dbf5e81c53b" dependencies = [ "async-trait", "base64", @@ -1969,18 +1969,18 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.54" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e472a104799c74b514a57226160104aa483546de37e839ec50e3c2e41dd87534" +checksum = "2b63bdb0cd06f1f4dedf69b254734f9b45af66e4a031e42a7480257d9898b435" dependencies = [ "unicode-ident", ] [[package]] name = "quick-xml" -version = "0.27.1" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffc053f057dd768a56f62cd7e434c42c831d296968997e9ac1f76ea7c2d14c41" +checksum = "e5c1a97b1bc42b1d550bfb48d4262153fe400a12bab1511821736f7eac76d7e2" dependencies = [ "memchr", "serde", @@ -2148,9 +2148,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.5" +version = "0.37.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e78cc525325c06b4a7ff02db283472f3c042b7ff0c391f96c6d5ac6f4f91b75" +checksum = "d097081ed288dfe45699b72f5b5d648e5f15d64d900c7080273baa20c16a6849" dependencies = [ "bitflags", "errno", @@ -2276,7 +2276,7 @@ checksum = "4c614d17805b093df4b147b51339e7e44bf05ef59fba1e45d83500bcfb4d8585" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.13", ] [[package]] @@ -2455,9 +2455,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.12" +version = "2.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79d9531f94112cfc3e4c8f5f02cb2b58f72c97b7efd85f70203cc6d8efda5927" +checksum = "4c9da457c5285ac1f936ebd076af6dac17a61cfe7826f2076b4d015cf47bc8ec" dependencies = [ "proc-macro2", "quote", @@ -2509,7 +2509,7 @@ checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.13", ] [[package]] @@ -2573,7 +2573,7 @@ checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" dependencies = [ "proc-macro2", "quote", - "syn 2.0.12", + "syn 2.0.13", ] [[package]] diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index dbd6751a4f76..dc74726d0d60 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -147,9 +147,9 @@ impl SchemaProvider for DynamicFileSchemaProvider { // if the inner schema provider didn't have a table by // that name, try to treat it as a listing table - let state = self.state.upgrade()?.read().clone(); + let task_ctx = self.state.upgrade()?.read().task_ctx(); let config = ListingTableConfig::new(ListingTableUrl::parse(name).ok()?) - .infer(&state) + .infer(&task_ctx) .await .ok()?; Some(Arc::new(ListingTable::try_new(config).ok()?)) diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/catalog.rs index 30cc2c8bd618..8edf37d1367d 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -32,7 +32,7 @@ use datafusion::{ TableProvider, }, error::Result, - execution::context::SessionState, + execution::context::TaskContext, prelude::SessionContext, }; use std::sync::RwLock; @@ -53,6 +53,7 @@ async fn main() -> Result<()> { .unwrap(); let mut ctx = SessionContext::new(); let state = ctx.state(); + let task_ctx = state.task_ctx(); let catlist = Arc::new(CustomCatalogList::new()); // use our custom catalog list for context. each context has a single catalog list. // context will by default have MemoryCatalogList @@ -61,7 +62,7 @@ async fn main() -> Result<()> { // intitialize our catalog and schemas let catalog = DirCatalog::new(); let parquet_schema = DirSchema::create( - &state, + &task_ctx, DirSchemaOpts { format: Arc::new(ParquetFormat::default()), dir: &repo_dir.join("parquet-testing").join("data"), @@ -70,7 +71,7 @@ async fn main() -> Result<()> { ) .await?; let csv_schema = DirSchema::create( - &state, + &task_ctx, DirSchemaOpts { format: Arc::new(CsvFormat::default()), dir: &repo_dir.join("testing").join("data").join("csv"), @@ -138,7 +139,10 @@ struct DirSchema { tables: RwLock>>, } impl DirSchema { - async fn create(state: &SessionState, opts: DirSchemaOpts<'_>) -> Result> { + async fn create( + task_ctx: &TaskContext, + opts: DirSchemaOpts<'_>, + ) -> Result> { let DirSchemaOpts { ext, dir, format } = opts; let mut tables = HashMap::new(); let listdir = std::fs::read_dir(dir).unwrap(); @@ -153,7 +157,7 @@ impl DirSchema { let opts = ListingOptions::new(format.clone()); let conf = ListingTableConfig::new(table_path) .with_listing_options(opts) - .infer_schema(state) + .infer_schema(task_ctx) .await?; let table = ListingTable::try_new(conf)?; tables.insert(filename, Arc::new(table) as Arc); diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index cb7b7c28d909..0b27d6367cec 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -59,7 +59,7 @@ impl FlightService for FlightServiceImpl { let ctx = SessionContext::new(); let schema = listing_options - .infer_schema(&ctx.state(), &table_path) + .infer_schema(&ctx.task_ctx(), &table_path) .await .unwrap(); diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs index 9d3b47546e39..c5c90a51b6bc 100644 --- a/datafusion/core/src/catalog/schema.rs +++ b/datafusion/core/src/catalog/schema.rs @@ -180,7 +180,7 @@ mod tests { let ctx = SessionContext::new(); let config = ListingTableConfig::new(table_path) - .infer(&ctx.state()) + .infer(&ctx.task_ctx()) .await .unwrap(); let table = ListingTable::try_new(config).unwrap(); diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 422733308996..9cbd31016f82 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -23,13 +23,13 @@ use std::sync::Arc; use arrow::datatypes::Schema; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; +use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use object_store::{GetResult, ObjectMeta, ObjectStore}; use super::FileFormat; use crate::avro_to_arrow::read_avro_schema_from_reader; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; @@ -48,7 +48,7 @@ impl FileFormat for AvroFormat { async fn infer_schema( &self, - _state: &SessionState, + _task_ctx: &TaskContext, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -70,7 +70,7 @@ impl FileFormat for AvroFormat { async fn infer_stats( &self, - _state: &SessionState, + _task_ctx: &TaskContext, _store: &Arc, _table_schema: SchemaRef, _object: &ObjectMeta, @@ -80,7 +80,7 @@ impl FileFormat for AvroFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _task_ctx: &TaskContext, conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -94,6 +94,7 @@ impl FileFormat for AvroFormat { mod tests { use super::*; use crate::datasource::file_format::test_util::scan_format; + use crate::execution::context::SessionState; use crate::physical_plan::collect; use crate::prelude::{SessionConfig, SessionContext}; use datafusion_common::cast::{ @@ -359,7 +360,15 @@ mod tests { let testdata = crate::test_util::arrow_test_data(); let store_root = format!("{testdata}/avro"); let format = AvroFormat {}; - scan_format(state, &format, &store_root, file_name, projection, limit).await + scan_format( + &state.task_ctx(), + &format, + &store_root, + file_name, + projection, + limit, + ) + .await } } @@ -379,7 +388,9 @@ mod tests { let format = AvroFormat {}; let testdata = crate::test_util::arrow_test_data(); let filename = "avro/alltypes_plain.avro"; - let result = scan_format(&state, &format, &testdata, filename, None, None).await; + let result = + scan_format(&state.task_ctx(), &format, &testdata, filename, None, None) + .await; assert!(matches!( result, Err(DataFusionError::NotImplemented(msg)) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index fcab651e3514..0bd3c5bc2e2c 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -37,10 +37,10 @@ use super::FileFormat; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +use datafusion_execution::TaskContext; /// The default file extension of csv files pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; @@ -115,7 +115,7 @@ impl FileFormat for CsvFormat { async fn infer_schema( &self, - _state: &SessionState, + _task_ctx: &TaskContext, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -142,7 +142,7 @@ impl FileFormat for CsvFormat { async fn infer_stats( &self, - _state: &SessionState, + _task_ctx: &TaskContext, _store: &Arc, _table_schema: SchemaRef, _object: &ObjectMeta, @@ -152,7 +152,7 @@ impl FileFormat for CsvFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _task_ctx: &TaskContext, conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -296,6 +296,7 @@ mod tests { use super::super::test_util::scan_format; use super::*; use crate::datasource::file_format::test_util::VariableStream; + use crate::execution::context::SessionState; use crate::physical_plan::collect; use crate::prelude::{SessionConfig, SessionContext}; use bytes::Bytes; @@ -430,7 +431,7 @@ mod tests { }; let inferred_schema = csv_format .infer_schema( - &state, + &state.task_ctx(), &(variable_object_store.clone() as Arc), &[object_meta], ) @@ -467,8 +468,9 @@ mod tests { projection: Option>, limit: Option, ) -> Result> { + let task_ctx = state.task_ctx(); let root = format!("{}/csv", crate::test_util::arrow_test_data()); let format = CsvFormat::default(); - scan_format(state, &format, &root, file_name, projection, limit).await + scan_format(&task_ctx, &format, &root, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index a66edab888bf..98ed99660809 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -37,10 +37,10 @@ use super::FileScanConfig; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::file_format::NdJsonExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +use datafusion_execution::TaskContext; /// The default file extension of json files pub const DEFAULT_JSON_EXTENSION: &str = ".json"; @@ -87,7 +87,7 @@ impl FileFormat for JsonFormat { async fn infer_schema( &self, - _state: &SessionState, + _task_ctx: &TaskContext, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -131,7 +131,7 @@ impl FileFormat for JsonFormat { async fn infer_stats( &self, - _state: &SessionState, + _task_ctx: &TaskContext, _store: &Arc, _table_schema: SchemaRef, _object: &ObjectMeta, @@ -141,7 +141,7 @@ impl FileFormat for JsonFormat { async fn create_physical_plan( &self, - _state: &SessionState, + _task_ctx: &TaskContext, conf: FileScanConfig, _filters: Option<&Arc>, ) -> Result> { @@ -158,6 +158,7 @@ mod tests { use object_store::local::LocalFileSystem; use super::*; + use crate::execution::context::SessionState; use crate::physical_plan::collect; use crate::prelude::{SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; @@ -256,21 +257,22 @@ mod tests { projection: Option>, limit: Option, ) -> Result> { + let task_ctx = state.task_ctx(); let filename = "tests/jsons/2.json"; let format = JsonFormat::default(); - scan_format(state, &format, ".", filename, projection, limit).await + scan_format(&task_ctx, &format, ".", filename, projection, limit).await } #[tokio::test] async fn infer_schema_with_limit() { let session = SessionContext::new(); - let ctx = session.state(); + let task_ctx = session.state().task_ctx(); let store = Arc::new(LocalFileSystem::new()) as _; let filename = "tests/jsons/schema_infer_limit.json"; let format = JsonFormat::default().with_schema_infer_max_rec(Some(3)); let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(filename)]) + .infer_schema(&task_ctx, &store, &[local_unpartitioned_file(filename)]) .await .expect("Schema inference"); diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 52da7285e373..b04b6fc479f4 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -36,8 +36,8 @@ use crate::error::Result; use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{ExecutionPlan, Statistics}; -use crate::execution::context::SessionState; use async_trait::async_trait; +use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use object_store::{ObjectMeta, ObjectStore}; @@ -58,7 +58,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { /// the files have schemas that cannot be merged. async fn infer_schema( &self, - state: &SessionState, + state: &TaskContext, store: &Arc, objects: &[ObjectMeta], ) -> Result; @@ -72,7 +72,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { /// TODO: should the file source return statistics for only columns referred to in the table schema? async fn infer_stats( &self, - state: &SessionState, + state: &TaskContext, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -82,7 +82,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug { /// according to this file format. async fn create_physical_plan( &self, - state: &SessionState, + state: &TaskContext, conf: FileScanConfig, filters: Option<&Arc>, ) -> Result>; @@ -106,7 +106,7 @@ pub(crate) mod test_util { use tokio::io::AsyncWrite; pub async fn scan_format( - state: &SessionState, + state: &TaskContext, format: &dyn FileFormat, store_root: &str, file_name: &str, diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index e51edf829e85..01b9d672e754 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -37,7 +37,8 @@ use crate::datasource::{ listing::ListingOptions, }; use crate::error::Result; -use crate::execution::context::{SessionConfig, SessionState}; +use crate::execution::context::SessionConfig; +use datafusion_execution::TaskContext; /// Options that control the reading of CSV files. /// @@ -351,16 +352,14 @@ pub trait ReadOptions<'a> { /// Infer and resolve the schema from the files/sources provided. async fn get_resolved_schema( &self, - config: &SessionConfig, - state: SessionState, + ctx: &TaskContext, table_path: ListingTableUrl, ) -> Result; /// helper function to reduce repetitive code. Infers the schema from sources if not provided. Infinite data sources not supported through this function. async fn _get_resolved_schema( &'a self, - config: &SessionConfig, - state: SessionState, + ctx: &TaskContext, table_path: ListingTableUrl, schema: Option<&'a Schema>, infinite: bool, @@ -371,8 +370,8 @@ pub trait ReadOptions<'a> { match (schema, infinite) { (Some(s), _) => Ok(Arc::new(s.to_owned())), (None, false) => Ok(self - .to_listing_options(config) - .infer_schema(&state, &table_path) + .to_listing_options(ctx.session_config()) + .infer_schema(ctx, &table_path) .await?), (None, true) => Err(DataFusionError::Plan( "Schema inference for infinite data sources is not supported." @@ -402,11 +401,10 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { async fn get_resolved_schema( &self, - config: &SessionConfig, - state: SessionState, + task_ctx: &TaskContext, table_path: ListingTableUrl, ) -> Result { - self._get_resolved_schema(config, state, table_path, self.schema, self.infinite) + self._get_resolved_schema(task_ctx, table_path, self.schema, self.infinite) .await } } @@ -426,14 +424,13 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { async fn get_resolved_schema( &self, - config: &SessionConfig, - state: SessionState, + task_ctx: &TaskContext, table_path: ListingTableUrl, ) -> Result { // with parquet we resolve the schema in all cases Ok(self - .to_listing_options(config) - .infer_schema(&state, &table_path) + .to_listing_options(task_ctx.session_config()) + .infer_schema(task_ctx, &table_path) .await?) } } @@ -453,11 +450,10 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { async fn get_resolved_schema( &self, - config: &SessionConfig, - state: SessionState, + task_ctx: &TaskContext, table_path: ListingTableUrl, ) -> Result { - self._get_resolved_schema(config, state, table_path, self.schema, self.infinite) + self._get_resolved_schema(task_ctx, table_path, self.schema, self.infinite) .await } } @@ -476,11 +472,10 @@ impl ReadOptions<'_> for AvroReadOptions<'_> { async fn get_resolved_schema( &self, - config: &SessionConfig, - state: SessionState, + task_ctx: &TaskContext, table_path: ListingTableUrl, ) -> Result { - self._get_resolved_schema(config, state, table_path, self.schema, self.infinite) + self._get_resolved_schema(task_ctx, table_path, self.schema, self.infinite) .await } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index ba18e9f62c25..7ff8a8a9128a 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -43,10 +43,10 @@ use crate::config::ConfigOptions; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; -use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::file_format::{ParquetExec, SchemaAdapter}; use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; +use datafusion_execution::TaskContext; /// The default file extension of parquet files pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; @@ -147,7 +147,7 @@ impl FileFormat for ParquetFormat { async fn infer_schema( &self, - state: &SessionState, + task_ctx: &TaskContext, store: &Arc, objects: &[ObjectMeta], ) -> Result { @@ -158,7 +158,7 @@ impl FileFormat for ParquetFormat { schemas.push(schema) } - let schema = if self.skip_metadata(state.config_options()) { + let schema = if self.skip_metadata(task_ctx.options()) { Schema::try_merge(clear_metadata(schemas)) } else { Schema::try_merge(schemas) @@ -169,7 +169,7 @@ impl FileFormat for ParquetFormat { async fn infer_stats( &self, - _state: &SessionState, + _task_ctx: &TaskContext, store: &Arc, table_schema: SchemaRef, object: &ObjectMeta, @@ -186,7 +186,7 @@ impl FileFormat for ParquetFormat { async fn create_physical_plan( &self, - state: &SessionState, + task_ctx: &TaskContext, conf: FileScanConfig, filters: Option<&Arc>, ) -> Result> { @@ -194,14 +194,14 @@ impl FileFormat for ParquetFormat { // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. let predicate = self - .enable_pruning(state.config_options()) + .enable_pruning(task_ctx.options()) .then(|| filters.cloned()) .flatten(); Ok(Arc::new(ParquetExec::new( conf, predicate, - self.metadata_size_hint(state.config_options()), + self.metadata_size_hint(task_ctx.options()), ))) } } @@ -612,6 +612,7 @@ pub(crate) mod test_util { #[cfg(test)] mod tests { use super::super::test_util::scan_format; + use crate::execution::context::SessionState; use crate::physical_plan::collect; use std::fmt::{Display, Formatter}; use std::ops::Range; @@ -658,10 +659,12 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; - let session = SessionContext::new(); - let ctx = session.state(); + let ctx = SessionContext::new(); let format = ParquetFormat::default(); - let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + let schema = format + .infer_schema(&ctx.task_ctx(), &store, &meta) + .await + .unwrap(); let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?; @@ -808,10 +811,10 @@ mod tests { assert_eq!(store.request_count(), 2); let session = SessionContext::new(); - let ctx = session.state(); + let task_ctx = session.task_ctx(); let format = ParquetFormat::default().with_metadata_size_hint(Some(9)); let schema = format - .infer_schema(&ctx, &store.upcast(), &meta) + .infer_schema(&task_ctx, &store.upcast(), &meta) .await .unwrap(); @@ -841,7 +844,7 @@ mod tests { let format = ParquetFormat::default().with_metadata_size_hint(Some(size_hint)); let schema = format - .infer_schema(&ctx, &store.upcast(), &meta) + .infer_schema(&task_ctx, &store.upcast(), &meta) .await .unwrap(); let stats = fetch_statistics( @@ -1293,8 +1296,9 @@ mod tests { projection: Option>, limit: Option, ) -> Result> { + let task_ctx = state.task_ctx(); let testdata = crate::test_util::parquet_test_data(); let format = ParquetFormat::default(); - scan_format(state, &format, &testdata, file_name, projection, limit).await + scan_format(&task_ctx, &format, &testdata, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index f85492d8c2ed..d7dbe4f64093 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -42,17 +42,18 @@ use crate::datasource::{ listing::ListingTableUrl, TableProvider, TableType, }; +use crate::execution::context::SessionState; use crate::logical_expr::TableProviderFilterPushDown; use crate::physical_plan; use crate::{ error::{DataFusionError, Result}, - execution::context::SessionState, logical_expr::Expr, physical_plan::{ empty::EmptyExec, file_format::FileScanConfig, project_schema, ExecutionPlan, Statistics, }, }; +use datafusion_execution::TaskContext; use super::PartitionedFile; @@ -149,8 +150,8 @@ impl ListingTableConfig { } /// Infer `ListingOptions` based on `table_path` suffix. - pub async fn infer_options(self, state: &SessionState) -> Result { - let store = state + pub async fn infer_options(self, task_ctx: &TaskContext) -> Result { + let store = task_ctx .runtime_env() .object_store(self.table_paths.get(0).unwrap())?; @@ -168,7 +169,7 @@ impl ListingTableConfig { let listing_options = ListingOptions::new(format) .with_file_extension(file_extension) - .with_target_partitions(state.config().target_partitions()); + .with_target_partitions(task_ctx.options().execution.target_partitions); Ok(Self { table_paths: self.table_paths, @@ -178,11 +179,11 @@ impl ListingTableConfig { } /// Infer the [`SchemaRef`] based on `table_path` suffix. Requires `self.options` to be set prior to using. - pub async fn infer_schema(self, state: &SessionState) -> Result { + pub async fn infer_schema(self, task_ctx: &TaskContext) -> Result { match self.options { Some(options) => { let schema = options - .infer_schema(state, self.table_paths.get(0).unwrap()) + .infer_schema(task_ctx, self.table_paths.get(0).unwrap()) .await?; Ok(Self { @@ -198,8 +199,11 @@ impl ListingTableConfig { } /// Convenience wrapper for calling `infer_options` and `infer_schema` - pub async fn infer(self, state: &SessionState) -> Result { - self.infer_options(state).await?.infer_schema(state).await + pub async fn infer(self, task_ctx: &TaskContext) -> Result { + self.infer_options(task_ctx) + .await? + .infer_schema(task_ctx) + .await } } @@ -430,17 +434,17 @@ impl ListingOptions { /// locally or ask a remote service to do it (e.g a scheduler). pub async fn infer_schema<'a>( &'a self, - state: &SessionState, + task_ctx: &TaskContext, table_path: &'a ListingTableUrl, ) -> Result { - let store = state.runtime_env().object_store(table_path)?; + let store = task_ctx.runtime_env().object_store(table_path)?; let files: Vec<_> = table_path .list_all_files(store.as_ref(), &self.file_extension) .try_collect() .await?; - self.format.infer_schema(state, &store, &files).await + self.format.infer_schema(task_ctx, &store, &files).await } } @@ -523,7 +527,7 @@ impl StatisticsCache { /// /// // Resolve the schema /// let resolved_schema = listing_options -/// .infer_schema(&session_state, &table_path) +/// .infer_schema(&session_state.task_ctx(), &table_path) /// .await?; /// /// let config = ListingTableConfig::new(table_path) @@ -672,8 +676,10 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { + let task_ctx = &state.task_ctx(); + let (partitioned_file_lists, statistics) = - self.list_files_for_scan(state, filters, limit).await?; + self.list_files_for_scan(task_ctx, filters, limit).await?; // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { @@ -716,7 +722,7 @@ impl TableProvider for ListingTable { self.options .format .create_physical_plan( - state, + task_ctx, FileScanConfig { object_store_url: self.table_paths.get(0).unwrap().object_store(), file_schema: Arc::clone(&self.file_schema), @@ -766,11 +772,11 @@ impl ListingTable { /// be distributed to different threads / executors. async fn list_files_for_scan<'a>( &'a self, - ctx: &'a SessionState, + task_ctx: &'a TaskContext, filters: &'a [Expr], limit: Option, ) -> Result<(Vec>, Statistics)> { - let store = ctx + let store = task_ctx .runtime_env() .object_store(self.table_paths.get(0).unwrap())?; // list files (with partitions) @@ -798,7 +804,7 @@ impl ListingTable { .options .format .infer_stats( - ctx, + task_ctx, &store, self.file_schema.clone(), &part_file.object_meta, @@ -902,7 +908,7 @@ mod tests { let state = ctx.state(); let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = opt.infer_schema(&state, &table_path).await?; + let schema = opt.infer_schema(&state.task_ctx(), &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); @@ -926,7 +932,7 @@ mod tests { let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) .with_collect_stat(false); - let schema = opt.infer_schema(&state, &table_path).await?; + let schema = opt.infer_schema(&state.task_ctx(), &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) .with_schema(schema); @@ -948,7 +954,10 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); let options = ListingOptions::new(Arc::new(ParquetFormat::default())); - let schema = options.infer_schema(&state, &table_path).await.unwrap(); + let schema = options + .infer_schema(&state.task_ctx(), &table_path) + .await + .unwrap(); use crate::physical_plan::expressions::col as physical_col; use std::ops::Add; @@ -1318,7 +1327,7 @@ mod tests { let table_path = ListingTableUrl::parse(filename).unwrap(); let config = ListingTableConfig::new(table_path) - .infer(&ctx.state()) + .infer(&ctx.task_ctx()) .await?; let table = ListingTable::try_new(config)?; Ok(Arc::new(table)) @@ -1350,7 +1359,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.task_ctx(), &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); @@ -1386,7 +1397,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.task_ctx(), &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 6c7b058520d8..b4085ba61896 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -59,6 +59,7 @@ impl TableProviderFactory for ListingTableFactory { state: &SessionState, cmd: &CreateExternalTable, ) -> datafusion_common::Result> { + let task_ctx = state.task_ctx(); let file_compression_type = FileCompressionType::from(cmd.file_compression_type); let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| { DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type)) @@ -139,7 +140,7 @@ impl TableProviderFactory for ListingTableFactory { let table_path = ListingTableUrl::parse(&cmd.location)?; let resolved_schema = match provided_schema { - None => options.infer_schema(state, &table_path).await?, + None => options.infer_schema(&task_ctx, &table_path).await?, Some(s) => s, }; let config = ListingTableConfig::new(table_path) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index eb9f6ba3c41d..32c088877ac7 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -711,11 +711,12 @@ impl SessionContext { table_paths: P, options: impl ReadOptions<'a>, ) -> Result { + let task_ctx = self.state().task_ctx(); let table_paths = table_paths.to_urls()?; let session_config = self.copied_config(); let listing_options = options.to_listing_options(&session_config); let resolved_schema = options - .get_resolved_schema(&session_config, self.state(), table_paths[0].clone()) + .get_resolved_schema(&task_ctx, table_paths[0].clone()) .await?; let config = ListingTableConfig::new_with_multi_paths(table_paths) .with_listing_options(listing_options) @@ -841,10 +842,11 @@ impl SessionContext { provided_schema: Option, sql_definition: Option, ) -> Result<()> { + let task_ctx = self.task_ctx(); let table_path = ListingTableUrl::parse(table_path)?; let resolved_schema = match (provided_schema, options.infinite_source) { (Some(s), _) => s, - (None, false) => options.infer_schema(&self.state(), &table_path).await?, + (None, false) => options.infer_schema(&task_ctx, &table_path).await?, (None, true) => { return Err(DataFusionError::Plan( "Schema inference for infinite data sources is not supported." @@ -1188,7 +1190,14 @@ impl QueryPlanner for DefaultQueryPlanner { } } -/// Execution context for registering data sources and executing queries +/// Holds state needed for planning queries within the context of a Session. +/// +/// * [`SessionContext`] is the main API for creating and executing queries and plans. +/// +/// * [`SessionState`] contains lower level information needed to register data sources and plan queries +/// +/// * [`TaskContext`] contains the portion of the session state needed for executing queries. +/// #[derive(Clone)] pub struct SessionState { /// UUID for the session @@ -1217,7 +1226,7 @@ pub struct SessionState { /// `CREATE EXTERNAL TABLE ... STORED AS ` for custom file /// formats other than those built into DataFusion table_factories: HashMap>, - /// Runtime environment + /// Runtime environment for managing memory, disk, object_stores, etc runtime_env: Arc, } diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index ed27dfac0317..ee6dcba7cdba 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -245,6 +245,7 @@ mod tests { async fn test_with_stores(store: Arc) -> Result<()> { let session_ctx = SessionContext::new(); let state = session_ctx.state(); + let task_ctx = session_ctx.task_ctx(); let url = Url::parse("file://").unwrap(); state @@ -256,7 +257,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let file_schema = AvroFormat {} - .infer_schema(&state, &store, &[meta.clone()]) + .infer_schema(&task_ctx, &store, &[meta.clone()]) .await?; let avro_exec = AvroExec::new(FileScanConfig { @@ -321,7 +322,7 @@ mod tests { let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); let actual_schema = AvroFormat {} - .infer_schema(&state, &object_store, &[meta.clone()]) + .infer_schema(&state.task_ctx(), &object_store, &[meta.clone()]) .await?; let mut fields = actual_schema.fields().clone(); @@ -394,7 +395,7 @@ mod tests { let object_store_url = ObjectStoreUrl::local_filesystem(); let meta = local_unpartitioned_file(filename); let file_schema = AvroFormat {} - .infer_schema(&state, &object_store, &[meta.clone()]) + .infer_schema(&state.task_ctx(), &object_store, &[meta.clone()]) .await?; let mut partitioned_file = PartitionedFile::from(meta); diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index ebbae7417889..5a3a8674454d 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -298,8 +298,9 @@ mod tests { state: &SessionState, file_compression_type: FileCompressionType, ) -> (ObjectStoreUrl, Vec>, SchemaRef) { + let task_ctx = state.task_ctx(); let store_url = ObjectStoreUrl::local_filesystem(); - let store = state.runtime_env().object_store(&store_url).unwrap(); + let store = task_ctx.runtime_env().object_store(&store_url).unwrap(); let filename = "1.json"; let file_groups = partitioned_file_groups( @@ -319,7 +320,7 @@ mod tests { .object_meta; let schema = JsonFormat::default() .with_file_compression_type(file_compression_type.to_owned()) - .infer_schema(state, &store, &[meta.clone()]) + .infer_schema(&task_ctx, &store, &[meta.clone()]) .await .unwrap(); diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 92be32f47649..a6b9606ee717 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1530,7 +1530,7 @@ mod tests { let state = session_ctx.state(); let task_ctx = state.task_ctx(); let parquet_exec = scan_format( - &state, + &task_ctx, &ParquetFormat::default(), &testdata, filename, @@ -1619,7 +1619,7 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let file_schema = ParquetFormat::default() - .infer_schema(&state, &store, &[meta.clone()]) + .infer_schema(&state.task_ctx(), &store, &[meta.clone()]) .await?; let group_empty = vec![vec![file_range(&meta, 0, 5)]]; @@ -1651,7 +1651,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let schema = ParquetFormat::default() - .infer_schema(&state, &store, &[meta.clone()]) + .infer_schema(&task_ctx, &store, &[meta.clone()]) .await .unwrap(); diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs index baf9d2d36a17..14e17d44fd3a 100644 --- a/datafusion/core/tests/parquet/page_pruning.rs +++ b/datafusion/core/tests/parquet/page_pruning.rs @@ -49,7 +49,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec { }; let schema = ParquetFormat::default() - .infer_schema(state, &store, &[meta.clone()]) + .infer_schema(&state.task_ctx(), &store, &[meta.clone()]) .await .unwrap(); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index b93ad02aa2c4..7a47cef6c8dc 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -576,7 +576,7 @@ async fn register_partitioned_alltypes_parquet( ListingTableUrl::parse(format!("mirror:///{}", store_paths[0])).unwrap(); let file_schema = options - .infer_schema(&ctx.state(), &store_path) + .infer_schema(&ctx.task_ctx(), &store_path) .await .expect("Parquet schema inference failed"); diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 5eeb237e187e..f62a8121dafb 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -92,18 +92,19 @@ async fn get_exec( let meta = object_store.head(&path).await.unwrap(); + let task_ctx = state.task_ctx(); let file_schema = format - .infer_schema(state, &object_store, &[meta.clone()]) + .infer_schema(&task_ctx, &object_store, &[meta.clone()]) .await .expect("Schema inference"); let statistics = format - .infer_stats(state, &object_store, file_schema.clone(), &meta) + .infer_stats(&task_ctx, &object_store, file_schema.clone(), &meta) .await .expect("Stats inference"); let file_groups = vec![vec![meta.into()]]; let exec = format .create_physical_plan( - state, + &task_ctx, FileScanConfig { object_store_url, file_schema, diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 67736edf6804..bd006920fed7 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -32,7 +32,8 @@ use std::sync::Arc; use url::Url; #[derive(Clone)] -/// Execution runtime environment. +/// Execution runtime environment for managing access to external +/// resources such as memory, disk and object stores. pub struct RuntimeEnv { /// Runtime memory management pub memory_pool: Arc, diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 9f73f767af05..6d07edda6633 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -32,6 +32,10 @@ use crate::{ }; /// Task Execution Context +/// +/// Contains all information needed to execte a DataFusion plan. It +/// does not contain information needed by the planner such as table +/// sources. pub struct TaskContext { /// Session Id session_id: String, @@ -103,6 +107,11 @@ impl TaskContext { &self.session_config } + /// Return the [`ConfigOptions`] associated with the Task + pub fn options(&self) -> &ConfigOptions { + self.session_config.options() + } + /// Return the `session_id` of this [TaskContext] pub fn session_id(&self) -> String { self.session_id.clone()