Skip to content

Commit

Permalink
Issue 2393: Support glob patterns for files (#2394)
Browse files Browse the repository at this point in the history
* implement globbing on ObjectStore

* remove unused code

* update list_file_with_suffix to use glob_file

* reworked code such that glob_file matches list_file and glob_file_with_suffix list_file_with_suffix

* rework the way we figure out what the greatest common base path is

* refactor tests on longested_search_path_without_glob_pattern

* added comment on / value

* remove unused use stmt

* rework implementation to find largest common path

* revert accidental/temp changes

* added tests to verify globbing

* find inspiration in glob crate to better deal with windows

* when running on windows, the expected path is slightly different (\ instead of /).

* fixed clippy issue

* added section on checks that are executed during a PR build

* updated section (and script) to make explicit this is about formatting

* replace with simple break

* make filter_suffix not-async as it does not need to be async

* no need to collect

* attempt to make tests more understandable

* actually format the code instead of only verifying

* added test with ** as glob pattern as well

* remove changes related to code formatting

* remove unneeded empty line

* run cargo fmt

* Update data-access/src/object_store/mod.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* use try_filter as suggested in pr review

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
timvw and tustvold authored May 5, 2022
1 parent 7304719 commit fbeb726
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 14 deletions.
1 change: 1 addition & 0 deletions data-access/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ path = "src/lib.rs"
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
futures = "0.3"
glob = "0.3.0"
parking_lot = "0.12"
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
26 changes: 26 additions & 0 deletions data-access/src/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,30 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_globbing() -> Result<()> {
let tmp = tempdir()?;
let a1_path = tmp.path().join("a1.txt");
let a2_path = tmp.path().join("a2.txt");
let b1_path = tmp.path().join("b1.txt");
File::create(&a1_path)?;
File::create(&a2_path)?;
File::create(&b1_path)?;

let glob = format!("{}/a*.txt", tmp.path().to_str().unwrap());
let mut all_files = HashSet::new();
let mut files = LocalFileSystem.glob_file(&glob).await?;
while let Some(file) = files.next().await {
let file = file?;
assert_eq!(file.size(), 0);
all_files.insert(file.path().to_owned());
}

assert_eq!(all_files.len(), 2);
assert!(all_files.contains(a1_path.to_str().unwrap()));
assert!(all_files.contains(a2_path.to_str().unwrap()));

Ok(())
}
}
161 changes: 151 additions & 10 deletions data-access/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ pub mod local;

use std::fmt::Debug;
use std::io::Read;
use std::path;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use futures::{AsyncRead, Stream, StreamExt};
use futures::future::ready;
use futures::{AsyncRead, Stream, StreamExt, TryStreamExt};
use glob::Pattern;

use crate::{FileMeta, ListEntry, Result, SizedFile};

Expand Down Expand Up @@ -80,15 +84,42 @@ pub trait ObjectStore: Sync + Send + Debug {
prefix: &str,
suffix: &str,
) -> Result<FileMetaStream> {
let file_stream = self.list_file(prefix).await?;
let suffix = suffix.to_owned();
Ok(Box::pin(file_stream.filter(move |fr| {
let has_suffix = match fr {
Ok(f) => f.path().ends_with(&suffix),
Err(_) => true,
};
async move { has_suffix }
})))
self.glob_file_with_suffix(prefix, suffix).await
}

/// Returns all the files matching `glob_pattern`
async fn glob_file(&self, glob_pattern: &str) -> Result<FileMetaStream> {
if !contains_glob_start_char(glob_pattern) {
self.list_file(glob_pattern).await
} else {
let start_path = find_longest_search_path_without_glob_pattern(glob_pattern);
let file_stream = self.list_file(&start_path).await?;
let pattern = Pattern::new(glob_pattern).unwrap();
Ok(Box::pin(file_stream.filter(move |fr| {
let matches_pattern = match fr {
Ok(f) => pattern.matches(f.path()),
Err(_) => true,
};
async move { matches_pattern }
})))
}
}

/// Calls `glob_file` with a suffix filter
async fn glob_file_with_suffix(
&self,
glob_pattern: &str,
suffix: &str,
) -> Result<FileMetaStream> {
let files_to_consider = match contains_glob_start_char(glob_pattern) {
true => self.glob_file(glob_pattern).await,
false => self.list_file(glob_pattern).await,
}?;

match suffix.is_empty() {
true => Ok(files_to_consider),
false => filter_suffix(files_to_consider, suffix),
}
}

/// Returns all the files in `prefix` if the `prefix` is already a leaf dir,
Expand All @@ -102,3 +133,113 @@ pub trait ObjectStore: Sync + Send + Debug {
/// Get object reader for one file
fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>>;
}

const GLOB_START_CHARS: [char; 3] = ['?', '*', '['];

/// Determine whether the path contains a globbing character
fn contains_glob_start_char(path: &str) -> bool {
path.chars().any(|c| GLOB_START_CHARS.contains(&c))
}

/// Filters the file_stream to only contain files that end with suffix
fn filter_suffix(file_stream: FileMetaStream, suffix: &str) -> Result<FileMetaStream> {
let suffix = suffix.to_owned();
Ok(Box::pin(
file_stream.try_filter(move |f| ready(f.path().ends_with(&suffix))),
))
}

