From c1d568c849851761f75cbfb0d8ad8af0fb65418e Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Mon, 15 Dec 2025 18:02:37 +0100 Subject: [PATCH 01/30] wip --- Cargo.lock | 32 +++++-- objectstore-server/Cargo.toml | 5 +- objectstore-server/src/auth/service.rs | 11 +-- objectstore-server/src/endpoints/batch.rs | 112 ++++++++++++++++++++++ objectstore-server/src/endpoints/mod.rs | 5 +- objectstore-server/src/error.rs | 8 ++ objectstore-service/src/id.rs | 4 +- objectstore-service/src/lib.rs | 4 + 8 files changed, 163 insertions(+), 18 deletions(-) create mode 100644 objectstore-server/src/endpoints/batch.rs diff --git a/Cargo.lock b/Cargo.lock index 75d81fbd..49ed150d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "axum" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" +checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", "bytes", @@ -312,6 +312,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "serde_core", @@ -347,21 +348,22 @@ dependencies = [ [[package]] name = "axum-extra" -version = "0.10.3" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9963ff19f40c6102c76756ef0a46004c0d58957d87259fc9208ff8441c12ab96" +checksum = "dbfe9f610fe4e99cf0cfcd03ccf8c63c28c616fe714d80475ef731f3b13dd21b" dependencies = [ "axum", "axum-core", "bytes", + "fastrand", + "futures-core", "futures-util", "http 1.3.1", "http-body", "http-body-util", "mime", + "multer", "pin-project-lite", - "rustversion", - "serde_core", "tower-layer", "tower-service", "tracing", @@ -2037,6 +2039,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.3.1", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "multimap" version = "0.10.1" @@ -2203,6 +2222,7 @@ dependencies = [ "console", "elegant-departure", "figment", + "futures", "futures-util", "humantime", "humantime-serde", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 820326e3..b76215a6 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,11 +13,12 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" -axum = "0.8.4" -axum-extra = "0.10.1" +axum = { version = "0.8.4", features = ["multipart"] } +axum-extra = { version = "0.12.2", features = ["multipart"] } console = "0.16.1" elegant-departure = { version = "0.3.2", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "test", "yaml"] } +futures = { workspace = true } futures-util = { workspace = true } humantime = { workspace = true } humantime-serde = { workspace = true } diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 26c4cdb8..e6d354e5 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,5 +1,5 @@ use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{PayloadStream, StorageService}; +use objectstore_service::{DeleteResult, GetResult, InsertResult, PayloadStream, StorageService}; use objectstore_types::{Metadata, Permission}; use crate::auth::AuthContext; @@ -56,7 +56,7 @@ impl AuthAwareService { key: Option, metadata: &Metadata, stream: PayloadStream, - ) -> anyhow::Result { + ) -> InsertResult { self.assert_authorized(Permission::ObjectWrite, &context)?; self.service .insert_object(context, key, metadata, stream) @@ -64,16 +64,13 @@ impl AuthAwareService { } /// Auth-aware wrapper around [`StorageService::get_object`]. - pub async fn get_object( - &self, - id: &ObjectId, - ) -> anyhow::Result> { + pub async fn get_object(&self, id: &ObjectId) -> GetResult { self.assert_authorized(Permission::ObjectRead, id.context())?; self.service.get_object(id).await } /// Auth-aware wrapper around [`StorageService::delete_object`]. - pub async fn delete_object(&self, id: &ObjectId) -> anyhow::Result<()> { + pub async fn delete_object(&self, id: &ObjectId) -> DeleteResult { self.assert_authorized(Permission::ObjectDelete, id.context())?; self.service.delete_object(id).await } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs new file mode 100644 index 00000000..cded7324 --- /dev/null +++ b/objectstore-server/src/endpoints/batch.rs @@ -0,0 +1,112 @@ +use std::os::unix::fs::OpenOptionsExt; + +use axum::extract::{DefaultBodyLimit, Multipart}; +use axum::http::StatusCode; +use axum::response::{IntoResponse, IntoResponseParts}; +use axum::routing; +use axum::{Json, Router}; +use axum_extra::response::multiple::MultipartForm; +use futures::TryStreamExt; +use futures_util::StreamExt; +use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_service::{DeleteResult, GetResult}; +use objectstore_types::Metadata; +use serde::{Deserialize, Serialize}; + +use crate::auth::AuthAwareService; +use crate::error::ApiResult; +use crate::extractors::Xt; +use crate::state::ServiceState; + +pub fn router() -> Router { + Router::new() + .route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) + .layer(DefaultBodyLimit::max(500 * 1_000_000)) +} + +#[derive(Deserialize, Debug)] +#[serde(tag = "op")] +enum Operation { + Get(String), + Insert(Option), + Delete(String), +} + +#[derive(Deserialize, Debug)] +struct RequestManifest { + operations: Vec, +} + +#[derive(Serialize, Debug)] +struct Response {} + +pub trait IntoPart { + fn into_part(self) -> axum_extra::response::multiple::Part; +} + +impl IntoPart for GetResult { + fn into_part(self) -> axum_extra::response::multiple::Part { + todo!() + } +} + +#[derive(Deserialize, Debug)] +struct ResultManifest {} + +impl IntoPart for ResultManifest { + fn into_part(self) -> axum_extra::response::multiple::Part { + todo!() + } +} + +async fn batch( + service: AuthAwareService, + Xt(context): Xt, + mut multipart: Multipart, +) -> ApiResult { + let Some(manifest) = multipart.next_field().await? else { + return Ok((StatusCode::BAD_REQUEST, "expected manifest").into_response()); + }; + + // TODO: enforce max size on the manifest + let manifest = manifest.bytes().await?; + let manifest = str::from_utf8(&manifest) + .map_err(|err| anyhow::Error::new(err).context("failed to parse manifest as UTF-8"))?; + let manifest: RequestManifest = serde_json::from_str(manifest) + .map_err(|err| anyhow::Error::new(err).context("failed to deserialize manifest"))?; + + let result_manifest = ResultManifest {}; + let mut parts = vec![]; + for operation in manifest.operations { + let result = match operation { + Operation::Get(key) => { + let result = service + .get_object(&ObjectId::new(context.clone(), key)) + .await; + parts.push(result.into_part()); + } + Operation::Insert(key) => { + service.insert_object( + context.clone(), + key, + &Metadata::default(), + multipart + .next_field() + .await + .unwrap() + .unwrap() + .map_err(|err| std::io::Error::other(err)) + .boxed(), + ); + } + Operation::Delete(key) => { + service + .delete_object(&ObjectId::new(context.clone(), key)) + .await; + } + }; + } + parts.insert(0, result_manifest.into_part()); + + Ok(MultipartForm::with_parts(parts).into_response()) +} diff --git a/objectstore-server/src/endpoints/mod.rs b/objectstore-server/src/endpoints/mod.rs index 2f6df835..1b9854c7 100644 --- a/objectstore-server/src/endpoints/mod.rs +++ b/objectstore-server/src/endpoints/mod.rs @@ -6,11 +6,14 @@ use axum::Router; use crate::state::ServiceState; +mod batch; mod health; mod objects; pub fn routes() -> Router { - let routes_v1 = Router::new().merge(objects::router()); + let routes_v1 = Router::new() + .merge(objects::router()) + .merge(batch::router()); Router::new() .merge(health::router()) diff --git a/objectstore-server/src/error.rs b/objectstore-server/src/error.rs index 8da1c300..5a3a1d5a 100644 --- a/objectstore-server/src/error.rs +++ b/objectstore-server/src/error.rs @@ -1,8 +1,10 @@ //! //! This is mostly adapted from +use axum::extract::multipart::MultipartError; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +use objectstore_service::DeleteResult; pub enum AnyhowResponse { Error(anyhow::Error), @@ -40,3 +42,9 @@ impl From for AnyhowResponse { Self::Error(err) } } + +impl From for AnyhowResponse { + fn from(value: MultipartError) -> Self { + Self::Error(value.into()) + } +} diff --git a/objectstore-service/src/id.rs b/objectstore-service/src/id.rs index 622cf652..fdd34177 100644 --- a/objectstore-service/src/id.rs +++ b/objectstore-service/src/id.rs @@ -11,9 +11,9 @@ use std::fmt; use objectstore_types::scope::{Scope, Scopes}; -/// Defines where an object belongs within the object store. +/// Defines where an object, or batch of objects, belongs within the object store. /// -/// This is part of the full object identifier, see [`ObjectId`]. +/// This is part of the full object identifier for single objects, see [`ObjectId`]. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ObjectContext { /// The usecase, or "product" this object belongs to. diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 3566044a..0ddf4ab7 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -86,6 +86,10 @@ pub enum StorageConfig<'a> { }, } +pub type GetResult = anyhow::Result>; +pub type InsertResult = anyhow::Result; +pub type DeleteResult = anyhow::Result<()>; + impl StorageService { /// Creates a new `StorageService` with the specified configuration. pub async fn new( From 61f661314661c753b6f7f8a095d43e8525035e9e Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Tue, 16 Dec 2025 14:06:34 +0100 Subject: [PATCH 02/30] pass around stream of inserts --- Cargo.lock | 2 + objectstore-server/Cargo.toml | 2 + objectstore-server/src/auth/service.rs | 16 ++++- objectstore-server/src/endpoints/batch.rs | 76 ++++++++--------------- objectstore-service/src/lib.rs | 31 +++++++++ 5 files changed, 77 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 49ed150d..5fe17d0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2219,11 +2219,13 @@ dependencies = [ "argh", "axum", "axum-extra", + "bytes", "console", "elegant-departure", "figment", "futures", "futures-util", + "http 1.3.1", "humantime", "humantime-serde", "jsonwebtoken", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index b76215a6..fb691efe 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -15,11 +15,13 @@ anyhow = { workspace = true } argh = "0.1.13" axum = { version = "0.8.4", features = ["multipart"] } axum-extra = { version = "0.12.2", features = ["multipart"] } +bytes = { workspace = true } console = "0.16.1" elegant-departure = { version = "0.3.2", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "test", "yaml"] } futures = { workspace = true } futures-util = { workspace = true } +http = { workspace = true } humantime = { workspace = true } humantime-serde = { workspace = true } jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] } diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index e6d354e5..c89a0c2b 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,5 +1,9 @@ +use bytes::Bytes; +use futures::Stream; use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{DeleteResult, GetResult, InsertResult, PayloadStream, StorageService}; +use objectstore_service::{ + BatchInsertResult, DeleteResult, GetResult, InsertResult, PayloadStream, StorageService, +}; use objectstore_types::{Metadata, Permission}; use crate::auth::AuthContext; @@ -74,4 +78,14 @@ impl AuthAwareService { self.assert_authorized(Permission::ObjectDelete, id.context())?; self.service.delete_object(id).await } + + /// Auth-aware wrapper around [`StorageService::insert_objects`]. + pub async fn insert_objects( + &self, + context: ObjectContext, + inserts: impl Stream>, + ) -> BatchInsertResult { + self.assert_authorized(Permission::ObjectWrite, &context)?; + self.service.insert_objects(context, inserts).await + } } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index cded7324..9886be21 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -2,14 +2,15 @@ use std::os::unix::fs::OpenOptionsExt; use axum::extract::{DefaultBodyLimit, Multipart}; use axum::http::StatusCode; -use axum::response::{IntoResponse, IntoResponseParts}; +use axum::response::IntoResponse; +use axum::response::Response; use axum::routing; use axum::{Json, Router}; -use axum_extra::response::multiple::MultipartForm; +use axum_extra::response::multiple::{MultipartForm, Part}; use futures::TryStreamExt; -use futures_util::StreamExt; +use futures::stream::{self, StreamExt, unfold}; use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{DeleteResult, GetResult}; +use objectstore_service::{DeleteResult, GetResult, InsertResult}; use objectstore_types::Metadata; use serde::{Deserialize, Serialize}; @@ -37,28 +38,6 @@ struct RequestManifest { operations: Vec, } -#[derive(Serialize, Debug)] -struct Response {} - -pub trait IntoPart { - fn into_part(self) -> axum_extra::response::multiple::Part; -} - -impl IntoPart for GetResult { - fn into_part(self) -> axum_extra::response::multiple::Part { - todo!() - } -} - -#[derive(Deserialize, Debug)] -struct ResultManifest {} - -impl IntoPart for ResultManifest { - fn into_part(self) -> axum_extra::response::multiple::Part { - todo!() - } -} - async fn batch( service: AuthAwareService, Xt(context): Xt, @@ -75,38 +54,37 @@ async fn batch( let manifest: RequestManifest = serde_json::from_str(manifest) .map_err(|err| anyhow::Error::new(err).context("failed to deserialize manifest"))?; - let result_manifest = ResultManifest {}; - let mut parts = vec![]; - for operation in manifest.operations { - let result = match operation { + let inserts = unfold(multipart, |mut m| async move { + match m.next_field().await { + Ok(Some(field)) => { + let metadata = Metadata::from_headers(field.headers(), "").unwrap(); + let bytes = field + .bytes() + .await + .map_err(|e| anyhow::Error::new(e)) + .unwrap(); + Some((Ok((metadata, bytes)), m)) + } + Ok(None) => None, + Err(_) => todo!(), + } + }); + let insert_results = service.insert_objects(context.clone(), inserts).await; + + for operation in manifest.operations.into_iter() { + match operation { Operation::Get(key) => { let result = service .get_object(&ObjectId::new(context.clone(), key)) .await; - parts.push(result.into_part()); - } - Operation::Insert(key) => { - service.insert_object( - context.clone(), - key, - &Metadata::default(), - multipart - .next_field() - .await - .unwrap() - .unwrap() - .map_err(|err| std::io::Error::other(err)) - .boxed(), - ); } Operation::Delete(key) => { - service + let result = service .delete_object(&ObjectId::new(context.clone(), key)) .await; } + _ => (), }; } - parts.insert(0, result_manifest.into_part()); - - Ok(MultipartForm::with_parts(parts).into_response()) + Ok(StatusCode::OK.into_response()) } diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 0ddf4ab7..531672b1 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -15,6 +15,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; use bytes::{Bytes, BytesMut}; +use futures_util::Stream; use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; use objectstore_types::Metadata; @@ -88,6 +89,7 @@ pub enum StorageConfig<'a> { pub type GetResult = anyhow::Result>; pub type InsertResult = anyhow::Result; +pub type BatchInsertResult = anyhow::Result>; pub type DeleteResult = anyhow::Result<()>; impl StorageService { @@ -278,6 +280,35 @@ impl StorageService { Ok(()) } + + /// TODO + pub async fn insert_objects( + &self, + context: ObjectContext, + inserts: impl Stream>, + ) -> BatchInsertResult { + let mut inserts = Box::pin(inserts); + + let mut results = Vec::new(); + while let Some(item) = inserts.next().await { + let result = match item { + Ok((metadata, bytes)) => { + let id = ObjectId::optional(context.clone(), None); + let stream = futures_util::stream::once(async { Ok(bytes) }).boxed(); + + self.0 + .high_volume_backend + .put_object(&id, &metadata, stream) + .await?; + + Ok(id) + } + Err(e) => Err(e), + }; + results.push(result); + } + Ok(results) + } } fn is_tombstoned(result: &Option<(Metadata, PayloadStream)>) -> bool { From 2cea755f917e99a357723d704ada8002d22abe82 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:29:25 +0100 Subject: [PATCH 03/30] wip --- Cargo.lock | 14 ++ objectstore-server/Cargo.toml | 4 +- objectstore-server/src/endpoints/batch.rs | 85 ++---------- objectstore-server/src/extractors/batch.rs | 146 +++++++++++++++++++++ objectstore-server/src/extractors/mod.rs | 3 + objectstore-service/src/id.rs | 10 +- objectstore-service/src/lib.rs | 2 +- 7 files changed, 184 insertions(+), 80 deletions(-) create mode 100644 objectstore-server/src/extractors/batch.rs diff --git a/Cargo.lock b/Cargo.lock index 5fe17d0c..15a8cfb3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -300,6 +300,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", + "axum-macros", "bytes", "form_urlencoded", "futures-util", @@ -369,6 +370,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "backtrace" version = "0.3.76" @@ -2231,6 +2243,8 @@ dependencies = [ "jsonwebtoken", "merni", "mimalloc", + "mime", + "multer", "nix", "num_cpus", "objectstore-service", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index fb691efe..c7183078 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,7 +13,7 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" -axum = { version = "0.8.4", features = ["multipart"] } +axum = { version = "0.8.4", features = ["multipart", "macros"] } axum-extra = { version = "0.12.2", features = ["multipart"] } bytes = { workspace = true } console = "0.16.1" @@ -27,6 +27,8 @@ humantime-serde = { workspace = true } jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] } merni = { workspace = true } mimalloc = { workspace = true } +mime = "0.3.17" +multer = "3.1.0" num_cpus = "1.17.0" objectstore-service = { workspace = true } objectstore-types = { workspace = true } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 9886be21..4954e8b8 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -1,22 +1,13 @@ -use std::os::unix::fs::OpenOptionsExt; - -use axum::extract::{DefaultBodyLimit, Multipart}; +use axum::Router; +use axum::extract::DefaultBodyLimit; use axum::http::StatusCode; -use axum::response::IntoResponse; -use axum::response::Response; +use axum::response::{IntoResponse, Response}; use axum::routing; -use axum::{Json, Router}; -use axum_extra::response::multiple::{MultipartForm, Part}; -use futures::TryStreamExt; -use futures::stream::{self, StreamExt, unfold}; -use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{DeleteResult, GetResult, InsertResult}; -use objectstore_types::Metadata; -use serde::{Deserialize, Serialize}; +use objectstore_service::id::ObjectContext; use crate::auth::AuthAwareService; use crate::error::ApiResult; -use crate::extractors::Xt; +use crate::extractors::{BatchRequest, Xt}; use crate::state::ServiceState; pub fn router() -> Router { @@ -25,66 +16,10 @@ pub fn router() -> Router { .layer(DefaultBodyLimit::max(500 * 1_000_000)) } -#[derive(Deserialize, Debug)] -#[serde(tag = "op")] -enum Operation { - Get(String), - Insert(Option), - Delete(String), -} - -#[derive(Deserialize, Debug)] -struct RequestManifest { - operations: Vec, -} - async fn batch( - service: AuthAwareService, - Xt(context): Xt, - mut multipart: Multipart, -) -> ApiResult { - let Some(manifest) = multipart.next_field().await? else { - return Ok((StatusCode::BAD_REQUEST, "expected manifest").into_response()); - }; - - // TODO: enforce max size on the manifest - let manifest = manifest.bytes().await?; - let manifest = str::from_utf8(&manifest) - .map_err(|err| anyhow::Error::new(err).context("failed to parse manifest as UTF-8"))?; - let manifest: RequestManifest = serde_json::from_str(manifest) - .map_err(|err| anyhow::Error::new(err).context("failed to deserialize manifest"))?; - - let inserts = unfold(multipart, |mut m| async move { - match m.next_field().await { - Ok(Some(field)) => { - let metadata = Metadata::from_headers(field.headers(), "").unwrap(); - let bytes = field - .bytes() - .await - .map_err(|e| anyhow::Error::new(e)) - .unwrap(); - Some((Ok((metadata, bytes)), m)) - } - Ok(None) => None, - Err(_) => todo!(), - } - }); - let insert_results = service.insert_objects(context.clone(), inserts).await; - - for operation in manifest.operations.into_iter() { - match operation { - Operation::Get(key) => { - let result = service - .get_object(&ObjectId::new(context.clone(), key)) - .await; - } - Operation::Delete(key) => { - let result = service - .delete_object(&ObjectId::new(context.clone(), key)) - .await; - } - _ => (), - }; - } - Ok(StatusCode::OK.into_response()) + _service: AuthAwareService, + Xt(_context): Xt, + _request: BatchRequest, +) -> ApiResult { + Ok(StatusCode::NOT_IMPLEMENTED.into_response()) } diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs new file mode 100644 index 00000000..7fd436fe --- /dev/null +++ b/objectstore-server/src/extractors/batch.rs @@ -0,0 +1,146 @@ +use std::{fmt::Debug, path::Path, pin::Pin}; + +use anyhow::Context; +use axum::{ + RequestExt, Router, + body::{Body, Bytes}, + extract::{FromRequest, Request}, + http::{ + StatusCode, + header::{HeaderValue, USER_AGENT}, + }, + response::{IntoResponse, Response}, + routing::get, +}; +use bytes::Buf; +use futures::{Stream, stream}; +use http::header::CONTENT_TYPE; +use multer::{Constraints, Multipart, SizeLimit}; +use objectstore_service::{BACKEND_SIZE_THRESHOLD, id::ObjectKey}; +use objectstore_types::Metadata; +use serde::Deserialize; + +use crate::error::AnyhowResponse; + +#[derive(Deserialize, Debug)] +#[serde(tag = "op")] +pub enum Operation { + Get(ObjectKey), + Insert(Option), + Delete(ObjectKey), +} + +#[derive(Deserialize, Debug)] +pub struct Manifest { + pub operations: Vec, +} + +pub struct BatchRequest { + pub manifest: Manifest, + pub parts: Pin> + Send>>, +} + +const MANIFEST_FIELD_NAME: &'static str = "manifest"; + +impl Debug for BatchRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BatchRequest") + .field("manifest", &self.manifest) + .finish() + } +} + +impl FromRequest for BatchRequest +where + S: Send + Sync, +{ + type Rejection = AnyhowResponse; + + async fn from_request(request: Request, _: &S) -> Result { + let Some(content_type) = request + .headers() + .get(CONTENT_TYPE) + .and_then(|ct| ct.to_str().ok()) + else { + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type") + .into_response() + .into()); + }; + + let Ok(mime) = content_type.parse::() else { + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type") + .into_response() + .into()); + }; + if !(mime.type_() == mime::MULTIPART && mime.subtype() == "mixed") { + return Err(( + StatusCode::BAD_REQUEST, + "expected Content-Type: multipart/mixed", + ) + .into_response() + .into()); + } + + // XXX: `multer::parse_boundary` requires the content-type to be `multipart/form-data` + let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); + let boundary = + multer::parse_boundary(content_type).context("failed to parse multipart boundary")?; + + let mut parts = Multipart::with_constraints( + request.into_body().into_data_stream(), + boundary, + Constraints::new().size_limit( + SizeLimit::new() + // 200 MiB: BigTable's maximum size for a single mutation + .whole_stream(200 * 1024 * 1024) + // A single operation serializes to (minimum) roughly 14 bytes, so this roughly + // means we accept a maximum of 10_000 operations per batch request + .for_field(MANIFEST_FIELD_NAME, 14 * 10_000) + // Each payload needs to be within the maximum size supported by BigTable + .per_field(BACKEND_SIZE_THRESHOLD as u64), + ), + ); + + let manifest = parts + .next_field() + .await + .context("failed to parse multipart part")? + .ok_or( + ( + StatusCode::BAD_REQUEST, + "expected at least one multipart part", + ) + .into_response(), + )?; + let manifest = manifest + .bytes() + .await + .context("failed to extract manifest")?; + let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) + .context("failed to parse manifest")?; + + let parts = Box::pin(stream::unfold(parts, |mut m| async move { + match m.next_field().await { + Ok(Some(field)) => { + let metadata = match Metadata::from_headers(field.headers(), "") { + Ok(metadata) => metadata, + Err(err) => { + return Some((Err(err.into()), m)); + } + }; + let bytes = match field.bytes().await { + Ok(bytes) => bytes, + Err(err) => { + return Some((Err(err.into()), m)); + } + }; + Some((Ok((metadata, bytes)), m)) + } + Ok(None) => None, + Err(err) => Some((Err(err).context("failed to parse multipart part"), m)), + } + })); + + Ok(Self { manifest, parts }) + } +} diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 5b270335..2bd79ed0 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -1,3 +1,4 @@ +mod batch; mod id; mod service; @@ -8,3 +9,5 @@ mod service; /// `Xt` for this to work. #[derive(Debug)] pub struct Xt(pub T); + +pub use batch::{BatchRequest, Manifest, Operation}; diff --git a/objectstore-service/src/id.rs b/objectstore-service/src/id.rs index fdd34177..7219d35d 100644 --- a/objectstore-service/src/id.rs +++ b/objectstore-service/src/id.rs @@ -10,11 +10,12 @@ use std::fmt; use objectstore_types::scope::{Scope, Scopes}; +use serde::Deserialize; /// Defines where an object, or batch of objects, belongs within the object store. /// /// This is part of the full object identifier for single objects, see [`ObjectId`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] pub struct ObjectContext { /// The usecase, or "product" this object belongs to. /// @@ -62,7 +63,7 @@ pub struct ObjectContext { /// /// This consists of a usecase and the scopes, which make up the object's context and define where /// the object belongs within objectstore, as well as the unique key within the context. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] pub struct ObjectId { /// The usecase and scopes this object belongs to. pub context: ObjectContext, @@ -73,9 +74,12 @@ pub struct ObjectId { /// a key makes a unique identifier. /// /// Keys can be assigned by the service. For this, use [`ObjectId::random`]. - pub key: String, + pub key: ObjectKey, } +/// A key that uniquely identifies an object within its usecase and scopes. +pub type ObjectKey = String; + impl ObjectId { /// Creates a new `ObjectId` with the given `context` and `key`. pub fn new(context: ObjectContext, key: String) -> Self { diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 531672b1..3ecca695 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -23,7 +23,7 @@ use crate::backend::common::BoxedBackend; use crate::id::{ObjectContext, ObjectId}; /// The threshold up until which we will go to the "high volume" backend. -const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB +pub const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB enum BackendChoice { HighVolume, From 4a20eab48d38b964883cf590e9e689f7e3b6fa93 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:31:12 +0100 Subject: [PATCH 04/30] wip --- Cargo.lock | 12 ------------ objectstore-server/Cargo.toml | 2 +- objectstore-service/src/id.rs | 5 ++--- 3 files changed, 3 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15a8cfb3..291b876b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -300,7 +300,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", - "axum-macros", "bytes", "form_urlencoded", "futures-util", @@ -370,17 +369,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum-macros" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "backtrace" version = "0.3.76" diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index c7183078..0e5917b2 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,7 +13,7 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" -axum = { version = "0.8.4", features = ["multipart", "macros"] } +axum = { version = "0.8.4", features = ["multipart"] } axum-extra = { version = "0.12.2", features = ["multipart"] } bytes = { workspace = true } console = "0.16.1" diff --git a/objectstore-service/src/id.rs b/objectstore-service/src/id.rs index 7219d35d..5154ba72 100644 --- a/objectstore-service/src/id.rs +++ b/objectstore-service/src/id.rs @@ -10,12 +10,11 @@ use std::fmt; use objectstore_types::scope::{Scope, Scopes}; -use serde::Deserialize; /// Defines where an object, or batch of objects, belongs within the object store. /// /// This is part of the full object identifier for single objects, see [`ObjectId`]. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ObjectContext { /// The usecase, or "product" this object belongs to. /// @@ -63,7 +62,7 @@ pub struct ObjectContext { /// /// This consists of a usecase and the scopes, which make up the object's context and define where /// the object belongs within objectstore, as well as the unique key within the context. -#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ObjectId { /// The usecase and scopes this object belongs to. pub context: ObjectContext, From 7cc9f12ba3cdf9df849d42318c57aad40971cff9 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:39:46 +0100 Subject: [PATCH 05/30] wip --- objectstore-server/src/auth/service.rs | 4 ++- objectstore-server/src/error.rs | 1 - objectstore-server/src/extractors/batch.rs | 13 +++------ objectstore-service/src/lib.rs | 34 +++++++--------------- 4 files changed, 17 insertions(+), 35 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index c89a0c2b..94f73e9f 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,3 +1,5 @@ +use std::pin::Pin; + use bytes::Bytes; use futures::Stream; use objectstore_service::id::{ObjectContext, ObjectId}; @@ -83,7 +85,7 @@ impl AuthAwareService { pub async fn insert_objects( &self, context: ObjectContext, - inserts: impl Stream>, + inserts: Pin> + Send>>, ) -> BatchInsertResult { self.assert_authorized(Permission::ObjectWrite, &context)?; self.service.insert_objects(context, inserts).await diff --git a/objectstore-server/src/error.rs b/objectstore-server/src/error.rs index 5a3a1d5a..346cee3a 100644 --- a/objectstore-server/src/error.rs +++ b/objectstore-server/src/error.rs @@ -4,7 +4,6 @@ use axum::extract::multipart::MultipartError; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; -use objectstore_service::DeleteResult; pub enum AnyhowResponse { Error(anyhow::Error), diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 7fd436fe..a91f922c 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,16 +1,11 @@ -use std::{fmt::Debug, path::Path, pin::Pin}; +use std::{fmt::Debug, pin::Pin}; use anyhow::Context; use axum::{ - RequestExt, Router, - body::{Body, Bytes}, + body::Bytes, extract::{FromRequest, Request}, - http::{ - StatusCode, - header::{HeaderValue, USER_AGENT}, - }, - response::{IntoResponse, Response}, - routing::get, + http::StatusCode, + response::IntoResponse, }; use bytes::Buf; use futures::{Stream, stream}; diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 3ecca695..51576512 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -10,6 +10,7 @@ mod backend; pub mod id; use std::path::Path; +use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; @@ -87,11 +88,16 @@ pub enum StorageConfig<'a> { }, } +/// Result type for get operations. pub type GetResult = anyhow::Result>; +/// Result type for insert operations. pub type InsertResult = anyhow::Result; -pub type BatchInsertResult = anyhow::Result>; +/// Result type for delete operations. pub type DeleteResult = anyhow::Result<()>; +/// Result type for batch insert operations. +pub type BatchInsertResult = anyhow::Result>; + impl StorageService { /// Creates a new `StorageService` with the specified configuration. pub async fn new( @@ -284,30 +290,10 @@ impl StorageService { /// TODO pub async fn insert_objects( &self, - context: ObjectContext, - inserts: impl Stream>, + _context: ObjectContext, + _inserts: Pin> + Send>>, ) -> BatchInsertResult { - let mut inserts = Box::pin(inserts); - - let mut results = Vec::new(); - while let Some(item) = inserts.next().await { - let result = match item { - Ok((metadata, bytes)) => { - let id = ObjectId::optional(context.clone(), None); - let stream = futures_util::stream::once(async { Ok(bytes) }).boxed(); - - self.0 - .high_volume_backend - .put_object(&id, &metadata, stream) - .await?; - - Ok(id) - } - Err(e) => Err(e), - }; - results.push(result); - } - Ok(results) + todo!(); } } From f0c4148e4c96659f6d65bf4a3ea977637d707c2a Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:45:36 +0100 Subject: [PATCH 06/30] wip --- objectstore-server/src/extractors/batch.rs | 13 ++++++------- objectstore-service/src/lib.rs | 3 +++ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index a91f922c..4bd7d170 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,17 +1,16 @@ -use std::{fmt::Debug, pin::Pin}; +use std::fmt::Debug; use anyhow::Context; use axum::{ - body::Bytes, extract::{FromRequest, Request}, http::StatusCode, response::IntoResponse, }; use bytes::Buf; -use futures::{Stream, stream}; +use futures::stream; use http::header::CONTENT_TYPE; use multer::{Constraints, Multipart, SizeLimit}; -use objectstore_service::{BACKEND_SIZE_THRESHOLD, id::ObjectKey}; +use objectstore_service::{BACKEND_SIZE_THRESHOLD, InsertStream, id::ObjectKey}; use objectstore_types::Metadata; use serde::Deserialize; @@ -32,7 +31,7 @@ pub struct Manifest { pub struct BatchRequest { pub manifest: Manifest, - pub parts: Pin> + Send>>, + pub inserts: InsertStream, } const MANIFEST_FIELD_NAME: &'static str = "manifest"; @@ -114,7 +113,7 @@ where let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) .context("failed to parse manifest")?; - let parts = Box::pin(stream::unfold(parts, |mut m| async move { + let inserts = Box::pin(stream::unfold(parts, |mut m| async move { match m.next_field().await { Ok(Some(field)) => { let metadata = match Metadata::from_headers(field.headers(), "") { @@ -136,6 +135,6 @@ where } })); - Ok(Self { manifest, parts }) + Ok(Self { manifest, inserts }) } } diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 51576512..aec6f98e 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -95,6 +95,9 @@ pub type InsertResult = anyhow::Result; /// Result type for delete operations. pub type DeleteResult = anyhow::Result<()>; +/// Type alias to represent a stream of insert operations. +pub type InsertStream = + Pin> + Send>>; /// Result type for batch insert operations. pub type BatchInsertResult = anyhow::Result>; From 3fa62d42858837aecef9fa0b7b643860db1b1133 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:46:08 +0100 Subject: [PATCH 07/30] wip --- objectstore-server/src/error.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/objectstore-server/src/error.rs b/objectstore-server/src/error.rs index 346cee3a..8da1c300 100644 --- a/objectstore-server/src/error.rs +++ b/objectstore-server/src/error.rs @@ -1,7 +1,6 @@ //! //! This is mostly adapted from -use axum::extract::multipart::MultipartError; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -41,9 +40,3 @@ impl From for AnyhowResponse { Self::Error(err) } } - -impl From for AnyhowResponse { - fn from(value: MultipartError) -> Self { - Self::Error(value.into()) - } -} From 3fea59f1de4e26dd91cebd1f5af3614a948f621b Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:48:12 +0100 Subject: [PATCH 08/30] wip --- objectstore-server/src/auth/service.rs | 5 +++-- objectstore-service/src/lib.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 94f73e9f..9d4a9f34 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -4,7 +4,8 @@ use bytes::Bytes; use futures::Stream; use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::{ - BatchInsertResult, DeleteResult, GetResult, InsertResult, PayloadStream, StorageService, + BatchInsertResult, DeleteResult, GetResult, InsertResult, InsertStream, PayloadStream, + StorageService, }; use objectstore_types::{Metadata, Permission}; @@ -85,7 +86,7 @@ impl AuthAwareService { pub async fn insert_objects( &self, context: ObjectContext, - inserts: Pin> + Send>>, + inserts: InsertStream, ) -> BatchInsertResult { self.assert_authorized(Permission::ObjectWrite, &context)?; self.service.insert_objects(context, inserts).await diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index aec6f98e..a8fef168 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -294,7 +294,7 @@ impl StorageService { pub async fn insert_objects( &self, _context: ObjectContext, - _inserts: Pin> + Send>>, + _inserts: InsertStream, ) -> BatchInsertResult { todo!(); } From 2431473fd01531b26f1cc10d8f212266797c0788 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:55:32 +0100 Subject: [PATCH 09/30] wip --- objectstore-server/src/endpoints/batch.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 4954e8b8..3a9133a1 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -1,5 +1,4 @@ use axum::Router; -use axum::extract::DefaultBodyLimit; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::routing; @@ -11,9 +10,7 @@ use crate::extractors::{BatchRequest, Xt}; use crate::state::ServiceState; pub fn router() -> Router { - Router::new() - .route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) - .layer(DefaultBodyLimit::max(500 * 1_000_000)) + Router::new().route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) } async fn batch( From 491518480dbf3b4dbd6209c904d1d31ee4a4a2c5 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:56:19 +0100 Subject: [PATCH 10/30] wip --- objectstore-server/src/auth/service.rs | 4 ---- objectstore-server/src/extractors/batch.rs | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 9d4a9f34..b8769cff 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,7 +1,3 @@ -use std::pin::Pin; - -use bytes::Bytes; -use futures::Stream; use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::{ BatchInsertResult, DeleteResult, GetResult, InsertResult, InsertStream, PayloadStream, diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 4bd7d170..fdca2a51 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -34,7 +34,7 @@ pub struct BatchRequest { pub inserts: InsertStream, } -const MANIFEST_FIELD_NAME: &'static str = "manifest"; +const MANIFEST_FIELD_NAME: &str = "manifest"; impl Debug for BatchRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { From 8e9b4f57bce7ce31a29fec3584d18fb4a3c92511 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:08:15 +0100 Subject: [PATCH 11/30] remove limits --- objectstore-server/src/extractors/batch.rs | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index fdca2a51..fc870a51 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -79,21 +79,7 @@ where let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); let boundary = multer::parse_boundary(content_type).context("failed to parse multipart boundary")?; - - let mut parts = Multipart::with_constraints( - request.into_body().into_data_stream(), - boundary, - Constraints::new().size_limit( - SizeLimit::new() - // 200 MiB: BigTable's maximum size for a single mutation - .whole_stream(200 * 1024 * 1024) - // A single operation serializes to (minimum) roughly 14 bytes, so this roughly - // means we accept a maximum of 10_000 operations per batch request - .for_field(MANIFEST_FIELD_NAME, 14 * 10_000) - // Each payload needs to be within the maximum size supported by BigTable - .per_field(BACKEND_SIZE_THRESHOLD as u64), - ), - ); + let mut parts = Multipart::new(request.into_body().into_data_stream(), boundary); let manifest = parts .next_field() From f52b7c1e62372df803867c7480abd9e661447510 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:08:46 +0100 Subject: [PATCH 12/30] wip --- objectstore-service/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index a8fef168..838be53b 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -24,7 +24,7 @@ use crate::backend::common::BoxedBackend; use crate::id::{ObjectContext, ObjectId}; /// The threshold up until which we will go to the "high volume" backend. -pub const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB +const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB enum BackendChoice { HighVolume, From 0413b61c7a0459038d1da6cab54fcac24c8c1e14 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:09:17 +0100 Subject: [PATCH 13/30] wip --- objectstore-server/src/extractors/batch.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index fc870a51..64775b04 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -9,8 +9,8 @@ use axum::{ use bytes::Buf; use futures::stream; use http::header::CONTENT_TYPE; -use multer::{Constraints, Multipart, SizeLimit}; -use objectstore_service::{BACKEND_SIZE_THRESHOLD, InsertStream, id::ObjectKey}; +use multer::Multipart; +use objectstore_service::{InsertStream, id::ObjectKey}; use objectstore_types::Metadata; use serde::Deserialize; @@ -34,8 +34,6 @@ pub struct BatchRequest { pub inserts: InsertStream, } -const MANIFEST_FIELD_NAME: &str = "manifest"; - impl Debug for BatchRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("BatchRequest") From a6686daefd67cf487fa63e167b64b5ccba819d91 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:36:05 +0100 Subject: [PATCH 14/30] add test --- objectstore-server/src/error.rs | 1 + objectstore-server/src/extractors/batch.rs | 82 ++++++++++++++++++++-- 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/objectstore-server/src/error.rs b/objectstore-server/src/error.rs index 8da1c300..799a035c 100644 --- a/objectstore-server/src/error.rs +++ b/objectstore-server/src/error.rs @@ -4,6 +4,7 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +#[derive(Debug)] pub enum AnyhowResponse { Error(anyhow::Error), Response(Response), diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 64775b04..bcc41b67 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -16,15 +16,15 @@ use serde::Deserialize; use crate::error::AnyhowResponse; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, PartialEq)] #[serde(tag = "op")] pub enum Operation { - Get(ObjectKey), - Insert(Option), - Delete(ObjectKey), + Get { key: ObjectKey }, + Insert { key: Option }, + Delete { key: ObjectKey }, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, PartialEq)] pub struct Manifest { pub operations: Vec, } @@ -122,3 +122,75 @@ where Ok(Self { manifest, inserts }) } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use axum::body::Body; + use axum::http::{Request, header::CONTENT_TYPE}; + use futures::StreamExt; + use objectstore_types::{ExpirationPolicy, HEADER_EXPIRATION}; + + #[tokio::test] + async fn test_valid_request_works() { + let manifest = r#"{"operations":[{"op":"Insert"},{"op":"Get","key":"abc123"},{"op":"Insert","key":"xyz789"},{"op":"Delete","key":"def456"}]}"#; + let insert1_data = b"first blob data"; + let insert2_data = b"second blob data"; + let expiration = ExpirationPolicy::TimeToLive(Duration::from_hours(1)); + let body = format!( + "--boundary\r\n\ + Content-Type: application/json\r\n\ + \r\n\ + {manifest}\r\n\ + --boundary\r\n\ + Content-Type: application/octet-stream\r\n\ + \r\n\ + {insert1}\r\n\ + --boundary\r\n\ + Content-Type: text/plain\r\n\ + {HEADER_EXPIRATION}: {expiration}\r\n\ + \r\n\ + {insert2}\r\n\ + --boundary--\r\n", + insert1 = String::from_utf8_lossy(insert1_data), + insert2 = String::from_utf8_lossy(insert2_data), + ); + + let request = Request::builder() + .header(CONTENT_TYPE, "multipart/mixed; boundary=boundary") + .body(Body::from(body)) + .unwrap(); + + let batch_request = BatchRequest::from_request(request, &()).await.unwrap(); + + let expected_manifest = Manifest { + operations: vec![ + Operation::Insert { key: None }, + Operation::Get { + key: "abc123".to_string(), + }, + Operation::Insert { + key: Some("xyz789".to_string()), + }, + Operation::Delete { + key: "def456".to_string(), + }, + ], + }; + assert_eq!(batch_request.manifest, expected_manifest); + + let inserts: Vec<_> = batch_request.inserts.collect().await; + assert_eq!(inserts.len(), 2); + + let (metadata1, bytes1) = inserts[0].as_ref().unwrap(); + assert_eq!(metadata1.content_type, "application/octet-stream"); + assert_eq!(bytes1.as_ref(), insert1_data); + + let (metadata2, bytes2) = inserts[1].as_ref().unwrap(); + assert_eq!(metadata2.content_type, "text/plain"); + assert_eq!(metadata2.expiration_policy, expiration); + assert_eq!(bytes2.as_ref(), insert2_data); + } +} From bc7b1e7a04b81a03fd6d6bb0d52277bd2aca7daa Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:46:51 +0100 Subject: [PATCH 15/30] missing debug impls --- objectstore-server/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/objectstore-server/src/lib.rs b/objectstore-server/src/lib.rs index a7e3653f..8d5c01c0 100644 --- a/objectstore-server/src/lib.rs +++ b/objectstore-server/src/lib.rs @@ -2,6 +2,7 @@ //! //! This builds on top of the [`objectstore_service`], and exposes the underlying storage layer as //! an `HTTP` layer which can serve files directly to *external clients* and our SDK. +#![warn(missing_debug_implementations)] pub mod auth; pub mod cli; From a042ad5fe84100703baf73950d4cfc40d380f5ac Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 11:54:14 +0100 Subject: [PATCH 16/30] rename --- objectstore-server/src/extractors/batch.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index bcc41b67..1592b076 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -97,25 +97,25 @@ where let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) .context("failed to parse manifest")?; - let inserts = Box::pin(stream::unfold(parts, |mut m| async move { - match m.next_field().await { + let inserts = Box::pin(stream::unfold(parts, |mut parts| async move { + match parts.next_field().await { Ok(Some(field)) => { let metadata = match Metadata::from_headers(field.headers(), "") { Ok(metadata) => metadata, Err(err) => { - return Some((Err(err.into()), m)); + return Some((Err(err.into()), parts)); } }; let bytes = match field.bytes().await { Ok(bytes) => bytes, Err(err) => { - return Some((Err(err.into()), m)); + return Some((Err(err.into()), parts)); } }; - Some((Ok((metadata, bytes)), m)) + Some((Ok((metadata, bytes)), parts)) } Ok(None) => None, - Err(err) => Some((Err(err).context("failed to parse multipart part"), m)), + Err(err) => Some((Err(err).context("failed to parse multipart part"), parts)), } })); From 8b01f51635e51fe1ae3f0eb6be4f926861c7854e Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 12:01:41 +0100 Subject: [PATCH 17/30] skill issues --- Cargo.lock | 23 ++++++++++++++++++ objectstore-server/Cargo.toml | 1 + objectstore-server/src/extractors/batch.rs | 27 ++++++---------------- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 291b876b..1fa65102 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -2217,6 +2239,7 @@ version = "0.1.0" dependencies = [ "anyhow", "argh", + "async-stream", "axum", "axum-extra", "bytes", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 0e5917b2..a6124f38 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,6 +13,7 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" +async-stream = "0.3.6" axum = { version = "0.8.4", features = ["multipart"] } axum-extra = { version = "0.12.2", features = ["multipart"] } bytes = { workspace = true } diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 1592b076..90825fc1 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use anyhow::Context; +use async_stream::try_stream; use axum::{ extract::{FromRequest, Request}, http::StatusCode, @@ -97,27 +98,13 @@ where let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) .context("failed to parse manifest")?; - let inserts = Box::pin(stream::unfold(parts, |mut parts| async move { - match parts.next_field().await { - Ok(Some(field)) => { - let metadata = match Metadata::from_headers(field.headers(), "") { - Ok(metadata) => metadata, - Err(err) => { - return Some((Err(err.into()), parts)); - } - }; - let bytes = match field.bytes().await { - Ok(bytes) => bytes, - Err(err) => { - return Some((Err(err.into()), parts)); - } - }; - Some((Ok((metadata, bytes)), parts)) - } - Ok(None) => None, - Err(err) => Some((Err(err).context("failed to parse multipart part"), parts)), + let inserts = Box::pin(async_stream::try_stream! { + while let Some(field) = parts.next_field().await? { + let metadata = Metadata::from_headers(field.headers(), "")?; + let bytes = field.bytes().await?; + yield (metadata, bytes); } - })); + }); Ok(Self { manifest, inserts }) } From eee86fb876feb921d8b57f73d626e2da19e25f12 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Wed, 17 Dec 2025 12:24:22 +0100 Subject: [PATCH 18/30] rename all lowercase --- objectstore-server/src/extractors/batch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 90825fc1..1d05201b 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -18,7 +18,7 @@ use serde::Deserialize; use crate::error::AnyhowResponse; #[derive(Deserialize, Debug, PartialEq)] -#[serde(tag = "op")] +#[serde(tag = "op", rename_all = "lowercase")] pub enum Operation { Get { key: ObjectKey }, Insert { key: Option }, @@ -122,7 +122,7 @@ mod tests { #[tokio::test] async fn test_valid_request_works() { - let manifest = r#"{"operations":[{"op":"Insert"},{"op":"Get","key":"abc123"},{"op":"Insert","key":"xyz789"},{"op":"Delete","key":"def456"}]}"#; + let manifest = r#"{"operations":[{"op":"insert"},{"op":"get","key":"abc123"},{"op":"insert","key":"xyz789"},{"op":"delete","key":"def456"}]}"#; let insert1_data = b"first blob data"; let insert2_data = b"second blob data"; let expiration = ExpirationPolicy::TimeToLive(Duration::from_hours(1)); From a57951bd6bdd690c065efe6d65d2310c4be21530 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 18 Dec 2025 14:55:39 +0100 Subject: [PATCH 19/30] refactor to new structure --- objectstore-server/src/extractors/batch.rs | 194 +++++++++++++-------- objectstore-server/src/extractors/mod.rs | 2 +- 2 files changed, 123 insertions(+), 73 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 1d05201b..9a687124 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,48 +1,96 @@ -use std::fmt::Debug; +use std::{fmt::Debug, pin::Pin}; use anyhow::Context; -use async_stream::try_stream; use axum::{ extract::{FromRequest, Request}, http::StatusCode, response::IntoResponse, }; -use bytes::Buf; -use futures::stream; +use bytes::Bytes; +use futures::Stream; use http::header::CONTENT_TYPE; -use multer::Multipart; -use objectstore_service::{InsertStream, id::ObjectKey}; +use multer::Field; +use multer::{Constraints, Multipart, SizeLimit}; +use objectstore_service::id::ObjectKey; use objectstore_types::Metadata; -use serde::Deserialize; use crate::error::AnyhowResponse; -#[derive(Deserialize, Debug, PartialEq)] -#[serde(tag = "op", rename_all = "lowercase")] +#[derive(Debug)] +pub struct GetOperation { + pub key: ObjectKey, +} + +#[derive(Debug)] +pub struct InsertOperation { + pub key: Option, + pub metadata: Metadata, + pub payload: Bytes, +} + +#[derive(Debug)] +pub struct DeleteOperation { + pub key: ObjectKey, +} + +#[derive(Debug)] pub enum Operation { - Get { key: ObjectKey }, - Insert { key: Option }, - Delete { key: ObjectKey }, + Get(GetOperation), + Insert(InsertOperation), + Delete(DeleteOperation), } -#[derive(Deserialize, Debug, PartialEq)] -pub struct Manifest { - pub operations: Vec, +impl Operation { + async fn try_from_field(field: Field<'_>) -> anyhow::Result { + let kind = field + .headers() + .get(HEADER_BATCH_OPERATION_KIND) + .ok_or(anyhow::anyhow!( + "missing {HEADER_BATCH_OPERATION_KIND} header" + ))?; + let kind = kind + .to_str() + .context(format!("invalid {HEADER_BATCH_OPERATION_KIND} header"))? + .to_lowercase(); + + let key = match field.headers().get(HEADER_BATCH_OPERATION_KEY) { + Some(key) => Some(key.to_str().context("invalid object key")?.to_owned()), + None => None, + }; + + let operation = match kind.as_str() { + "get" => { + let key = key.context("missing object key for get operation")?; + Operation::Get(GetOperation { key }) + } + "insert" => Operation::Insert(InsertOperation { + key, + metadata: Metadata::from_headers(field.headers(), "")?, + payload: field.bytes().await?, + }), + "delete" => { + let key = key.context("missing object key for delet operation")?; + Operation::Delete(DeleteOperation { key }) + } + _ => anyhow::bail!("invalid {HEADER_BATCH_OPERATION_KIND} header"), + }; + Ok(operation) + } } pub struct BatchRequest { - pub manifest: Manifest, - pub inserts: InsertStream, + pub operations: Pin> + Send>>, } impl Debug for BatchRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BatchRequest") - .field("manifest", &self.manifest) - .finish() + f.debug_struct("BatchRequest").finish() } } +pub const HEADER_BATCH_OPERATION_KIND: &str = "x-sn-batch-operation-kind"; +pub const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key"; + impl FromRequest for BatchRequest where S: Send + Sync, @@ -78,35 +126,28 @@ where let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); let boundary = multer::parse_boundary(content_type).context("failed to parse multipart boundary")?; - let mut parts = Multipart::new(request.into_body().into_data_stream(), boundary); - - let manifest = parts - .next_field() - .await - .context("failed to parse multipart part")? - .ok_or( - ( - StatusCode::BAD_REQUEST, - "expected at least one multipart part", - ) - .into_response(), - )?; - let manifest = manifest - .bytes() - .await - .context("failed to extract manifest")?; - let manifest = serde_json::from_reader::<_, Manifest>(manifest.reader()) - .context("failed to parse manifest")?; - - let inserts = Box::pin(async_stream::try_stream! { + let mut parts = Multipart::with_constraints( + request.into_body().into_data_stream(), + boundary, + Constraints::new().size_limit( + // TODO(lcian): tentative limits that should be tested + SizeLimit::new() + .per_field(1024 * 1024) // 1 MB + .whole_stream(1024 * 1024 * 1024), // 1 GB + ), + ); + let operations = Box::pin(async_stream::try_stream! { + let mut count = 0; while let Some(field) = parts.next_field().await? { - let metadata = Metadata::from_headers(field.headers(), "")?; - let bytes = field.bytes().await?; - yield (metadata, bytes); + if count >= 1000 { + Err(anyhow::anyhow!("exceeded limit of 1000 operations per batch request"))?; + } + count += 1; + yield Operation::try_from_field(field).await?; } }); - Ok(Self { manifest, inserts }) + Ok(Self { operations }) } } @@ -122,24 +163,33 @@ mod tests { #[tokio::test] async fn test_valid_request_works() { - let manifest = r#"{"operations":[{"op":"insert"},{"op":"get","key":"abc123"},{"op":"insert","key":"xyz789"},{"op":"delete","key":"def456"}]}"#; let insert1_data = b"first blob data"; let insert2_data = b"second blob data"; let expiration = ExpirationPolicy::TimeToLive(Duration::from_hours(1)); let body = format!( "--boundary\r\n\ - Content-Type: application/json\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test0\r\n\ + {HEADER_BATCH_OPERATION_KIND}: get\r\n\ + \r\n\ \r\n\ - {manifest}\r\n\ --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test1\r\n\ + {HEADER_BATCH_OPERATION_KIND}: insert\r\n\ Content-Type: application/octet-stream\r\n\ \r\n\ {insert1}\r\n\ --boundary\r\n\ - Content-Type: text/plain\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test2\r\n\ + {HEADER_BATCH_OPERATION_KIND}: insert\r\n\ {HEADER_EXPIRATION}: {expiration}\r\n\ + Content-Type: text/plain\r\n\ \r\n\ {insert2}\r\n\ + --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test3\r\n\ + {HEADER_BATCH_OPERATION_KIND}: delete\r\n\ + \r\n\ + \r\n\ --boundary--\r\n", insert1 = String::from_utf8_lossy(insert1_data), insert2 = String::from_utf8_lossy(insert2_data), @@ -152,32 +202,32 @@ mod tests { let batch_request = BatchRequest::from_request(request, &()).await.unwrap(); - let expected_manifest = Manifest { - operations: vec![ - Operation::Insert { key: None }, - Operation::Get { - key: "abc123".to_string(), - }, - Operation::Insert { - key: Some("xyz789".to_string()), - }, - Operation::Delete { - key: "def456".to_string(), - }, - ], + let operations: Vec<_> = batch_request.operations.collect().await; + assert_eq!(operations.len(), 4); + + let Operation::Get(get_op) = &operations[0].as_ref().unwrap() else { + panic!("expected get operation"); }; - assert_eq!(batch_request.manifest, expected_manifest); + assert_eq!(get_op.key, "test0"); - let inserts: Vec<_> = batch_request.inserts.collect().await; - assert_eq!(inserts.len(), 2); + let Operation::Insert(insert_op1) = &operations[1].as_ref().unwrap() else { + panic!("expected insert operation"); + }; + assert_eq!(insert_op1.key.as_ref().unwrap(), "test1"); + assert_eq!(insert_op1.metadata.content_type, "application/octet-stream"); + assert_eq!(insert_op1.payload.as_ref(), insert1_data); - let (metadata1, bytes1) = inserts[0].as_ref().unwrap(); - assert_eq!(metadata1.content_type, "application/octet-stream"); - assert_eq!(bytes1.as_ref(), insert1_data); + let Operation::Insert(insert_op2) = &operations[2].as_ref().unwrap() else { + panic!("expected insert operation"); + }; + assert_eq!(insert_op2.key.as_ref().unwrap(), "test2"); + assert_eq!(insert_op2.metadata.content_type, "text/plain"); + assert_eq!(insert_op2.metadata.expiration_policy, expiration); + assert_eq!(insert_op2.payload.as_ref(), insert2_data); - let (metadata2, bytes2) = inserts[1].as_ref().unwrap(); - assert_eq!(metadata2.content_type, "text/plain"); - assert_eq!(metadata2.expiration_policy, expiration); - assert_eq!(bytes2.as_ref(), insert2_data); + let Operation::Delete(delete_op) = &operations[3].as_ref().unwrap() else { + panic!("expected delete operation"); + }; + assert_eq!(delete_op.key, "test3"); } } diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 2bd79ed0..8cc4d34b 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -10,4 +10,4 @@ mod service; #[derive(Debug)] pub struct Xt(pub T); -pub use batch::{BatchRequest, Manifest, Operation}; +pub use batch::{BatchRequest, Operation, GetOperation, InsertOperation, DeleteOperation}; From 557301e32f4f6caf2fc53709b0225b72287c3b66 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 13:57:31 +0100 Subject: [PATCH 20/30] improve --- objectstore-server/src/extractors/batch.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 9a687124..b7dafc74 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -7,7 +7,7 @@ use axum::{ response::IntoResponse, }; use bytes::Bytes; -use futures::Stream; +use futures::{StreamExt, stream::BoxStream}; use http::header::CONTENT_TYPE; use multer::Field; use multer::{Constraints, Multipart, SizeLimit}; @@ -79,7 +79,7 @@ impl Operation { } pub struct BatchRequest { - pub operations: Pin> + Send>>, + pub operations: BoxStream<'static, anyhow::Result>, } impl Debug for BatchRequest { @@ -136,7 +136,7 @@ where .whole_stream(1024 * 1024 * 1024), // 1 GB ), ); - let operations = Box::pin(async_stream::try_stream! { + let operations = async_stream::try_stream! { let mut count = 0; while let Some(field) = parts.next_field().await? { if count >= 1000 { @@ -145,7 +145,8 @@ where count += 1; yield Operation::try_from_field(field).await?; } - }); + } + .boxed(); Ok(Self { operations }) } From d187534628a8ec594607b87cfea1cdcc74679ba6 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 18 Dec 2025 12:06:36 +0100 Subject: [PATCH 21/30] wip --- Cargo.lock | 1 + objectstore-server/Cargo.toml | 1 + objectstore-server/src/auth/service.rs | 27 +++++-- objectstore-server/src/endpoints/batch.rs | 95 +++++++++++++++++++++-- objectstore-server/src/extractors/mod.rs | 2 +- objectstore-service/src/lib.rs | 65 ++++++++++++++-- 6 files changed, 173 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1fa65102..0293e481 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2251,6 +2251,7 @@ dependencies = [ "http 1.3.1", "humantime", "humantime-serde", + "itertools 0.14.0", "jsonwebtoken", "merni", "mimalloc", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index a6124f38..376e9fd3 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -23,6 +23,7 @@ figment = { version = "0.10.19", features = ["env", "test", "yaml"] } futures = { workspace = true } futures-util = { workspace = true } http = { workspace = true } +itertools = "0.14.0" humantime = { workspace = true } humantime-serde = { workspace = true } jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] } diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index b8769cff..d594a079 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,7 +1,7 @@ -use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_service::id::{ObjectContext, ObjectId, ObjectKey}; use objectstore_service::{ - BatchInsertResult, DeleteResult, GetResult, InsertResult, InsertStream, PayloadStream, - StorageService, + BatchDeleteResult, BatchGetResult, BatchInsertResult, DeleteResult, GetResult, InsertResult, + InsertStream, PayloadStream, StorageService, }; use objectstore_types::{Metadata, Permission}; @@ -81,10 +81,27 @@ impl AuthAwareService { /// Auth-aware wrapper around [`StorageService::insert_objects`]. pub async fn insert_objects( &self, - context: ObjectContext, + context: &ObjectContext, + keys: &[Option], inserts: InsertStream, ) -> BatchInsertResult { self.assert_authorized(Permission::ObjectWrite, &context)?; - self.service.insert_objects(context, inserts).await + self.service.insert_objects(context, keys, inserts).await + } + + /// Auth-aware wrapper around [`StorageService::get_objects`]. + pub async fn get_objects(&self, context: &ObjectContext, keys: &[ObjectKey]) -> BatchGetResult { + self.assert_authorized(Permission::ObjectRead, context)?; + self.service.get_objects(context, keys).await + } + + /// Auth-aware wrapper around [`StorageService::delete_objects`]. + pub async fn delete_objects( + &self, + context: &ObjectContext, + keys: &[ObjectKey], + ) -> BatchDeleteResult { + self.assert_authorized(Permission::ObjectDelete, context)?; + self.service.delete_objects(context, keys).await } } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 3a9133a1..f204838c 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -1,11 +1,19 @@ +use std::fmt::Debug; + use axum::Router; -use axum::http::StatusCode; +use axum::body::Body; use axum::response::{IntoResponse, Response}; use axum::routing; -use objectstore_service::id::ObjectContext; +use futures::StreamExt; +use http::header::CONTENT_TYPE; +use http::{HeaderMap, HeaderValue}; +use objectstore_service::id::{ObjectContext, ObjectKey}; +use objectstore_types::Metadata; +use serde::Serialize; use crate::auth::AuthAwareService; use crate::error::ApiResult; +use crate::extractors::Operation; use crate::extractors::{BatchRequest, Xt}; use crate::state::ServiceState; @@ -13,10 +21,85 @@ pub fn router() -> Router { Router::new().route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) } +#[derive(Serialize, Debug, PartialEq, Clone)] +pub enum OperationResult { + Get { + status: u8, + metadata: Option, + }, + Insert { + status: u8, + }, + Delete { + status: u8, + }, +} + +#[derive(Serialize, Debug, PartialEq)] +#[serde(tag = "results")] +pub struct ResponseManifest { + pub results: Vec, +} + async fn batch( - _service: AuthAwareService, - Xt(_context): Xt, - _request: BatchRequest, + service: AuthAwareService, + Xt(context): Xt, + request: BatchRequest, ) -> ApiResult { - Ok(StatusCode::NOT_IMPLEMENTED.into_response()) + let (gets, inserts, deletes): (Vec<_>, Vec<_>, Vec<_>) = + request.manifest.operations.into_iter().enumerate().fold( + (vec![], vec![], vec![]), + |mut acc, (i, op)| { + match op { + Operation::Get { key } => acc.0.push((i, key)), + Operation::Insert { key } => acc.1.push((i, key)), + Operation::Delete { key } => acc.2.push((i, key)), + } + acc + }, + ); + + let get_keys: Vec = gets.iter().map(|(_, key)| key.clone()).collect(); + let get_results = service.get_objects(&context, &get_keys).await?; + + let insert_keys: Vec> = inserts.iter().map(|(_, key)| key.clone()).collect(); + let insert_results = service + .insert_objects(&context, &insert_keys, request.inserts) + .await?; + + let delete_keys: Vec = deletes.iter().map(|(_, key)| key.clone()).collect(); + let delete_results = service.delete_objects(&context, &delete_keys).await?; + + let mut results = vec![None; request.manifest.operations.len()]; + let mut streams = vec![]; + for ((i, _), res) in gets.into_iter().zip(get_results) { + results[i] = None; + } + for ((i, _), res) in inserts.into_iter().zip(insert_results) { + results[i] = None; + } + for ((i, _), res) in deletes.into_iter().zip(delete_results) { + results[i] = None; + } + let results = results.into_iter().map(|r| r.unwrap()).collect(); + let manifest = ResponseManifest { results }; + + let r = rand::random::(); + let boundary = format!("os-boundary-{r:032x}"); + + let headers = HeaderMap::new(); + headers.insert( + CONTENT_TYPE, + HeaderValue::from_str(&format!("multipart/mixed; boundary={boundary}")).unwrap(), + ); + + let body_stream = async_stream::stream! { + yield Ok(serde_json::to_vec(&manifest)?); + for result in get_results { + //yield result.unwrap(); + todo!(); + } + }; + + Ok((headers, Body::from_stream(body_stream)).into_response()) } diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 8cc4d34b..839463f0 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -10,4 +10,4 @@ mod service; #[derive(Debug)] pub struct Xt(pub T); -pub use batch::{BatchRequest, Operation, GetOperation, InsertOperation, DeleteOperation}; +pub use batch::{BatchRequest, DeleteOperation, GetOperation, InsertOperation, Operation}; diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 838be53b..91ace183 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -21,7 +21,7 @@ use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; use objectstore_types::Metadata; use crate::backend::common::BoxedBackend; -use crate::id::{ObjectContext, ObjectId}; +use crate::id::{ObjectContext, ObjectId, ObjectKey}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB @@ -96,10 +96,15 @@ pub type InsertResult = anyhow::Result; pub type DeleteResult = anyhow::Result<()>; /// Type alias to represent a stream of insert operations. -pub type InsertStream = +pub type PayloadMetadataStream = Pin> + Send>>; /// Result type for batch insert operations. pub type BatchInsertResult = anyhow::Result>; +/// Result type for batch get operations. +/// TODO: change this +pub type BatchGetResult = anyhow::Result>; +/// Result type for batch delete operations. +pub type BatchDeleteResult = anyhow::Result>; impl StorageService { /// Creates a new `StorageService` with the specified configuration. @@ -290,13 +295,61 @@ impl StorageService { Ok(()) } - /// TODO + /// Batch inserts multiple objects. pub async fn insert_objects( &self, - _context: ObjectContext, - _inserts: InsertStream, + context: &ObjectContext, + keys: &[Option], + mut inserts: PayloadMetadataStream, ) -> BatchInsertResult { - todo!(); + let mut results = Vec::new(); + let mut key_idx = 0; + + while let Some(item) = inserts.next().await { + let result = match item { + Ok((metadata, bytes)) => { + let key = keys.get(key_idx).and_then(|k| k.clone()); + let stream = futures_util::stream::once(async { Ok(bytes) }).boxed(); + self.insert_object(context.clone(), key, &metadata, stream) + .await + } + Err(e) => Err(e), + }; + results.push(result); + key_idx += 1; + } + + Ok(results) + } + + /// Batch retrieve multiple objects by their keys. + pub async fn get_objects(&self, context: &ObjectContext, keys: &[ObjectKey]) -> BatchGetResult { + let mut results = Vec::new(); + + for key in keys { + let id = ObjectId::new(context.clone(), key.clone()); + let result = self.get_object(&id).await; + results.push(result); + } + + Ok(results) + } + + /// Batch deletes multiple objects by their keys. + pub async fn delete_objects( + &self, + context: &ObjectContext, + keys: &[ObjectKey], + ) -> BatchDeleteResult { + let mut results = Vec::new(); + + for key in keys { + let id = ObjectId::new(context.clone(), key.clone()); + let result = self.delete_object(&id).await; + results.push(result); + } + + Ok(results) } } From 4f36e61c395f0407565b2dc2c3745ae28fdf12d7 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Thu, 18 Dec 2025 15:25:46 +0100 Subject: [PATCH 22/30] wip --- objectstore-server/src/auth/service.rs | 27 -------- objectstore-server/src/endpoints/batch.rs | 77 ++++++++--------------- objectstore-service/src/lib.rs | 57 ----------------- 3 files changed, 26 insertions(+), 135 deletions(-) diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index d594a079..d9820940 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -77,31 +77,4 @@ impl AuthAwareService { self.assert_authorized(Permission::ObjectDelete, id.context())?; self.service.delete_object(id).await } - - /// Auth-aware wrapper around [`StorageService::insert_objects`]. - pub async fn insert_objects( - &self, - context: &ObjectContext, - keys: &[Option], - inserts: InsertStream, - ) -> BatchInsertResult { - self.assert_authorized(Permission::ObjectWrite, &context)?; - self.service.insert_objects(context, keys, inserts).await - } - - /// Auth-aware wrapper around [`StorageService::get_objects`]. - pub async fn get_objects(&self, context: &ObjectContext, keys: &[ObjectKey]) -> BatchGetResult { - self.assert_authorized(Permission::ObjectRead, context)?; - self.service.get_objects(context, keys).await - } - - /// Auth-aware wrapper around [`StorageService::delete_objects`]. - pub async fn delete_objects( - &self, - context: &ObjectContext, - keys: &[ObjectKey], - ) -> BatchDeleteResult { - self.assert_authorized(Permission::ObjectDelete, context)?; - self.service.delete_objects(context, keys).await - } } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index f204838c..df86b5b8 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -7,7 +7,7 @@ use axum::routing; use futures::StreamExt; use http::header::CONTENT_TYPE; use http::{HeaderMap, HeaderValue}; -use objectstore_service::id::{ObjectContext, ObjectKey}; +use objectstore_service::id::{ObjectContext, ObjectId, ObjectKey}; use objectstore_types::Metadata; use serde::Serialize; @@ -35,55 +35,11 @@ pub enum OperationResult { }, } -#[derive(Serialize, Debug, PartialEq)] -#[serde(tag = "results")] -pub struct ResponseManifest { - pub results: Vec, -} - async fn batch( service: AuthAwareService, Xt(context): Xt, request: BatchRequest, ) -> ApiResult { - let (gets, inserts, deletes): (Vec<_>, Vec<_>, Vec<_>) = - request.manifest.operations.into_iter().enumerate().fold( - (vec![], vec![], vec![]), - |mut acc, (i, op)| { - match op { - Operation::Get { key } => acc.0.push((i, key)), - Operation::Insert { key } => acc.1.push((i, key)), - Operation::Delete { key } => acc.2.push((i, key)), - } - acc - }, - ); - - let get_keys: Vec = gets.iter().map(|(_, key)| key.clone()).collect(); - let get_results = service.get_objects(&context, &get_keys).await?; - - let insert_keys: Vec> = inserts.iter().map(|(_, key)| key.clone()).collect(); - let insert_results = service - .insert_objects(&context, &insert_keys, request.inserts) - .await?; - - let delete_keys: Vec = deletes.iter().map(|(_, key)| key.clone()).collect(); - let delete_results = service.delete_objects(&context, &delete_keys).await?; - - let mut results = vec![None; request.manifest.operations.len()]; - let mut streams = vec![]; - for ((i, _), res) in gets.into_iter().zip(get_results) { - results[i] = None; - } - for ((i, _), res) in inserts.into_iter().zip(insert_results) { - results[i] = None; - } - for ((i, _), res) in deletes.into_iter().zip(delete_results) { - results[i] = None; - } - let results = results.into_iter().map(|r| r.unwrap()).collect(); - let manifest = ResponseManifest { results }; - let r = rand::random::(); let boundary = format!("os-boundary-{r:032x}"); @@ -93,13 +49,32 @@ async fn batch( HeaderValue::from_str(&format!("multipart/mixed; boundary={boundary}")).unwrap(), ); - let body_stream = async_stream::stream! { - yield Ok(serde_json::to_vec(&manifest)?); - for result in get_results { - //yield result.unwrap(); - todo!(); + while let Some(operation) = request.operations.next().await { + match operation { + Ok(operation) => match operation { + Operation::Get(get) => { + let res = service + .get_object(&ObjectId::new(context.clone(), get.key)) + .await; + res.into_part() + } + Operation::Insert(insert) => { + let res = service + .insert_object(context.clone(), insert.key, &insert.metadata, insert.stream) + .await; + res.into_part() + } + Operation::Delete(delete) => { + let res = service + .delete_object(&ObjectId::new(context.clone(), delete.key)) + .await; + res.into_part() + } + }, + Err(err) => todo!(), } - }; + } + let body_stream = async_stream::stream! {}; Ok((headers, Body::from_stream(body_stream)).into_response()) } diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 91ace183..df9c061c 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -294,63 +294,6 @@ impl StorageService { Ok(()) } - - /// Batch inserts multiple objects. - pub async fn insert_objects( - &self, - context: &ObjectContext, - keys: &[Option], - mut inserts: PayloadMetadataStream, - ) -> BatchInsertResult { - let mut results = Vec::new(); - let mut key_idx = 0; - - while let Some(item) = inserts.next().await { - let result = match item { - Ok((metadata, bytes)) => { - let key = keys.get(key_idx).and_then(|k| k.clone()); - let stream = futures_util::stream::once(async { Ok(bytes) }).boxed(); - self.insert_object(context.clone(), key, &metadata, stream) - .await - } - Err(e) => Err(e), - }; - results.push(result); - key_idx += 1; - } - - Ok(results) - } - - /// Batch retrieve multiple objects by their keys. - pub async fn get_objects(&self, context: &ObjectContext, keys: &[ObjectKey]) -> BatchGetResult { - let mut results = Vec::new(); - - for key in keys { - let id = ObjectId::new(context.clone(), key.clone()); - let result = self.get_object(&id).await; - results.push(result); - } - - Ok(results) - } - - /// Batch deletes multiple objects by their keys. - pub async fn delete_objects( - &self, - context: &ObjectContext, - keys: &[ObjectKey], - ) -> BatchDeleteResult { - let mut results = Vec::new(); - - for key in keys { - let id = ObjectId::new(context.clone(), key.clone()); - let result = self.delete_object(&id).await; - results.push(result); - } - - Ok(results) - } } fn is_tombstoned(result: &Option<(Metadata, PayloadStream)>) -> bool { From 17958d2e288daa660ed65590613234961fff9be2 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 10:43:12 +0100 Subject: [PATCH 23/30] wip --- Cargo.lock | 1 + objectstore-server/Cargo.toml | 1 + objectstore-server/src/auth/service.rs | 7 +- objectstore-server/src/endpoints/batch.rs | 100 ++++++++++++--------- objectstore-server/src/extractors/batch.rs | 2 +- objectstore-service/src/lib.rs | 11 --- 6 files changed, 62 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0293e481..8b2f37f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2240,6 +2240,7 @@ dependencies = [ "anyhow", "argh", "async-stream", + "async-trait", "axum", "axum-extra", "bytes", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 376e9fd3..76f1a912 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -14,6 +14,7 @@ publish = false anyhow = { workspace = true } argh = "0.1.13" async-stream = "0.3.6" +async-trait = { workspace = true } axum = { version = "0.8.4", features = ["multipart"] } axum-extra = { version = "0.12.2", features = ["multipart"] } bytes = { workspace = true } diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index d9820940..e6d354e5 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,8 +1,5 @@ -use objectstore_service::id::{ObjectContext, ObjectId, ObjectKey}; -use objectstore_service::{ - BatchDeleteResult, BatchGetResult, BatchInsertResult, DeleteResult, GetResult, InsertResult, - InsertStream, PayloadStream, StorageService, -}; +use objectstore_service::id::{ObjectContext, ObjectId}; +use objectstore_service::{DeleteResult, GetResult, InsertResult, PayloadStream, StorageService}; use objectstore_types::{Metadata, Permission}; use crate::auth::AuthContext; diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index df86b5b8..afc45adf 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -1,18 +1,22 @@ use std::fmt::Debug; +use std::pin::Pin; +use async_trait::async_trait; use axum::Router; use axum::body::Body; use axum::response::{IntoResponse, Response}; use axum::routing; -use futures::StreamExt; +use bytes::Bytes; +use futures::{Stream, StreamExt}; use http::header::CONTENT_TYPE; use http::{HeaderMap, HeaderValue}; use objectstore_service::id::{ObjectContext, ObjectId, ObjectKey}; +use objectstore_service::{DeleteResult, GetResult, InsertResult}; use objectstore_types::Metadata; use serde::Serialize; use crate::auth::AuthAwareService; -use crate::error::ApiResult; +use crate::error::{AnyhowResponse, ApiResult}; use crate::extractors::Operation; use crate::extractors::{BatchRequest, Xt}; use crate::state::ServiceState; @@ -21,60 +25,70 @@ pub fn router() -> Router { Router::new().route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) } -#[derive(Serialize, Debug, PartialEq, Clone)] -pub enum OperationResult { - Get { - status: u8, - metadata: Option, - }, - Insert { - status: u8, - }, - Delete { - status: u8, - }, +pub trait IntoPart { + fn into_part(&self) -> Bytes; +} + +impl IntoPart for GetResult { + fn into_part(&self) -> Bytes { + todo!() + } +} + +impl IntoPart for InsertResult { + fn into_part(&self) -> Bytes { + todo!() + } +} + +impl IntoPart for DeleteResult { + fn into_part(&self) -> Bytes { + todo!() + } } async fn batch( service: AuthAwareService, Xt(context): Xt, - request: BatchRequest, + mut request: BatchRequest, ) -> ApiResult { let r = rand::random::(); let boundary = format!("os-boundary-{r:032x}"); - - let headers = HeaderMap::new(); + let mut headers = HeaderMap::new(); headers.insert( CONTENT_TYPE, HeaderValue::from_str(&format!("multipart/mixed; boundary={boundary}")).unwrap(), ); - while let Some(operation) = request.operations.next().await { - match operation { - Ok(operation) => match operation { - Operation::Get(get) => { - let res = service - .get_object(&ObjectId::new(context.clone(), get.key)) - .await; - res.into_part() - } - Operation::Insert(insert) => { - let res = service - .insert_object(context.clone(), insert.key, &insert.metadata, insert.stream) - .await; - res.into_part() - } - Operation::Delete(delete) => { - let res = service - .delete_object(&ObjectId::new(context.clone(), delete.key)) - .await; - res.into_part() - } - }, - Err(err) => todo!(), - } - } - let body_stream = async_stream::stream! {}; + let body_stream: Pin> + Send>> = async_stream::try_stream! { + while let Some(operation) = request.operations.next().await { + let res = match operation { + Ok(operation) => match operation { + Operation::Get(get) => { + let res = service + .get_object(&ObjectId::new(context.clone(), get.key)) + .await; + res.into_part() + } + Operation::Insert(insert) => { + let stream = futures_util::stream::once(async { Ok(insert.payload) }).boxed(); + let res = service + .insert_object(context.clone(), insert.key, &insert.metadata, stream) + .await; + res.into_part() + } + Operation::Delete(delete) => { + let res = service + .delete_object(&ObjectId::new(context.clone(), delete.key)) + .await; + res.into_part() + } + }, + Err(_) => todo!(), + }; + yield res; + } + }.boxed(); Ok((headers, Body::from_stream(body_stream)).into_response()) } diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index b7dafc74..5877b70c 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, pin::Pin}; +use std::fmt::Debug; use anyhow::Context; use axum::{ diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index df9c061c..fb555c15 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -95,17 +95,6 @@ pub type InsertResult = anyhow::Result; /// Result type for delete operations. pub type DeleteResult = anyhow::Result<()>; -/// Type alias to represent a stream of insert operations. -pub type PayloadMetadataStream = - Pin> + Send>>; -/// Result type for batch insert operations. -pub type BatchInsertResult = anyhow::Result>; -/// Result type for batch get operations. -/// TODO: change this -pub type BatchGetResult = anyhow::Result>; -/// Result type for batch delete operations. -pub type BatchDeleteResult = anyhow::Result>; - impl StorageService { /// Creates a new `StorageService` with the specified configuration. pub async fn new( From a1b398f43d0aba053982fdfb593ee108bea4196d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 13:22:10 +0100 Subject: [PATCH 24/30] wip --- Cargo.lock | 1 + objectstore-server/Cargo.toml | 1 + objectstore-server/src/endpoints/batch.rs | 115 ++++++++++++++++++---- objectstore-server/src/lib.rs | 1 + objectstore-server/src/multipart.rs | 109 ++++++++++++++++++++ 5 files changed, 207 insertions(+), 20 deletions(-) create mode 100644 objectstore-server/src/multipart.rs diff --git a/Cargo.lock b/Cargo.lock index 8b2f37f4..df0b7c5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2262,6 +2262,7 @@ dependencies = [ "num_cpus", "objectstore-service", "objectstore-types", + "pin-project", "rand 0.9.2", "reqwest", "rustls", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 76f1a912..af5a63ff 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -35,6 +35,7 @@ multer = "3.1.0" num_cpus = "1.17.0" objectstore-service = { workspace = true } objectstore-types = { workspace = true } +pin-project = "1.1.10" rand = { workspace = true } reqwest = { workspace = true } rustls = { version = "0.23.31", default-features = false } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index afc45adf..1a240a5e 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -1,4 +1,3 @@ -use std::fmt::Debug; use std::pin::Pin; use async_trait::async_trait; @@ -6,44 +5,120 @@ use axum::Router; use axum::body::Body; use axum::response::{IntoResponse, Response}; use axum::routing; -use bytes::Bytes; -use futures::{Stream, StreamExt}; +use bytes::{BytesMut}; +use futures::{Stream, StreamExt, TryStreamExt}; use http::header::CONTENT_TYPE; use http::{HeaderMap, HeaderValue}; -use objectstore_service::id::{ObjectContext, ObjectId, ObjectKey}; +use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::{DeleteResult, GetResult, InsertResult}; -use objectstore_types::Metadata; -use serde::Serialize; use crate::auth::AuthAwareService; -use crate::error::{AnyhowResponse, ApiResult}; +use crate::error::{ApiResult}; use crate::extractors::Operation; use crate::extractors::{BatchRequest, Xt}; +use crate::multipart::{IntoBytesStream, Part}; use crate::state::ServiceState; pub fn router() -> Router { Router::new().route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) } +const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status"; +const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key"; + +#[async_trait] pub trait IntoPart { - fn into_part(&self) -> Bytes; + async fn into_part(mut self) -> Part; } +#[async_trait] impl IntoPart for GetResult { - fn into_part(&self) -> Bytes { - todo!() + async fn into_part(mut self) -> Part { + match self { + Ok(Some((metadata, payload))) => { + let payload = payload + .try_fold(BytesMut::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk); + Ok(acc) + }) + .await + .unwrap() + .freeze(); + + let mut headers = metadata.to_headers("", false).unwrap(); + headers.insert(HEADER_BATCH_OPERATION_STATUS, HeaderValue::from_static("200")); + + Part::new(headers, payload) + }, + Ok(None) => { + let mut headers = HeaderMap::new(); + headers.insert( + HEADER_BATCH_OPERATION_STATUS, + HeaderValue::from_static("404"), + ); + Part::headers_only(headers) + }, + Err(_) => { + let mut headers = HeaderMap::new(); + headers.insert( + HEADER_BATCH_OPERATION_STATUS, + HeaderValue::from_static("500"), + ); + Part::headers_only(headers) + }, + } } } +#[async_trait] impl IntoPart for InsertResult { - fn into_part(&self) -> Bytes { - todo!() + async fn into_part(mut self) -> Part { + match self { + Ok(id) => { + let mut headers = HeaderMap::new(); + headers.insert( + HEADER_BATCH_OPERATION_KEY, + HeaderValue::from_str(id.key()).unwrap(), + ); + headers.insert( + HEADER_BATCH_OPERATION_STATUS, + HeaderValue::from_static("200"), + ); + Part::headers_only(headers) + }, + Err(_) => { + let mut headers = HeaderMap::new(); + headers.insert( + HEADER_BATCH_OPERATION_STATUS, + HeaderValue::from_static("500"), + ); + Part::headers_only(headers) + }, + } } } +#[async_trait] impl IntoPart for DeleteResult { - fn into_part(&self) -> Bytes { - todo!() + async fn into_part(mut self) -> Part { + match self { + Ok(()) => { + let mut headers = HeaderMap::new(); + headers.insert( + HEADER_BATCH_OPERATION_STATUS, + HeaderValue::from_static("200"), + ); + Part::headers_only(headers) + }, + Err(_) => { + let mut headers = HeaderMap::new(); + headers.insert( + HEADER_BATCH_OPERATION_STATUS, + HeaderValue::from_static("500"), + ); + Part::headers_only(headers) + }, + } } } @@ -60,7 +135,7 @@ async fn batch( HeaderValue::from_str(&format!("multipart/mixed; boundary={boundary}")).unwrap(), ); - let body_stream: Pin> + Send>> = async_stream::try_stream! { + let parts: Pin> + Send>> = async_stream::try_stream! { while let Some(operation) = request.operations.next().await { let res = match operation { Ok(operation) => match operation { @@ -68,27 +143,27 @@ async fn batch( let res = service .get_object(&ObjectId::new(context.clone(), get.key)) .await; - res.into_part() + res.into_part().await } Operation::Insert(insert) => { let stream = futures_util::stream::once(async { Ok(insert.payload) }).boxed(); let res = service .insert_object(context.clone(), insert.key, &insert.metadata, stream) .await; - res.into_part() + res.into_part().await } Operation::Delete(delete) => { let res = service .delete_object(&ObjectId::new(context.clone(), delete.key)) .await; - res.into_part() + res.into_part().await } }, - Err(_) => todo!(), + Err(_) => todo!() }; yield res; } }.boxed(); - Ok((headers, Body::from_stream(body_stream)).into_response()) + Ok((headers, Body::from_stream(parts.into_bytes_stream(boundary))).into_response()) } diff --git a/objectstore-server/src/lib.rs b/objectstore-server/src/lib.rs index 8d5c01c0..ea118934 100644 --- a/objectstore-server/src/lib.rs +++ b/objectstore-server/src/lib.rs @@ -13,5 +13,6 @@ pub mod extractors; pub mod healthcheck; pub mod http; pub mod killswitches; +pub mod multipart; pub mod observability; pub mod state; diff --git a/objectstore-server/src/multipart.rs b/objectstore-server/src/multipart.rs new file mode 100644 index 00000000..d3573895 --- /dev/null +++ b/objectstore-server/src/multipart.rs @@ -0,0 +1,109 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use bytes::{BufMut, Bytes, BytesMut}; +use futures::Stream; +use http::HeaderMap; +use pin_project::pin_project; + +/// A Multipart part. +#[derive(Debug)] +pub struct Part { + headers: HeaderMap, + body: Bytes, +} + +impl Part { + /// Creates a new Multipart part with headers and body. + pub fn new(headers: HeaderMap, body: Bytes) -> Self { + Part { headers, body } + } + + /// Creates a new Multipart part with headers only. + pub fn headers_only(headers: HeaderMap) -> Self { + Part { + headers, + body: Bytes::new(), + } + } +} + +pub trait IntoBytesStream { + fn into_bytes_stream(self, boundary: String) -> impl Stream>; +} + +impl IntoBytesStream for S +where + S: Stream> + Send, +{ + fn into_bytes_stream(self, boundary: String) -> impl Stream> { + let mut b = BytesMut::with_capacity(boundary.len() + 4); + b.put(&b"--"[..]); + b.put(boundary.as_bytes()); + b.put(&b"\r\n"[..]); + PartsSerializer { + parts: self, + boundary: b.freeze(), + state: State::Waiting, + } + } +} + +#[pin_project] +struct PartsSerializer +where + S: Stream>, +{ + #[pin] + parts: S, + boundary: Bytes, + state: State, +} + +enum State { + Waiting, + SendHeaders(Part), + SendBody(Bytes), +} + +impl Stream for PartsSerializer +where + S: Stream>, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + match std::mem::replace(this.state, State::Waiting) { + State::Waiting => match this.parts.as_mut().poll_next(ctx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Ok(p))) => { + *this.state = State::SendHeaders(p); + return Poll::Ready(Some(Ok(this.boundary.clone()))); + } + }, + State::SendHeaders(part) => { + *this.state = State::SendBody(part.body); + let headers = serialize_headers(part.headers); + return Poll::Ready(Some(Ok(headers))); + } + State::SendBody(body) => { + return Poll::Ready(Some(Ok(body))); + } + } + } +} + +fn serialize_headers(headers: HeaderMap) -> Bytes { + let mut b = BytesMut::with_capacity(30 + 30 * headers.len()); + for (name, value) in &headers { + b.put(name.as_str().as_bytes()); + b.put(&b": "[..]); + b.put(value.as_bytes()); + b.put(&b"\r\n"[..]); + } + b.put(&b"\r\n"[..]); + b.freeze() +} From 0709fe3893437e47e30c7cc93b198c5ce6b51d2d Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 13:39:52 +0100 Subject: [PATCH 25/30] wip --- objectstore-server/src/endpoints/batch.rs | 283 ++++++++++++++++++---- objectstore-server/src/multipart.rs | 31 ++- 2 files changed, 261 insertions(+), 53 deletions(-) diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 1a240a5e..3d6d8bbd 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -5,7 +5,7 @@ use axum::Router; use axum::body::Body; use axum::response::{IntoResponse, Response}; use axum::routing; -use bytes::{BytesMut}; +use bytes::BytesMut; use futures::{Stream, StreamExt, TryStreamExt}; use http::header::CONTENT_TYPE; use http::{HeaderMap, HeaderValue}; @@ -13,7 +13,7 @@ use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::{DeleteResult, GetResult, InsertResult}; use crate::auth::AuthAwareService; -use crate::error::{ApiResult}; +use crate::error::ApiResult; use crate::extractors::Operation; use crate::extractors::{BatchRequest, Xt}; use crate::multipart::{IntoBytesStream, Part}; @@ -23,6 +23,56 @@ pub fn router() -> Router { Router::new().route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) } +async fn batch( + service: AuthAwareService, + Xt(context): Xt, + mut request: BatchRequest, +) -> ApiResult { + let r = rand::random::(); + let boundary = format!("os-boundary-{r:032x}"); + let mut headers = HeaderMap::new(); + headers.insert( + CONTENT_TYPE, + HeaderValue::from_str(&format!("multipart/mixed; boundary={boundary}")).unwrap(), + ); + + let parts: Pin> + Send>> = async_stream::try_stream! { + while let Some(operation) = request.operations.next().await { + let res = match operation { + Ok(operation) => match operation { + Operation::Get(get) => { + let res = service + .get_object(&ObjectId::new(context.clone(), get.key)) + .await; + res.into_part().await + } + Operation::Insert(insert) => { + let stream = futures_util::stream::once(async { Ok(insert.payload) }).boxed(); + let res = service + .insert_object(context.clone(), insert.key, &insert.metadata, stream) + .await; + res.into_part().await + } + Operation::Delete(delete) => { + let res = service + .delete_object(&ObjectId::new(context.clone(), delete.key)) + .await; + res.into_part().await + } + }, + Err(_) => todo!() + }; + yield res; + } + }.boxed(); + + Ok(( + headers, + Body::from_stream(parts.into_bytes_stream(boundary)), + ) + .into_response()) +} + const HEADER_BATCH_OPERATION_STATUS: &str = "x-sn-batch-operation-status"; const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key"; @@ -46,10 +96,13 @@ impl IntoPart for GetResult { .freeze(); let mut headers = metadata.to_headers("", false).unwrap(); - headers.insert(HEADER_BATCH_OPERATION_STATUS, HeaderValue::from_static("200")); + headers.insert( + HEADER_BATCH_OPERATION_STATUS, + HeaderValue::from_static("200"), + ); Part::new(headers, payload) - }, + } Ok(None) => { let mut headers = HeaderMap::new(); headers.insert( @@ -57,7 +110,7 @@ impl IntoPart for GetResult { HeaderValue::from_static("404"), ); Part::headers_only(headers) - }, + } Err(_) => { let mut headers = HeaderMap::new(); headers.insert( @@ -65,7 +118,7 @@ impl IntoPart for GetResult { HeaderValue::from_static("500"), ); Part::headers_only(headers) - }, + } } } } @@ -85,7 +138,7 @@ impl IntoPart for InsertResult { HeaderValue::from_static("200"), ); Part::headers_only(headers) - }, + } Err(_) => { let mut headers = HeaderMap::new(); headers.insert( @@ -93,7 +146,7 @@ impl IntoPart for InsertResult { HeaderValue::from_static("500"), ); Part::headers_only(headers) - }, + } } } } @@ -109,7 +162,7 @@ impl IntoPart for DeleteResult { HeaderValue::from_static("200"), ); Part::headers_only(headers) - }, + } Err(_) => { let mut headers = HeaderMap::new(); headers.insert( @@ -117,53 +170,181 @@ impl IntoPart for DeleteResult { HeaderValue::from_static("500"), ); Part::headers_only(headers) - }, + } } } } -async fn batch( - service: AuthAwareService, - Xt(context): Xt, - mut request: BatchRequest, -) -> ApiResult { - let r = rand::random::(); - let boundary = format!("os-boundary-{r:032x}"); - let mut headers = HeaderMap::new(); - headers.insert( - CONTENT_TYPE, - HeaderValue::from_str(&format!("multipart/mixed; boundary={boundary}")).unwrap(), - ); +#[cfg(test)] +mod tests { + use super::*; + use axum::body::Body; + use axum::http::{Request, StatusCode}; + use bytes::Bytes; + use objectstore_service::StorageConfig; + use std::sync::Arc; + use tower::ServiceExt; - let parts: Pin> + Send>> = async_stream::try_stream! { - while let Some(operation) = request.operations.next().await { - let res = match operation { - Ok(operation) => match operation { - Operation::Get(get) => { - let res = service - .get_object(&ObjectId::new(context.clone(), get.key)) - .await; - res.into_part().await - } - Operation::Insert(insert) => { - let stream = futures_util::stream::once(async { Ok(insert.payload) }).boxed(); - let res = service - .insert_object(context.clone(), insert.key, &insert.metadata, stream) - .await; - res.into_part().await - } - Operation::Delete(delete) => { - let res = service - .delete_object(&ObjectId::new(context.clone(), delete.key)) - .await; - res.into_part().await - } - }, - Err(_) => todo!() - }; - yield res; + /// Tests the batch endpoint end-to-end with insert, get, and delete operations + #[tokio::test] + async fn test_batch_endpoint_basic() { + // Set up temporary filesystem storage + let tempdir = tempfile::tempdir().unwrap(); + let config = StorageConfig::FileSystem { + path: tempdir.path(), + }; + let storage_service = objectstore_service::StorageService::new(config.clone(), config) + .await + .unwrap(); + + // Create application state + let state = Arc::new(crate::state::Services { + config: crate::config::Config::default(), + service: storage_service, + }); + + // Build the router with state + let app = router().with_state(state); + + // Create a batch request with insert, get, delete, and get non-existing key + let insert_data = b"test data"; + let request_body = format!( + "--boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: testkey\r\n\ + x-sn-batch-operation-kind: insert\r\n\ + Content-Type: application/octet-stream\r\n\ + \r\n\ + {data}\r\n\ + --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: testkey\r\n\ + x-sn-batch-operation-kind: get\r\n\ + \r\n\ + \r\n\ + --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: testkey\r\n\ + x-sn-batch-operation-kind: delete\r\n\ + \r\n\ + \r\n\ + --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: nonexistent\r\n\ + x-sn-batch-operation-kind: get\r\n\ + \r\n\ + \r\n\ + --boundary--\r\n", + data = String::from_utf8_lossy(insert_data), + ); + + let request = Request::builder() + .uri("/objects:batch/testing/scope=value/") + .method("POST") + .header("Content-Type", "multipart/mixed; boundary=boundary") + .body(Body::from(request_body)) + .unwrap(); + + // Call the endpoint + let response = app.oneshot(request).await.unwrap(); + + // Verify response status + let status = response.status(); + if status != StatusCode::OK { + let body = response.into_body(); + let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap(); + let error_msg = String::from_utf8_lossy(&body_bytes); + panic!("Expected 200 OK, got {}: {}", status, error_msg); + } + + // Get the content type and extract boundary + let content_type = response + .headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(); + assert!(content_type.starts_with("multipart/mixed")); + + // Swap content type from multipart/mixed to multipart/form-data for multer + let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); + let boundary = multer::parse_boundary(&content_type).unwrap(); + + // Parse the multipart response using multer + let body = response.into_body(); + let body_bytes = axum::body::to_bytes(body, usize::MAX).await.unwrap(); + + // Create a stream for multer + use futures::stream; + let chunks: Vec<_> = body_bytes + .chunks(64) + .map(|chunk| Ok::<_, multer::Error>(Bytes::copy_from_slice(chunk))) + .collect(); + let body_stream = stream::iter(chunks); + let mut multipart = multer::Multipart::new(body_stream, boundary); + + // Collect all parts + let mut parts = vec![]; + loop { + match multipart.next_field().await { + Ok(Some(field)) => { + let headers = field.headers().clone(); + match field.bytes().await { + Ok(data) => parts.push((headers, data)), + Err(e) => panic!("Failed to read field bytes: {:?}", e), + } + } + Ok(None) => break, + Err(e) => panic!("Failed to get next field: {:?}", e), } - }.boxed(); + } - Ok((headers, Body::from_stream(parts.into_bytes_stream(boundary))).into_response()) + // Should have exactly 4 parts + assert_eq!(parts.len(), 4); + + // First part: insert response + let (insert_headers, insert_body) = &parts[0]; + let insert_status = insert_headers + .get(HEADER_BATCH_OPERATION_STATUS) + .unwrap() + .to_str() + .unwrap(); + assert_eq!(insert_status, "200"); + + let insert_key = insert_headers + .get(HEADER_BATCH_OPERATION_KEY) + .unwrap() + .to_str() + .unwrap(); + assert_eq!(insert_key, "testkey"); + assert!(insert_body.is_empty()); // Insert response has no body + + // Second part: get response + let (get_headers, get_body) = &parts[1]; + let get_status = get_headers + .get(HEADER_BATCH_OPERATION_STATUS) + .unwrap() + .to_str() + .unwrap(); + assert_eq!(get_status, "200"); + + // Verify the retrieved data matches what we inserted + assert_eq!(get_body.as_ref(), insert_data); + + // Third part: delete response + let (delete_headers, delete_body) = &parts[2]; + let delete_status = delete_headers + .get(HEADER_BATCH_OPERATION_STATUS) + .unwrap() + .to_str() + .unwrap(); + assert_eq!(delete_status, "200"); + assert!(delete_body.is_empty()); // Delete response has no body + + // Fourth part: get non-existing key (should be 404) + let (not_found_headers, not_found_body) = &parts[3]; + let not_found_status = not_found_headers + .get(HEADER_BATCH_OPERATION_STATUS) + .unwrap() + .to_str() + .unwrap(); + assert_eq!(not_found_status, "404"); + assert!(not_found_body.is_empty()); // Not found response has no body + } } diff --git a/objectstore-server/src/multipart.rs b/objectstore-server/src/multipart.rs index d3573895..ab0d038a 100644 --- a/objectstore-server/src/multipart.rs +++ b/objectstore-server/src/multipart.rs @@ -1,3 +1,5 @@ +//! Utilities to represent and serialize multipart parts. + use std::pin::Pin; use std::task::{Context, Poll}; @@ -64,6 +66,8 @@ enum State { Waiting, SendHeaders(Part), SendBody(Bytes), + SendClosingBoundary, + Done, } impl Stream for PartsSerializer @@ -77,7 +81,11 @@ where match std::mem::replace(this.state, State::Waiting) { State::Waiting => match this.parts.as_mut().poll_next(ctx) { Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(None) => { + *this.state = State::SendClosingBoundary; + ctx.waker().wake_by_ref(); + return Poll::Pending; + } Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), Poll::Ready(Some(Ok(p))) => { *this.state = State::SendHeaders(p); @@ -90,7 +98,26 @@ where return Poll::Ready(Some(Ok(headers))); } State::SendBody(body) => { - return Poll::Ready(Some(Ok(body))); + // Add \r\n after the body + let mut body_with_newline = BytesMut::with_capacity(body.len() + 2); + body_with_newline.put(body); + body_with_newline.put(&b"\r\n"[..]); + return Poll::Ready(Some(Ok(body_with_newline.freeze()))); + } + State::SendClosingBoundary => { + *this.state = State::Done; + // Create closing boundary: --boundary-- (without \r\n in between) + // The boundary already has --boundary\r\n, so we need to strip the \r\n + // and add --\r\n instead + let boundary_str = std::str::from_utf8(this.boundary).unwrap(); + let boundary_without_crlf = boundary_str.trim_end_matches("\r\n"); + let mut closing = BytesMut::with_capacity(boundary_without_crlf.len() + 4); + closing.put(boundary_without_crlf.as_bytes()); + closing.put(&b"--\r\n"[..]); + return Poll::Ready(Some(Ok(closing.freeze()))); + } + State::Done => { + return Poll::Ready(None); } } } From ece0cf6584217a057ba96bc2deafa0d91090eb83 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 13:40:39 +0100 Subject: [PATCH 26/30] wip --- Cargo.lock | 1 - objectstore-server/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index df0b7c5e..200798d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2252,7 +2252,6 @@ dependencies = [ "http 1.3.1", "humantime", "humantime-serde", - "itertools 0.14.0", "jsonwebtoken", "merni", "mimalloc", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index af5a63ff..aeafdadf 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -24,7 +24,6 @@ figment = { version = "0.10.19", features = ["env", "test", "yaml"] } futures = { workspace = true } futures-util = { workspace = true } http = { workspace = true } -itertools = "0.14.0" humantime = { workspace = true } humantime-serde = { workspace = true } jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] } From b0b6f36dc001c37f4a50bbbda2ed1d50c6abd699 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:30:28 +0100 Subject: [PATCH 27/30] wip --- Cargo.lock | 8 ++++--- objectstore-server/src/endpoints/batch.rs | 8 ++++++- objectstore-server/src/extractors/batch.rs | 26 ++++++++++------------ objectstore-server/src/multipart.rs | 16 ++++++------- objectstore-service/src/lib.rs | 4 +--- 5 files changed, 33 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 200798d4..08937df4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2221,6 +2221,7 @@ dependencies = [ "bytes", "futures-util", "infer", + "jsonwebtoken", "objectstore-test", "objectstore-types", "reqwest", @@ -2260,6 +2261,7 @@ dependencies = [ "nix", "num_cpus", "objectstore-service", + "objectstore-test", "objectstore-types", "pin-project", "rand 0.9.2", @@ -2314,9 +2316,12 @@ dependencies = [ name = "objectstore-test" version = "0.1.0" dependencies = [ + "jsonwebtoken", "objectstore-server", + "objectstore-types", "tempfile", "tokio", + "tracing-subscriber", ] [[package]] @@ -3654,7 +3659,6 @@ version = "0.1.0" dependencies = [ "anyhow", "argh", - "bytes", "bytesize", "futures", "futures-util", @@ -3664,9 +3668,7 @@ dependencies = [ "objectstore-client", "rand 0.9.2", "rand_distr", - "reqwest", "serde", - "serde_json", "serde_yaml", "sketches-ddsketch", "tokio", diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 3d6d8bbd..0c6db385 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -13,7 +13,7 @@ use objectstore_service::id::{ObjectContext, ObjectId}; use objectstore_service::{DeleteResult, GetResult, InsertResult}; use crate::auth::AuthAwareService; -use crate::error::ApiResult; +use crate::endpoints::common::ApiResult; use crate::extractors::Operation; use crate::extractors::{BatchRequest, Xt}; use crate::multipart::{IntoBytesStream, Part}; @@ -177,11 +177,14 @@ impl IntoPart for DeleteResult { #[cfg(test)] mod tests { + use crate::auth::PublicKeyDirectory; + use super::*; use axum::body::Body; use axum::http::{Request, StatusCode}; use bytes::Bytes; use objectstore_service::StorageConfig; + use std::collections::BTreeMap; use std::sync::Arc; use tower::ServiceExt; @@ -201,6 +204,9 @@ mod tests { let state = Arc::new(crate::state::Services { config: crate::config::Config::default(), service: storage_service, + key_directory: PublicKeyDirectory { + keys: BTreeMap::new(), + }, }); // Build the router with state diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index 5877b70c..f8496c5f 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -4,7 +4,7 @@ use anyhow::Context; use axum::{ extract::{FromRequest, Request}, http::StatusCode, - response::IntoResponse, + response::{IntoResponse, Response}, }; use bytes::Bytes; use futures::{StreamExt, stream::BoxStream}; @@ -14,8 +14,6 @@ use multer::{Constraints, Multipart, SizeLimit}; use objectstore_service::id::ObjectKey; use objectstore_types::Metadata; -use crate::error::AnyhowResponse; - #[derive(Debug)] pub struct GetOperation { pub key: ObjectKey, @@ -95,7 +93,7 @@ impl FromRequest for BatchRequest where S: Send + Sync, { - type Rejection = AnyhowResponse; + type Rejection = Response; async fn from_request(request: Request, _: &S) -> Result { let Some(content_type) = request @@ -103,29 +101,29 @@ where .get(CONTENT_TYPE) .and_then(|ct| ct.to_str().ok()) else { - return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type") - .into_response() - .into()); + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type").into_response()); }; let Ok(mime) = content_type.parse::() else { - return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type") - .into_response() - .into()); + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type").into_response()); }; if !(mime.type_() == mime::MULTIPART && mime.subtype() == "mixed") { return Err(( StatusCode::BAD_REQUEST, "expected Content-Type: multipart/mixed", ) - .into_response() - .into()); + .into_response()); } // XXX: `multer::parse_boundary` requires the content-type to be `multipart/form-data` let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); - let boundary = - multer::parse_boundary(content_type).context("failed to parse multipart boundary")?; + let Ok(boundary) = multer::parse_boundary(content_type).context("") else { + return Err(( + StatusCode::BAD_REQUEST, + "failed to parse multipart boundary", + ) + .into_response()); + }; let mut parts = Multipart::with_constraints( request.into_body().into_data_stream(), boundary, diff --git a/objectstore-server/src/multipart.rs b/objectstore-server/src/multipart.rs index ab0d038a..3a007df6 100644 --- a/objectstore-server/src/multipart.rs +++ b/objectstore-server/src/multipart.rs @@ -80,29 +80,29 @@ where let mut this = self.project(); match std::mem::replace(this.state, State::Waiting) { State::Waiting => match this.parts.as_mut().poll_next(ctx) { - Poll::Pending => return Poll::Pending, + Poll::Pending => Poll::Pending, Poll::Ready(None) => { *this.state = State::SendClosingBoundary; ctx.waker().wake_by_ref(); - return Poll::Pending; + Poll::Pending } - Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), Poll::Ready(Some(Ok(p))) => { *this.state = State::SendHeaders(p); - return Poll::Ready(Some(Ok(this.boundary.clone()))); + Poll::Ready(Some(Ok(this.boundary.clone()))) } }, State::SendHeaders(part) => { *this.state = State::SendBody(part.body); let headers = serialize_headers(part.headers); - return Poll::Ready(Some(Ok(headers))); + Poll::Ready(Some(Ok(headers))) } State::SendBody(body) => { // Add \r\n after the body let mut body_with_newline = BytesMut::with_capacity(body.len() + 2); body_with_newline.put(body); body_with_newline.put(&b"\r\n"[..]); - return Poll::Ready(Some(Ok(body_with_newline.freeze()))); + Poll::Ready(Some(Ok(body_with_newline.freeze()))) } State::SendClosingBoundary => { *this.state = State::Done; @@ -114,10 +114,10 @@ where let mut closing = BytesMut::with_capacity(boundary_without_crlf.len() + 4); closing.put(boundary_without_crlf.as_bytes()); closing.put(&b"--\r\n"[..]); - return Poll::Ready(Some(Ok(closing.freeze()))); + Poll::Ready(Some(Ok(closing.freeze()))) } State::Done => { - return Poll::Ready(None); + Poll::Ready(None) } } } diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index a5e40f61..98cbdb33 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -10,18 +10,16 @@ mod backend; pub mod id; use std::path::Path; -use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Instant; use bytes::{Bytes, BytesMut}; -use futures_util::Stream; use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; use objectstore_types::Metadata; use crate::backend::common::BoxedBackend; -use crate::id::{ObjectContext, ObjectId, ObjectKey}; +use crate::id::{ObjectContext, ObjectId}; /// The threshold up until which we will go to the "high volume" backend. const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB From d9ba02d7717a039a9b416f79828460c5e866f344 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:33:56 +0100 Subject: [PATCH 28/30] fmt --- objectstore-server/src/multipart.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/objectstore-server/src/multipart.rs b/objectstore-server/src/multipart.rs index 3a007df6..5944d92f 100644 --- a/objectstore-server/src/multipart.rs +++ b/objectstore-server/src/multipart.rs @@ -116,9 +116,7 @@ where closing.put(&b"--\r\n"[..]); Poll::Ready(Some(Ok(closing.freeze()))) } - State::Done => { - Poll::Ready(None) - } + State::Done => Poll::Ready(None), } } } From 3ff6ef3ceaaef3c5088673959c589dc2f2d02700 Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:35:12 +0100 Subject: [PATCH 29/30] improve --- objectstore-server/src/endpoints/batch.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs index 0c6db385..bcf55511 100644 --- a/objectstore-server/src/endpoints/batch.rs +++ b/objectstore-server/src/endpoints/batch.rs @@ -6,6 +6,7 @@ use axum::body::Body; use axum::response::{IntoResponse, Response}; use axum::routing; use bytes::BytesMut; +use futures::stream::BoxStream; use futures::{Stream, StreamExt, TryStreamExt}; use http::header::CONTENT_TYPE; use http::{HeaderMap, HeaderValue}; @@ -36,7 +37,7 @@ async fn batch( HeaderValue::from_str(&format!("multipart/mixed; boundary={boundary}")).unwrap(), ); - let parts: Pin> + Send>> = async_stream::try_stream! { + let parts: BoxStream> = async_stream::try_stream! { while let Some(operation) = request.operations.next().await { let res = match operation { Ok(operation) => match operation { From 10d92706e2e68d97892c6e4654a672bcd4de542f Mon Sep 17 00:00:00 2001 From: lcian <17258265+lcian@users.noreply.github.com> Date: Fri, 19 Dec 2025 14:36:56 +0100 Subject: [PATCH 30/30] improvel --- objectstore-server/src/extractors/batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs index f8496c5f..51efcd5a 100644 --- a/objectstore-server/src/extractors/batch.rs +++ b/objectstore-server/src/extractors/batch.rs @@ -117,7 +117,7 @@ where // XXX: `multer::parse_boundary` requires the content-type to be `multipart/form-data` let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); - let Ok(boundary) = multer::parse_boundary(content_type).context("") else { + let Ok(boundary) = multer::parse_boundary(content_type) else { return Err(( StatusCode::BAD_REQUEST, "failed to parse multipart boundary",