Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Service HTTP deosn't handle dir correctly #455

Merged
merged 6 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use std::fmt::Formatter;
use std::io::Error;
use std::io::ErrorKind;
use std::io::Result;
use std::ops::{Range, RangeBounds};
use std::ops::Range;
use std::ops::RangeBounds;

use anyhow::anyhow;
use time::Duration;
Expand Down
158 changes: 121 additions & 37 deletions src/services/http/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io::Error;
Expand All @@ -29,21 +30,26 @@ use anyhow::anyhow;
use async_trait::async_trait;
use futures::TryStreamExt;
use http::StatusCode;
use log::debug;
use log::error;
use log::info;
use log::{debug, warn};
use log::warn;
use radix_trie::Trie;
use radix_trie::TrieCommon;

use crate::error::other;
use crate::error::BackendError;
use crate::error::ObjectError;
use crate::io_util::new_http_channel;
use crate::io_util::parse_content_length;
use crate::io_util::parse_content_md5;
use crate::io_util::parse_error_kind as parse_http_error_kind;
use crate::io_util::parse_error_response;
use crate::io_util::parse_etag;
use crate::io_util::parse_last_modified;
use crate::io_util::percent_encode_path;
use crate::io_util::HttpBodyWriter;
use crate::io_util::HttpClient;
use crate::io_util::{new_http_channel, parse_content_md5};
use crate::io_util::{parse_content_length, percent_encode_path};
use crate::io_util::{parse_error_kind as parse_http_error_kind, HttpBodyWriter};
use crate::io_util::{parse_error_response, parse_etag};
use crate::ops::BytesRange;
use crate::ops::OpCreate;
use crate::ops::OpDelete;
Expand Down Expand Up @@ -105,6 +111,20 @@ impl Builder {
self
}

pub(crate) fn insert_path(&mut self, path: &str) {
for (idx, _) in path.match_indices('/') {
let p = path[idx + 1..].to_string();
if self.index.get(&p).is_none() {
debug!("insert path {} into index", p);
self.index.insert(p, ());
}
}
if self.index.get(path).is_none() {
debug!("insert path {} into index", path);
self.index.insert(path.to_string(), ());
}
}

/// Insert index into backend.
pub fn insert_index(&mut self, key: &str) -> &mut Self {
if key.is_empty() {
Expand All @@ -117,7 +137,8 @@ impl Builder {
key.to_string()
};

self.index.insert(key, ());
self.insert_path(&key);

self
}

Expand All @@ -130,7 +151,7 @@ impl Builder {
k.to_string()
};

self.index.insert(k, ());
self.insert_path(&k);
}
self
}
Expand Down Expand Up @@ -183,14 +204,31 @@ impl Builder {
}

/// Backend is used to serve `Accessor` support for http.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct Backend {
endpoint: String,
root: String,
client: HttpClient,
index: Arc<Mutex<Trie<String, ()>>>,
}

impl Debug for Backend {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Backend")
.field("endpoint", &self.endpoint)
.field("root", &self.root)
.field("client", &self.client)
.field(
"index",
&format!(
"length = {}",
self.index.lock().expect("lock must succeed").len()
),
)
.finish()
}
}

