Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 2393: Support glob patterns for files #2394

Merged
merged 27 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3e0b209
implement globbing on ObjectStore
timvw May 3, 2022
c9c9a19
remove unused code
timvw May 3, 2022
7b39083
update list_file_with_suffix to use glob_file
timvw May 3, 2022
5750e43
reworked code such that glob_file matches list_file and glob_file_wit…
timvw May 3, 2022
7930206
rework the way we figure out what the greatest common base path is
timvw May 3, 2022
eb1e32e
refactor tests on longested_search_path_without_glob_pattern
timvw May 3, 2022
79e45a9
added comment on / value
timvw May 3, 2022
bbc4bbf
remove unused use stmt
timvw May 3, 2022
5244250
rework implementation to find largest common path
timvw May 3, 2022
77f07f9
revert accidental/temp changes
timvw May 3, 2022
445c95b
added tests to verify globbing
timvw May 3, 2022
485f59d
find inspiration in glob crate to better deal with windows
timvw May 4, 2022
8589ef6
when running on windows, the expected path is slightly different (\ i…
timvw May 4, 2022
1cfe2d6
fixed clippy issue
timvw May 4, 2022
c8cbc73
added section on checks that are executed during a PR build
timvw May 4, 2022
b00c430
updated section (and script) to make explicit this is about formatting
timvw May 4, 2022
b4edeb9
replace with simple break
timvw May 4, 2022
a49a63a
make filter_suffix not-async as it does not need to be async
timvw May 4, 2022
e3ffac3
no need to collect
timvw May 4, 2022
b0da3a7
attempt to make tests more understandable
timvw May 4, 2022
1f8f502
actually format the code instead of only verifying
timvw May 4, 2022
2a26116
added test with ** as glob pattern as well
timvw May 4, 2022
4035050
remove changes related to code formatting
timvw May 4, 2022
ca1ef1e
remove unneeded empty line
timvw May 4, 2022
130af0f
run cargo fmt
timvw May 4, 2022
8cedd50
Update data-access/src/object_store/mod.rs
timvw May 4, 2022
223afed
use try_filter as suggested in pr review
timvw May 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-access/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ path = "src/lib.rs"
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
futures = "0.3"
glob = "0.3.0"
parking_lot = "0.12"
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
26 changes: 26 additions & 0 deletions data-access/src/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,30 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_globbing() -> Result<()> {
timvw marked this conversation as resolved.
Show resolved Hide resolved
let tmp = tempdir()?;
let a1_path = tmp.path().join("a1.txt");
let a2_path = tmp.path().join("a2.txt");
let b1_path = tmp.path().join("b1.txt");
File::create(&a1_path)?;
File::create(&a2_path)?;
File::create(&b1_path)?;

let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap());
let mut all_files = HashSet::new();
let mut files = LocalFileSystem.glob_file(&glob).await?;
while let Some(file) = files.next().await {
let file = file?;
assert_eq!(file.size(), 0);
all_files.insert(file.path().to_owned());
}

assert_eq!(all_files.len(), 2);
assert!(all_files.contains(a1_path.to_str().unwrap()));
assert!(all_files.contains(a2_path.to_str().unwrap()));

Ok(())
}
}
161 changes: 151 additions & 10 deletions data-access/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ pub mod local;

use std::fmt::Debug;
use std::io::Read;
use std::path;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use futures::{AsyncRead, Stream, StreamExt};
use futures::future::ready;
use futures::{AsyncRead, Stream, StreamExt, TryStreamExt};
use glob::Pattern;

use crate::{FileMeta, ListEntry, Result, SizedFile};

Expand Down Expand Up @@ -80,15 +84,42 @@ pub trait ObjectStore: Sync + Send + Debug {
prefix: &str,
suffix: &str,
) -> Result<FileMetaStream> {
let file_stream = self.list_file(prefix).await?;
let suffix = suffix.to_owned();
Ok(Box::pin(file_stream.filter(move |fr| {
let has_suffix = match fr {
Ok(f) => f.path().ends_with(&suffix),
Err(_) => true,
timvw marked this conversation as resolved.
Show resolved Hide resolved
};
async move { has_suffix }
})))
self.glob_file_with_suffix(prefix, suffix).await
}

/// Returns all the files matching `glob_pattern`
async fn glob_file(&self, glob_pattern: &str) -> Result<FileMetaStream> {
if !contains_glob_start_char(glob_pattern) {
self.list_file(glob_pattern).await
} else {
let start_path = find_longest_search_path_without_glob_pattern(glob_pattern);
let file_stream = self.list_file(&start_path).await?;
let pattern = Pattern::new(glob_pattern).unwrap();
Ok(Box::pin(file_stream.filter(move |fr| {
let matches_pattern = match fr {
Ok(f) => pattern.matches(f.path()),
Err(_) => true,
};
async move { matches_pattern }
})))
}
}

/// Calls `glob_file` with a suffix filter
async fn glob_file_with_suffix(
&self,
glob_pattern: &str,
suffix: &str,
) -> Result<FileMetaStream> {
let files_to_consider = match contains_glob_start_char(glob_pattern) {
true => self.glob_file(glob_pattern).await,
false => self.list_file(glob_pattern).await,
}?;

match suffix.is_empty() {
true => Ok(files_to_consider),
false => filter_suffix(files_to_consider, suffix),
}
}

/// Returns all the files in `prefix` if the `prefix` is already a leaf dir,
Expand All @@ -102,3 +133,113 @@ pub trait ObjectStore: Sync + Send + Debug {
/// Get object reader for one file
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
}

