Skip to content

Commit

Permalink
Get all but copy working
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Apr 17, 2022
1 parent e9c52c9 commit 7c19385
Show file tree
Hide file tree
Showing 3 changed files with 344 additions and 21 deletions.
2 changes: 1 addition & 1 deletion data-access/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "src/lib.rs"

[dependencies]
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
chrono = { version = "0.4"}
futures = "0.3"
parking_lot = "0.12"
tempfile = "3"
Expand Down
323 changes: 307 additions & 16 deletions data-access/src/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@
//! Object store that represents the Local File System.
use std::fs::{self, File, Metadata};
use std::io::{self, Write};
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::io::{self, BufReader, Read, Seek, SeekFrom, Write};
use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::{stream, AsyncRead, StreamExt};
use tokio::io::AsyncWrite;
use tokio::fs::File as AsyncFile;
use tokio::{fs::File as AsyncFile, io::AsyncWrite};

use crate::{FileMeta, Result, SizedFile};

use super::{
FileMetaStream, ListEntryStream, ObjectReader, ObjectWriter, ObjectReaderStream, ObjectStore,
path_without_scheme
path_without_scheme, FileMetaStream, ListEntryStream, ObjectReader,
ObjectReaderStream, ObjectStore, ObjectWriter,
};

pub static LOCAL_SCHEME: &str = "file";
Expand Down Expand Up @@ -60,10 +59,66 @@ impl ObjectStore for LocalFileSystem {
Ok(Arc::new(LocalFileReader::new(file)?))
}

fn file_writer(&self, path: String) -> Result<Arc<dyn ObjectWriter>> {
let path = path_without_scheme(&path).to_string();
fn file_writer(&self, path: &str) -> Result<Arc<dyn ObjectWriter>> {
let path = path_without_scheme(path).to_string();
Ok(Arc::new(LocalFileWriter::new(path)?))
}

async fn create_dir(&self, path: &str, recursive: bool) -> Result<()> {
let res = match recursive {
false => tokio::fs::create_dir(path).await,
true => tokio::fs::create_dir_all(path).await,
};
match res {
Ok(()) => Ok(()),
Err(ref e) if e.kind() == io::ErrorKind::AlreadyExists => Ok(()),
Err(err) => Err(err),
}
}

async fn remove_dir_all(&self, path: &str) -> Result<()> {
tokio::fs::remove_dir_all(path).await
}

async fn remove_dir_contents(&self, path: &str) -> Result<()> {
let mut entries = tokio::fs::read_dir(path).await?;
while let Some(entry) = entries.next_entry().await? {
if entry.file_type().await?.is_dir() {
self.remove_dir_all(entry.path().to_str().unwrap()).await?;
} else {
self.remove_file(entry.path().to_str().unwrap()).await?;
}
}
Ok(())
}

async fn remove_file(&self, path: &str) -> Result<()> {
let res = tokio::fs::remove_file(path).await;
match res {
Ok(()) => Ok(()),
Err(e) if e.kind() == io::ErrorKind::PermissionDenied => {
// If path is a directory, we should return InvalidInput instead
if tokio::fs::metadata(path).await?.is_dir() {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"was a directory",
))
} else {
Err(e)
}
}
Err(err) => Err(err),
}
}

async fn rename(&self, source: &str, dest: &str) -> Result<()> {
tokio::fs::rename(source, dest).await
}

async fn copy(&self, source: &str, dest: &str) -> Result<()> {
tokio::fs::copy(source, dest).await?;
Ok(())
}
}

struct LocalFileReader {
Expand Down Expand Up @@ -190,12 +245,12 @@ pub fn local_unpartitioned_file(file: String) -> FileMeta {
size: metadata.len(),
path: file,
},
last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
last_modified: metadata.modified().map(DateTime::<Utc>::from).ok(),
}
}

struct LocalFileWriter {
path: String
path: String,
}

