-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent Parquet Schema Inference #6366
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ use async_trait::async_trait; | |
use bytes::{BufMut, BytesMut}; | ||
use datafusion_common::DataFusionError; | ||
use datafusion_physical_expr::PhysicalExpr; | ||
use futures::{StreamExt, TryStreamExt}; | ||
use hashbrown::HashMap; | ||
use object_store::{ObjectMeta, ObjectStore}; | ||
use parquet::arrow::parquet_to_arrow_schema; | ||
|
@@ -151,12 +152,12 @@ impl FileFormat for ParquetFormat { | |
store: &Arc<dyn ObjectStore>, | ||
objects: &[ObjectMeta], | ||
) -> Result<SchemaRef> { | ||
let mut schemas = Vec::with_capacity(objects.len()); | ||
for object in objects { | ||
let schema = | ||
fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?; | ||
schemas.push(schema) | ||
} | ||
let schemas: Vec<_> = futures::stream::iter(objects) | ||
.map(|object| fetch_schema(store.as_ref(), object, self.metadata_size_hint)) | ||
.boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 | ||
.buffered(32) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the What do you think about potentially using the same value as in #6183:
(I sort of imagine some day someone will want to make that a configuration knob rather than a constant so using the same constant in the code will make it easier to find where they are used) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll make this change as a follow up There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
.try_collect() | ||
.await?; | ||
|
||
let schema = if self.skip_metadata(state.config_options()) { | ||
Schema::try_merge(clear_metadata(schemas)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I cannot explain why this works... But for some unknown reason it placates the compiler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found using streams was a similarly frustrating experience where the compiler give you an opaque error message (and opaque is being polite)