From 9cfdba4c699ef05cf15debfd5f2b7fe8ebb0b6e2 Mon Sep 17 00:00:00 2001 From: Thomas Cameron Date: Mon, 11 Dec 2023 23:17:11 +0900 Subject: [PATCH] Sort filenames when reading parquet to ensure consistent schema (#6629) * update * FIXed * add parquet * update * update * update * try2 * update * update * Add comments * cargo fmt --------- Co-authored-by: Andrew Lamb --- .../src/datasource/file_format/parquet.rs | 85 +++++++++++++++++-- 1 file changed, 80 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 09e54558f12e..9db320fb9da4 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -164,6 +164,16 @@ fn clear_metadata( }) } +async fn fetch_schema_with_location( + store: &dyn ObjectStore, + file: &ObjectMeta, + metadata_size_hint: Option, +) -> Result<(Path, Schema)> { + let loc_path = file.location.clone(); + let schema = fetch_schema(store, file, metadata_size_hint).await?; + Ok((loc_path, schema)) +} + #[async_trait] impl FileFormat for ParquetFormat { fn as_any(&self) -> &dyn Any { @@ -176,13 +186,32 @@ impl FileFormat for ParquetFormat { store: &Arc, objects: &[ObjectMeta], ) -> Result { - let schemas: Vec<_> = futures::stream::iter(objects) - .map(|object| fetch_schema(store.as_ref(), object, self.metadata_size_hint)) + let mut schemas: Vec<_> = futures::stream::iter(objects) + .map(|object| { + fetch_schema_with_location( + store.as_ref(), + object, + self.metadata_size_hint, + ) + }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 .buffered(state.config_options().execution.meta_fetch_concurrency) .try_collect() .await?; + // Schema inference adds fields based the order they are seen + // which depends on the order the files are processed. For some + // object stores (like local file systems) the order returned from list + // is not deterministic. Thus, to ensure deterministic schema inference + // sort the files first. + // https://github.com/apache/arrow-datafusion/pull/6629 + schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2)); + + let schemas = schemas + .into_iter() + .map(|(_, schema)| schema) + .collect::>(); + let schema = if self.skip_metadata(state.config_options()) { Schema::try_merge(clear_metadata(schemas)) } else { @@ -1124,12 +1153,21 @@ pub(crate) mod test_util { batches: Vec, multi_page: bool, ) -> Result<(Vec, Vec)> { + // we need the tmp files to be sorted as some tests rely on the how the returning files are ordered + // https://github.com/apache/arrow-datafusion/pull/6629 + let tmp_files = { + let mut tmp_files: Vec<_> = (0..batches.len()) + .map(|_| NamedTempFile::new().expect("creating temp file")) + .collect(); + tmp_files.sort_by(|a, b| a.path().cmp(b.path())); + tmp_files + }; + // Each batch writes to their own file let files: Vec<_> = batches .into_iter() - .map(|batch| { - let mut output = NamedTempFile::new().expect("creating temp file"); - + .zip(tmp_files.into_iter()) + .map(|(batch, mut output)| { let builder = WriterProperties::builder(); let props = if multi_page { builder.set_data_page_row_count_limit(ROWS_PER_PAGE) @@ -1155,6 +1193,7 @@ pub(crate) mod test_util { .collect(); let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect(); + Ok((meta, files)) } @@ -1254,6 +1293,42 @@ mod tests { Ok(()) } + #[tokio::test] + async fn is_schema_stable() -> Result<()> { + let c1: ArrayRef = + Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")])); + + let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None])); + + let batch1 = + RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())]) + .unwrap(); + let batch2 = + RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())]) + .unwrap(); + + let store = Arc::new(LocalFileSystem::new()) as _; + let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?; + + let session = SessionContext::new(); + let ctx = session.state(); + let format = ParquetFormat::default(); + let schema = format.infer_schema(&ctx, &store, &meta).await.unwrap(); + + let order: Vec<_> = ["a", "b", "c", "d"] + .into_iter() + .map(|i| i.to_string()) + .collect(); + let coll: Vec<_> = schema + .all_fields() + .into_iter() + .map(|i| i.name().to_string()) + .collect(); + assert_eq!(coll, order); + + Ok(()) + } + #[derive(Debug)] struct RequestCountingObjectStore { inner: Arc,