Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: List stage files recursively #5992

Merged
merged 4 commits into from
Jun 15, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
List files recursively
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Jun 15, 2022
commit dc19b2fa604c43b80e861d9f5dd69678ba65536d
2 changes: 1 addition & 1 deletion common/io/src/operator.rs
Original file line number Diff line number Diff line change
@@ -122,7 +122,7 @@ pub async fn init_s3_operator(cfg: &StorageS3Config) -> Result<Operator> {

// Enable virtual host style
if cfg.enable_virtual_host_style {
builder.enable_virtual_host_style()
builder.enable_virtual_host_style();
}

Ok(Operator::new(builder.finish().await?))
70 changes: 37 additions & 33 deletions query/src/interpreters/interpreter_common.rs
Original file line number Diff line number Diff line change
@@ -23,8 +23,8 @@ use common_meta_types::GrantObject;
use common_meta_types::StageFile;
use common_meta_types::StageType;
use common_meta_types::UserStageInfo;
use common_tracing::tracing::warn;
use futures::TryStreamExt;
use opendal::ObjectMode;
use regex::Regex;

use crate::sessions::QueryContext;
@@ -79,47 +79,51 @@ pub async fn list_files(
}
}

/// List files from DAL in recursive way.
///
/// - If input path is a dir, we will list it recursively.
/// - Or, we will append the file itself, and try to list `path/`.
/// - If not exist, we will try to list `path/` too.
pub async fn list_files_from_dal(
ctx: &Arc<QueryContext>,
stage: &UserStageInfo,
path: &str,
pattern: &str,
) -> Result<Vec<StageFile>> {
let op = StageSource::get_op(ctx, stage).await?;
let files = if path.ends_with('/') {
let mut list = vec![];
let mut dirs = vec![path.to_string()];
while let Some(dir) = dirs.pop() {
let mut objects = op.object(&dir).list().await?;
while let Some(de) = objects.try_next().await? {
let meta = de.metadata().await?;
let path = de.path().to_string();
match de.mode() {
ObjectMode::FILE => {
list.push((path, meta));
}
ObjectMode::DIR => {
dirs.push(path);
}
ObjectMode::Unknown => continue,
}
}
let mut files = Vec::new();

// - If the path itself is a dir, return directly.
// - Otherwise, return a path suffix by `/`
// - If other errors happen, we will ignore them by returning None.
let dir_path = match op.object(path).metadata().await {
Ok(meta) if meta.mode().is_dir() => Some(path.to_string()),
Ok(meta) if !meta.mode().is_dir() => {
files.push((path.to_string(), meta));

Some(format!("{path}/"))
}
list
} else {
let o = op.object(path);
match o.metadata().await {
Ok(meta) => {
if meta.mode().is_dir() {
vec![]
} else {
vec![(o.path().to_string(), meta)]
Err(e) if e.kind() == io::ErrorKind::NotFound => Some(format!("{path}/")),
Err(e) => return Err(e.into()),
_ => None,
};

// Check the if this dir valid and list it recursively.
if let Some(dir) = dir_path {
match op.object(&dir).metadata().await {
Ok(_) => {
let mut ds = op.batch().walk_top_down(path)?;
while let Some(de) = ds.try_next().await? {
if de.mode().is_file() {
let path = de.path().to_string();
let meta = de.metadata().await?;
files.push((path, meta));
}
}
}
Err(e) if e.kind() == io::ErrorKind::NotFound => vec![],
Err(e) => return Err(e.into()),
}
};
Err(e) => warn!("ignore listing {path}/, because: {:?}", e),
};
}

let regex = if !pattern.is_empty() {
Some(Regex::new(pattern).map_err(|e| {
@@ -201,7 +205,7 @@ pub async fn list_files_from_meta_api(
if path.ends_with('/') {
name.starts_with(path)
} else {
name == path
name.starts_with(&format!("{path}/")) || name == path
}
})
.filter(|file| {