Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug-fix: Send safety for sync function #571

Merged
merged 12 commits into from
Oct 2, 2023
3 changes: 2 additions & 1 deletion crates/esthri/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ pub async fn compress_to_tempfile(path: &Path) -> Result<(TempFile, u64)> {
};
let mut dest = TempFile::new(dir, Some(".gz")).await?;
let new_size = io::copy(&mut src, dest.file_mut()).await?;
let m = tokio::fs::metadata(dest.path()).await?;
dest.rewind().await?;
debug!("new file size: {}", new_size);
debug!("new file size: {}", m.len());
Ok((dest, new_size))
}

Expand Down
4 changes: 2 additions & 2 deletions crates/esthri/src/ops/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use futures::{future, stream::Stream, Future, StreamExt, TryFutureExt};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
use aws_sdk_s3::Client;
use log::{debug, info};
use log::{debug, error, info};
use log_derive::logfn;

use crate::errors::Result;
Expand Down Expand Up @@ -103,7 +103,7 @@ pub fn delete_streaming<'a>(
}))
}
Err(err) => {
println!("nothing found in delete_streaming keys");
error!("nothing found in delete_streaming keys");
future::Either::Right(future::ready(Err(err)))
}
}
Expand Down
135 changes: 56 additions & 79 deletions crates/esthri/src/ops/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A PARTICULAR PURPOSE.
*/