fn find_longest_search_path_without_glob_pattern(glob_pattern: &str) -> String {
// in case the glob_pattern is not actually a glob pattern, take the entire thing
if !contains_glob_start_char(glob_pattern) {
glob_pattern.to_string()
} else {
// take all the components of the path (left-to-right) which do not contain a glob pattern
let components_in_glob_pattern = Path::new(glob_pattern).components();
let mut path_buf_for_longest_search_path_without_glob_pattern = PathBuf::new();
for component_in_glob_pattern in components_in_glob_pattern {
let component_as_str =
component_in_glob_pattern.as_os_str().to_str().unwrap();
if contains_glob_start_char(component_as_str) {
break;
}
path_buf_for_longest_search_path_without_glob_pattern
.push(component_in_glob_pattern);
}

let mut result = path_buf_for_longest_search_path_without_glob_pattern
.to_str()
.unwrap()
.to_string();

// when we're not at the root, append a separator
if path_buf_for_longest_search_path_without_glob_pattern
.components()
.count()
> 1
{
result.push(path::MAIN_SEPARATOR);
}
result
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_is_glob_path() -> Result<()> {
assert!(!contains_glob_start_char("/"));
assert!(!contains_glob_start_char("/test"));
assert!(!contains_glob_start_char("/test/"));
assert!(contains_glob_start_char("/test*"));
Ok(())
}

fn test_longest_base_path(input: &str, expected: &str) {
assert_eq!(
find_longest_search_path_without_glob_pattern(input),
expected,
"testing find_longest_search_path_without_glob_pattern with {}",
input
);
}

#[tokio::test]
async fn test_find_longest_search_path_without_glob_pattern() -> Result<()> {
// no glob patterns, thus we get the full path (as-is)
test_longest_base_path("/", "/");
test_longest_base_path("/a.txt", "/a.txt");
test_longest_base_path("/a", "/a");
test_longest_base_path("/a/", "/a/");
test_longest_base_path("/a/b", "/a/b");
test_longest_base_path("/a/b/", "/a/b/");
test_longest_base_path("/a/b.txt", "/a/b.txt");
test_longest_base_path("/a/b/c.txt", "/a/b/c.txt");
// glob patterns, thus we build the longest path (os-specific)
use path::MAIN_SEPARATOR;
test_longest_base_path("/*.txt", &format!("{MAIN_SEPARATOR}"));
test_longest_base_path(
"/a/*b.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
);
test_longest_base_path(
"/a/*/b.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
);
test_longest_base_path(
"/a/b/[123]/file*.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
);
test_longest_base_path(
"/a/b*.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}"),
);
test_longest_base_path(
"/a/b/**/c*.txt",
&format!("{MAIN_SEPARATOR}a{MAIN_SEPARATOR}b{MAIN_SEPARATOR}"),
);
Ok(())
}
}
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub async fn pruned_partition_list(
if table_partition_cols.is_empty() {
return Ok(Box::pin(
store
.list_file_with_suffix(table_path, file_extension)
.glob_file_with_suffix(table_path, file_extension)
.await?
.map(|f| {
Ok(PartitionedFile {
Expand All @@ -196,7 +196,7 @@ pub async fn pruned_partition_list(
let table_partition_cols_stream = table_partition_cols.to_vec();
Ok(Box::pin(
store
.list_file_with_suffix(table_path, file_extension)
.glob_file_with_suffix(table_path, file_extension)
.await?
.filter_map(move |f| {
let stream_path = stream_path.clone();
Expand Down Expand Up @@ -231,7 +231,7 @@ pub async fn pruned_partition_list(
// parse the partition values and serde them as a RecordBatch to filter them
// TODO avoid collecting but have a streaming memory table instead
let batches: Vec<RecordBatch> = store
.list_file_with_suffix(table_path, file_extension)
.glob_file_with_suffix(table_path, file_extension)
.await?
// TODO we set an arbitrary high batch size here, it does not matter as we list
// all the files anyway. This number will need to be adjusted according to the object
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl ListingOptions {
path: &'a str,
) -> Result<SchemaRef> {
let file_stream = object_store
.list_file_with_suffix(path, &self.file_extension)
.glob_file_with_suffix(path, &self.file_extension)
.await?
.map(move |file_meta| object_store.file_reader(file_meta?.sized_file));
let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?;
Expand Down
35 changes: 35 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,7 @@ mod tests {
use crate::logical_plan::{binary_expr, lit, Operator};
use crate::physical_plan::functions::make_scalar_function;
use crate::test;
use crate::test_util::parquet_test_data;
use crate::variable::VarType;
use crate::{
assert_batches_eq, assert_batches_sorted_eq,
Expand Down Expand Up @@ -2124,7 +2125,41 @@ mod tests {
"+----------+",
];
assert_batches_eq!(expected, &result);
Ok(())
}

#[tokio::test]
async fn read_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();

let df = ctx
.read_parquet(
format!("{}/alltypes_plain*.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;
let results = df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
// alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
assert_eq!(total_rows, 10);
Ok(())
}

#[tokio::test]
async fn read_from_registered_table_with_glob_path() -> Result<()> {
let ctx = SessionContext::new();

ctx.register_parquet(
"test",
&format!("{}/alltypes_plain*.parquet", parquet_test_data()),
ParquetReadOptions::default(),
)
.await?;
let df = ctx.sql("SELECT * FROM test").await?;
let results = df.collect().await?;
let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
// alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
assert_eq!(total_rows, 10);
Ok(())
}

Expand Down

0 comments on commit fbeb726

Please sign in to comment.