Skip to content

Commit

Permalink
bug-fix: Send safety for sync function (#571)
Browse files Browse the repository at this point in the history
Adds send tests and attempts to fix some Send issues
  • Loading branch information
adrian-kong authored Oct 2, 2023
1 parent bfebc10 commit 046257b
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 91 deletions.
4 changes: 2 additions & 2 deletions crates/esthri/src/ops/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::{future, stream::Stream, Future, StreamExt, TryFutureExt};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
use aws_sdk_s3::Client;
use log::{debug, info};
use log::{debug, error, info};
use log_derive::logfn;

use crate::errors::Result;
Expand Down Expand Up @@ -103,7 +103,7 @@ pub fn delete_streaming<'a>(
}))
}
Err(err) => {
println!("nothing found in delete_streaming keys");
error!("nothing found in delete_streaming keys");
future::Either::Right(future::ready(Err(err)))
}
}
Expand Down
135 changes: 56 additions & 79 deletions crates/esthri/src/ops/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
*/

use async_stream::stream;
use std::{
borrow::Cow,
io::ErrorKind,
Expand Down Expand Up @@ -44,7 +45,7 @@ use crate::{
};

struct MappedPathResult {
file_path: Box<dyn AsRef<Path>>,
file_path: Box<dyn AsRef<Path> + Send + Sync>,
source_path: PathBuf,
local_etag: Result<Option<String>>,
metadata: Option<ListingMetadata>,
Expand Down Expand Up @@ -286,12 +287,12 @@ where
{
input_stream.map(move |params| async move {
let (source_path, metadata) = params?;
let file_path: Box<dyn AsRef<Path>> = Box::new(source_path.clone());
let file_path: Box<dyn AsRef<Path> + Send + Sync> = Box::new(source_path.clone());

let (file_path, source_path, local_etag) = match sync_cmd {
SyncCmd::Up => {
let local_etag = if metadata.is_some() {
compute_etag(&source_path).await.map(Option::Some)
compute_etag(&source_path).await.map(Some)
} else {
Ok(None)
};
Expand All @@ -302,7 +303,7 @@ where
// should be compressed
let (tmp, _) = compress_to_tempfile(&source_path).await?;
let local_etag = if metadata.is_some() {
compute_etag(tmp.path()).await.map(Option::Some)
compute_etag(tmp.path()).await.map(Some)
} else {
Ok(None)
};
Expand Down Expand Up @@ -330,11 +331,11 @@ where
// the local file to see if it matches with
// the compressed version
let (tmp, _) = compress_to_tempfile(&source_path).await?;
let local_etag = compute_etag(tmp.path()).await.map(Option::Some);
let local_etag = compute_etag(tmp.path()).await.map(Some);
(file_path, source_path, local_etag)
}
} else {
let local_etag = compute_etag(&source_path).await.map(Option::Some);
let local_etag = compute_etag(&source_path).await.map(Some);
(file_path, source_path, local_etag)
}
}
Expand Down Expand Up @@ -374,7 +375,7 @@ where
.map(move |clones| async move {
let (s3, bucket, key, directory, entry) = clones;
let MappedPathResult {
file_path: filepath,
file_path,
source_path,
local_etag,
metadata: object_info,
Expand All @@ -384,17 +385,14 @@ where
let stripped_path = path.strip_prefix(&directory);
let stripped_path = match stripped_path {
Err(e) => {
warn!(
"unexpected: failed to strip prefix: {}, {:?}, {:?}",
e, &path, &directory
);
warn!("unexpected: failed to strip prefix: {e}, {path:?}, {directory:?}");
return Ok(());
}
Ok(result) => result,
};
let remote_path = remote_path.join(stripped_path);
let remote_path = remote_path.to_string_lossy();
debug!("checking remote: {}", remote_path);
debug!("checking remote: {remote_path}");
let local_etag = local_etag?;
let metadata = if transparent_compression {
Some(crate::compression::compressed_file_metadata())
Expand All @@ -406,25 +404,19 @@ where
let remote_etag = object_info.e_tag;
let local_etag = local_etag.unwrap();
if remote_etag != local_etag {
info!(
"etag mis-match: {}, remote_etag={}, local_etag={}",
remote_path, remote_etag, local_etag
);
let f = File::open(&*filepath).await?;
info!("etag mis-match: {remote_path}, remote_etag={remote_etag}, local_etag={local_etag}");
let f = File::open(&*file_path).await?;
let reader = BufReader::new(f);
let size = fs::metadata(&*filepath).await?.len();
let size = fs::metadata(&*file_path).await?.len();
upload_from_reader(&s3, bucket, remote_path, reader, size, metadata).await?;
} else {
debug!(
"etags matched: {}, remote_etag={}, local_etag={}",
remote_path, remote_etag, local_etag
);
debug!("etags matched: {remote_path}, remote_etag={remote_etag}, local_etag={local_etag}");
}
} else {
info!("file did not exist remotely: {}", remote_path);
let f = File::open(&*filepath).await?;
info!("file did not exist remotely: {remote_path}");
let f = File::open(&*file_path).await?;
let reader = BufReader::new(f);
let size = fs::metadata(&*filepath).await?.len();
let size = fs::metadata(&*file_path).await?.len();
upload_from_reader(&s3, bucket, remote_path, reader, size, metadata).await?;
}
Ok(())
Expand All @@ -450,7 +442,7 @@ async fn sync_local_to_remote(
Err(e) => {
unreachable!(
"unexpected: failed to strip prefix: {}, {:?}, {:?}",
e, &path, &directory
e, path, directory
);
}
};
Expand All @@ -475,19 +467,17 @@ async fn sync_local_to_remote(
};

let etag_stream = translate_paths(dirent_stream, cmd).buffer_unordered(task_count);
let sync_tasks = local_to_remote_sync_tasks(
local_to_remote_sync_tasks(
s3.clone(),
bucket.into(),
key.into(),
directory.into(),
etag_stream,
compressed,
);

sync_tasks
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await?;
)
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await?;

if delete {
sync_delete_local(s3, bucket, key, directory, filters, task_count).await
Expand All @@ -506,22 +496,19 @@ async fn sync_delete_local(
task_count: usize,
) -> Result<()> {
let stream = flattened_object_listing(s3, bucket, key, directory, filters, false);
let delete_paths_stream = stream.try_filter_map(|object_info| async {
let (path, _, s3_metadata) = object_info;
if let Some(s3_metadata) = s3_metadata {
let fs_metadata = fs::metadata(&path).await;
if let Err(err) = fs_metadata {
if err.kind() == ErrorKind::NotFound {
let obj_path = String::from(key) + &s3_metadata.s3_suffix;
Ok(Some(obj_path))
} else {
Err(err.into())
let delete_paths_stream = stream.try_filter_map(move |obj| {
let key = key.to_string();
async {
let (path, _, s3_metadata) = obj;
if let Some(s3_metadata) = s3_metadata {
match fs::metadata(&path).await {
Ok(_) => Ok(None),
Err(e) if e.kind() != ErrorKind::NotFound => Err(e.into()),
Err(_) => Ok(Some(key + &s3_metadata.s3_suffix)),
}
} else {
Ok(None)
Err(Error::MetadataNone)
}
} else {
Err(Error::MetadataNone)
}
});
pin_mut!(delete_paths_stream);
Expand Down Expand Up @@ -643,45 +630,35 @@ async fn sync_across(
/// Delete all files in `directory` which do not exist withing a `bucket's` corresponding `directory` key prefix
async fn sync_delete_across(
s3: &Client,
source_bucket: &str,
source_prefix: &str,
destination_bucket: &str,
destination_prefix: &str,
src_bucket: &str,
src_prefix: &str,
dst_bucket: &str,
dst_prefix: &str,
filters: &[GlobFilter],
) -> Result<()> {
let task_count = Config::global().concurrent_sync_tasks();
// Identify all files in destination bucket
let all_files_in_destination_bucket_stream = flattened_object_listing(
s3,
destination_bucket,
destination_prefix,
std::path::Path::new(source_prefix),
filters,
false,
);

let dir = Path::new(&src_prefix);
let bucket_stream = flattened_object_listing(s3, dst_bucket, dst_prefix, dir, filters, false);
// For each file, perform a head_object_request on the source directory to determine if file only exists in the destination
let delete_paths_stream = all_files_in_destination_bucket_stream.try_filter_map(
|(_path, key, _s3_metadata)| async move {
let filename = key.strip_prefix(destination_prefix).unwrap();
let source_key = source_prefix.to_string() + filename;
if head_object_request(s3, source_bucket, &source_key, None)
.await?
.is_none()
{
Ok(Some(source_key))
} else {
Ok(None)
}
},
);
let delete_paths_stream = bucket_stream.try_filter_map(move |(_, key, _)| {
let s3 = s3.clone();
let src_bucket = src_bucket.to_string();
let src_prefix = src_prefix.to_string();
let dst_prefix = dst_prefix.to_string();
async move {
let filename = key.strip_prefix(&dst_prefix).unwrap();
let source_key = src_prefix + filename;
let head_object_info = head_object_request(&s3, &src_bucket, &source_key, None).await?;
Ok(head_object_info.map(|_| source_key))
}
});

// Delete files that exist in destination, but not in source
pin_mut!(delete_paths_stream);
crate::delete_streaming(s3, destination_bucket, delete_paths_stream)
.buffer_unordered(Config::global().concurrent_sync_tasks())
.try_for_each_concurrent(Config::global().concurrent_sync_tasks(), |_| {
future::ready(Ok(()))
})
crate::delete_streaming(s3, &dst_bucket, delete_paths_stream)
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await
}

Expand All @@ -700,7 +677,7 @@ fn flattened_object_listing<'a>(
Cow::Owned(format!("{}/", key))
};

async_stream::stream! {
stream! {
let mut stream = list_objects_stream(s3, bucket, prefix);
loop {
let entries = match stream.try_next().await {
Expand Down
3 changes: 1 addition & 2 deletions crates/esthri/src/tempfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ impl TempFile {
pub fn path(&self) -> &Path {
&self.path
}

pub fn into_path(self) -> Box<dyn AsRef<Path>> {
pub fn into_path(self) -> Box<dyn AsRef<Path> + Send + Sync> {
Box::new(self.path)
}
}
1 change: 1 addition & 0 deletions crates/esthri/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod delete_test;
mod download_test;
mod list_object_test;
mod presign;
mod send_test;
mod sync_test;
mod upload_test;

Expand Down
4 changes: 2 additions & 2 deletions crates/esthri/tests/integration/presign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn test_presign_put() {
let s3_key = esthri_test::randomised_name(&format!("test_upload/{}", filename));
let bucket = esthri_test::TEST_BUCKET;
let opts = EsthriPutOptParamsBuilder::default().build().unwrap();
let presigned_url = presign_put(&s3, bucket, &s3_key, None, opts.clone())
let presigned_url = presign_put(s3, bucket, &s3_key, None, opts.clone())
.await
.unwrap();
upload_file_presigned(&Client::new(), &presigned_url, &filepath, opts)
Expand Down Expand Up @@ -84,7 +84,7 @@ async fn test_presign_delete() {
.await
.unwrap()
.is_some());
let presigned_url = presign_delete(&s3, bucket, s3_key, None).await.unwrap();
let presigned_url = presign_delete(s3, bucket, s3_key, None).await.unwrap();
delete_file_presigned(&Client::new(), &presigned_url)
.await
.unwrap();
Expand Down
Loading

0 comments on commit 046257b

Please sign in to comment.