From b9224eb05891166d40fe7a287b1eba0dd0c145d4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 10 May 2024 19:47:59 +0100 Subject: [PATCH 1/2] Add additional WriteMultipart tests (#5743) --- object_store/src/integration.rs | 12 ++++++ object_store/src/upload.rs | 69 +++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index d08c4509f366..31b074f4987a 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -799,6 +799,18 @@ pub async fn stream_get(storage: &DynObjectStore) { let meta = storage.head(&location).await.unwrap(); assert_eq!(meta.size, 6); + let location = Path::from("test_dir/test_put_part_mixed.txt"); + let upload = storage.put_multipart(&location).await.unwrap(); + let mut write = WriteMultipart::new(upload); + write.put(vec![0; 2].into()); + write.write(&[1, 2, 3]); + write.put(vec![4, 5, 6, 7].into()); + write.finish().await.unwrap(); + + let r = storage.get(&location).await.unwrap(); + let r = r.bytes().await.unwrap(); + assert_eq!(r.as_ref(), &[0, 0, 1, 2, 3, 4, 5, 6, 7]); + // We can abort an empty write let location = Path::from("test_dir/test_abort_upload.txt"); let mut upload = storage.put_multipart(&location).await.unwrap(); diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 9805df0dda73..728cf45b5067 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -217,9 +217,13 @@ impl WriteMultipart { #[cfg(test)] mod tests { + use std::sync::Arc; use std::time::Duration; use futures::FutureExt; + use parking_lot::Mutex; + use rand::prelude::StdRng; + use rand::{Rng, SeedableRng}; use crate::memory::InMemory; use crate::path::Path; @@ -246,4 +250,69 @@ mod tests { assert!(write.wait_for_capacity(10).now_or_never().is_none()); write.wait_for_capacity(10).await.unwrap() } + + #[derive(Debug, Default)] + struct InstrumentedUpload { + chunks: Arc>>, + } + + #[async_trait] + impl MultipartUpload for InstrumentedUpload { + fn put_part(&mut self, data: PutPayload) -> UploadPart { + self.chunks.lock().push(data); + futures::future::ready(Ok(())).boxed() + } + + async fn complete(&mut self) -> Result { + Ok(PutResult { + e_tag: None, + version: None, + }) + } + + async fn abort(&mut self) -> Result<()> { + unimplemented!() + } + } + + #[tokio::test] + async fn test_write_multipart() { + let mut rng = StdRng::seed_from_u64(42); + + for method in [0.0, 0.5, 1.0] { + for _ in 0..10 { + for chunk_size in [1, 17, 23] { + let upload = Box::new(InstrumentedUpload::default()); + let chunks = Arc::clone(&upload.chunks); + let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size); + + let mut expected = Vec::with_capacity(1024); + + for _ in 0..50 { + let chunk_size = rng.gen_range(0..30); + let data: Vec<_> = (0..chunk_size).map(|_| rng.gen()).collect(); + expected.extend_from_slice(&data); + + match rng.gen_bool(method) { + true => write.put(data.into()), + false => write.write(&data), + } + } + write.finish().await.unwrap(); + + let chunks = chunks.lock(); + + let actual: Vec<_> = chunks.iter().flatten().flatten().copied().collect(); + assert_eq!(expected, actual); + + for chunk in chunks.iter().take(chunks.len() - 1) { + assert_eq!(chunk.content_length(), chunk_size) + } + + let last_chunk = chunks.last().unwrap().content_length(); + assert!(last_chunk <= chunk_size, "{chunk_size}"); + } + } + } + } } From 87aaa32465ee0b820a968824976d8c73dd280edf Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 10 May 2024 19:57:42 +0100 Subject: [PATCH 2/2] Clippy --- object_store/src/upload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/upload.rs b/object_store/src/upload.rs index 728cf45b5067..e5f683a034ac 100644 --- a/object_store/src/upload.rs +++ b/object_store/src/upload.rs @@ -282,7 +282,7 @@ mod tests { for method in [0.0, 0.5, 1.0] { for _ in 0..10 { for chunk_size in [1, 17, 23] { - let upload = Box::new(InstrumentedUpload::default()); + let upload = Box::::default(); let chunks = Arc::clone(&upload.chunks); let mut write = WriteMultipart::new_with_chunk_size(upload, chunk_size);