diff --git a/.gitignore b/.gitignore index 445d69b1..749e3a90 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ /data -/target +/**/target recipe.json profile.json.gz .envrc.private diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index cf3d6701..72732b61 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -1,7 +1,7 @@ use std::fmt; use std::time::{Duration, SystemTime}; -use anyhow::{Context, Result}; +use anyhow::Result; use bigtable_rs::bigtable::BigTableConnection; use bigtable_rs::google::bigtable::v2::{self, mutation}; use futures_util::{StreamExt, TryStreamExt, stream}; @@ -11,6 +11,7 @@ use tokio::runtime::Handle; use crate::PayloadStream; use crate::backend::common::Backend; use crate::id::ObjectId; +use crate::{ServiceError, ServiceResult}; /// Connection timeout used for the initial connection to BigQuery. const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); @@ -97,7 +98,7 @@ impl BigTableBackend { path: Vec, mutations: I, action: &str, - ) -> Result + ) -> ServiceResult where I: IntoIterator, { @@ -125,8 +126,9 @@ impl BigTableBackend { if response.is_err() { merni::counter!("bigtable.mutate_failures": 1, "action" => action); } - return response.with_context(|| { - format!("failed mutating bigtable row performing a `{action}`") + return response.map_err(|e| ServiceError::Generic { + context: format!("Bigtable: failed mutating row performing a `{action}`"), + cause: Some(Box::new(e)), }); } retry_count += 1; @@ -141,19 +143,13 @@ impl BigTableBackend { metadata: &Metadata, payload: Vec, action: &str, - ) -> Result { + ) -> ServiceResult { // TODO: Inject the access time from the request. let access_time = SystemTime::now(); let (family, timestamp_micros) = match metadata.expiration_policy { ExpirationPolicy::Manual => (FAMILY_MANUAL, -1), - ExpirationPolicy::TimeToLive(ttl) => ( - FAMILY_GC, - ttl_to_micros(ttl, access_time).context("TTL out of range")?, - ), - ExpirationPolicy::TimeToIdle(tti) => ( - FAMILY_GC, - ttl_to_micros(tti, access_time).context("TTL out of range")?, - ), + ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, access_time)?), + ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, access_time)?), }; let mutations = [ @@ -169,7 +165,10 @@ impl BigTableBackend { family_name: family.to_owned(), column_qualifier: COLUMN_METADATA.to_owned(), timestamp_micros, - value: serde_json::to_vec(metadata).with_context(|| "failed to encode metadata")?, + value: serde_json::to_vec(metadata).map_err(|cause| ServiceError::Serde { + context: "failed to serialize metadata".to_string(), + cause, + })?, }), ]; self.mutate(path, mutations, action).await @@ -188,7 +187,7 @@ impl Backend for BigTableBackend { id: &ObjectId, metadata: &Metadata, mut stream: PayloadStream, - ) -> Result<()> { + ) -> ServiceResult<()> { tracing::debug!("Writing to Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); @@ -202,7 +201,7 @@ impl Backend for BigTableBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_object(&self, id: &ObjectId) -> Result> { + async fn get_object(&self, id: &ObjectId) -> ServiceResult> { tracing::debug!("Reading from Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); let rows = v2::RowSet { @@ -226,7 +225,10 @@ impl Backend for BigTableBackend { if response.is_err() { merni::counter!("bigtable.read_failures": 1); } - break response?; + break response.map_err(|e| ServiceError::Generic { + context: "Bigtable: failed to read rows".to_string(), + cause: Some(Box::new(e)), + })?; } retry_count += 1; merni::counter!("bigtable.read_retry": 1); @@ -252,8 +254,12 @@ impl Backend for BigTableBackend { // TODO: Log if the timestamp is invalid. } self::COLUMN_METADATA => { - metadata = serde_json::from_slice(&cell.value) - .with_context(|| "failed to decode metadata")?; + metadata = serde_json::from_slice(&cell.value).map_err(|cause| { + ServiceError::Serde { + context: "failed to deserialize metadata".to_string(), + cause, + } + })?; } _ => { // TODO: Log unknown column @@ -288,7 +294,7 @@ impl Backend for BigTableBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn delete_object(&self, id: &ObjectId) -> Result<()> { + async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> { tracing::debug!("Deleting from Bigtable backend"); let path = id.as_storage_path().to_string().into_bytes(); @@ -306,13 +312,31 @@ impl Backend for BigTableBackend { /// The TTL is anchored at the provided `from` timestamp, which defaults to `SystemTime::now()`. As /// required by BigTable, the resulting timestamp has millisecond precision, with the last digits at /// 0. -fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Option { - let deadline = from.checked_add(ttl)?; +fn ttl_to_micros(ttl: Duration, from: SystemTime) -> ServiceResult { + let deadline = from.checked_add(ttl).ok_or_else(|| ServiceError::Generic { + context: format!( + "TTL duration overflow: {} plus {}s cannot be represented as SystemTime", + humantime::format_rfc3339_seconds(from), + ttl.as_secs() + ), + cause: None, + })?; let millis = deadline .duration_since(SystemTime::UNIX_EPOCH) - .ok()? + .map_err(|e| ServiceError::Generic { + context: format!( + "unable to get duration since UNIX_EPOCH for SystemTime {}", + humantime::format_rfc3339_seconds(deadline) + ), + cause: Some(Box::new(e)), + })? .as_millis(); - (millis * 1000).try_into().ok() + (millis * 1000) + .try_into() + .map_err(|e| ServiceError::Generic { + context: format!("failed to convert {}ms to i64 microseconds", millis), + cause: Some(Box::new(e)), + }) } /// Converts a microsecond-precision unix timestamp to a `SystemTime`. diff --git a/objectstore-service/src/backend/common.rs b/objectstore-service/src/backend/common.rs index df3e0038..31636c13 100644 --- a/objectstore-service/src/backend/common.rs +++ b/objectstore-service/src/backend/common.rs @@ -1,9 +1,9 @@ use std::fmt::Debug; -use anyhow::Result; use objectstore_types::Metadata; use crate::PayloadStream; +use crate::ServiceResult; use crate::id::ObjectId; /// User agent string used for outgoing requests. @@ -25,13 +25,13 @@ pub trait Backend: Debug + Send + Sync + 'static { id: &ObjectId, metadata: &Metadata, stream: PayloadStream, - ) -> Result<()>; + ) -> ServiceResult<()>; /// Retrieves an object at the given path, returning its metadata and a stream of bytes. - async fn get_object(&self, id: &ObjectId) -> Result>; + async fn get_object(&self, id: &ObjectId) -> ServiceResult>; /// Deletes the object at the given path. - async fn delete_object(&self, id: &ObjectId) -> Result<()>; + async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()>; } /// Creates a reqwest client with required defaults. diff --git a/objectstore-service/src/backend/gcs.rs b/objectstore-service/src/backend/gcs.rs index daa3ee1e..26af4ab8 100644 --- a/objectstore-service/src/backend/gcs.rs +++ b/objectstore-service/src/backend/gcs.rs @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use crate::PayloadStream; use crate::backend::common::{self, Backend}; use crate::id::ObjectId; +use crate::{ServiceError, ServiceResult}; /// Default endpoint used to access the GCS JSON API. const DEFAULT_ENDPOINT: &str = "https://storage.googleapis.com"; @@ -109,7 +110,7 @@ impl GcsObject { } /// Converts GCS JSON object metadata to our Metadata type. - pub fn into_metadata(mut self) -> Result { + pub fn into_metadata(mut self) -> ServiceResult { // Remove ignored metadata keys that are set by the GCS emulator. self.metadata.remove(&GcsMetaKey::EmulatorIgnored); @@ -122,7 +123,14 @@ impl GcsObject { let content_type = self.content_type; let compression = self.content_encoding.map(|s| s.parse()).transpose()?; - let size = self.size.map(|size| size.parse()).transpose()?; + let size = self + .size + .map(|size| size.parse()) + .transpose() + .map_err(|e| ServiceError::Generic { + context: "GCS: failed to parse size from object metadata".to_string(), + cause: Some(Box::new(e)), + })?; let time_created = self.time_created; // At this point, all built-in metadata should have been removed from self.metadata. @@ -131,7 +139,13 @@ impl GcsObject { if let GcsMetaKey::Custom(custom_key) = key { custom.insert(custom_key, value); } else { - anyhow::bail!("unexpected metadata"); + return Err(ServiceError::Generic { + context: format!( + "GCS: unexpected built-in metadata key in object metadata: {}", + key + ), + cause: None, + }); } } @@ -233,23 +247,35 @@ impl GcsBackend { } /// Formats the GCS object (metadata) URL for the given key. - fn object_url(&self, id: &ObjectId) -> Result { + fn object_url(&self, id: &ObjectId) -> ServiceResult { let mut url = self.endpoint.clone(); let path = id.as_storage_path().to_string(); url.path_segments_mut() - .map_err(|()| anyhow::anyhow!("invalid GCS endpoint path"))? + .map_err(|()| ServiceError::Generic { + context: format!( + "GCS: invalid endpoint URL, {} cannot be a base", + self.endpoint + ), + cause: None, + })? .extend(&["storage", "v1", "b", &self.bucket, "o", &path]); Ok(url) } /// Formats the GCS upload URL for the given upload type. - fn upload_url(&self, id: &ObjectId, upload_type: &str) -> Result { + fn upload_url(&self, id: &ObjectId, upload_type: &str) -> ServiceResult { let mut url = self.endpoint.clone(); url.path_segments_mut() - .map_err(|()| anyhow::anyhow!("invalid GCS endpoint path"))? + .map_err(|()| ServiceError::Generic { + context: format!( + "GCS: invalid endpoint URL, {} cannot be a base", + self.endpoint + ), + cause: None, + })? .extend(&["upload", "storage", "v1", "b", &self.bucket, "o"]); url.query_pairs_mut() @@ -260,7 +286,7 @@ impl GcsBackend { } /// Creates a request builder with the appropriate authentication. - async fn request(&self, method: Method, url: impl IntoUrl) -> Result { + async fn request(&self, method: Method, url: impl IntoUrl) -> ServiceResult { let mut builder = self.client.request(method, url); if let Some(provider) = &self.token_provider { let token = provider.token(TOKEN_SCOPES).await?; @@ -269,7 +295,11 @@ impl GcsBackend { Ok(builder) } - async fn update_custom_time(&self, object_url: Url, custom_time: SystemTime) -> Result<()> { + async fn update_custom_time( + &self, + object_url: Url, + custom_time: SystemTime, + ) -> ServiceResult<()> { #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct CustomTimeRequest { @@ -281,9 +311,16 @@ impl GcsBackend { .await? .json(&CustomTimeRequest { custom_time }) .send() - .await? + .await + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to send update custom time request".to_string(), + cause, + })? .error_for_status() - .context("failed to update expiration time for object with TTI")?; + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to update expiration time for object with TTI".to_string(), + cause, + })?; Ok(()) } @@ -310,22 +347,33 @@ impl Backend for GcsBackend { id: &ObjectId, metadata: &Metadata, stream: PayloadStream, - ) -> Result<()> { + ) -> ServiceResult<()> { tracing::debug!("Writing to GCS backend"); let gcs_metadata = GcsObject::from_metadata(metadata); // NB: Ensure the order of these fields and that a content-type is attached to them. Both // are required by the GCS API. + let metadata_json = + serde_json::to_string(&gcs_metadata).map_err(|cause| ServiceError::Serde { + context: "failed to serialize metadata for GCS upload".to_string(), + cause, + })?; + let multipart = multipart::Form::new() .part( "metadata", - multipart::Part::text(serde_json::to_string(&gcs_metadata)?) - .mime_str("application/json")?, + multipart::Part::text(metadata_json) + .mime_str("application/json") + .expect("application/json is a valid mime type"), ) .part( "media", multipart::Part::stream(Body::wrap_stream(stream)) - .mime_str(&metadata.content_type)?, + .mime_str(&metadata.content_type) + .map_err(|e| ServiceError::Generic { + context: format!("invalid mime type: {}", &metadata.content_type), + cause: Some(Box::new(e)), + })?, ); // GCS requires a multipart/related request. Its body looks identical to @@ -338,36 +386,55 @@ impl Backend for GcsBackend { .multipart(multipart) .header(header::CONTENT_TYPE, content_type) .send() - .await? + .await + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to send multipart upload request".to_string(), + cause, + })? .error_for_status() - .context("failed to upload object via multipart")?; + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to upload object via multipart".to_string(), + cause, + })?; Ok(()) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_object(&self, id: &ObjectId) -> Result> { + async fn get_object(&self, id: &ObjectId) -> ServiceResult> { tracing::debug!("Reading from GCS backend"); let object_url = self.object_url(id)?; let metadata_response = self .request(Method::GET, object_url.clone()) .await? .send() - .await?; + .await + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to send get metadata request".to_string(), + cause, + })?; if metadata_response.status() == StatusCode::NOT_FOUND { tracing::debug!("Object not found"); return Ok(None); } - let metadata_response = metadata_response - .error_for_status() - .context("failed to get object metadata")?; - - let gcs_metadata: GcsObject = metadata_response - .json() - .await - .context("failed to parse object metadata")?; + let metadata_response = + metadata_response + .error_for_status() + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to get object metadata".to_string(), + cause, + })?; + + let gcs_metadata: GcsObject = + metadata_response + .json() + .await + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to parse object metadata response".to_string(), + cause, + })?; // TODO: Store custom_time directly in metadata. let expire_at = gcs_metadata.custom_time; @@ -398,9 +465,16 @@ impl Backend for GcsBackend { .request(Method::GET, download_url) .await? .send() - .await? + .await + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to send get payload request".to_string(), + cause, + })? .error_for_status() - .context("failed to get object payload")?; + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to get object payload".to_string(), + cause, + })?; let stream = payload_response .bytes_stream() @@ -411,20 +485,27 @@ impl Backend for GcsBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn delete_object(&self, id: &ObjectId) -> Result<()> { + async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> { tracing::debug!("Deleting from GCS backend"); let response = self .request(Method::DELETE, self.object_url(id)?) .await? .send() - .await?; + .await + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to send delete request".to_string(), + cause, + })?; // Do not error for objects that do not exist if response.status() != StatusCode::NOT_FOUND { tracing::debug!("Object not found"); response .error_for_status() - .context("failed to delete object")?; + .map_err(|cause| ServiceError::Reqwest { + context: "GCS: failed to delete object".to_string(), + cause, + })?; } Ok(()) diff --git a/objectstore-service/src/backend/local_fs.rs b/objectstore-service/src/backend/local_fs.rs index 2287a5b3..fb6a0bd4 100644 --- a/objectstore-service/src/backend/local_fs.rs +++ b/objectstore-service/src/backend/local_fs.rs @@ -2,7 +2,6 @@ use std::io::ErrorKind; use std::path::{Path, PathBuf}; use std::pin::pin; -use anyhow::Result; use futures_util::StreamExt; use objectstore_types::Metadata; use tokio::fs::OpenOptions; @@ -12,6 +11,7 @@ use tokio_util::io::{ReaderStream, StreamReader}; use crate::PayloadStream; use crate::backend::common::Backend; use crate::id::ObjectId; +use crate::{ServiceError, ServiceResult}; #[derive(Debug)] pub struct LocalFsBackend { @@ -36,7 +36,7 @@ impl Backend for LocalFsBackend { id: &ObjectId, metadata: &Metadata, stream: PayloadStream, - ) -> anyhow::Result<()> { + ) -> ServiceResult<()> { let path = self.path.join(id.as_storage_path().to_string()); tracing::debug!(path=%path.display(), "Writing to local_fs backend"); tokio::fs::create_dir_all(path.parent().unwrap()).await?; @@ -50,7 +50,11 @@ impl Backend for LocalFsBackend { let mut reader = pin!(StreamReader::new(stream)); let mut writer = BufWriter::new(file); - let metadata_json = serde_json::to_string(metadata)?; + let metadata_json = + serde_json::to_string(metadata).map_err(|cause| ServiceError::Serde { + context: "failed to serialize metadata".to_string(), + cause, + })?; writer.write_all(metadata_json.as_bytes()).await?; writer.write_all(b"\n").await?; @@ -65,7 +69,7 @@ impl Backend for LocalFsBackend { // TODO: Return `Ok(None)` if object is found but past expiry #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_object(&self, id: &ObjectId) -> Result> { + async fn get_object(&self, id: &ObjectId) -> ServiceResult> { tracing::debug!("Reading from local_fs backend"); let path = self.path.join(id.as_storage_path().to_string()); let file = match OpenOptions::new().read(true).open(path).await { @@ -80,14 +84,20 @@ impl Backend for LocalFsBackend { let mut reader = BufReader::new(file); let mut metadata_line = String::new(); reader.read_line(&mut metadata_line).await?; - let metadata: Metadata = serde_json::from_str(metadata_line.trim_end())?; + let metadata: Metadata = + serde_json::from_str(metadata_line.trim_end()).map_err(|cause| { + ServiceError::Serde { + context: "failed to deserialize metadata".to_string(), + cause, + } + })?; let stream = ReaderStream::new(reader); Ok(Some((metadata, stream.boxed()))) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn delete_object(&self, id: &ObjectId) -> anyhow::Result<()> { + async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> { tracing::debug!("Deleting from local_fs backend"); let path = self.path.join(id.as_storage_path().to_string()); let result = tokio::fs::remove_file(path).await; diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index b9fa0a07..a93de9d4 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -1,7 +1,6 @@ use std::time::{Duration, SystemTime}; use std::{fmt, io}; -use anyhow::{Context, Result}; use futures_util::{StreamExt, TryStreamExt}; use objectstore_types::{ExpirationPolicy, Metadata}; use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode}; @@ -9,6 +8,7 @@ use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode}; use crate::PayloadStream; use crate::backend::common::{self, Backend}; use crate::id::ObjectId; +use crate::{ServiceError, ServiceResult}; /// Prefix used for custom metadata in headers for the GCS backend. /// @@ -27,7 +27,7 @@ pub trait Token: Send + Sync { } pub trait TokenProvider: Send + Sync + 'static { - fn get_token(&self) -> impl Future> + Send; + fn get_token(&self) -> impl Future> + Send; } // this only exists because we have to provide *some* kind of provider @@ -36,7 +36,7 @@ pub struct NoToken; impl TokenProvider for NoToken { #[allow(refining_impl_trait_internal)] // otherwise, returning `!` will not implement the required traits - async fn get_token(&self) -> Result { + async fn get_token(&self) -> anyhow::Result { unimplemented!() } } @@ -78,16 +78,25 @@ where T: TokenProvider, { /// Creates a request builder with the appropriate authentication. - async fn request(&self, method: Method, url: impl IntoUrl) -> Result { + async fn request(&self, method: Method, url: impl IntoUrl) -> ServiceResult { let mut builder = self.client.request(method, url); if let Some(provider) = &self.token_provider { - builder = builder.bearer_auth(provider.get_token().await?.as_str()); + builder = builder.bearer_auth( + provider + .get_token() + .await + .map_err(|err| ServiceError::Generic { + context: "S3: failed to get authentication token".to_owned(), + cause: Some(err.into()), + })? + .as_str(), + ); } Ok(builder) } /// Issues a request to update the metadata for the given object. - async fn update_metadata(&self, id: &ObjectId, metadata: &Metadata) -> Result<()> { + async fn update_metadata(&self, id: &ObjectId, metadata: &Metadata) -> ServiceResult<()> { // NB: Meta updates require copy + REPLACE along with *all* metadata. See // https://cloud.google.com/storage/docs/xml-api/put-object-copy self.request(Method::PUT, self.object_url(id)) @@ -99,9 +108,16 @@ where .header("x-goog-metadata-directive", "REPLACE") .headers(metadata.to_headers(GCS_CUSTOM_PREFIX, true)?) .send() - .await? + .await + .map_err(|cause| ServiceError::Reqwest { + context: "S3: failed to send TTI update request".to_string(), + cause, + })? .error_for_status() - .context("failed to update expiration time for object with TTI")?; + .map_err(|cause| ServiceError::Reqwest { + context: "S3: failed to update expiration time for object with TTI".to_string(), + cause, + })?; Ok(()) } @@ -140,26 +156,41 @@ impl Backend for S3CompatibleBackend { id: &ObjectId, metadata: &Metadata, stream: PayloadStream, - ) -> Result<()> { + ) -> ServiceResult<()> { tracing::debug!("Writing to s3_compatible backend"); self.request(Method::PUT, self.object_url(id)) .await? .headers(metadata.to_headers(GCS_CUSTOM_PREFIX, true)?) .body(Body::wrap_stream(stream)) .send() - .await? + .await + .map_err(|cause| ServiceError::Reqwest { + context: "S3: failed to send put request".to_string(), + cause, + })? .error_for_status() - .context("failed to put object")?; + .map_err(|cause| ServiceError::Reqwest { + context: "S3: failed to put object".to_string(), + cause, + })?; Ok(()) } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn get_object(&self, id: &ObjectId) -> Result> { + async fn get_object(&self, id: &ObjectId) -> ServiceResult> { tracing::debug!("Reading from s3_compatible backend"); let object_url = self.object_url(id); - let response = self.request(Method::GET, &object_url).await?.send().await?; + let response = self + .request(Method::GET, &object_url) + .await? + .send() + .await + .map_err(|cause| ServiceError::Reqwest { + context: "S3: failed to send get request".to_string(), + cause, + })?; if response.status() == StatusCode::NOT_FOUND { tracing::debug!("Object not found"); return Ok(None); @@ -167,7 +198,10 @@ impl Backend for S3CompatibleBackend { let response = response .error_for_status() - .context("failed to get object")?; + .map_err(|cause| ServiceError::Reqwest { + context: "S3: failed to get object".to_string(), + cause, + })?; let headers = response.headers(); // TODO: Populate size in metadata @@ -197,20 +231,27 @@ impl Backend for S3CompatibleBackend { } #[tracing::instrument(level = "trace", fields(?id), skip_all)] - async fn delete_object(&self, id: &ObjectId) -> Result<()> { + async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> { tracing::debug!("Deleting from s3_compatible backend"); let response = self .request(Method::DELETE, self.object_url(id)) .await? .send() - .await?; + .await + .map_err(|cause| ServiceError::Reqwest { + context: "S3: failed to send delete request".to_string(), + cause, + })?; // Do not error for objects that do not exist. if response.status() != StatusCode::NOT_FOUND { tracing::debug!("Object not found"); response .error_for_status() - .context("failed to delete object")?; + .map_err(|cause| ServiceError::Reqwest { + context: "S3: failed to delete object".to_string(), + cause, + })?; } Ok(()) diff --git a/objectstore-service/src/error.rs b/objectstore-service/src/error.rs new file mode 100644 index 00000000..74476d47 --- /dev/null +++ b/objectstore-service/src/error.rs @@ -0,0 +1,54 @@ +use thiserror::Error; + +/// Error type for service operations. +#[derive(Debug, Error)] +pub enum ServiceError { + /// IO errors related to payload streaming or file operations. + #[error("i/o error: {0}")] + Io(#[from] std::io::Error), + + /// Errors related to de/serialization. + #[error("serde error: {context}")] + Serde { + /// Context describing what was being serialized/deserialized. + context: String, + /// The underlying serde error. + #[source] + cause: serde_json::Error, + }, + + /// All errors stemming from the reqwest client, used in multiple backends to send requests to + /// e.g. GCP APIs. + /// These can be network errors encountered when sending the requests, but can also indicate + /// errors returned by the API itself. + #[error("reqwest error: {context}")] + Reqwest { + /// Context describing the request that failed. + context: String, + /// The underlying reqwest error. + #[source] + cause: reqwest::Error, + }, + + /// Errors related to de/serialization and parsing of object metadata. + #[error("metadata error: {0}")] + Metadata(#[from] objectstore_types::Error), + + /// Errors encountered when attempting to authenticate with GCP. + #[error("GCP authentication error: {0}")] + GcpAuth(#[from] gcp_auth::Error), + + /// Any other error stemming from one of the storage backends, which might be specific to that + /// backend or to a certain operation. + #[error("storage backend error: {context}")] + Generic { + /// Context describing the operation that failed. + context: String, + /// The underlying error, if available. + #[source] + cause: Option>, + }, +} + +/// Result type for service operations. +pub type ServiceResult = Result; diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 3566044a..9bb1d8f8 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -7,6 +7,8 @@ // TODO(ja): Re-organize modules mod backend; +mod error; +pub use error::{ServiceError, ServiceResult}; pub mod id; use std::path::Path;