Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug-fix: Send safety for sync function #571

Merged
merged 12 commits into from
Oct 2, 2023
4 changes: 2 additions & 2 deletions crates/esthri/src/ops/delete.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@ use futures::{future, stream::Stream, Future, StreamExt, TryFutureExt};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
use aws_sdk_s3::Client;
use log::{debug, info};
use log::{debug, error, info};
use log_derive::logfn;

use crate::errors::Result;
@@ -103,7 +103,7 @@ pub fn delete_streaming<'a>(
}))
}
Err(err) => {
println!("nothing found in delete_streaming keys");
error!("nothing found in delete_streaming keys");
future::Either::Right(future::ready(Err(err)))
}
}
135 changes: 56 additions & 79 deletions crates/esthri/src/ops/sync.rs
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@
* WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
*/

use async_stream::stream;
use std::{
borrow::Cow,
io::ErrorKind,
@@ -44,7 +45,7 @@ use crate::{
};

struct MappedPathResult {
file_path: Box<dyn AsRef<Path>>,
file_path: Box<dyn AsRef<Path> + Send + Sync>,
source_path: PathBuf,
local_etag: Result<Option<String>>,
metadata: Option<ListingMetadata>,
@@ -286,12 +287,12 @@ where
{
input_stream.map(move |params| async move {
let (source_path, metadata) = params?;
let file_path: Box<dyn AsRef<Path>> = Box::new(source_path.clone());
let file_path: Box<dyn AsRef<Path> + Send + Sync> = Box::new(source_path.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adds +Send+Sync to fix passing sync() to thread.


let (file_path, source_path, local_etag) = match sync_cmd {
SyncCmd::Up => {
let local_etag = if metadata.is_some() {
compute_etag(&source_path).await.map(Option::Some)
compute_etag(&source_path).await.map(Some)
} else {
Ok(None)
};
@@ -302,7 +303,7 @@ where
// should be compressed
let (tmp, _) = compress_to_tempfile(&source_path).await?;
let local_etag = if metadata.is_some() {
compute_etag(tmp.path()).await.map(Option::Some)
compute_etag(tmp.path()).await.map(Some)
} else {
Ok(None)
};
@@ -330,11 +331,11 @@ where
// the local file to see if it matches with
// the compressed version
let (tmp, _) = compress_to_tempfile(&source_path).await?;
let local_etag = compute_etag(tmp.path()).await.map(Option::Some);
let local_etag = compute_etag(tmp.path()).await.map(Some);
(file_path, source_path, local_etag)
}
} else {
let local_etag = compute_etag(&source_path).await.map(Option::Some);
let local_etag = compute_etag(&source_path).await.map(Some);
(file_path, source_path, local_etag)
}
}
@@ -374,7 +375,7 @@ where
.map(move |clones| async move {
let (s3, bucket, key, directory, entry) = clones;
let MappedPathResult {
file_path: filepath,
file_path,
source_path,
local_etag,
metadata: object_info,
@@ -384,17 +385,14 @@ where
let stripped_path = path.strip_prefix(&directory);
let stripped_path = match stripped_path {
Err(e) => {
warn!(
"unexpected: failed to strip prefix: {}, {:?}, {:?}",
e, &path, &directory
);
warn!("unexpected: failed to strip prefix: {e}, {path:?}, {directory:?}");
return Ok(());
}
Ok(result) => result,
};
let remote_path = remote_path.join(stripped_path);
let remote_path = remote_path.to_string_lossy();
debug!("checking remote: {}", remote_path);
debug!("checking remote: {remote_path}");
let local_etag = local_etag?;
let metadata = if transparent_compression {
Some(crate::compression::compressed_file_metadata())
@@ -406,25 +404,19 @@ where
let remote_etag = object_info.e_tag;
let local_etag = local_etag.unwrap();
if remote_etag != local_etag {
info!(
"etag mis-match: {}, remote_etag={}, local_etag={}",
remote_path, remote_etag, local_etag
);
let f = File::open(&*filepath).await?;
info!("etag mis-match: {remote_path}, remote_etag={remote_etag}, local_etag={local_etag}");
let f = File::open(&*file_path).await?;
let reader = BufReader::new(f);
let size = fs::metadata(&*filepath).await?.len();
let size = fs::metadata(&*file_path).await?.len();
upload_from_reader(&s3, bucket, remote_path, reader, size, metadata).await?;
} else {
debug!(
"etags matched: {}, remote_etag={}, local_etag={}",
remote_path, remote_etag, local_etag
);
debug!("etags matched: {remote_path}, remote_etag={remote_etag}, local_etag={local_etag}");
}
} else {
info!("file did not exist remotely: {}", remote_path);
let f = File::open(&*filepath).await?;
info!("file did not exist remotely: {remote_path}");
let f = File::open(&*file_path).await?;
let reader = BufReader::new(f);
let size = fs::metadata(&*filepath).await?.len();
let size = fs::metadata(&*file_path).await?.len();
upload_from_reader(&s3, bucket, remote_path, reader, size, metadata).await?;
}
Ok(())
@@ -450,7 +442,7 @@ async fn sync_local_to_remote(
Err(e) => {
unreachable!(
"unexpected: failed to strip prefix: {}, {:?}, {:?}",
e, &path, &directory
e, path, directory
);
}
};
@@ -475,19 +467,17 @@ async fn sync_local_to_remote(
};

let etag_stream = translate_paths(dirent_stream, cmd).buffer_unordered(task_count);
let sync_tasks = local_to_remote_sync_tasks(
local_to_remote_sync_tasks(
s3.clone(),
bucket.into(),
key.into(),
directory.into(),
etag_stream,
compressed,
);

sync_tasks
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await?;
)
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await?;

if delete {
sync_delete_local(s3, bucket, key, directory, filters, task_count).await
@@ -506,22 +496,19 @@ async fn sync_delete_local(
task_count: usize,
) -> Result<()> {
let stream = flattened_object_listing(s3, bucket, key, directory, filters, false);
let delete_paths_stream = stream.try_filter_map(|object_info| async {
let (path, _, s3_metadata) = object_info;
if let Some(s3_metadata) = s3_metadata {
let fs_metadata = fs::metadata(&path).await;
if let Err(err) = fs_metadata {
if err.kind() == ErrorKind::NotFound {
let obj_path = String::from(key) + &s3_metadata.s3_suffix;
Ok(Some(obj_path))
} else {
Err(err.into())
let delete_paths_stream = stream.try_filter_map(move |obj| {
let key = key.to_string();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ensures Send is satisfied due to &str lifetimes

async {
let (path, _, s3_metadata) = obj;
if let Some(s3_metadata) = s3_metadata {
match fs::metadata(&path).await {
Ok(_) => Ok(None),
Err(e) if e.kind() != ErrorKind::NotFound => Err(e.into()),
Err(_) => Ok(Some(key + &s3_metadata.s3_suffix)),
}
} else {
Ok(None)
Err(Error::MetadataNone)
}
} else {
Err(Error::MetadataNone)
}
});
pin_mut!(delete_paths_stream);
@@ -643,45 +630,35 @@ async fn sync_across(
/// Delete all files in `directory` which do not exist withing a `bucket's` corresponding `directory` key prefix
async fn sync_delete_across(
s3: &Client,
source_bucket: &str,
source_prefix: &str,
destination_bucket: &str,
destination_prefix: &str,
src_bucket: &str,
src_prefix: &str,
dst_bucket: &str,
dst_prefix: &str,
filters: &[GlobFilter],
) -> Result<()> {
let task_count = Config::global().concurrent_sync_tasks();
// Identify all files in destination bucket
let all_files_in_destination_bucket_stream = flattened_object_listing(
s3,
destination_bucket,
destination_prefix,
std::path::Path::new(source_prefix),
filters,
false,
);

let dir = Path::new(&src_prefix);
let bucket_stream = flattened_object_listing(s3, dst_bucket, dst_prefix, dir, filters, false);
// For each file, perform a head_object_request on the source directory to determine if file only exists in the destination
let delete_paths_stream = all_files_in_destination_bucket_stream.try_filter_map(
|(_path, key, _s3_metadata)| async move {
let filename = key.strip_prefix(destination_prefix).unwrap();
let source_key = source_prefix.to_string() + filename;
if head_object_request(s3, source_bucket, &source_key, None)
.await?
.is_none()
{
Ok(Some(source_key))
} else {
Ok(None)
}
},
);
let delete_paths_stream = bucket_stream.try_filter_map(move |(_, key, _)| {
let s3 = s3.clone();
let src_bucket = src_bucket.to_string();
let src_prefix = src_prefix.to_string();
let dst_prefix = dst_prefix.to_string();
Comment on lines +645 to +648
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also satisfying Send trait

async move {
let filename = key.strip_prefix(&dst_prefix).unwrap();
let source_key = src_prefix + filename;
let head_object_info = head_object_request(&s3, &src_bucket, &source_key, None).await?;
Ok(head_object_info.map(|_| source_key))
}
});

// Delete files that exist in destination, but not in source
pin_mut!(delete_paths_stream);
crate::delete_streaming(s3, destination_bucket, delete_paths_stream)
.buffer_unordered(Config::global().concurrent_sync_tasks())
.try_for_each_concurrent(Config::global().concurrent_sync_tasks(), |_| {
future::ready(Ok(()))
})
crate::delete_streaming(s3, &dst_bucket, delete_paths_stream)
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await
}

@@ -700,7 +677,7 @@ fn flattened_object_listing<'a>(
Cow::Owned(format!("{}/", key))
};

async_stream::stream! {
stream! {
let mut stream = list_objects_stream(s3, bucket, prefix);
loop {
let entries = match stream.try_next().await {
3 changes: 1 addition & 2 deletions crates/esthri/src/tempfile.rs
Original file line number Diff line number Diff line change
@@ -68,8 +68,7 @@ impl TempFile {
pub fn path(&self) -> &Path {
&self.path
}

pub fn into_path(self) -> Box<dyn AsRef<Path>> {
pub fn into_path(self) -> Box<dyn AsRef<Path> + Send + Sync> {
Box::new(self.path)
}
}
1 change: 1 addition & 0 deletions crates/esthri/tests/integration/main.rs
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ mod delete_test;
mod download_test;
mod list_object_test;
mod presign;
mod send_test;
mod sync_test;
mod upload_test;

4 changes: 2 additions & 2 deletions crates/esthri/tests/integration/presign.rs
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ async fn test_presign_put() {
let s3_key = esthri_test::randomised_name(&format!("test_upload/{}", filename));
let bucket = esthri_test::TEST_BUCKET;
let opts = EsthriPutOptParamsBuilder::default().build().unwrap();
let presigned_url = presign_put(&s3, bucket, &s3_key, None, opts.clone())
let presigned_url = presign_put(s3, bucket, &s3_key, None, opts.clone())
.await
.unwrap();
upload_file_presigned(&Client::new(), &presigned_url, &filepath, opts)
@@ -84,7 +84,7 @@ async fn test_presign_delete() {
.await
.unwrap()
.is_some());
let presigned_url = presign_delete(&s3, bucket, s3_key, None).await.unwrap();
let presigned_url = presign_delete(s3, bucket, s3_key, None).await.unwrap();
delete_file_presigned(&Client::new(), &presigned_url)
.await
.unwrap();
Loading