const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];

/// Determine whether the path contains a globbing character
fn contains_glob_start_char(path: &str) -> bool {
path.chars().any(|c| GLOB_START_CHARS.contains(&c))
}

/// Filters the file_stream to only contain files that end with suffix
fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> Result<FileMetaStream> {
let suffix = suffix.to_owned();
Ok(Box::pin(
file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))),
))
}

fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String {
// in case the glob_pattern is not actually a glob pattern, take the entire thing
if !contains_glob_start_char(glob_pattern) {
glob_pattern.to_string()
} else {
// take all the components of the path (left-to-right) which do not contain a glob pattern
let components_in_glob_pattern = Path::new(glob_pattern).components();
let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new();
for component_in_glob_pattern in components_in_glob_pattern {
let component_as_str =
component_in_glob_pattern.as_os_str().to_str().unwrap();
if contains_glob_start_char(component_as_str) {
break;
}
path_buf_for_longest_search_path_without_glob_pattern
.push(component_in_glob_pattern);
}

let mut result = path_buf_for_longest_search_path_without_glob_pattern
.to_str()
.unwrap()
.to_string();

// when we're not at the root, append a separator
timvw marked this conversation as resolved.
Show resolved Hide resolved
if path_buf_for_longest_search_path_without_glob_pattern
.components()
.count()
> 1
{
result.push(path::MAIN_SEPARATOR);
}
result
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_is_glob_path() -> Result<()> {
assert!(!contains_glob_start_char("/"));
assert!(!contains_glob_start_char("/test"));
assert!(!contains_glob_start_char("/test/"));
assert!(contains_glob_start_char("/test*"));
Ok(())
}

fn test_longest_base_path(input: &str, expected: &str) {
assert_eq!(
find_longest_search_path_without_glob_pattern(input),
timvw marked this conversation as resolved.
Show resolved Hide resolved
expected,
"testing find_longest_search_path_without_glob_pattern with {}",
input
);
}

#[tokio::test]
async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> {
// no glob patterns, thus we get the full path (as-is)
test_longest_base_path("/", "/");
test_longest_base_path("/a.txt", "/a.txt");
test_longest_base_path("/a", "/a");
test_longest_base_path("/a/", "/a/");
test_longest_base_path("/a/b", "/a/b");
test_longest_base_path("/a/b/", "/a/b/");
test_longest_base_path("/a/b.txt", "/a/b.txt");
test_longest_base_path("/a/b/c.txt", "/a/b/c.txt");
// glob patterns, thus we build the longest path (os-specific)
use path::MAIN_SEPARATOR;
test_longest_base_path("/*.txt", &format!("{MAIN_SEPARATOR}"));
test_longest_base_path(
"/a/*b.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
);
test_longest_base_path(
"/a/*/b.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
);
test_longest_base_path(
"/a/b/[123]/file*.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
);
test_longest_base_path(
"/a/b*.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
);
test_longest_base_path(
"/a/b/**/c*.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
);
Ok(())
}
}
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub async fn pruned_partition_list(
if table_partition_cols.is_empty() {
return Ok(Box::pin(
store
.list_file_with_suffix(table_path, file_extension)
.glob_file_with_suffix(table_path, file_extension)
.await?
.map(|f| {
Ok(PartitionedFile {
Expand All @@ -196,7 +196,7 @@ pub async fn pruned_partition_list(
let table_partition_cols_stream = table_partition_cols.to_vec();
Ok(Box::pin(
store
.list_file_with_suffix(table_path, file_extension)
.glob_file_with_suffix(table_path, file_extension)
.await?
.filter_map(move |f| {
let stream_path = stream_path.clone();
Expand Down Expand Up @@ -231,7 +231,7 @@ pub async fn pruned_partition_list(
// parse the partition values and serde them as a RecordBatch to filter them
// TODO avoid collecting but have a streaming memory table instead
let batches: Vec<RecordBatch> = store
.list_file_with_suffix(table_path, file_extension)
.glob_file_with_suffix(table_path, file_extension)
.await?
// TODO we set an arbitrary high batch size here, it does not matter as we list
// all the files anyway. This number will need to be adjusted according to the object
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl ListingOptions {
path: &'a str,
) -> Result<SchemaRef> {
let file_stream = object_store
.list_file_with_suffix(path, &self.file_extension)
.glob_file_with_suffix(path, &self.file_extension)
.await?
.map(move |file_meta| object_store.file_reader(file_meta?.sized_file));
let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?;
Expand Down
35 changes: 35 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,7 @@ mod tests {
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::make_scalar_function;
use crate::test;
use crate::test_util::parquet_test_data;
use crate::variable::VarType;
use crate::{
assert_batches_eq, assert_batches_sorted_eq,
Expand Down Expand Up @@ -2124,7 +2125,41 @@ mod tests {
"+----------+",
];
assert_batches_eq!(expected, &result);
Ok(())
}

#[tokio::test]
async fn read_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();

let df = ctx
.read_parquet(
format!("{}/alltypes_plain*.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;
let results = df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
// alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
assert_eq!(total_rows, 10);
Ok(())
}

#[tokio::test]
async fn read_from_registered_table_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();

ctx.register_parquet(
"test",
&format!("{}/alltypes_plain*.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;
let df = ctx.sql("SELECT * FROM test").await?;
let results = df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
// alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
assert_eq!(total_rows, 10);
Ok(())
}

Expand Down