From a1901636dcb81e03b78ba4e368e70444c5dad256 Mon Sep 17 00:00:00 2001 From: Andy Gayton Date: Thu, 5 Dec 2024 17:48:29 -0500 Subject: [PATCH] feat: add frame import endpoint for backup/restore operations --- src/api.rs | 22 ++++++++++++++++++++++ src/client/commands.rs | 20 ++++++++++++++++++++ src/client/mod.rs | 2 +- src/main.rs | 22 ++++++++++++++++++++++ src/store.rs | 16 ++++++++++------ 5 files changed, 75 insertions(+), 7 deletions(-) diff --git a/src/api.rs b/src/api.rs index 1f37d88..0ee0c8f 100644 --- a/src/api.rs +++ b/src/api.rs @@ -45,6 +45,7 @@ enum Routes { CasPost, ProcessPost(Scru128Id), HeadGet(String), + Import, NotFound, } @@ -85,6 +86,8 @@ fn match_route(method: &Method, path: &str, headers: &hyper::HeaderMap) -> Route (&Method::POST, "/cas") => Routes::CasPost, + (&Method::POST, "/import") => Routes::Import, + (&Method::GET, p) => { if let Ok(id) = Scru128Id::from_str(p.trim_start_matches('/')) { Routes::StreamItemGet(id) @@ -158,6 +161,8 @@ async fn handle( Routes::HeadGet(topic) => response_frame_or_404(store.head(&topic)), + Routes::Import => handle_import(&mut store, req.into_body()).await, + Routes::NotFound => response_404(), }; @@ -425,6 +430,23 @@ async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResu } } +async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult { + let bytes = body.collect().await?.to_bytes(); + let frame: Frame = match serde_json::from_slice(&bytes) { + Ok(frame) => frame, + Err(e) => return response_400(format!("Invalid frame JSON: {}", e)), + }; + + store + .insert_frame(&frame) + .map_err(|e| Box::new(e) as BoxError)?; + + Ok(Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(full(serde_json::to_string(&frame).unwrap()))?) +} + fn response_404() -> HTTPResult { Ok(Response::builder() .status(StatusCode::NOT_FOUND) diff --git a/src/client/commands.rs b/src/client/commands.rs index d14311a..5155334 100644 --- a/src/client/commands.rs +++ b/src/client/commands.rs @@ -228,6 +228,26 @@ pub async fn head( Ok(body) } +pub async fn import( + addr: &str, + data: R, +) -> Result> +where + R: AsyncRead + Unpin + Send + 'static, +{ + let reader_stream = ReaderStream::new(data); + let mapped_stream = reader_stream.map(|result| { + result + .map(hyper::body::Frame::data) + .map_err(|e| Box::new(e) as Box) + }); + let body = StreamBody::new(mapped_stream); + + let res = request::request(addr, Method::POST, "import", None, body, None).await?; + let body = res.collect().await?.to_bytes(); + Ok(body) +} + fn empty() -> BoxBody> { Empty::::new() .map_err(|never| match never {}) diff --git a/src/client/mod.rs b/src/client/mod.rs index 0b3bb3f..87251d5 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -3,4 +3,4 @@ mod connect; mod request; mod types; -pub use self::commands::{append, cas_get, cas_post, cat, get, head, process, remove}; +pub use self::commands::{append, cas_get, cas_post, cat, get, head, import, process, remove}; diff --git a/src/main.rs b/src/main.rs index af60300..5e6d947 100644 --- a/src/main.rs +++ b/src/main.rs @@ -37,6 +37,8 @@ enum Command { Get(CommandGet), /// Process content through a handler Process(CommandProcess), + /// Import a frame directly into the store + Import(CommandImport), } #[derive(Parser, Debug)] @@ -170,6 +172,7 @@ async fn main() -> Result<(), Box> { Command::Head(args) => head(args).await, Command::Get(args) => get(args).await, Command::Process(args) => process(args).await, + Command::Import(args) => import(args).await, }; if let Err(err) = res { eprintln!("{}", err); @@ -331,3 +334,22 @@ async fn process(args: CommandProcess) -> Result<(), Box for Unix domain socket + #[clap(value_parser)] + addr: String, +} + +async fn import(args: CommandImport) -> Result<(), Box> { + let input: Box = if !std::io::stdin().is_terminal() { + Box::new(stdin()) + } else { + Box::new(tokio::io::empty()) + }; + + let response = xs::client::import(&args.addr, input).await?; + tokio::io::stdout().write_all(&response).await?; + Ok(()) +} diff --git a/src/store.rs b/src/store.rs index f24df82..2948714 100644 --- a/src/store.rs +++ b/src/store.rs @@ -390,18 +390,22 @@ impl Store { cacache::read_hash(&self.path.join("cacache"), hash).await } + pub fn insert_frame(&self, frame: &Frame) -> Result<(), fjall::Error> { + let encoded: Vec = serde_json::to_vec(&frame).unwrap(); + let mut batch = self.keyspace.batch(); + batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded); + batch.insert(&self.topic_index, Self::topic_index_key(frame), b""); + batch.commit()?; + self.keyspace.persist(fjall::PersistMode::SyncAll) + } + pub async fn append(&self, frame: Frame) -> Frame { let mut frame = frame; frame.id = scru128::new(); // only store the frame if it's not ephemeral if frame.ttl != Some(TTL::Ephemeral) { - let encoded: Vec = serde_json::to_vec(&frame).unwrap(); - let mut batch = self.keyspace.batch(); - batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded); - batch.insert(&self.topic_index, Self::topic_index_key(&frame), b""); - batch.commit().unwrap(); - self.keyspace.persist(fjall::PersistMode::SyncAll).unwrap(); + self.insert_frame(&frame).unwrap(); // If this is a Head TTL, cleanup old frames AFTER insert if let Some(TTL::Head(n)) = frame.ttl {