Skip to content

Commit

Permalink
Fix http list dir
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Jul 19, 2022
1 parent f1f10e5 commit 3d9098e
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 36 deletions.
115 changes: 90 additions & 25 deletions src/services/http/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ impl Builder {
self
}

pub(crate) fn insert_path(&mut self, path: &str) {
for (idx, _) in path.match_indices('/') {
debug!("insert path {} into index", &path[idx + 1..]);
self.index.insert(path[idx + 1..].to_string(), ());
}
self.index.insert(path.to_string(), ());
}

/// Insert index into backend.
pub fn insert_index(&mut self, key: &str) -> &mut Self {
if key.is_empty() {
Expand All @@ -117,7 +125,8 @@ impl Builder {
key.to_string()
};

self.index.insert(key, ());
self.insert_path(&key);

self
}

Expand All @@ -130,7 +139,7 @@ impl Builder {
k.to_string()
};

self.index.insert(k, ());
self.insert_path(&k);
}
self
}
Expand Down Expand Up @@ -183,14 +192,31 @@ impl Builder {
}

/// Backend is used to serve `Accessor` support for http.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct Backend {
endpoint: String,
root: String,
client: HttpClient,
index: Arc<Mutex<Trie<String, ()>>>,
}

impl Debug for Backend {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backend")
.field("endpoint", &self.endpoint)
.field("root", &self.root)
.field("client", &self.client)
.field(
"index",
&format!(
"length = {}",
self.index.lock().expect("lock must succeed").len()
),
)
.finish()
}
}

