Skip to content

Commit

Permalink
fix: traverse directories in local list_objs (#681)
Browse files Browse the repository at this point in the history
* fix: traverse directories in local

* docs: remove comment rename_no_replace not supported on windows
  • Loading branch information
roeap authored Jul 11, 2022
1 parent 9229dee commit 431d0ea
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 66 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 12 additions & 5 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,15 @@ rusoto_s3 = { version = "0.48", default-features = false, optional = true }
rusoto_sts = { version = "0.48", default-features = false, optional = true }
rusoto_dynamodb = { version = "0.48", default-features = false, optional = true }
maplit = { version = "1", optional = true }
hyper = { version = "0.14.19", default-features = false, optional = true}
hyper-rustls = { version = "0.23.0", default-features = false, optional = true, features = ["http2", "rustls-native-certs", "tokio-runtime"] }
hyper-proxy = { version = "0.9.1", default-features = false, optional = true, features = ["rustls"] }
hyper = { version = "0.14.19", default-features = false, optional = true }
hyper-rustls = { version = "0.23.0", default-features = false, optional = true, features = [
"http2",
"rustls-native-certs",
"tokio-runtime",
] }
hyper-proxy = { version = "0.9.1", default-features = false, optional = true, features = [
"rustls",
] }

# Glue
rusoto_glue = { version = "0.48", default-features = false, optional = true }
Expand All @@ -68,6 +74,7 @@ crossbeam = { version = "0", optional = true }

cfg-if = "1"
async-trait = "0.1"
walkdir = "2"

# NOTE: disable rust-dataframe integration since it currently doesn't have a
# version published in crates.io
Expand Down Expand Up @@ -98,7 +105,7 @@ s3 = [
"dynamodb_lock/native-tls",
"hyper",
"hyper-rustls",
"hyper-proxy"
"hyper-proxy",
]
s3-rustls = [
"rusoto_core/rustls",
Expand All @@ -110,7 +117,7 @@ s3-rustls = [
"dynamodb_lock/rustls",
"hyper",
"hyper-rustls",
"hyper-proxy"
"hyper-proxy",
]
gcs = ["async-stream", "tame-gcs", "tame-oauth", "reqwest"]
glue = ["s3", "rusoto_glue"]
Expand Down
9 changes: 3 additions & 6 deletions rust/src/storage/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use azure_identity::{
};
use azure_storage::storage_shared_key_credential::StorageSharedKeyCredential;
use azure_storage_datalake::prelude::*;
use futures::stream::{self, Stream};
use futures::stream::{self, BoxStream};
use futures::{future::Either, StreamExt};
use log::debug;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::sync::Arc;
use std::{fmt, pin::Pin};

/// Storage option keys to use when creating [crate::storage::azure::AzureStorageOptions].
/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable.
Expand Down Expand Up @@ -348,10 +348,7 @@ impl StorageBackend for AdlsGen2Backend {
async fn list_objs<'a>(
&'a self,
path: &'a str,
) -> Result<
Pin<Box<dyn Stream<Item = Result<ObjectMeta, StorageError>> + Send + 'a>>,
StorageError,
> {
) -> Result<BoxStream<'a, Result<ObjectMeta, StorageError>>, StorageError> {
debug!("Listing objects under {}", path);
let obj = parse_uri(path)?.into_adlsgen2_object()?;
self.validate_container(&obj)?;
Expand Down
118 changes: 85 additions & 33 deletions rust/src/storage/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,16 @@
//!
//! The local file storage backend is multi-writer safe.

use super::{ObjectMeta, StorageBackend, StorageError};
use chrono::DateTime;
use futures::{stream::BoxStream, StreamExt};
use std::collections::VecDeque;
use std::io;
use std::path::{Path, PathBuf};
use std::pin::Pin;

use chrono::DateTime;
use futures::{Stream, TryStreamExt};
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio_stream::wrappers::ReadDirStream;

use super::{ObjectMeta, StorageBackend, StorageError};
use uuid::Uuid;
use walkdir::WalkDir;

mod rename;

Expand All @@ -27,7 +25,6 @@ mod rename;
/// * xfs requires >= Linux 4.0
/// * ext2, minix, reiserfs, jfs, vfat, and bpf requires >= Linux 4.9
/// * Darwin is supported but not fully tested.
/// * Windows is not supported because we are not using native atomic file rename system call.
/// Patches welcome.
/// * Support for other platforms are not implemented at the moment.
#[derive(Default, Debug)]
Expand Down Expand Up @@ -87,33 +84,72 @@ impl StorageBackend for FileStorageBackend {
async fn list_objs<'a>(
&'a self,
path: &'a str,
) -> Result<
Pin<Box<dyn Stream<Item = Result<ObjectMeta, StorageError>> + Send + 'a>>,
StorageError,
> {
let readdir = ReadDirStream::new(fs::read_dir(path).await?);

Ok(Box::pin(readdir.err_into().and_then(|entry| async move {
let path = String::from(
entry
.path()
.to_str()
.ok_or_else(|| StorageError::Generic("invalid path".to_string()))?,
);
match entry.metadata().await {
Ok(meta) => {
Ok(ObjectMeta {
path,
modified: meta.modified()?.into(),
size: Some(meta.len().try_into().map_err(|_| {
StorageError::Generic("cannot convert to i64".to_string())
})?),
) -> Result<BoxStream<'a, Result<ObjectMeta, StorageError>>, StorageError> {
let walkdir = WalkDir::new(path)
// Don't include the root directory itself
.min_depth(1);

let meta_iter = walkdir.into_iter().filter_map(move |result_dir_entry| {
match convert_walkdir_result(result_dir_entry) {
Err(e) => Some(Err(e)),
Ok(None) => None,
Ok(entry @ Some(_)) => entry
.filter(|dir_entry| dir_entry.file_type().is_file())
.map(|entry| {
let file_path =
String::from(entry.path().to_str().ok_or_else(|| {
StorageError::Generic("invalid path".to_string())
})?);
match entry.metadata() {
Ok(meta) => Ok(ObjectMeta {
path: file_path,
modified: meta.modified()?.into(),
size: Some(meta.len().try_into().map_err(|_| {
StorageError::Generic("cannot convert to i64".to_string())
})?),
}),
Err(err)
if err.io_error().map(|e| e.kind())
== Some(io::ErrorKind::NotFound) =>
{
Err(StorageError::NotFound)
}
Err(err) => Err(StorageError::WalkDir { source: err }),
}
}),
}
});

// list in batches of CHUNK_SIZE
const CHUNK_SIZE: usize = 1024;

let buffer = VecDeque::with_capacity(CHUNK_SIZE);
let stream = futures::stream::try_unfold(
(meta_iter, buffer),
|(mut meta_iter, mut buffer)| async move {
if buffer.is_empty() {
(meta_iter, buffer) = tokio::task::spawn_blocking(move || {
for _ in 0..CHUNK_SIZE {
match meta_iter.next() {
Some(r) => buffer.push_back(r),
None => break,
}
}
(meta_iter, buffer)
})
.await
.map_err(|err| StorageError::Generic(err.to_string()))?;
}
Err(err) if err.kind() == io::ErrorKind::NotFound => Err(StorageError::NotFound),
Err(err) => Err(StorageError::Io { source: err }),
}
})))

match buffer.pop_front() {
Some(Err(e)) => Err(e),
Some(Ok(meta)) => Ok(Some((meta, (meta_iter, buffer)))),
None => Ok(None),
}
},
);

Ok(stream.boxed())
}

async fn put_obj(&self, path: &str, obj_bytes: &[u8]) -> Result<(), StorageError> {
Expand Down Expand Up @@ -152,6 +188,22 @@ impl StorageBackend for FileStorageBackend {
}
}

/// Convert walkdir results and converts not-found errors into `None`.
fn convert_walkdir_result(
res: std::result::Result<walkdir::DirEntry, walkdir::Error>,
) -> Result<Option<walkdir::DirEntry>, StorageError> {
match res {
Ok(entry) => Ok(Some(entry)),
Err(walkdir_err) => match walkdir_err.io_error() {
Some(io_err) => match io_err.kind() {
io::ErrorKind::NotFound => Ok(None),
_ => Err(StorageError::Generic(io_err.to_string())),
},
None => Err(StorageError::Generic(walkdir_err.to_string())),
},
}
}

#[cfg(test)]
mod tests {
use super::super::parse_uri;
Expand Down
8 changes: 2 additions & 6 deletions rust/src/storage/gcs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ pub(crate) use client::GCSStorageBackend;
pub(crate) use error::GCSClientError;
pub(crate) use object::GCSObject;

use futures::Stream;
use futures::stream::BoxStream;
use std::convert::TryInto;
use std::pin::Pin;

use log::debug;

Expand Down Expand Up @@ -70,10 +69,7 @@ impl StorageBackend for GCSStorageBackend {
async fn list_objs<'a>(
&'a self,
path: &'a str,
) -> Result<
Pin<Box<dyn Stream<Item = Result<ObjectMeta, StorageError>> + Send + 'a>>,
StorageError,
> {
) -> Result<BoxStream<'a, Result<ObjectMeta, StorageError>>, StorageError> {
let prefix = parse_uri(path)?.into_gcs_object()?;
let obj_meta_stream = async_stream::stream! {
for await meta in self.list(prefix) {
Expand Down
24 changes: 14 additions & 10 deletions rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
//! Object storage backend abstraction layer for Delta Table transaction logs and data

#[cfg(any(feature = "s3", feature = "s3-rustls"))]
use hyper::http::uri::InvalidUri;
use std::fmt::Debug;
use std::pin::Pin;

#[cfg(feature = "azure")]
use azure_core::error::{Error as AzureError, ErrorKind as AzureErrorKind};
use chrono::{DateTime, Utc};
use futures::Stream;
use futures::stream::BoxStream;
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
use hyper::http::uri::InvalidUri;
use std::collections::HashMap;
use std::fmt::Debug;
use walkdir::Error as WalkDirError;

#[cfg(feature = "azure")]
pub mod azure;
Expand Down Expand Up @@ -278,6 +277,14 @@ pub enum StorageError {
/// The raw error returned when trying to read the local file.
source: std::io::Error,
},

#[error("Failed to walk directory: {source}")]
/// Error raised when failing to traverse a directory
WalkDir {
/// The raw error returned when trying to read the local file.
#[from]
source: WalkDirError,
},
/// The file system represented by the scheme is not known.
#[error("File system not supported")]
FileSystemNotSupported,
Expand Down Expand Up @@ -519,10 +526,7 @@ pub trait StorageBackend: Send + Sync + Debug {
async fn list_objs<'a>(
&'a self,
path: &'a str,
) -> Result<
Pin<Box<dyn Stream<Item = Result<ObjectMeta, StorageError>> + Send + 'a>>,
StorageError,
>;
) -> Result<BoxStream<'a, Result<ObjectMeta, StorageError>>, StorageError>;

/// Create new object with `obj_bytes` as content.
///
Expand Down
9 changes: 3 additions & 6 deletions rust/src/storage/s3/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! AWS S3 storage backend. It only supports a single writer and is not multi-writer safe.

use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::{fmt, pin::Pin};

use chrono::{DateTime, FixedOffset, Utc};
use futures::Stream;
use futures::stream::BoxStream;

use log::debug;
use rusoto_core::{HttpClient, HttpConfig, Region, RusotoError};
Expand Down Expand Up @@ -714,10 +714,7 @@ impl StorageBackend for S3StorageBackend {
async fn list_objs<'a>(
&'a self,
path: &'a str,
) -> Result<
Pin<Box<dyn Stream<Item = Result<ObjectMeta, StorageError>> + Send + 'a>>,
StorageError,
> {
) -> Result<BoxStream<'_, Result<ObjectMeta, StorageError>>, StorageError> {
let uri = parse_uri(path)?.into_s3object()?;

/// This enum is used to represent 3 states in our object metadata streaming logic:
Expand Down

0 comments on commit 431d0ea

Please sign in to comment.