Skip to content

Commit

Permalink
Filter exact list prefix matches for MemoryStore and HttpStore (#3712) (
Browse files Browse the repository at this point in the history
#3713)

* Filter exact list prefix matches for MemoryStore and HttpStore (#3712)

* Update object_store/src/lib.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
tustvold and alamb authored Feb 13, 2023
1 parent 5ffc0a8 commit 38a79ae
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
14 changes: 12 additions & 2 deletions object_store/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt, Snafu};
use tokio::io::AsyncWrite;
use url::Url;
Expand Down Expand Up @@ -163,6 +164,7 @@ impl ObjectStore for HttpStore {
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let status = self.client.list(prefix, "infinity").await?;
Ok(futures::stream::iter(
status
Expand All @@ -172,7 +174,9 @@ impl ObjectStore for HttpStore {
.map(|response| {
response.check_ok()?;
response.object_meta(self.client.base_url())
}),
})
// Filter out exact prefix matches
.filter_ok(move |r| r.location.as_ref().len() > prefix_len),
)
.boxed())
}
Expand All @@ -186,7 +190,13 @@ impl ObjectStore for HttpStore {
for response in status.response {
response.check_ok()?;
match response.is_dir() {
false => objects.push(response.object_meta(self.client.base_url())?),
false => {
let meta = response.object_meta(self.client.base_url())?;
// Filter out exact prefix matches
if meta.location.as_ref().len() > prefix_len {
objects.push(meta);
}
}
true => {
let path = response.path(self.client.base_url())?;
// Exclude the current object
Expand Down
20 changes: 20 additions & 0 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,9 +911,29 @@ mod tests {
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[location1.clone()]);

let result = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(result.objects.len(), 1);
assert_eq!(result.objects[0].location, location1);
assert_eq!(result.common_prefixes, &[]);

// Listing an existing path (file) should return an empty list:
// https://github.com/apache/arrow-rs/issues/3712
let content_list = flatten_list_stream(storage, Some(&location1))
.await
.unwrap();
assert_eq!(content_list, &[]);

let list = storage.list_with_delimiter(Some(&location1)).await.unwrap();
assert_eq!(list.objects, &[]);
assert_eq!(list.common_prefixes, &[]);

let prefix = Path::from("foo/x");
let content_list = flatten_list_stream(storage, Some(&prefix)).await.unwrap();
assert_eq!(content_list, &[]);

let list = storage.list_with_delimiter(Some(&prefix)).await.unwrap();
assert_eq!(list.objects, &[]);
assert_eq!(list.common_prefixes, &[]);
}

pub(crate) async fn list_with_delimiter(storage: &DynObjectStore) {
Expand Down
21 changes: 17 additions & 4 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,21 @@ impl ObjectStore for InMemory {
&self,
prefix: Option<&Path>,
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let root = Path::default();
let prefix = prefix.unwrap_or(&root);
let last_modified = Utc::now();

let storage = self.storage.read();
let values: Vec<_> = storage
.iter()
.filter(move |(key, _)| prefix.map(|p| key.prefix_matches(p)).unwrap_or(true))
.map(move |(key, value)| {
.range((prefix)..)
.take_while(|(key, _)| key.as_ref().starts_with(prefix.as_ref()))
.filter(|(key, _)| {
// Don't return for exact prefix match
key.prefix_match(prefix)
.map(|mut x| x.next().is_some())
.unwrap_or(false)
})
.map(|(key, value)| {
Ok(ObjectMeta {
location: key.clone(),
last_modified,
Expand All @@ -195,14 +203,19 @@ impl ObjectStore for InMemory {
// response. Otherwise, we just collect the common prefixes.
let mut objects = vec![];
for (k, v) in self.storage.read().range((prefix)..) {
if !k.as_ref().starts_with(prefix.as_ref()) {
break;
}

let mut parts = match k.prefix_match(prefix) {
Some(parts) => parts,
None => break,
None => continue,
};

// Pop first element
let common_prefix = match parts.next() {
Some(p) => p,
// Should only return children of the prefix
None => continue,
};

Expand Down

0 comments on commit 38a79ae

Please sign in to comment.