diff --git a/data-access/Cargo.toml b/data-access/Cargo.toml index aaa869f0ab86..951d74167d6a 100644 --- a/data-access/Cargo.toml +++ b/data-access/Cargo.toml @@ -36,6 +36,7 @@ path = "src/lib.rs" async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } futures = "0.3" +glob = "0.3.0" parking_lot = "0.12" tempfile = "3" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } diff --git a/data-access/src/object_store/local.rs b/data-access/src/object_store/local.rs index f4872ae17420..118f20564d91 100644 --- a/data-access/src/object_store/local.rs +++ b/data-access/src/object_store/local.rs @@ -230,4 +230,30 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_globbing() -> Result<()> { + let tmp = tempdir()?; + let a1_path = tmp.path().join("a1.txt"); + let a2_path = tmp.path().join("a2.txt"); + let b1_path = tmp.path().join("b1.txt"); + File::create(&a1_path)?; + File::create(&a2_path)?; + File::create(&b1_path)?; + + let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap()); + let mut all_files = HashSet::new(); + let mut files = LocalFileSystem.glob_file(&glob).await?; + while let Some(file) = files.next().await { + let file = file?; + assert_eq!(file.size(), 0); + all_files.insert(file.path().to_owned()); + } + + assert_eq!(all_files.len(), 2); + assert!(all_files.contains(a1_path.to_str().unwrap())); + assert!(all_files.contains(a2_path.to_str().unwrap())); + + Ok(()) + } } diff --git a/data-access/src/object_store/mod.rs b/data-access/src/object_store/mod.rs index 5d2f76e27931..39d1bf04d147 100644 --- a/data-access/src/object_store/mod.rs +++ b/data-access/src/object_store/mod.rs @@ -21,11 +21,15 @@ pub mod local; use std::fmt::Debug; use std::io::Read; +use std::path; +use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; -use futures::{AsyncRead, Stream, StreamExt}; +use futures::future::ready; +use futures::{AsyncRead, Stream, StreamExt, TryStreamExt}; +use glob::Pattern; use crate::{FileMeta, ListEntry, Result, SizedFile}; @@ -80,15 +84,42 @@ pub trait ObjectStore: Sync + Send + Debug { prefix: &str, suffix: &str, ) -> Result { - let file_stream = self.list_file(prefix).await?; - let suffix = suffix.to_owned(); - Ok(Box::pin(file_stream.filter(move |fr| { - let has_suffix = match fr { - Ok(f) => f.path().ends_with(&suffix), - Err(_) => true, - }; - async move { has_suffix } - }))) + self.glob_file_with_suffix(prefix, suffix).await + } + + /// Returns all the files matching `glob_pattern` + async fn glob_file(&self, glob_pattern: &str) -> Result { + if !contains_glob_start_char(glob_pattern) { + self.list_file(glob_pattern).await + } else { + let start_path = find_longest_search_path_without_glob_pattern(glob_pattern); + let file_stream = self.list_file(&start_path).await?; + let pattern = Pattern::new(glob_pattern).unwrap(); + Ok(Box::pin(file_stream.filter(move |fr| { + let matches_pattern = match fr { + Ok(f) => pattern.matches(f.path()), + Err(_) => true, + }; + async move { matches_pattern } + }))) + } + } + + /// Calls `glob_file` with a suffix filter + async fn glob_file_with_suffix( + &self, + glob_pattern: &str, + suffix: &str, + ) -> Result { + let files_to_consider = match contains_glob_start_char(glob_pattern) { + true => self.glob_file(glob_pattern).await, + false => self.list_file(glob_pattern).await, + }?; + + match suffix.is_empty() { + true => Ok(files_to_consider), + false => filter_suffix(files_to_consider, suffix), + } } /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, @@ -102,3 +133,113 @@ pub trait ObjectStore: Sync + Send + Debug { /// Get object reader for one file fn file_reader(&self, file: SizedFile) -> Result>; } + +const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; + +/// Determine whether the path contains a globbing character +fn contains_glob_start_char(path: &str) -> bool { + path.chars().any(|c| GLOB_START_CHARS.contains(&c)) +} + +/// Filters the file_stream to only contain files that end with suffix +fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> Result { + let suffix = suffix.to_owned(); + Ok(Box::pin( + file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))), + )) +} + +fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String { + // in case the glob_pattern is not actually a glob pattern, take the entire thing + if !contains_glob_start_char(glob_pattern) { + glob_pattern.to_string() + } else { + // take all the components of the path (left-to-right) which do not contain a glob pattern + let components_in_glob_pattern = Path::new(glob_pattern).components(); + let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new(); + for component_in_glob_pattern in components_in_glob_pattern { + let component_as_str = + component_in_glob_pattern.as_os_str().to_str().unwrap(); + if contains_glob_start_char(component_as_str) { + break; + } + path_buf_for_longest_search_path_without_glob_pattern + .push(component_in_glob_pattern); + } + + let mut result = path_buf_for_longest_search_path_without_glob_pattern + .to_str() + .unwrap() + .to_string(); + + // when we're not at the root, append a separator + if path_buf_for_longest_search_path_without_glob_pattern + .components() + .count() + > 1 + { + result.push(path::MAIN_SEPARATOR); + } + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_is_glob_path() -> Result<()> { + assert!(!contains_glob_start_char("/")); + assert!(!contains_glob_start_char("/test")); + assert!(!contains_glob_start_char("/test/")); + assert!(contains_glob_start_char("/test*")); + Ok(()) + } + + fn test_longest_base_path(input: &str, expected: &str) { + assert_eq!( + find_longest_search_path_without_glob_pattern(input), + expected, + "testing find_longest_search_path_without_glob_pattern with {}", + input + ); + } + + #[tokio::test] + async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> { + // no glob patterns, thus we get the full path (as-is) + test_longest_base_path("/", "/"); + test_longest_base_path("/a.txt", "/a.txt"); + test_longest_base_path("/a", "/a"); + test_longest_base_path("/a/", "/a/"); + test_longest_base_path("/a/b", "/a/b"); + test_longest_base_path("/a/b/", "/a/b/"); + test_longest_base_path("/a/b.txt", "/a/b.txt"); + test_longest_base_path("/a/b/c.txt", "/a/b/c.txt"); + // glob patterns, thus we build the longest path (os-specific) + use path::MAIN_SEPARATOR; + test_longest_base_path("/*.txt", &format!("{MAIN_SEPARATOR}")); + test_longest_base_path( + "/a/*b.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), + ); + test_longest_base_path( + "/a/*/b.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), + ); + test_longest_base_path( + "/a/b/[123]/file*.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"), + ); + test_longest_base_path( + "/a/b*.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"), + ); + test_longest_base_path( + "/a/b/**/c*.txt", + &format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"), + ); + Ok(()) + } +} diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index d066d8d9dd2c..9518986a14da 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -171,7 +171,7 @@ pub async fn pruned_partition_list( if table_partition_cols.is_empty() { return Ok(Box::pin( store - .list_file_with_suffix(table_path, file_extension) + .glob_file_with_suffix(table_path, file_extension) .await? .map(|f| { Ok(PartitionedFile { @@ -196,7 +196,7 @@ pub async fn pruned_partition_list( let table_partition_cols_stream = table_partition_cols.to_vec(); Ok(Box::pin( store - .list_file_with_suffix(table_path, file_extension) + .glob_file_with_suffix(table_path, file_extension) .await? .filter_map(move |f| { let stream_path = stream_path.clone(); @@ -231,7 +231,7 @@ pub async fn pruned_partition_list( // parse the partition values and serde them as a RecordBatch to filter them // TODO avoid collecting but have a streaming memory table instead let batches: Vec = store - .list_file_with_suffix(table_path, file_extension) + .glob_file_with_suffix(table_path, file_extension) .await? // TODO we set an arbitrary high batch size here, it does not matter as we list // all the files anyway. This number will need to be adjusted according to the object diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9e554c13d32e..6881f674b9e4 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -216,7 +216,7 @@ impl ListingOptions { path: &'a str, ) -> Result { let file_stream = object_store - .list_file_with_suffix(path, &self.file_extension) + .glob_file_with_suffix(path, &self.file_extension) .await? .map(move |file_meta| object_store.file_reader(file_meta?.sized_file)); let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?; diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 0f544ca89e0c..0b8189b56515 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1600,6 +1600,7 @@ mod tests { use crate::logical_plan::{binary_expr, lit, Operator}; use crate::physical_plan::functions::make_scalar_function; use crate::test; + use crate::test_util::parquet_test_data; use crate::variable::VarType; use crate::{ assert_batches_eq, assert_batches_sorted_eq, @@ -2124,7 +2125,41 @@ mod tests { "+----------+", ]; assert_batches_eq!(expected, &result); + Ok(()) + } + + #[tokio::test] + async fn read_with_glob_path() -> Result<()> { + let ctx = SessionContext::new(); + let df = ctx + .read_parquet( + format!("{}/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let results = df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows + assert_eq!(total_rows, 10); + Ok(()) + } + + #[tokio::test] + async fn read_from_registered_table_with_glob_path() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.register_parquet( + "test", + &format!("{}/alltypes_plain*.parquet", parquet_test_data()), + ParquetReadOptions::default(), + ) + .await?; + let df = ctx.sql("SELECT * FROM test").await?; + let results = df.collect().await?; + let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum(); + // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows + assert_eq!(total_rows, 10); Ok(()) }