Skip to content

Commit

Permalink
Rename methods
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
  • Loading branch information
passaro committed Nov 29, 2024
1 parent bde6d23 commit 5ebbb31
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 31 deletions.
4 changes: 2 additions & 2 deletions mountpoint-s3/examples/upload_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where

let bucket = args.bucket.clone();
let key = args.key.clone();
let mut upload_request = uploader.put(&bucket, &key).await.unwrap();
let mut upload_request = uploader.start_atomic_upload(&bucket, &key).await.unwrap();

let mut total_bytes_written = 0;
let target_size = args.object_size;
Expand Down Expand Up @@ -198,7 +198,7 @@ where

let bucket = args.bucket.clone();
let key = args.key.clone();
let mut upload_request = uploader.start_upload(bucket.clone(), key.clone(), 0, None);
let mut upload_request = uploader.start_incremental_upload(bucket.clone(), key.clone(), 0, None);

let mut total_bytes_written = 0;
let target_size = args.object_size;
Expand Down
13 changes: 8 additions & 5 deletions mountpoint-s3/src/fs/handles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,20 @@ where
lookup.stat.etag.as_ref().map(|e| e.into())
};
let current_offset = if is_truncate { 0 } else { lookup.stat.size as u64 };
let request =
fs.uploader
.start_upload(bucket.to_owned(), key.to_owned(), current_offset, initial_etag.clone());
let request = fs.uploader.start_incremental_upload(
bucket.to_owned(),
key.to_owned(),
current_offset,
initial_etag.clone(),
);
FileHandleState::Write(UploadState::AppendInProgress {
request,
handle,
initial_etag,
written_bytes: 0,
})
} else {
match fs.uploader.put(bucket, key).await {
match fs.uploader.start_atomic_upload(bucket, key).await {
Err(e) => return Err(err!(libc::EIO, source:e, "put failed to start")),
Ok(request) => FileHandleState::Write(UploadState::MPUInProgress { request, handle }),
}
Expand Down Expand Up @@ -244,7 +247,7 @@ where
Ok(etag) => {
// Restart append request.
let initial_etag = etag.or(initial_etag);
let request = fs.uploader.start_upload(
let request = fs.uploader.start_incremental_upload(
fs.bucket.clone(),
key.to_owned(),
current_offset,
Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,17 @@ where
}
}

/// Start a new put request to the specified object.
pub async fn put(
/// Start a new atomic upload.
pub async fn start_atomic_upload(
&self,
bucket: &str,
key: &str,
) -> Result<UploadRequest<Client>, UploadError<Client::ClientError>> {
UploadRequest::new(self, bucket, key).await
}

/// Start a new appendable upload to the specified object.
pub fn start_upload(
/// Start a new incremental upload.
pub fn start_incremental_upload(
&self,
bucket: String,
key: String,
Expand Down
17 changes: 10 additions & 7 deletions mountpoint-s3/src/upload/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ mod tests {
..Default::default()
}));
let uploader = new_uploader_for_test(client.clone(), None, ServerSideEncryption::default(), true);
let request = uploader.put(bucket, key).await.unwrap();
let request = uploader.start_atomic_upload(bucket, key).await.unwrap();

assert!(!client.contains_key(key));
assert!(client.is_upload_in_progress(key));
Expand Down Expand Up @@ -255,7 +255,7 @@ mod tests {
true,
);

let mut request = uploader.put(bucket, key).await.unwrap();
let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap();

let data = b"foo";
let mut offset = 0;
Expand Down Expand Up @@ -306,7 +306,7 @@ mod tests {

// First request fails on first write.
{
let mut request = uploader.put(bucket, key).await.unwrap();
let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap();

let data = b"foo";
request.write(0, data).await.expect_err("first write should fail");
Expand All @@ -316,7 +316,7 @@ mod tests {

// Second request fails on complete (after one write).
{
let mut request = uploader.put(bucket, key).await.unwrap();
let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap();

let data = b"foo";
_ = request.write(0, data).await.unwrap();
Expand Down Expand Up @@ -344,7 +344,7 @@ mod tests {
..Default::default()
}));
let uploader = new_uploader_for_test(client.clone(), None, ServerSideEncryption::default(), true);
let mut request = uploader.put(bucket, key).await.unwrap();
let mut request = uploader.start_atomic_upload(bucket, key).await.unwrap();

let successful_writes = PART_SIZE * MAX_S3_MULTIPART_UPLOAD_PARTS / write_size;
let data = vec![0xaa; write_size];
Expand Down Expand Up @@ -382,7 +382,7 @@ mod tests {
.server_side_encryption
.corrupt_data(sse_type_corrupted.map(String::from), key_id_corrupted.map(String::from));
let err = uploader
.put("bucket", "hello")
.start_atomic_upload("bucket", "hello")
.await
.expect_err("sse checksum must be checked");
assert!(matches!(
Expand All @@ -408,6 +408,9 @@ mod tests {
ServerSideEncryption::new(Some("aws:kms".to_string()), Some("some_key".to_string())),
true,
);
uploader.put(bucket, key).await.expect("put with sse should succeed");
uploader
.start_atomic_upload(bucket, key)
.await
.expect("put with sse should succeed");
}
}
33 changes: 20 additions & 13 deletions mountpoint-s3/src/upload/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ mod tests {
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let mut offset = existing_object.as_ref().map_or(0, |object| object.len() as u64);
let initial_etag = existing_object.map(|object| object.etag());
let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);

// Write some data
let append_data = [0xaa; 128];
Expand Down Expand Up @@ -615,7 +616,8 @@ mod tests {
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let mut offset = existing_object.as_ref().map_or(0, |object| object.len() as u64);
let initial_etag = existing_object.map(|object| object.etag());
let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);

// Write some data and verify that buffer length should not grow larger than configured capacity
let append_data = [0xaa; 384];
Expand Down Expand Up @@ -694,7 +696,8 @@ mod tests {

let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, default_checksum_algorithm.clone());
let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);

// Write some data
let append_data = [0xaa; 384];
Expand Down Expand Up @@ -754,7 +757,8 @@ mod tests {
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let initial_offset = existing_object.as_ref().map_or(0, |object| object.len() as u64);
let initial_etag = existing_object.map(|object| object.etag());
let upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, initial_etag);
let upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, initial_etag);
// Wait for the upload to complete
upload_request
.complete()
Expand Down Expand Up @@ -791,7 +795,7 @@ mod tests {
let initial_offset = (existing_object.len() - 1) as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));

let append_data = [0xaa; 128];
upload_request
Expand Down Expand Up @@ -837,7 +841,7 @@ mod tests {
let initial_offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));

// Write data more than the buffer capacity as the first append should succeed
let append_data = [0xab; 384];
Expand Down Expand Up @@ -880,7 +884,8 @@ mod tests {
// Test append with a wrong offset
let mut offset = (existing_object.len() - 1) as u64;
let initial_etag = existing_object.etag();
let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));

// Keep writing and it should fail eventually
let mut write_success_count = 0;
Expand Down Expand Up @@ -945,7 +950,8 @@ mod tests {
let uploader = new_uploader_for_test(failure_client, buffer_size, None, None);
let mut offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));

// Keep writing and it should fail eventually
let mut write_success_count = 0;
Expand Down Expand Up @@ -1010,7 +1016,8 @@ mod tests {
// Start appending
let mut offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));

// Replace the existing object
let replacing_object = MockObject::from(vec![0xcc; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
Expand Down Expand Up @@ -1066,7 +1073,7 @@ mod tests {

let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), 0, None);
let mut upload_request = uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), 0, None);

// Write some data
let append_data = [0xaa; 128];
Expand Down Expand Up @@ -1116,7 +1123,7 @@ mod tests {
let initial_offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));

let append_data = [0xaa; 128];
upload_request
Expand Down Expand Up @@ -1155,7 +1162,7 @@ mod tests {
let initial_offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));

let append_data = [0xaa; 128];
expected_content.extend_from_slice(&append_data);
Expand Down Expand Up @@ -1203,7 +1210,7 @@ mod tests {
);

let mut offset = 0;
let mut upload_request = uploader.start_upload(bucket.to_owned(), key.to_owned(), offset, None);
let mut upload_request = uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, None);
let mut expected_content = Vec::new();

// Write enough data to fill multiple parts
Expand Down

0 comments on commit 5ebbb31

Please sign in to comment.