impl LocalFileWriter {
Expand All @@ -222,19 +277,20 @@ mod tests {
use super::*;
use futures::StreamExt;
use std::collections::HashSet;
use std::fs::create_dir;
use std::fs::File;
use std::fs::{create_dir, read_dir};
use tempfile::tempdir;

#[tokio::test]
async fn test_recursive_listing() -> Result<()> {
// tmp/a.txt
// tmp/x/b.txt
// tmp/y/c.txt
let tmp = tempdir()?;
let x_path = tmp.path().join("x");
let y_path = tmp.path().join("y");
let a_path = tmp.path().join("a.txt");
let tmp_dir = tempdir()?;
let tmp = tmp_dir.path();
let x_path = tmp.join("x");
let y_path = tmp.join("y");
let a_path = tmp.join("a.txt");
let b_path = x_path.join("b.txt");
let c_path = y_path.join("c.txt");
create_dir(&x_path)?;
Expand All @@ -244,7 +300,7 @@ mod tests {
File::create(&c_path)?;

let mut all_files = HashSet::new();
let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
let mut files = list_all(tmp.to_str().unwrap().to_string()).await?;
while let Some(file) = files.next().await {
let file = file?;
assert_eq!(file.size(), 0);
Expand All @@ -258,4 +314,239 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_create_dir() -> Result<()> {
let tmp_dir = tempdir()?;
let tmp = tmp_dir.path();

let fs = LocalFileSystem;

// Create directory succeeds
let z_path = tmp.join("z");
fs.create_dir(z_path.to_str().unwrap(), false).await?;
assert!(z_path.exists());

// Create recursive directory succeeds
let rec_path = tmp.join("w").join("a");
fs.create_dir(rec_path.to_str().unwrap(), true).await?;
assert!(rec_path.exists());

// Returns Ok if already exists
fs.create_dir(tmp.to_str().unwrap(), false).await?;

Ok(())
}

#[tokio::test]
async fn test_remove_dir() -> Result<()> {
// tmp/a.txt
// tmp/x/b.txt
let tmp_dir = tempdir()?;
let tmp = tmp_dir.path();
let x_path = tmp.join("x");
let a_path = tmp.join("a.txt");
let b_path = x_path.join("b.txt");
create_dir(&x_path)?;
File::create(&a_path)?;
File::create(&b_path)?;

let fs = LocalFileSystem;

// Delete contents tmp means tmp is empty
fs.remove_dir_contents(tmp.to_str().unwrap()).await?;
assert!(tmp.exists());
assert_eq!(read_dir(tmp)?.count(), 0);

// Delete tmp means no tmp
fs.remove_dir_all(tmp.to_str().unwrap()).await?;
assert!(!tmp.exists());

Ok(())
}

#[tokio::test]
async fn test_remove_file() -> Result<()> {
// tmp/a.txt
// tmp/x
let tmp_dir = tempdir()?;
let tmp = tmp_dir.path();
let x_path = tmp.join("x");
let a_path = tmp.join("a.txt");
create_dir(&x_path)?;
File::create(&a_path)?;

let fs = LocalFileSystem;

// Delete existing file works
fs.remove_file(a_path.to_str().unwrap()).await?;
assert!(!a_path.exists());

// Delete non-existent file errors
let res = fs
.remove_file(tmp.join("missing.txt").to_str().unwrap())
.await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::NotFound);

// Delete file on directory errors
let res = fs.remove_file(x_path.to_str().unwrap()).await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::InvalidInput);

Ok(())
}

#[tokio::test]
async fn test_rename() -> Result<()> {
// tmp/a.txt
// tmp/b.txt
// tmp/x/b.txt
// tmp/empty/
// tmp/y/c.txt
// tmp/y/z/d.txt
let tmp_dir = tempdir()?;
let tmp = tmp_dir.path();
let x_path = tmp.join("x");
let empty_path = tmp.join("empty");
let y_path = tmp.join("y");
let z_path = y_path.join("z");
let a_path = tmp.join("a.txt");
let b_path = tmp.join("b.txt");
let x_b_path = x_path.join("b.txt");
let c_path = y_path.join("c.txt");
let d_path = z_path.join("d.txt");
create_dir(&x_path)?;
create_dir(&empty_path)?;
create_dir(&y_path)?;
create_dir(&z_path)?;
File::create(&a_path)?;
File::create(&b_path)?;
File::create(&x_b_path)?;
File::create(&c_path)?;
File::create(&d_path)?;

let fs = LocalFileSystem;

// Can rename a file, and it will exist at dest and not at source
let a2_path = tmp.join("a2.txt");
fs.rename(a_path.to_str().unwrap(), a2_path.to_str().unwrap())
.await?;
assert!(!a_path.exists());
assert!(a2_path.exists());

// rename replaces files
let test_content = b"test";
let mut f = File::create(&a_path)?;
f.write(test_content)?;
f.flush()?;
fs.rename(a_path.to_str().unwrap(), b_path.to_str().unwrap())
.await?;
assert!(!a_path.exists());
assert!(b_path.exists());
let mut f = File::open(&b_path)?;
let mut actual_content = Vec::new();
f.read_to_end(&mut actual_content)?;
assert_eq!(actual_content, test_content);

// Can rename a directory, and it will recursively copy contents
let dest_path = tmp.join("v");
fs.rename(y_path.to_str().unwrap(), dest_path.to_str().unwrap())
.await?;
assert!(!y_path.exists());
assert!(dest_path.exists());
assert!(dest_path.join("c.txt").exists());
assert!(dest_path.join("z").join("d.txt").exists());

// rename errors if it would overwrite non-empty directory
let res = fs
.rename(dest_path.to_str().unwrap(), x_path.to_str().unwrap())
.await;
assert!(res.is_err());
// We cannot test for specific error. See: https://diziet.dreamwidth.org/9894.html

// rename succeeds if it would overwrite an empty directory
fs.rename(dest_path.to_str().unwrap(), empty_path.to_str().unwrap())
.await?;

Ok(())
}

#[tokio::test]
async fn test_copy() -> Result<()> {
// tmp/a.txt
// tmp/b.txt
// tmp/x/b.txt
// tmp/empty/
// tmp/y/c.txt
// tmp/y/z/d.txt
let tmp_dir = tempdir()?;
let tmp = tmp_dir.path();
let x_path = tmp.join("x");
let empty_path = tmp.join("empty");
let y_path = tmp.join("y");
let z_path = y_path.join("z");
let a_path = tmp.join("a.txt");
let b_path = tmp.join("b.txt");
let x_b_path = x_path.join("b.txt");
let c_path = y_path.join("c.txt");
let d_path = z_path.join("d.txt");
create_dir(&x_path)?;
create_dir(&empty_path)?;
create_dir(&y_path)?;
create_dir(&z_path)?;
File::create(&a_path)?;
File::create(&b_path)?;
File::create(&x_b_path)?;
File::create(&c_path)?;
File::create(&d_path)?;

let fs = LocalFileSystem;

// Can copy a file, and it will exist at dest and source
let a2_path = tmp.join("a2.txt");
fs.copy(a_path.to_str().unwrap(), a2_path.to_str().unwrap())
.await?;
assert!(a_path.exists());
assert!(a2_path.exists());

// Copy replaces files
let test_content = b"test";
let mut f = File::create(&a_path)?;
f.write(test_content)?;
f.flush()?;
fs.copy(a_path.to_str().unwrap(), b_path.to_str().unwrap())
.await?;
assert!(a_path.exists());
assert!(b_path.exists());
let mut f = File::open(&b_path)?;
let mut actual_content = Vec::new();
f.read_to_end(&mut actual_content)?;
assert_eq!(actual_content, test_content);

// Can copy a directory, and it will recursively copy contents
let dest_path = tmp.join("v");
fs.copy(y_path.to_str().unwrap(), dest_path.to_str().unwrap())
.await?;
assert!(y_path.exists());
assert!(dest_path.exists());
assert!(dest_path.join("c.txt").exists());
assert!(dest_path.join("z").join("d.txt").exists());

// Copy errors if it would overwrite a non-empty directory
let res = fs
.copy(dest_path.to_str().unwrap(), x_path.to_str().unwrap())
.await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::AlreadyExists);

// Copy errors if it would overwrite a non-empty directory
let res = fs
.copy(dest_path.to_str().unwrap(), empty_path.to_str().unwrap())
.await;
assert!(res.is_err());
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::AlreadyExists);

Ok(())
}
}
Loading

0 comments on commit 7c19385

Please sign in to comment.