Skip to content

Commit

Permalink
feat: add frame import endpoint for backup/restore operations
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Dec 5, 2024
1 parent 6b5b51f commit a190163
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 7 deletions.
22 changes: 22 additions & 0 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ enum Routes {
CasPost,
ProcessPost(Scru128Id),
HeadGet(String),
Import,
NotFound,
}

Expand Down Expand Up @@ -85,6 +86,8 @@ fn match_route(method: &Method, path: &str, headers: &hyper::HeaderMap) -> Route

(&Method::POST, "/cas") => Routes::CasPost,

(&Method::POST, "/import") => Routes::Import,

(&Method::GET, p) => {
if let Ok(id) = Scru128Id::from_str(p.trim_start_matches('/')) {
Routes::StreamItemGet(id)
Expand Down Expand Up @@ -158,6 +161,8 @@ async fn handle(

Routes::HeadGet(topic) => response_frame_or_404(store.head(&topic)),

Routes::Import => handle_import(&mut store, req.into_body()).await,

Routes::NotFound => response_404(),
};

Expand Down Expand Up @@ -425,6 +430,23 @@ async fn handle_stream_item_remove(store: &mut Store, id: Scru128Id) -> HTTPResu
}
}

async fn handle_import(store: &mut Store, body: hyper::body::Incoming) -> HTTPResult {
let bytes = body.collect().await?.to_bytes();
let frame: Frame = match serde_json::from_slice(&bytes) {
Ok(frame) => frame,
Err(e) => return response_400(format!("Invalid frame JSON: {}", e)),
};

store
.insert_frame(&frame)
.map_err(|e| Box::new(e) as BoxError)?;

Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(full(serde_json::to_string(&frame).unwrap()))?)
}

fn response_404() -> HTTPResult {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down
20 changes: 20 additions & 0 deletions src/client/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,26 @@ pub async fn head(
Ok(body)
}

pub async fn import<R>(
addr: &str,
data: R,
) -> Result<Bytes, Box<dyn std::error::Error + Send + Sync>>
where
R: AsyncRead + Unpin + Send + 'static,
{
let reader_stream = ReaderStream::new(data);
let mapped_stream = reader_stream.map(|result| {
result
.map(hyper::body::Frame::data)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
});
let body = StreamBody::new(mapped_stream);

let res = request::request(addr, Method::POST, "import", None, body, None).await?;
let body = res.collect().await?.to_bytes();
Ok(body)
}

fn empty() -> BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
Expand Down
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ mod connect;
mod request;
mod types;

pub use self::commands::{append, cas_get, cas_post, cat, get, head, process, remove};
pub use self::commands::{append, cas_get, cas_post, cat, get, head, import, process, remove};
22 changes: 22 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ enum Command {
Get(CommandGet),
/// Process content through a handler
Process(CommandProcess),
/// Import a frame directly into the store
Import(CommandImport),
}

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -170,6 +172,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
Command::Head(args) => head(args).await,
Command::Get(args) => get(args).await,
Command::Process(args) => process(args).await,
Command::Import(args) => import(args).await,
};
if let Err(err) = res {
eprintln!("{}", err);
Expand Down Expand Up @@ -331,3 +334,22 @@ async fn process(args: CommandProcess) -> Result<(), Box<dyn std::error::Error +
tokio::io::stdout().write_all(&response).await?;
Ok(())
}

#[derive(Parser, Debug)]
struct CommandImport {
/// Address to connect to [HOST]:PORT or <PATH> for Unix domain socket
#[clap(value_parser)]
addr: String,
}

async fn import(args: CommandImport) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let input: Box<dyn AsyncRead + Unpin + Send> = if !std::io::stdin().is_terminal() {
Box::new(stdin())
} else {
Box::new(tokio::io::empty())
};

let response = xs::client::import(&args.addr, input).await?;
tokio::io::stdout().write_all(&response).await?;
Ok(())
}
16 changes: 10 additions & 6 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,18 +390,22 @@ impl Store {
cacache::read_hash(&self.path.join("cacache"), hash).await
}

pub fn insert_frame(&self, frame: &Frame) -> Result<(), fjall::Error> {
let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
let mut batch = self.keyspace.batch();
batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded);
batch.insert(&self.topic_index, Self::topic_index_key(frame), b"");
batch.commit()?;
self.keyspace.persist(fjall::PersistMode::SyncAll)
}

pub async fn append(&self, frame: Frame) -> Frame {
let mut frame = frame;
frame.id = scru128::new();

// only store the frame if it's not ephemeral
if frame.ttl != Some(TTL::Ephemeral) {
let encoded: Vec<u8> = serde_json::to_vec(&frame).unwrap();
let mut batch = self.keyspace.batch();
batch.insert(&self.frame_partition, frame.id.as_bytes(), encoded);
batch.insert(&self.topic_index, Self::topic_index_key(&frame), b"");
batch.commit().unwrap();
self.keyspace.persist(fjall::PersistMode::SyncAll).unwrap();
self.insert_frame(&frame).unwrap();

// If this is a Head TTL, cleanup old frames AFTER insert
if let Some(TTL::Head(n)) = frame.ttl {
Expand Down

0 comments on commit a190163

Please sign in to comment.