use async_stream::stream;
use std::{
borrow::Cow,
io::ErrorKind,
Expand Down Expand Up @@ -44,7 +45,7 @@ use crate::{
};

struct MappedPathResult {
file_path: Box<dyn AsRef<Path>>,
file_path: Box<dyn AsRef<Path> + Send + Sync>,
source_path: PathBuf,
local_etag: Result<Option<String>>,
metadata: Option<ListingMetadata>,
Expand Down Expand Up @@ -286,12 +287,12 @@ where
{
input_stream.map(move |params| async move {
let (source_path, metadata) = params?;
let file_path: Box<dyn AsRef<Path>> = Box::new(source_path.clone());
let file_path: Box<dyn AsRef<Path> + Send + Sync> = Box::new(source_path.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adds +Send+Sync to fix passing sync() to thread.


let (file_path, source_path, local_etag) = match sync_cmd {
SyncCmd::Up => {
let local_etag = if metadata.is_some() {
compute_etag(&source_path).await.map(Option::Some)
compute_etag(&source_path).await.map(Some)
} else {
Ok(None)
};
Expand All @@ -302,7 +303,7 @@ where
// should be compressed
let (tmp, _) = compress_to_tempfile(&source_path).await?;
let local_etag = if metadata.is_some() {
compute_etag(tmp.path()).await.map(Option::Some)
compute_etag(tmp.path()).await.map(Some)
} else {
Ok(None)
};
Expand Down Expand Up @@ -330,11 +331,11 @@ where
// the local file to see if it matches with
// the compressed version
let (tmp, _) = compress_to_tempfile(&source_path).await?;
let local_etag = compute_etag(tmp.path()).await.map(Option::Some);
let local_etag = compute_etag(tmp.path()).await.map(Some);
(file_path, source_path, local_etag)
}
} else {
let local_etag = compute_etag(&source_path).await.map(Option::Some);
let local_etag = compute_etag(&source_path).await.map(Some);
(file_path, source_path, local_etag)
}
}
Expand Down Expand Up @@ -374,7 +375,7 @@ where
.map(move |clones| async move {
let (s3, bucket, key, directory, entry) = clones;
let MappedPathResult {
file_path: filepath,
file_path,
source_path,
local_etag,
metadata: object_info,
Expand All @@ -384,17 +385,14 @@ where
let stripped_path = path.strip_prefix(&directory);
let stripped_path = match stripped_path {
Err(e) => {
warn!(
"unexpected: failed to strip prefix: {}, {:?}, {:?}",
e, &path, &directory
);
warn!("unexpected: failed to strip prefix: {e}, {path:?}, {directory:?}");
return Ok(());
}
Ok(result) => result,
};
let remote_path = remote_path.join(stripped_path);
let remote_path = remote_path.to_string_lossy();
debug!("checking remote: {}", remote_path);
debug!("checking remote: {remote_path}");
let local_etag = local_etag?;
let metadata = if transparent_compression {
Some(crate::compression::compressed_file_metadata())
Expand All @@ -406,25 +404,19 @@ where
let remote_etag = object_info.e_tag;
let local_etag = local_etag.unwrap();
if remote_etag != local_etag {
info!(
"etag mis-match: {}, remote_etag={}, local_etag={}",
remote_path, remote_etag, local_etag
);
let f = File::open(&*filepath).await?;
info!("etag mis-match: {remote_path}, remote_etag={remote_etag}, local_etag={local_etag}");
let f = File::open(&*file_path).await?;
let reader = BufReader::new(f);
let size = fs::metadata(&*filepath).await?.len();
let size = fs::metadata(&*file_path).await?.len();
upload_from_reader(&s3, bucket, remote_path, reader, size, metadata).await?;
} else {
debug!(
"etags matched: {}, remote_etag={}, local_etag={}",
remote_path, remote_etag, local_etag
);
debug!("etags matched: {remote_path}, remote_etag={remote_etag}, local_etag={local_etag}");
}
} else {
info!("file did not exist remotely: {}", remote_path);
let f = File::open(&*filepath).await?;
info!("file did not exist remotely: {remote_path}");
let f = File::open(&*file_path).await?;
let reader = BufReader::new(f);
let size = fs::metadata(&*filepath).await?.len();
let size = fs::metadata(&*file_path).await?.len();
upload_from_reader(&s3, bucket, remote_path, reader, size, metadata).await?;
}
Ok(())
Expand All @@ -450,7 +442,7 @@ async fn sync_local_to_remote(
Err(e) => {
unreachable!(
"unexpected: failed to strip prefix: {}, {:?}, {:?}",
e, &path, &directory
e, path, directory
);
}
};
Expand All @@ -475,19 +467,17 @@ async fn sync_local_to_remote(
};

let etag_stream = translate_paths(dirent_stream, cmd).buffer_unordered(task_count);
let sync_tasks = local_to_remote_sync_tasks(
local_to_remote_sync_tasks(
s3.clone(),
bucket.into(),
key.into(),
directory.into(),
etag_stream,
compressed,
);

sync_tasks
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await?;
)
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await?;

if delete {
sync_delete_local(s3, bucket, key, directory, filters, task_count).await
Expand All @@ -506,22 +496,19 @@ async fn sync_delete_local(
task_count: usize,
) -> Result<()> {
let stream = flattened_object_listing(s3, bucket, key, directory, filters, false);
let delete_paths_stream = stream.try_filter_map(|object_info| async {
let (path, _, s3_metadata) = object_info;
if let Some(s3_metadata) = s3_metadata {
let fs_metadata = fs::metadata(&path).await;
if let Err(err) = fs_metadata {
if err.kind() == ErrorKind::NotFound {
let obj_path = String::from(key) + &s3_metadata.s3_suffix;
Ok(Some(obj_path))
} else {
Err(err.into())
let delete_paths_stream = stream.try_filter_map(move |obj| {
let key = key.to_string();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this ensures Send is satisfied due to &str lifetimes

async {
let (path, _, s3_metadata) = obj;
if let Some(s3_metadata) = s3_metadata {
match fs::metadata(&path).await {
Ok(_) => Ok(None),
Err(e) if e.kind() != ErrorKind::NotFound => Err(e.into()),
Err(_) => Ok(Some(key + &s3_metadata.s3_suffix)),
}
} else {
Ok(None)
Err(Error::MetadataNone)
}
} else {
Err(Error::MetadataNone)
}
});
pin_mut!(delete_paths_stream);
Expand Down Expand Up @@ -643,45 +630,35 @@ async fn sync_across(
/// Delete all files in `directory` which do not exist withing a `bucket's` corresponding `directory` key prefix
async fn sync_delete_across(
s3: &Client,
source_bucket: &str,
source_prefix: &str,
destination_bucket: &str,
destination_prefix: &str,
src_bucket: &str,
src_prefix: &str,
dst_bucket: &str,
dst_prefix: &str,
filters: &[GlobFilter],
) -> Result<()> {
let task_count = Config::global().concurrent_sync_tasks();
// Identify all files in destination bucket
let all_files_in_destination_bucket_stream = flattened_object_listing(
s3,
destination_bucket,
destination_prefix,
std::path::Path::new(source_prefix),
filters,
false,
);

let dir = Path::new(&src_prefix);
let bucket_stream = flattened_object_listing(s3, dst_bucket, dst_prefix, dir, filters, false);
// For each file, perform a head_object_request on the source directory to determine if file only exists in the destination
let delete_paths_stream = all_files_in_destination_bucket_stream.try_filter_map(
|(_path, key, _s3_metadata)| async move {
let filename = key.strip_prefix(destination_prefix).unwrap();
let source_key = source_prefix.to_string() + filename;
if head_object_request(s3, source_bucket, &source_key, None)
.await?
.is_none()
{
Ok(Some(source_key))
} else {
Ok(None)
}
},
);
let delete_paths_stream = bucket_stream.try_filter_map(move |(_, key, _)| {
let s3 = s3.clone();
let src_bucket = src_bucket.to_string();
let src_prefix = src_prefix.to_string();
let dst_prefix = dst_prefix.to_string();
Comment on lines +645 to +648
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also satisfying Send trait

async move {
let filename = key.strip_prefix(&dst_prefix).unwrap();
let source_key = src_prefix + filename;
let head_object_info = head_object_request(&s3, &src_bucket, &source_key, None).await?;
Ok(head_object_info.map(|_| source_key))
}
});

// Delete files that exist in destination, but not in source
pin_mut!(delete_paths_stream);
crate::delete_streaming(s3, destination_bucket, delete_paths_stream)
.buffer_unordered(Config::global().concurrent_sync_tasks())
.try_for_each_concurrent(Config::global().concurrent_sync_tasks(), |_| {
future::ready(Ok(()))
})
crate::delete_streaming(s3, &dst_bucket, delete_paths_stream)
.buffer_unordered(task_count)
.try_for_each_concurrent(task_count, |_| future::ready(Ok(())))
.await
}

Expand All @@ -700,7 +677,7 @@ fn flattened_object_listing<'a>(
Cow::Owned(format!("{}/", key))
};

async_stream::stream! {
stream! {
let mut stream = list_objects_stream(s3, bucket, prefix);
loop {
let entries = match stream.try_next().await {
Expand Down
3 changes: 1 addition & 2 deletions crates/esthri/src/tempfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ impl TempFile {
pub fn path(&self) -> &Path {
&self.path
}

pub fn into_path(self) -> Box<dyn AsRef<Path>> {
pub fn into_path(self) -> Box<dyn AsRef<Path> + Send + Sync> {
Box::new(self.path)
}
}
1 change: 1 addition & 0 deletions crates/esthri/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod delete_test;
mod download_test;
mod list_object_test;
mod presign;
mod send_test;
mod sync_test;
mod upload_test;

Expand Down
4 changes: 2 additions & 2 deletions crates/esthri/tests/integration/presign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn test_presign_put() {
let s3_key = esthri_test::randomised_name(&format!("test_upload/{}", filename));
let bucket = esthri_test::TEST_BUCKET;
let opts = EsthriPutOptParamsBuilder::default().build().unwrap();
let presigned_url = presign_put(&s3, bucket, &s3_key, None, opts.clone())
let presigned_url = presign_put(s3, bucket, &s3_key, None, opts.clone())
.await
.unwrap();
upload_file_presigned(&Client::new(), &presigned_url, &filepath, opts)
Expand Down Expand Up @@ -84,7 +84,7 @@ async fn test_presign_delete() {
.await
.unwrap()
.is_some());
let presigned_url = presign_delete(&s3, bucket, s3_key, None).await.unwrap();
let presigned_url = presign_delete(s3, bucket, s3_key, None).await.unwrap();
delete_file_presigned(&Client::new(), &presigned_url)
.await
.unwrap();
Expand Down
Loading