Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow filesystem backend put_obj to overwrite existing #376

Merged
merged 4 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions rust/src/storage/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,21 @@ impl StorageBackend for FileStorageBackend {
f.sync_all().await?;
drop(f);

match self.rename_obj(tmp_path, path).await {
// atomic rename with swap=true only possible if both paths exists.
let swap = Path::new(path).exists();

match rename::atomic_rename(tmp_path, path, swap) {
Ok(_) => Ok(()),
Err(e) => {
// If rename failed, clean up the temp file.
self.delete_obj(tmp_path).await?;
//self.delete_obj(tmp_path).await?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😞

Err(e)
}
}
}

async fn rename_obj(&self, src: &str, dst: &str) -> Result<(), StorageError> {
rename::atomic_rename(src, dst)
rename::atomic_rename(src, dst, false)
}

async fn delete_obj(&self, path: &str) -> Result<(), StorageError> {
Expand Down
60 changes: 49 additions & 11 deletions rust/src/storage/file/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::StorageError;
mod imp {
use super::*;

pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> {
pub fn atomic_rename(from: &str, to: &str, _swap: bool) -> Result<(), StorageError> {
// doing best effort in windows since there is no native atomic rename support
if std::fs::metadata(to).is_ok() {
return Err(StorageError::AlreadyExists(to.to_string()));
Expand All @@ -24,10 +24,10 @@ mod imp {
CString::new(p).map_err(|e| StorageError::Generic(format!("{}", e)))
}

pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> {
pub fn atomic_rename(from: &str, to: &str, swap: bool) -> Result<(), StorageError> {
let cs_from = to_c_string(from)?;
let cs_to = to_c_string(to)?;
let ret = unsafe { platform_specific_rename(cs_from.as_ptr(), cs_to.as_ptr()) };
let ret = unsafe { platform_specific_rename(cs_from.as_ptr(), cs_to.as_ptr(), swap) };

if ret != 0 {
let e = errno::errno();
Expand All @@ -45,29 +45,39 @@ mod imp {
}

#[allow(unused_variables)]
unsafe fn platform_specific_rename(from: *const libc::c_char, to: *const libc::c_char) -> i32 {
unsafe fn platform_specific_rename(
from: *const libc::c_char,
to: *const libc::c_char,
swap: bool,
) -> i32 {
cfg_if::cfg_if! {
if #[cfg(all(target_os = "linux", target_env = "gnu"))] {
cfg_if::cfg_if! {
if #[cfg(glibc_renameat2)] {
libc::renameat2(libc::AT_FDCWD, from, libc::AT_FDCWD, to, libc::RENAME_NOREPLACE)
let flag = if swap { libc::RENAME_EXCHANGE } else { libc::RENAME_NOREPLACE };
libc::renameat2(libc::AT_FDCWD, from, libc::AT_FDCWD, to, flag)
} else {
// target has old glibc (< 2.28), we would need to invoke syscall manually
unimplemented!()
}
}
} else if #[cfg(target_os = "macos")] {
libc::renamex_np(from, to, libc::RENAME_EXCL)
let flag = if swap { libc::RENAME_SWAP } else { libc::RENAME_EXCL };
libc::renamex_np(from, to, flag)
} else {
unimplemented!()
}
}
}
}

/// Atomically renames `from` to `to`.
/// - if `swap` is `true` then both `from` and `to` have to exist;
/// - if `swap` is `false` then `from` has to exist, but `to` is not;
/// otherwise the operation will fail.
#[inline]
pub fn atomic_rename(from: &str, to: &str) -> Result<(), StorageError> {
imp::atomic_rename(from, to)
pub fn atomic_rename(from: &str, to: &str, swap: bool) -> Result<(), StorageError> {
imp::atomic_rename(from, to, swap)
}

#[cfg(test)]
Expand All @@ -85,7 +95,7 @@ mod tests {
let c = &tmp_dir.path().join("c");

// unsuccessful move not_exists to C, not_exists is missing
match atomic_rename("not_exists", c.to_str().unwrap()) {
match atomic_rename("not_exists", c.to_str().unwrap(), false) {
Err(StorageError::Io { source: e }) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "windows")] {
Expand Down Expand Up @@ -114,18 +124,46 @@ mod tests {
// successful move A to C
assert!(a.exists());
assert!(!c.exists());
atomic_rename(a.to_str().unwrap(), c.to_str().unwrap()).unwrap();
atomic_rename(a.to_str().unwrap(), c.to_str().unwrap(), false).unwrap();
assert!(!a.exists());
assert!(c.exists());

// unsuccessful move B to C, C already exists, B is not deleted
assert!(b.exists());
match atomic_rename(b.to_str().unwrap(), c.to_str().unwrap()) {
match atomic_rename(b.to_str().unwrap(), c.to_str().unwrap(), false) {
Err(StorageError::AlreadyExists(p)) => assert_eq!(p, c.to_str().unwrap()),
_ => panic!("unexpected"),
}
assert!(b.exists());
assert_eq!(std::fs::read_to_string(c).unwrap(), "a");

// until https://github.com/delta-io/delta-rs/issues/377 is resolved
cfg_if::cfg_if! {
if #[cfg(target_os = "windows")] {
if true {
return;
}
}
}

// successful swaps B to C
atomic_rename(b.to_str().unwrap(), c.to_str().unwrap(), true).unwrap();
assert!(b.exists());
assert!(c.exists());
assert_eq!(std::fs::read_to_string(b).unwrap(), "a");
assert_eq!(std::fs::read_to_string(c).unwrap(), "b");

// unsuccessful swap C to D, D does not exist
let d = &tmp_dir.path().join("d");
assert!(!d.exists());
match atomic_rename(c.to_str().unwrap(), d.to_str().unwrap(), true) {
Err(StorageError::Io { source }) => {
assert!(source.to_string().starts_with("failed to rename"));
}
_ => panic!("unexpected"),
}
assert!(c.exists());
assert!(!d.exists());
}

fn create_file(dir: &Path, name: &str) -> PathBuf {
Expand Down