Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 23, 2022
1 parent 5f3bf35 commit 5875b13
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 61 deletions.
45 changes: 26 additions & 19 deletions object_store/src/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ use std::ops::Range;
use std::{
convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration,
};
use std::{convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration};
use std::{
convert::TryFrom, fmt, num::NonZeroUsize, ops::Deref, sync::Arc, time::Duration,
};
use tokio::io::AsyncWrite;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::{debug, warn};
Expand Down Expand Up @@ -356,7 +358,11 @@ impl ObjectStore for AmazonS3 {
Ok((upload_id, Box::new(CloudMultiPartUpload::new(inner, 8))))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()> {
let request_factory = move || rusoto_s3::AbortMultipartUploadRequest {
bucket: self.bucket_name.clone(),
key: location.to_string(),
Expand Down Expand Up @@ -988,23 +994,24 @@ impl CloudMultiPartUploadImpl for S3MultiPartUpload {
&self,
completed_parts: Vec<Option<UploadPart>>,
) -> BoxFuture<'static, Result<(), io::Error>> {
let parts = completed_parts
.into_iter()
.enumerate()
.map(|(part_number, maybe_part)| match maybe_part {
Some(part) => Ok(rusoto_s3::CompletedPart {
e_tag: Some(part.content_id),
part_number: Some(
(part_number + 1)
.try_into()
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?,
),
}),
None => Err(io::Error::new(
io::ErrorKind::Other,
format!("Missing information for upload part {:?}", part_number),
)),
});
let parts =
completed_parts
.into_iter()
.enumerate()
.map(|(part_number, maybe_part)| match maybe_part {
Some(part) => {
Ok(rusoto_s3::CompletedPart {
e_tag: Some(part.content_id),
part_number: Some((part_number + 1).try_into().map_err(
|err| io::Error::new(io::ErrorKind::Other, err),
)?),
})
}
None => Err(io::Error::new(
io::ErrorKind::Other,
format!("Missing information for upload part {:?}", part_number),
)),
});

// Get values to move into future; we don't want a reference to Self
let bucket = self.bucket.clone();
Expand Down
33 changes: 20 additions & 13 deletions object_store/src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ impl ObjectStore for MicrosoftAzure {
Ok((String::new(), Box::new(CloudMultiPartUpload::new(inner, 8))))
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
async fn abort_multipart(
&self,
_location: &Path,
_multipart_id: &MultipartId,
) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Ok(())
Expand Down Expand Up @@ -639,18 +643,21 @@ impl CloudMultiPartUploadImpl for AzureMultiPartUpload {
&self,
completed_parts: Vec<Option<UploadPart>>,
) -> BoxFuture<'static, Result<(), io::Error>> {
let parts = completed_parts
.into_iter()
.enumerate()
.map(|(part_number, maybe_part)| match maybe_part {
Some(part) => Ok(azure_storage_blobs::blob::BlobBlockType::Uncommitted(
azure_storage_blobs::BlockId::new(part.content_id),
)),
None => Err(io::Error::new(
io::ErrorKind::Other,
format!("Missing information for upload part {:?}", part_number),
)),
});
let parts =
completed_parts
.into_iter()
.enumerate()
.map(|(part_number, maybe_part)| match maybe_part {
Some(part) => {
Ok(azure_storage_blobs::blob::BlobBlockType::Uncommitted(
azure_storage_blobs::BlockId::new(part.content_id),
))
}
None => Err(io::Error::new(
io::ErrorKind::Other,
format!("Missing information for upload part {:?}", part_number),
)),
});

let client = Arc::clone(&self.container_client);
let location = self.location.clone();
Expand Down
21 changes: 17 additions & 4 deletions object_store/src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ impl GoogleCloudStorageClient {
}

/// Cleanup unused parts <https://cloud.google.com/storage/docs/xml-api/delete-multipart>
async fn multipart_cleanup(&self, path: &str, multipart_id: &MultipartId) -> Result<()> {
async fn multipart_cleanup(
&self,
path: &str,
multipart_id: &MultipartId,
) -> Result<()> {
let token = self.get_token().await?;
let url = format!("{}/{}/{}", self.base_url, self.bucket_name_encoded, path);

Expand Down Expand Up @@ -510,7 +514,9 @@ fn reqwest_error_as_io(err: reqwest::Error) -> io::Error {
} else if err.is_status() {
match err.status() {
Some(StatusCode::NOT_FOUND) => io::Error::new(io::ErrorKind::NotFound, err),
Some(StatusCode::BAD_REQUEST) => io::Error::new(io::ErrorKind::InvalidInput, err),
Some(StatusCode::BAD_REQUEST) => {
io::Error::new(io::ErrorKind::InvalidInput, err)
}
Some(_) => io::Error::new(io::ErrorKind::Other, err),
None => io::Error::new(io::ErrorKind::Other, err),
}
Expand Down Expand Up @@ -570,7 +576,10 @@ impl CloudMultiPartUploadImpl for GCSMultipartUpload {
.headers()
.get("ETag")
.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidData, "response headers missing ETag")
io::Error::new(
io::ErrorKind::InvalidData,
"response headers missing ETag",
)
})?
.to_str()
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?
Expand Down Expand Up @@ -663,7 +672,11 @@ impl ObjectStore for GoogleCloudStorage {
Ok((upload_id, Box::new(CloudMultiPartUpload::new(inner, 8))))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()> {
self.client
.multipart_cleanup(location.as_ref(), multipart_id)
.await?;
Expand Down
10 changes: 8 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
///
/// See documentation for individual stores for exact behavior, as capabilities
/// vary by object store.
async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>;
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()>;

/// Return the bytes that are stored at the specified location.
async fn get(&self, location: &Path) -> Result<GetResult>;
Expand Down Expand Up @@ -596,7 +600,9 @@ mod tests {
Ok(())
}

pub(crate) async fn list_uses_directories_correctly(storage: &DynObjectStore) -> Result<()> {
pub(crate) async fn list_uses_directories_correctly(
storage: &DynObjectStore,
) -> Result<()> {
delete_fixtures(storage).await;

let content_list = flatten_list_stream(storage, None).await?;
Expand Down
65 changes: 44 additions & 21 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ impl ObjectStore for LocalFileSystem {

match std::fs::metadata(&staging_path) {
Err(err) if err.kind() == io::ErrorKind::NotFound => break staging_path,
Err(err) => return Err(Error::UnableToCopyDataToFile { source: err }.into()),
Err(err) => {
return Err(Error::UnableToCopyDataToFile { source: err }.into())
}
Ok(_) => multipart_id += 1,
}
};
Expand All @@ -277,7 +279,11 @@ impl ObjectStore for LocalFileSystem {
))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> Result<()> {
let dest = self.config.path_to_filesystem(location)?;
let staging_path: PathBuf = get_upload_stage_path(&dest, multipart_id);

Expand Down Expand Up @@ -547,7 +553,11 @@ struct LocalUpload {
}

impl LocalUpload {
pub fn new(dest: PathBuf, multipart_id: MultipartId, file: Arc<std::fs::File>) -> Self {
pub fn new(
dest: PathBuf,
multipart_id: MultipartId,
file: Arc<std::fs::File>,
) -> Self {
Self {
inner_state: LocalUploadState::Idle(file),
dest,
Expand All @@ -562,12 +572,13 @@ impl AsyncWrite for LocalUpload {
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
let invalid_state = |condition: &str| -> std::task::Poll<Result<usize, io::Error>> {
Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Tried to write to file {}.", condition),
)))
};
let invalid_state =
|condition: &str| -> std::task::Poll<Result<usize, io::Error>> {
Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Tried to write to file {}.", condition),
)))
};

if let Ok(runtime) = tokio::runtime::Handle::try_current() {
let mut data: Vec<u8> = buf.to_vec();
Expand All @@ -585,7 +596,9 @@ impl AsyncWrite for LocalUpload {
runtime
.spawn_blocking(move || (&*file2).write_all(&data))
.map(move |res| match res {
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
Err(err) => {
Err(io::Error::new(io::ErrorKind::Other, err))
}
Ok(res) => res.map(move |_| data_len),
}),
),
Expand All @@ -594,7 +607,8 @@ impl AsyncWrite for LocalUpload {
LocalUploadState::Writing(file, inner_write) => {
match inner_write.poll_unpin(cx) {
Poll::Ready(res) => {
self.inner_state = LocalUploadState::Idle(Arc::clone(file));
self.inner_state =
LocalUploadState::Idle(Arc::clone(file));
return Poll::Ready(res);
}
Poll::Pending => {
Expand Down Expand Up @@ -641,12 +655,14 @@ impl AsyncWrite for LocalUpload {
// We are moving file into the future, and it will be dropped on it's completion, closing the file.
let file = Arc::clone(file);
self.inner_state = LocalUploadState::ShuttingDown(Box::pin(
runtime
.spawn_blocking(move || (*file).sync_all())
.map(move |res| match res {
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
runtime.spawn_blocking(move || (*file).sync_all()).map(
move |res| match res {
Err(err) => {
Err(io::Error::new(io::ErrorKind::Other, err))
}
Ok(res) => res,
}),
},
),
));
}
LocalUploadState::ShuttingDown(fut) => match fut.poll_unpin(cx) {
Expand All @@ -657,9 +673,13 @@ impl AsyncWrite for LocalUpload {
let dest = self.dest.clone();
self.inner_state = LocalUploadState::Committing(Box::pin(
runtime
.spawn_blocking(move || std::fs::rename(&staging_path, &dest))
.spawn_blocking(move || {
std::fs::rename(&staging_path, &dest)
})
.map(move |res| match res {
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
Err(err) => {
Err(io::Error::new(io::ErrorKind::Other, err))
}
Ok(res) => res,
}),
));
Expand Down Expand Up @@ -736,7 +756,8 @@ fn open_writable_file(path: &std::path::PathBuf) -> Result<File> {
let parent = path
.parent()
.context(UnableToCreateFileSnafu { path: &path, err })?;
std::fs::create_dir_all(&parent).context(UnableToCreateDirSnafu { path: parent })?;
std::fs::create_dir_all(&parent)
.context(UnableToCreateDirSnafu { path: parent })?;

match File::create(&path) {
Ok(f) => Ok(f),
Expand Down Expand Up @@ -1044,10 +1065,12 @@ mod tests {
let location = Path::from("some_file");

let data = Bytes::from("arbitrary data");
let (multipart_id, mut writer) = integration.put_multipart(&location).await.unwrap();
let (multipart_id, mut writer) =
integration.put_multipart(&location).await.unwrap();
writer.write_all(&data).await.unwrap();

let (multipart_id_2, mut writer_2) = integration.put_multipart(&location).await.unwrap();
let (multipart_id_2, mut writer_2) =
integration.put_multipart(&location).await.unwrap();
assert_ne!(multipart_id, multipart_id_2);
writer_2.write_all(&data).await.unwrap();

Expand Down
6 changes: 5 additions & 1 deletion object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ impl ObjectStore for InMemory {
))
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
async fn abort_multipart(
&self,
_location: &Path,
_multipart_id: &MultipartId,
) -> Result<()> {
// Nothing to clean up
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion object_store/src/throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
Err(super::Error::NotImplemented)
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
async fn abort_multipart(
&self,
_location: &Path,
_multipart_id: &MultipartId,
) -> Result<()> {
Err(super::Error::NotImplemented)
}

Expand Down

0 comments on commit 5875b13

Please sign in to comment.