Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/data
/target
/**/target
recipe.json
profile.json.gz
.envrc.private
Expand Down
72 changes: 48 additions & 24 deletions objectstore-service/src/backend/bigtable.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt;
use std::time::{Duration, SystemTime};

use anyhow::{Context, Result};
use anyhow::Result;
use bigtable_rs::bigtable::BigTableConnection;
use bigtable_rs::google::bigtable::v2::{self, mutation};
use futures_util::{StreamExt, TryStreamExt, stream};
Expand All @@ -11,6 +11,7 @@ use tokio::runtime::Handle;
use crate::PayloadStream;
use crate::backend::common::Backend;
use crate::id::ObjectId;
use crate::{ServiceError, ServiceResult};

/// Connection timeout used for the initial connection to BigQuery.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -97,7 +98,7 @@ impl BigTableBackend {
path: Vec<u8>,
mutations: I,
action: &str,
) -> Result<v2::MutateRowResponse>
) -> ServiceResult<v2::MutateRowResponse>
where
I: IntoIterator<Item = mutation::Mutation>,
{
Expand Down Expand Up @@ -125,8 +126,9 @@ impl BigTableBackend {
if response.is_err() {
merni::counter!("bigtable.mutate_failures": 1, "action" => action);
}
return response.with_context(|| {
format!("failed mutating bigtable row performing a `{action}`")
return response.map_err(|e| ServiceError::Generic {
context: format!("Bigtable: failed mutating row performing a `{action}`"),
cause: Some(Box::new(e)),
});
}
retry_count += 1;
Expand All @@ -141,19 +143,13 @@ impl BigTableBackend {
metadata: &Metadata,
payload: Vec<u8>,
action: &str,
) -> Result<v2::MutateRowResponse> {
) -> ServiceResult<v2::MutateRowResponse> {
// TODO: Inject the access time from the request.
let access_time = SystemTime::now();
let (family, timestamp_micros) = match metadata.expiration_policy {
ExpirationPolicy::Manual => (FAMILY_MANUAL, -1),
ExpirationPolicy::TimeToLive(ttl) => (
FAMILY_GC,
ttl_to_micros(ttl, access_time).context("TTL out of range")?,
),
ExpirationPolicy::TimeToIdle(tti) => (
FAMILY_GC,
ttl_to_micros(tti, access_time).context("TTL out of range")?,
),
ExpirationPolicy::TimeToLive(ttl) => (FAMILY_GC, ttl_to_micros(ttl, access_time)?),
ExpirationPolicy::TimeToIdle(tti) => (FAMILY_GC, ttl_to_micros(tti, access_time)?),
};

let mutations = [
Expand All @@ -169,7 +165,10 @@ impl BigTableBackend {
family_name: family.to_owned(),
column_qualifier: COLUMN_METADATA.to_owned(),
timestamp_micros,
value: serde_json::to_vec(metadata).with_context(|| "failed to encode metadata")?,
value: serde_json::to_vec(metadata).map_err(|cause| ServiceError::Serde {
context: "failed to serialize metadata".to_string(),
cause,
})?,
}),
];
self.mutate(path, mutations, action).await
Expand All @@ -188,7 +187,7 @@ impl Backend for BigTableBackend {
id: &ObjectId,
metadata: &Metadata,
mut stream: PayloadStream,
) -> Result<()> {
) -> ServiceResult<()> {
tracing::debug!("Writing to Bigtable backend");
let path = id.as_storage_path().to_string().into_bytes();

Expand All @@ -202,7 +201,7 @@ impl Backend for BigTableBackend {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn get_object(&self, id: &ObjectId) -> Result<Option<(Metadata, PayloadStream)>> {
async fn get_object(&self, id: &ObjectId) -> ServiceResult<Option<(Metadata, PayloadStream)>> {
tracing::debug!("Reading from Bigtable backend");
let path = id.as_storage_path().to_string().into_bytes();
let rows = v2::RowSet {
Expand All @@ -226,7 +225,10 @@ impl Backend for BigTableBackend {
if response.is_err() {
merni::counter!("bigtable.read_failures": 1);
}
break response?;
break response.map_err(|e| ServiceError::Generic {
context: "Bigtable: failed to read rows".to_string(),
cause: Some(Box::new(e)),
})?;
}
retry_count += 1;
merni::counter!("bigtable.read_retry": 1);
Expand All @@ -252,8 +254,12 @@ impl Backend for BigTableBackend {
// TODO: Log if the timestamp is invalid.
}
self::COLUMN_METADATA => {
metadata = serde_json::from_slice(&cell.value)
.with_context(|| "failed to decode metadata")?;
metadata = serde_json::from_slice(&cell.value).map_err(|cause| {
ServiceError::Serde {
context: "failed to deserialize metadata".to_string(),
cause,
}
})?;
}
_ => {
// TODO: Log unknown column
Expand Down Expand Up @@ -288,7 +294,7 @@ impl Backend for BigTableBackend {
}

#[tracing::instrument(level = "trace", fields(?id), skip_all)]
async fn delete_object(&self, id: &ObjectId) -> Result<()> {
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()> {
tracing::debug!("Deleting from Bigtable backend");

let path = id.as_storage_path().to_string().into_bytes();
Expand All @@ -306,13 +312,31 @@ impl Backend for BigTableBackend {
/// The TTL is anchored at the provided `from` timestamp, which defaults to `SystemTime::now()`. As
/// required by BigTable, the resulting timestamp has millisecond precision, with the last digits at
/// 0.
fn ttl_to_micros(ttl: Duration, from: SystemTime) -> Option<i64> {
let deadline = from.checked_add(ttl)?;
fn ttl_to_micros(ttl: Duration, from: SystemTime) -> ServiceResult<i64> {
let deadline = from.checked_add(ttl).ok_or_else(|| ServiceError::Generic {
context: format!(
"TTL duration overflow: {} plus {}s cannot be represented as SystemTime",
humantime::format_rfc3339_seconds(from),
ttl.as_secs()
),
cause: None,
})?;
let millis = deadline
.duration_since(SystemTime::UNIX_EPOCH)
.ok()?
.map_err(|e| ServiceError::Generic {
context: format!(
"unable to get duration since UNIX_EPOCH for SystemTime {}",
humantime::format_rfc3339_seconds(deadline)
),
cause: Some(Box::new(e)),
})?
.as_millis();
(millis * 1000).try_into().ok()
(millis * 1000)
.try_into()
.map_err(|e| ServiceError::Generic {
context: format!("failed to convert {}ms to i64 microseconds", millis),
cause: Some(Box::new(e)),
})
}

/// Converts a microsecond-precision unix timestamp to a `SystemTime`.
Expand Down
8 changes: 4 additions & 4 deletions objectstore-service/src/backend/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::fmt::Debug;

use anyhow::Result;
use objectstore_types::Metadata;

use crate::PayloadStream;
use crate::ServiceResult;
use crate::id::ObjectId;

/// User agent string used for outgoing requests.
Expand All @@ -25,13 +25,13 @@ pub trait Backend: Debug + Send + Sync + 'static {
id: &ObjectId,
metadata: &Metadata,
stream: PayloadStream,
) -> Result<()>;
) -> ServiceResult<()>;

/// Retrieves an object at the given path, returning its metadata and a stream of bytes.
async fn get_object(&self, id: &ObjectId) -> Result<Option<(Metadata, PayloadStream)>>;
async fn get_object(&self, id: &ObjectId) -> ServiceResult<Option<(Metadata, PayloadStream)>>;

/// Deletes the object at the given path.
async fn delete_object(&self, id: &ObjectId) -> Result<()>;
async fn delete_object(&self, id: &ObjectId) -> ServiceResult<()>;
}

/// Creates a reqwest client with required defaults.
Expand Down
Loading