impl Backend {
/// Create a new builder for s3.
pub fn build() -> Builder {
Expand Down Expand Up @@ -222,6 +260,30 @@ impl Backend {
// root must be normalized like `/abc/`
format!("{}{}", self.root, path)
}

pub(crate) fn get_index_path(&self, path: &str) -> String {
match path.strip_prefix('/') {
Some(strip) => strip.to_string(),
None => path.to_string(),
}
}

pub(crate) fn insert_path(&self, path: &str) {
let mut index = self.index.lock().expect("lock must succeed");

for (idx, _) in path.match_indices('/') {
let p = path[idx + 1..].to_string();

if index.get(&p).is_none() {
debug!("insert path {} into index", p);
index.insert(p, ());
}
}
if index.get(path).is_none() {
debug!("insert path {} into index", path);
index.insert(path.to_string(), ());
}
}
}

#[async_trait]
Expand Down Expand Up @@ -250,10 +312,7 @@ impl Accessor for Backend {
match resp.status() {
StatusCode::CREATED | StatusCode::OK => {
debug!("object {} create finished", args.path());
self.index
.lock()
.expect("lock succeed")
.insert(args.path().to_string(), ());
self.insert_path(&self.get_index_path(args.path()));
Ok(())
}
_ => {
Expand Down Expand Up @@ -317,11 +376,7 @@ impl Accessor for Backend {
parse_error_kind,
);

// TODO: we should only update index while put succeed.
self.index
.lock()
.expect("lock succeed")
.insert(args.path().to_string(), ());
self.insert_path(&self.get_index_path(args.path()));

Ok(Box::new(bs))
}
Expand Down Expand Up @@ -410,35 +465,59 @@ impl Accessor for Backend {
}

async fn list(&self, args: &OpList) -> Result<DirStreamer> {
let mut path = args.path().to_string();
let mut path = args.path();
if path == "/" {
path.clear();
path = ""
}

let paths = match self.index.lock().expect("lock succeed").subtrie(&path) {
debug!("object {} list start", path);

let paths = match self.index.lock().expect("lock succeed").subtrie(path) {
None => {
return Err(Error::new(
ErrorKind::NotFound,
ObjectError::new("list", &path, anyhow!("no such dir")),
ObjectError::new("list", path, anyhow!("no such dir")),
))
}
Some(trie) => trie
.keys()
// Make sure k is at the same level with input path.
.filter(|k| match k[path.len()..].find('/') {
None => true,
Some(idx) => idx + 1 + path.len() == k.len(),
.filter_map(|k| {
let k = k.as_str();

// `/xyz` should not belong to `/abc`
if !k.starts_with(&path) {
return None;
}

// We should remove `/abc` if self
if k == path {
return None;
}

match k[path.len()..].find('/') {
// File `/abc/def.csv` must belong to `/abc`
None => Some(k.to_string()),
Some(idx) => {
// The index of first `/` after `/abc`.
let dir_idx = idx + 1 + path.len();

if dir_idx == k.len() {
// Dir `/abc/def/` belongs to `/abc/`
Some(k.to_string())
} else {
None
}
}
}
})
.map(|v| v.to_string())
.filter(|v| v != &path)
.collect::<Vec<_>>(),
.collect::<HashSet<_>>(),
};

debug!("dir object {} listed keys: {paths:?}", path);
debug!("dir object {path:?} listed keys: {paths:?}");
Ok(Box::new(DirStream {
backend: Arc::new(self.clone()),
path,
paths,
path: path.to_string(),
paths: paths.into_iter().collect(),
idx: 0,
}))
}
Expand Down Expand Up @@ -668,22 +747,27 @@ mod tests {

let mock_server = MockServer::start().await;

let mut expected = vec!["another/", "hello", "world"];

let mut builder = Backend::build();
builder.endpoint(&mock_server.uri());
builder.root("/");
builder.insert_index("/hello");
builder.insert_index("/world");
builder.insert_index("/another/");
for s in expected.iter() {
builder.insert_index(s);
}

let op = Operator::new(builder.finish().await?);

let bs = op.object("/").list().await?;
let paths = bs.try_collect::<Vec<_>>().await?;
let paths = paths
let mut paths = paths
.into_iter()
.map(|v| v.path().to_string())
.collect::<Vec<_>>();

assert_eq!(paths, vec!["another/", "hello", "world"]);
paths.sort_unstable();
expected.sort_unstable();
assert_eq!(paths, expected);
Ok(())
}
}
40 changes: 32 additions & 8 deletions src/services/memory/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashMap;
use std::collections::HashSet;
use std::io::Error;
use std::io::ErrorKind;
use std::io::Result;
Expand Down Expand Up @@ -206,21 +207,44 @@ impl Accessor for Backend {

let paths = map
.iter()
.map(|(k, _)| k.clone())
// Make sure k is start with input path.
.filter(|k| k.starts_with(&path) && k != &path)
// Make sure k is at the same level with input path.
.filter(|k| match k[path.len()..].find('/') {
None => true,
Some(idx) => idx + 1 + path.len() == k.len(),
.filter_map(|(k, _)| {
let k = k.as_str();
// `/xyz` should not belong to `/abc`
if !k.starts_with(&path) {
return None;
}

// We should remove `/abc` if self
if k == path {
return None;
}

match k[path.len()..].find('/') {
// File `/abc/def.csv` must belong to `/abc`
None => Some(k.to_string()),
Some(idx) => {
// The index of first `/` after `/abc`.
let dir_idx = idx + 1 + path.len();

if dir_idx == k.len() {
// Dir `/abc/def/` belongs to `/abc/`
Some(k.to_string())
} else {
// File/Dir `/abc/def/xyz` deoesn't belongs to `/abc`.
// But we need to list `/abc/def` out so that we can walk down.
Some(k[..dir_idx].to_string())
}
}
}
})
.collect::<Vec<_>>();
.collect::<HashSet<_>>();

debug!("dir object {} listed keys: {paths:?}", path);
Ok(Box::new(DirStream {
backend: Arc::new(self.clone()),
path,
paths,
paths: paths.into_iter().collect(),
idx: 0,
}))
}
Expand Down
Loading