diff --git a/Cargo.toml b/Cargo.toml index 7c1e13500cde..94c16170dac1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ arrow-flight = { version = "38.0.0", features = ["flight-sql-experimental"] } arrow-buffer = { version = "38.0.0", default-features = false } arrow-schema = { version = "38.0.0", default-features = false } arrow-array = { version = "38.0.0", default-features = false, features = ["chrono-tz"] } -parquet = { version = "38.0.0", features = ["arrow", "async"] } +parquet = { version = "38.0.0", features = ["arrow", "async", "object_store"] } [profile.release] codegen-units = 1 diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index dd541ab2f877..dcc0329b4b00 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -478,9 +478,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.2.23" +version = "3.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" +checksum = "eef2b3ded6a26dfaec672a742c93c8cf6b689220324da509ec5caa20de55dc83" dependencies = [ "atty", "bitflags", @@ -495,9 +495,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.2.18" +version = "3.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65" +checksum = "d756c5824fc5c0c1ee8e36000f576968dbcb2081def956c83fad6f40acd46f96" dependencies = [ "heck", "proc-macro-error", @@ -1511,9 +1511,9 @@ checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "libmimalloc-sys" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a558e3d911bc3c7bfc8c78bc580b404d6e51c1cefbf656e176a94b49b0df40" +checksum = "f4ac0e912c8ef1b735e92369695618dc5b1819f5a7bf3f167301a3ba1cea515e" dependencies = [ "cc", "libc", @@ -1530,9 +1530,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b085a4f2cde5781fc4b1717f2e86c62f5cda49de7ba99a7c2eae02b61c9064c" +checksum = "36eb31c1778188ae1e64398743890d0877fef36d11521ac60406b42016e8c2cf" [[package]] name = "lock_api" @@ -1601,9 +1601,9 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "mimalloc" -version = "0.1.36" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d88dad3f985ec267a3fcb7a1726f5cb1a7e8cad8b646e70a84f967210df23da" +checksum = "4e2894987a3459f3ffb755608bd82188f8ed00d0ae077f1edea29c068d639d98" dependencies = [ "libmimalloc-sys", ] @@ -1839,6 +1839,7 @@ dependencies = [ "lz4", "num", "num-bigint", + "object_store", "paste", "seq-macro", "snap", @@ -2158,9 +2159,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.37.13" +version = "0.37.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f79bef90eb6d984c72722595b5b1348ab39275a5e5123faca6863bf07d75a4e0" +checksum = "a0661814f891c57c930a610266415528da53c4933e6dea5fb350cbfe048a9ece" dependencies = [ "bitflags", "errno", @@ -2559,9 +2560,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "c3c786bf8134e5a3a166db9b29ab8f48134739014a3eca7bc6bfa95d673b136f" dependencies = [ "autocfg", "bytes", @@ -2572,14 +2573,14 @@ dependencies = [ "pin-project-lite", "socket2", "tokio-macros", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] name = "tokio-macros" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", @@ -2599,9 +2600,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" +checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ "futures-core", "pin-project-lite", @@ -2610,9 +2611,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ "bytes", "futures-core", @@ -2630,11 +2631,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "cf9cf6a813d3f40c88b0b6b6f29a5c95c6cdbf97c1f9cc53fb820200f5ad814d" dependencies = [ - "cfg-if", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2642,13 +2642,13 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" +checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.15", ] [[package]] diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 6d9e35c32e37..c5285c71b7bf 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -28,7 +28,6 @@ use std::ops::Range; use std::sync::Arc; use crate::config::ConfigOptions; -use crate::datasource::file_format::parquet::fetch_parquet_metadata; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -49,15 +48,14 @@ use crate::{ use arrow::error::ArrowError; use bytes::Bytes; use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::ObjectStore; use parquet::arrow::arrow_reader::ArrowReaderOptions; -use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader}; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; use parquet::basic::{ConvertedType, LogicalType}; -use parquet::errors::ParquetError; use parquet::file::{metadata::ParquetMetaData, properties::WriterProperties}; use parquet::schema::types::ColumnDescriptor; @@ -624,10 +622,8 @@ impl DefaultParquetFileReaderFactory { /// Implements [`AsyncFileReader`] for a parquet file in object storage struct ParquetFileReader { - store: Arc, - meta: ObjectMeta, file_metrics: ParquetFileMetrics, - metadata_size_hint: Option, + inner: ParquetObjectReader, } impl AsyncFileReader for ParquetFileReader { @@ -636,13 +632,7 @@ impl AsyncFileReader for ParquetFileReader { range: Range, ) -> BoxFuture<'_, parquet::errors::Result> { self.file_metrics.bytes_scanned.add(range.end - range.start); - - self.store - .get_range(&self.meta.location, range) - .map_err(|e| { - ParquetError::General(format!("AsyncChunkReader::get_bytes error: {e}")) - }) - .boxed() + self.inner.get_bytes(range) } fn get_byte_ranges( @@ -654,37 +644,13 @@ impl AsyncFileReader for ParquetFileReader { { let total = ranges.iter().map(|r| r.end - r.start).sum(); self.file_metrics.bytes_scanned.add(total); - - async move { - self.store - .get_ranges(&self.meta.location, &ranges) - .await - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_byte_ranges error: {e}" - )) - }) - } - .boxed() + self.inner.get_byte_ranges(ranges) } fn get_metadata( &mut self, ) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { - let metadata = fetch_parquet_metadata( - self.store.as_ref(), - &self.meta, - self.metadata_size_hint, - ) - .await - .map_err(|e| { - ParquetError::General(format!( - "AsyncChunkReader::get_metadata error: {e}" - )) - })?; - Ok(Arc::new(metadata)) - }) + self.inner.get_metadata() } } @@ -701,11 +667,15 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { file_meta.location().as_ref(), metrics, ); + let store = Arc::clone(&self.store); + let mut inner = ParquetObjectReader::new(store, file_meta.object_meta); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; Ok(Box::new(ParquetFileReader { - meta: file_meta.object_meta, - store: Arc::clone(&self.store), - metadata_size_hint, + inner, file_metrics, })) }