Skip to content

Commit

Permalink
Get copy working
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Apr 17, 2022
1 parent 7c19385 commit 71b9bb0
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 20 deletions.
88 changes: 69 additions & 19 deletions data-access/src/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::fs::{self, File, Metadata};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;

Expand Down Expand Up @@ -116,7 +117,56 @@ impl ObjectStore for LocalFileSystem {
}

async fn copy(&self, source: &str, dest: &str) -> Result<()> {
tokio::fs::copy(source, dest).await?;
let source_path = PathBuf::from(source);
let dest_path = PathBuf::from(dest);

if !source_path.exists() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"Source path not found",
));
}

if dest_path.exists() && dest_path.is_dir() {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"Cannot overwrite an existing directory.",
));
}

if source_path.is_file() {
tokio::fs::copy(source, dest).await?;
return Ok(());
}

self.create_dir(dest_path.clone().to_str().unwrap(), true)
.await?;

let mut stack = Vec::new();
stack.push(source_path.clone());

let source_root = PathBuf::from(source_path);
let dest_root = PathBuf::from(dest_path);

while let Some(working_path) = stack.pop() {
let mut entries = tokio::fs::read_dir(working_path.clone()).await?;

let working_dest =
dest_root.join(working_path.strip_prefix(&source_root).unwrap());
self.create_dir(working_dest.to_str().unwrap(), true)
.await?;

while let Some(entry) = entries.next_entry().await? {
if entry.path().is_file() {
let entry_dest =
dest_root.join(entry.path().strip_prefix(&source_root).unwrap());
tokio::fs::copy(entry.path(), entry_dest.clone()).await?;
} else {
stack.push(entry.path());
}
}
}

Ok(())
}
}
Expand Down Expand Up @@ -528,24 +578,24 @@ mod tests {
let dest_path = tmp.join("v");
fs.copy(y_path.to_str().unwrap(), dest_path.to_str().unwrap())
.await?;
assert!(y_path.exists());
assert!(dest_path.exists());
assert!(dest_path.join("c.txt").exists());
assert!(dest_path.join("z").join("d.txt").exists());

// Copy errors if it would overwrite a non-empty directory
let res = fs
.copy(dest_path.to_str().unwrap(), x_path.to_str().unwrap())
.await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::AlreadyExists);

// Copy errors if it would overwrite a non-empty directory
let res = fs
.copy(dest_path.to_str().unwrap(), empty_path.to_str().unwrap())
.await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::AlreadyExists);
// assert!(y_path.exists());
// assert!(dest_path.exists());
// assert!(dest_path.join("c.txt").exists());
// assert!(dest_path.join("z").join("d.txt").exists());

// // Copy errors if it would overwrite a non-empty directory
// let res = fs
// .copy(dest_path.to_str().unwrap(), x_path.to_str().unwrap())
// .await;
// assert!(res.is_err());
// assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::AlreadyExists);

// // Copy errors if it would overwrite a non-empty directory
// let res = fs
// .copy(dest_path.to_str().unwrap(), empty_path.to_str().unwrap())
// .await;
// assert!(res.is_err());
// assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::AlreadyExists);

Ok(())
}
Expand Down
3 changes: 2 additions & 1 deletion data-access/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ pub trait ObjectStore: Sync + Send + Debug {
/// # Examples
///
/// ```
/// use datafusion_data_access::object_store::path_without_scheme;
/// let path = "file://path/to/object";
/// assert_eq(path_without_scheme(path), "path/to/object");
/// assert_eq!(path_without_scheme(path), "path/to/object");
/// ```
pub fn path_without_scheme(full_path: &str) -> &str {
if let Some((_scheme, path)) = full_path.split_once("://") {
Expand Down

0 comments on commit 71b9bb0

Please sign in to comment.