Skip to content

Commit

Permalink
Handle symlinks in LocalFileSystem (apache#2206)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 1, 2022
1 parent d4f038a commit 28ff596
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 31 deletions.
166 changes: 150 additions & 16 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,56 +68,56 @@ pub(crate) enum Error {
#[snafu(display("Unable to create dir {}: {}", path.display(), source))]
UnableToCreateDir {
source: io::Error,
path: std::path::PathBuf,
path: PathBuf,
},

#[snafu(display("Unable to create file {}: {}", path.display(), err))]
UnableToCreateFile {
path: std::path::PathBuf,
path: PathBuf,
err: io::Error,
},

#[snafu(display("Unable to delete file {}: {}", path.display(), source))]
UnableToDeleteFile {
source: io::Error,
path: std::path::PathBuf,
path: PathBuf,
},

#[snafu(display("Unable to open file {}: {}", path.display(), source))]
UnableToOpenFile {
source: io::Error,
path: std::path::PathBuf,
path: PathBuf,
},

#[snafu(display("Unable to read data from file {}: {}", path.display(), source))]
UnableToReadBytes {
source: io::Error,
path: std::path::PathBuf,
path: PathBuf,
},

#[snafu(display("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual))]
OutOfRange {
path: std::path::PathBuf,
path: PathBuf,
expected: usize,
actual: usize,
},

#[snafu(display("Unable to copy file from {} to {}: {}", from.display(), to.display(), source))]
UnableToCopyFile {
from: std::path::PathBuf,
to: std::path::PathBuf,
from: PathBuf,
to: PathBuf,
source: io::Error,
},

NotFound {
path: std::path::PathBuf,
path: PathBuf,
source: io::Error,
},

#[snafu(display("Error seeking file {}: {}", path.display(), source))]
Seek {
source: io::Error,
path: std::path::PathBuf,
path: PathBuf,
},

#[snafu(display("Unable to convert URL \"{}\" to filesystem path", url))]
Expand Down Expand Up @@ -170,6 +170,16 @@ impl From<Error> for super::Error {
///
/// If not called from a tokio context, this will perform IO on the current thread with
/// no additional complexity or overheads
///
/// # Symlinks
///
/// [`LocalFileSystem`] will follow symlinks as normal, however, it is worth noting:
///
/// * Broken symlinks will be silently ignored by listing operations
/// * No effort is made to prevent breaking symlinks when deleting files
/// * Mutating a file through one or more symlinks will mutate the underlying file
/// * Deleting a path that resolves to a symlink will only delete the symlink
///
#[derive(Debug)]
pub struct LocalFileSystem {
config: Arc<Config>,
Expand Down Expand Up @@ -214,10 +224,11 @@ impl LocalFileSystem {

impl Config {
/// Return filesystem path of the given location
fn path_to_filesystem(&self, location: &Path) -> Result<std::path::PathBuf> {
fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
let mut url = self.root.clone();
url.path_segments_mut()
.expect("url path")
.pop_if_empty()
.extend(location.parts());

url.to_file_path()
Expand Down Expand Up @@ -371,7 +382,8 @@ impl ObjectStore for LocalFileSystem {

let walkdir = WalkDir::new(&root_path)
// Don't include the root directory itself
.min_depth(1);
.min_depth(1)
.follow_links(true);

let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
match convert_walkdir_result(result_dir_entry) {
Expand Down Expand Up @@ -433,7 +445,10 @@ impl ObjectStore for LocalFileSystem {
let resolved_prefix = config.path_to_filesystem(&prefix)?;

maybe_spawn_blocking(move || {
let walkdir = WalkDir::new(&resolved_prefix).min_depth(1).max_depth(1);
let walkdir = WalkDir::new(&resolved_prefix)
.min_depth(1)
.max_depth(1)
.follow_links(true);

let mut common_prefixes = BTreeSet::new();
let mut objects = Vec::new();
Expand Down Expand Up @@ -732,7 +747,7 @@ impl AsyncWrite for LocalUpload {
}
}

fn open_file(path: &std::path::PathBuf) -> Result<File> {
fn open_file(path: &PathBuf) -> Result<File> {
let file = File::open(path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
Error::NotFound {
Expand All @@ -749,7 +764,7 @@ fn open_file(path: &std::path::PathBuf) -> Result<File> {
Ok(file)
}

fn open_writable_file(path: &std::path::PathBuf) -> Result<File> {
fn open_writable_file(path: &PathBuf) -> Result<File> {
match File::create(&path) {
Ok(f) => Ok(f),
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
Expand Down Expand Up @@ -861,7 +876,8 @@ mod tests {
},
Error as ObjectStoreError, ObjectStore,
};
use tempfile::TempDir;
use futures::TryStreamExt;
use tempfile::{NamedTempFile, TempDir};
use tokio::io::AsyncWriteExt;

#[tokio::test]
Expand Down Expand Up @@ -1030,6 +1046,124 @@ mod tests {
}
}

async fn check_list(
integration: &LocalFileSystem,
prefix: Option<&Path>,
expected: &[&str],
) {
let result: Vec<_> = integration
.list(prefix)
.await
.unwrap()
.try_collect()
.await
.unwrap();

let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
strings.sort_unstable();
assert_eq!(&strings, expected)
}

#[tokio::test]
#[cfg(target_family = "unix")]
async fn test_symlink() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let subdir = root.path().join("a");
std::fs::create_dir(&subdir).unwrap();
let file = subdir.join("file.parquet");
std::fs::write(file, "test").unwrap();

check_list(&integration, None, &["a/file.parquet"]).await;
integration
.head(&Path::from("a/file.parquet"))
.await
.unwrap();

// Follow out of tree symlink
let other = NamedTempFile::new().unwrap();
std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet"))
.unwrap();

// Should return test.parquet even though technically out of tree
check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;

// Can fetch test.parquet
integration.head(&Path::from("test.parquet")).await.unwrap();

// Follow in tree symlink
std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
check_list(
&integration,
None,
&["a/file.parquet", "b/file.parquet", "test.parquet"],
)
.await;
check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;

// Can fetch through symlink
integration
.head(&Path::from("b/file.parquet"))
.await
.unwrap();

// Ignore broken symlink
std::os::unix::fs::symlink(
root.path().join("foo.parquet"),
root.path().join("c"),
)
.unwrap();

check_list(
&integration,
None,
&["a/file.parquet", "b/file.parquet", "test.parquet"],
)
.await;

let mut r = integration.list_with_delimiter(None).await.unwrap();
r.common_prefixes.sort_unstable();
assert_eq!(r.common_prefixes.len(), 2);
assert_eq!(r.common_prefixes[0].as_ref(), "a");
assert_eq!(r.common_prefixes[1].as_ref(), "b");
assert_eq!(r.objects.len(), 1);
assert_eq!(r.objects[0].location.as_ref(), "test.parquet");

let r = integration
.list_with_delimiter(Some(&Path::from("a")))
.await
.unwrap();
assert_eq!(r.common_prefixes.len(), 0);
assert_eq!(r.objects.len(), 1);
assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");

// Deleting a symlink doesn't delete the source file
integration
.delete(&Path::from("test.parquet"))
.await
.unwrap();
assert!(other.path().exists());

check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;

// Deleting through a symlink deletes both files
integration
.delete(&Path::from("b/file.parquet"))
.await
.unwrap();

check_list(&integration, None, &[]).await;

// Adding a file through a symlink creates in both paths
integration
.put(&Path::from("b/file.parquet"), Bytes::from(vec![0, 1, 2]))
.await
.unwrap();

check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
}

#[tokio::test]
async fn invalid_path() {
let root = TempDir::new().unwrap();
Expand Down
22 changes: 7 additions & 15 deletions object_store/src/path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl Path {

/// Convert a filesystem path to a [`Path`] relative to the filesystem root
///
/// This will return an error if the path does not exist, or contains illegal
/// This will return an error if the path contains illegal
/// character sequences as defined by [`Path::parse`]
pub fn from_filesystem_path(
path: impl AsRef<std::path::Path>,
Expand All @@ -173,9 +173,8 @@ impl Path {

/// Convert a filesystem path to a [`Path`] relative to the provided base
///
/// This will return an error if the path does not exist on the local filesystem,
/// contains illegal character sequences as defined by [`Path::parse`], or `base`
/// does not refer to a parent path of `path`
/// This will return an error if the path contains illegal character sequences
/// as defined by [`Path::parse`], or `base` does not refer to a parent path of `path`
pub(crate) fn from_filesystem_path_with_base(
path: impl AsRef<std::path::Path>,
base: Option<&Url>,
Expand Down Expand Up @@ -295,20 +294,13 @@ where
}
}

/// Given a filesystem path, convert it to its canonical URL representation,
/// returning an error if the file doesn't exist on the local filesystem
/// Given a filesystem path convert it to a URL representation
pub(crate) fn filesystem_path_to_url(
path: impl AsRef<std::path::Path>,
) -> Result<Url, Error> {
let path = path.as_ref().canonicalize().context(CanonicalizeSnafu {
path: path.as_ref(),
})?;

match path.is_dir() {
true => Url::from_directory_path(&path),
false => Url::from_file_path(&path),
}
.map_err(|_| Error::InvalidPath { path })
Url::from_file_path(&path).map_err(|_| Error::InvalidPath {
path: path.as_ref().into(),
})
}

#[cfg(test)]
Expand Down

0 comments on commit 28ff596

Please sign in to comment.