Skip to content

Commit

Permalink
Optionally skip metadata from schema when merging parquet files (#2985)
Browse files Browse the repository at this point in the history
* Optionally Strip metadata from schema when merging parquet files

* Improve comments
  • Loading branch information
alamb authored Aug 1, 2022
1 parent c7fa789 commit 833f588
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 66 deletions.
13 changes: 6 additions & 7 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 40 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,15 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
pub struct ParquetFormat {
enable_pruning: bool,
metadata_size_hint: Option<usize>,
skip_metadata: bool,
}

impl Default for ParquetFormat {
fn default() -> Self {
Self {
enable_pruning: true,
metadata_size_hint: None,
skip_metadata: true,
}
}
}
Expand Down Expand Up @@ -90,6 +92,37 @@ impl ParquetFormat {
pub fn metadata_size_hint(&self) -> Option<usize> {
self.metadata_size_hint
}

/// Tell the parquet reader to skip any metadata that may be in
/// the file Schema. This can help avoid schema conflicts due to
/// metadata. Defaults to true.
pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
self.skip_metadata = skip_metadata;
self
}

/// returns true if schema metadata will be cleared prior to
/// schema merging.
pub fn skip_metadata(&self) -> bool {
self.skip_metadata
}
}

/// Clears all metadata (Schema level and field level) on an iterator
/// of Schemas
fn clear_metadata(
schemas: impl IntoIterator<Item = Schema>,
) -> impl Iterator<Item = Schema> {
schemas.into_iter().map(|schema| {
let fields = schema
.fields()
.iter()
.map(|field| {
field.clone().with_metadata(None) // clear meta
})
.collect::<Vec<_>>();
Schema::new(fields)
})
}

#[async_trait]
Expand All @@ -109,7 +142,13 @@ impl FileFormat for ParquetFormat {
fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?;
schemas.push(schema)
}
let schema = Schema::try_merge(schemas)?;

let schema = if self.skip_metadata {
Schema::try_merge(clear_metadata(schemas))
} else {
Schema::try_merge(schemas)
}?;

Ok(Arc::new(schema))
}

Expand Down
23 changes: 19 additions & 4 deletions datafusion/core/src/execution/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,23 @@ pub struct ParquetReadOptions<'a> {
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<String>,
/// Should DataFusion parquet reader using the predicate to prune data,
/// Should DataFusion parquet reader use the predicate to prune data,
/// overridden by value on execution::context::SessionConfig
pub parquet_pruning: bool,
/// Tell the parquet reader to skip any metadata that may be in
/// the file Schema. This can help avoid schema conflicts due to
/// metadata. Defaults to true.
pub skip_metadata: bool,
}

impl<'a> Default for ParquetReadOptions<'a> {
fn default() -> Self {
let format_default = ParquetFormat::default();
Self {
file_extension: DEFAULT_PARQUET_EXTENSION,
table_partition_cols: vec![],
parquet_pruning: ParquetFormat::default().enable_pruning(),
parquet_pruning: format_default.enable_pruning(),
skip_metadata: format_default.skip_metadata(),
}
}
}
Expand All @@ -164,6 +170,14 @@ impl<'a> ParquetReadOptions<'a> {
self
}

/// Tell the parquet reader to skip any metadata that may be in
/// the file Schema. This can help avoid schema conflicts due to
/// metadata. Defaults to true.
pub fn skip_metadata(mut self, skip_metadata: bool) -> Self {
self.skip_metadata = skip_metadata;
self
}

/// Specify table_partition_cols for partition pruning
pub fn table_partition_cols(mut self, table_partition_cols: Vec<String>) -> Self {
self.table_partition_cols = table_partition_cols;
Expand All @@ -172,8 +186,9 @@ impl<'a> ParquetReadOptions<'a> {

/// Helper to convert these user facing options to `ListingTable` options
pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions {
let file_format =
ParquetFormat::default().with_enable_pruning(self.parquet_pruning);
let file_format = ParquetFormat::default()
.with_enable_pruning(self.parquet_pruning)
.with_skip_metadata(self.skip_metadata);

ListingOptions {
format: Arc::new(file_format),
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ impl FileScanConfig {
column_statistics: Some(table_cols_stats),
};

let table_schema = Arc::new(Schema::new(table_fields));
let table_schema = Arc::new(
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
);

(table_schema, table_stats)
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub mod decimal;
mod explain;
mod idenfifers;
pub mod information_schema;
mod parquet_schema;
mod partitioned_csv;
mod subqueries;
#[cfg(feature = "unicode_expressions")]
Expand Down
54 changes: 1 addition & 53 deletions datafusion/core/tests/sql/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::{collections::HashMap, fs, path::Path};
use std::{fs, path::Path};

use ::parquet::arrow::ArrowWriter;
use tempfile::TempDir;
Expand Down Expand Up @@ -173,58 +173,6 @@ async fn parquet_list_columns() {
assert_eq!(result.value(3), "xyz");
}

#[tokio::test]
async fn schema_merge_ignores_metadata() {
// Create two parquet files in same table with same schema but different metadata
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
let table_path = Path::new(&table_dir);

let mut non_empty_metadata: HashMap<String, String> = HashMap::new();
non_empty_metadata.insert("testing".to_string(), "metadata".to_string());

let fields = vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
];
let schemas = vec![
Arc::new(Schema::new_with_metadata(
fields.clone(),
non_empty_metadata.clone(),
)),
Arc::new(Schema::new(fields.clone())),
];

if let Ok(()) = fs::create_dir(table_path) {
for (i, schema) in schemas.iter().enumerate().take(2) {
let filename = format!("part-{}.parquet", i);
let path = table_path.join(&filename);
let file = fs::File::create(path).unwrap();
let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap();

// create mock record batch
let ids = Arc::new(Int32Array::from_slice(&[i as i32]));
let names = Arc::new(StringArray::from_slice(&["test"]));
let rec_batch =
RecordBatch::try_new(schema.clone(), vec![ids, names]).unwrap();

writer.write(&rec_batch).unwrap();
writer.close().unwrap();
}
}

// Read the parquet files into a dataframe to confirm results
// (no errors)
let ctx = SessionContext::new();
let df = ctx
.read_parquet(table_dir.to_str().unwrap(), ParquetReadOptions::default())
.await
.unwrap();
let result = df.collect().await.unwrap();

assert_eq!(result[0].schema().metadata(), result[1].schema().metadata());
}

#[tokio::test]
async fn parquet_query_with_max_min() {
let tmp_dir = TempDir::new().unwrap();
Expand Down
Loading

0 comments on commit 833f588

Please sign in to comment.