impl Backend {
/// Create a new builder for s3.
pub fn build() -> Builder {
Expand Down Expand Up @@ -222,6 +248,28 @@ impl Backend {
// root must be normalized like `/abc/`
format!("{}{}", self.root, path)
}

pub(crate) fn get_index_path(&self, path: &str) -> String {
if path.starts_with("/") {
path[1..].to_string()
} else {
path.to_string()
}
}

pub(crate) fn insert_path(&self, path: &str) {
for (idx, _) in path.match_indices('/') {
debug!("insert path {} into index", &path[idx + 1..]);
self.index
.lock()
.expect("lock must succeed")
.insert(path[idx + 1..].to_string(), ());
}
self.index
.lock()
.expect("lock must succeed")
.insert(path.to_string(), ());
}
}

#[async_trait]
Expand Down Expand Up @@ -250,10 +298,7 @@ impl Accessor for Backend {
match resp.status() {
StatusCode::CREATED | StatusCode::OK => {
debug!("object {} create finished", args.path());
self.index
.lock()
.expect("lock succeed")
.insert(args.path().to_string(), ());
self.insert_path(&self.get_index_path(args.path()));
Ok(())
}
_ => {
Expand Down Expand Up @@ -317,11 +362,7 @@ impl Accessor for Backend {
parse_error_kind,
);

// TODO: we should only update index while put succeed.
self.index
.lock()
.expect("lock succeed")
.insert(args.path().to_string(), ());
self.insert_path(&self.get_index_path(args.path()));

Ok(Box::new(bs))
}
Expand Down Expand Up @@ -410,12 +451,14 @@ impl Accessor for Backend {
}

async fn list(&self, args: &OpList) -> Result<DirStreamer> {
let mut path = args.path().to_string();
let mut path = args.path();
if path == "/" {
path.clear();
path = ""
}

let paths = match self.index.lock().expect("lock succeed").subtrie(&path) {
debug!("object {} list start", path);

let paths = match self.index.lock().expect("lock succeed").subtrie(path) {
None => {
return Err(Error::new(
ErrorKind::NotFound,
Expand All @@ -424,21 +467,43 @@ impl Accessor for Backend {
}
Some(trie) => trie
.keys()
// Make sure k is at the same level with input path.
.filter(|k| match k[path.len()..].find('/') {
None => true,
Some(idx) => idx + 1 + path.len() == k.len(),
.filter_map(|k| {
let k = k.as_str();

// `/xyz` should not belong to `/abc`
if !k.starts_with(&path) {
return None;
}

// We should remove `/abc` if self
if k == path {
return None;
}

match k[path.len()..].find('/') {
// File `/abc/def.csv` must belong to `/abc`
None => Some(k.to_string()),
Some(idx) => {
// The index of first `/` after `/abc`.
let dir_idx = idx + 1 + path.len();

if dir_idx == k.len() {
// Dir `/abc/def/` belongs to `/abc/`
Some(k.to_string())
} else {
None
}
}
}
})
.map(|v| v.to_string())
.filter(|v| v != &path)
.collect::<Vec<_>>(),
.collect::<HashSet<_>>(),
};

debug!("dir object {} listed keys: {paths:?}", path);
debug!("dir object {path:?} listed keys: {paths:?}");
Ok(Box::new(DirStream {
backend: Arc::new(self.clone()),
path,
paths,
path: path.to_string(),
paths: paths.into_iter().collect(),
idx: 0,
}))
}
Expand Down
41 changes: 32 additions & 9 deletions src/services/memory/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::io::Error;
use std::io::ErrorKind;
use std::io::Result;
Expand Down Expand Up @@ -206,21 +206,44 @@ impl Accessor for Backend {

let paths = map
.iter()
.map(|(k, _)| k.clone())
// Make sure k is start with input path.
.filter(|k| k.starts_with(&path) && k != &path)
// Make sure k is at the same level with input path.
.filter(|k| match k[path.len()..].find('/') {
None => true,
Some(idx) => idx + 1 + path.len() == k.len(),
.filter_map(|(k, _)| {
let k = k.as_str();
// `/xyz` should not belong to `/abc`
if !k.starts_with(&path) {
return None;
}

// We should remove `/abc` if self
if k == &path {
return None;
}

match k[path.len()..].find('/') {
// File `/abc/def.csv` must belong to `/abc`
None => Some(k.to_string()),
Some(idx) => {
// The index of first `/` after `/abc`.
let dir_idx = idx + 1 + path.len();

if dir_idx == k.len() {
// Dir `/abc/def/` belongs to `/abc/`
Some(k.to_string())
} else {
// File/Dir `/abc/def/xyz` deoesn't belongs to `/abc`.
// But we need to list `/abc/def` out so that we can walk down.
Some(k[..dir_idx].to_string())
}
}
}
})
.collect::<Vec<_>>();
.collect::<HashSet<_>>();

debug!("dir object {} listed keys: {paths:?}", path);
Ok(Box::new(DirStream {
backend: Arc::new(self.clone()),
path,
paths,
paths: paths.into_iter().collect(),
idx: 0,
}))
}
Expand Down
36 changes: 34 additions & 2 deletions tests/behavior/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ macro_rules! behavior_tests {

test_walk_bottom_up,
test_walk_top_down,
test_walk_top_down_within_empty_dir,
test_remove_all,

test_presign_read,
Expand Down Expand Up @@ -956,7 +957,7 @@ async fn test_walk_top_down(op: Operator) -> Result<()> {
fn get_position(vs: &[String], s: &str) -> usize {
vs.iter()
.position(|v| v == s)
.expect("{s} is not found in {vs}")
.expect(&format!("{s} is not found in {vs:?}"))
}

assert!(get_position(&actual, "x/x/x/x/") > get_position(&actual, "x/x/x/"));
Expand All @@ -969,6 +970,37 @@ async fn test_walk_top_down(op: Operator) -> Result<()> {
Ok(())
}

// Walk top down within empty dir should output as expected
async fn test_walk_top_down_within_empty_dir(op: Operator) -> Result<()> {
let mut expected = vec!["x/", "x/x/x/x/"];
for path in expected.iter() {
op.object(path).create().await?;
}

let w = op.batch().walk_top_down("x/")?;
let mut actual = w
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|v| v.path().to_string())
.collect::<Vec<_>>();

debug!("walk top down: {:?}", actual);

fn get_position(vs: &[String], s: &str) -> usize {
vs.iter()
.position(|v| v == s)
.expect(&format!("{s} is not found in {vs:?}"))
}

assert!(get_position(&actual, "x/x/x/x/") > get_position(&actual, "x/"));

expected.sort_unstable();
actual.sort_unstable();
assert_eq!(actual, vec!["x/", "x/x/", "x/x/x/", "x/x/x/x/"]);
Ok(())
}

// Walk bottom up should output as expected
async fn test_walk_bottom_up(op: Operator) -> Result<()> {
let mut expected = vec![
Expand All @@ -991,7 +1023,7 @@ async fn test_walk_bottom_up(op: Operator) -> Result<()> {
fn get_position(vs: &[String], s: &str) -> usize {
vs.iter()
.position(|v| v == s)
.expect("{s} is not found in {vs}")
.expect(&format!("{s} is not found in {vs:?}"))
}

assert!(get_position(&actual, "x/x/x/x/") < get_position(&actual, "x/x/x/"));
Expand Down

1 comment on commit 3d9098e

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-lyg0l9zmq-databend.vercel.app

Built with commit 3d9098e.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.