diff --git a/Cargo.lock b/Cargo.lock index 5e15f55b..2154e0ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3039,6 +3039,7 @@ name = "rustapi-core" version = "0.1.275" dependencies = [ "async-stream", + "async-trait", "base64 0.22.1", "brotli 6.0.0", "bytes", @@ -3123,6 +3124,7 @@ dependencies = [ "tracing", "tracing-opentelemetry", "urlencoding", + "uuid", ] [[package]] diff --git a/crates/cargo-rustapi/Cargo.toml b/crates/cargo-rustapi/Cargo.toml index e6366152..0c8f8933 100644 --- a/crates/cargo-rustapi/Cargo.toml +++ b/crates/cargo-rustapi/Cargo.toml @@ -53,3 +53,4 @@ predicates = "3.1" [features] default = ["remote-spec"] remote-spec = ["dep:reqwest"] +replay = ["dep:reqwest"] diff --git a/crates/cargo-rustapi/src/cli.rs b/crates/cargo-rustapi/src/cli.rs index 52c778c8..77eb7240 100644 --- a/crates/cargo-rustapi/src/cli.rs +++ b/crates/cargo-rustapi/src/cli.rs @@ -54,6 +54,11 @@ enum Commands { /// Deploy to various platforms #[command(subcommand)] Deploy(DeployArgs), + + /// Replay debugging commands (time-travel debugging) + #[cfg(feature = "replay")] + #[command(subcommand)] + Replay(commands::ReplayArgs), } impl Cli { @@ -70,6 +75,8 @@ impl Cli { Commands::Docs { port } => commands::open_docs(port).await, Commands::Client(args) => commands::client(args).await, Commands::Deploy(args) => commands::deploy(args).await, + #[cfg(feature = "replay")] + Commands::Replay(args) => commands::replay(args).await, } } } diff --git a/crates/cargo-rustapi/src/commands/mod.rs b/crates/cargo-rustapi/src/commands/mod.rs index f246e803..524fc907 100644 --- a/crates/cargo-rustapi/src/commands/mod.rs +++ b/crates/cargo-rustapi/src/commands/mod.rs @@ -21,3 +21,8 @@ pub use migrate::{migrate, MigrateArgs}; pub use new::{new_project, NewArgs}; pub use run::{run_dev, RunArgs}; pub use watch::{watch, WatchArgs}; + +#[cfg(feature = "replay")] +mod replay; +#[cfg(feature = "replay")] +pub use replay::{replay, ReplayArgs}; diff --git a/crates/cargo-rustapi/src/commands/replay.rs b/crates/cargo-rustapi/src/commands/replay.rs new file mode 100644 index 00000000..83cb2f09 --- /dev/null +++ b/crates/cargo-rustapi/src/commands/replay.rs @@ -0,0 +1,470 @@ +//! CLI commands for replay management. +//! +//! Communicates with a running RustAPI server's `/__rustapi/replays` admin endpoints +//! via HTTP. Does not import `rustapi-extras` directly. + +use anyhow::{Context, Result}; +use clap::{Args, Subcommand}; +use console::style; + +/// Replay debugging commands. +/// +/// Manage recorded HTTP request/response pairs for time-travel debugging. +#[derive(Subcommand, Debug)] +pub enum ReplayArgs { + /// List recorded replay entries + List(ReplayListArgs), + + /// Show a single replay entry + Show(ReplayShowArgs), + + /// Replay a recorded request against a target URL + Run(ReplayRunArgs), + + /// Replay and compute diff against original response + Diff(ReplayDiffArgs), +} + +/// Arguments for `replay list` +#[derive(Args, Debug)] +pub struct ReplayListArgs { + /// Server URL + #[arg(short, long, default_value = "http://localhost:8080")] + pub server: String, + + /// Admin bearer token + #[arg(short, long)] + pub token: String, + + /// Maximum number of entries to return + #[arg(short, long)] + pub limit: Option, + + /// Filter by HTTP method + #[arg(short, long)] + pub method: Option, + + /// Filter by path substring + #[arg(short, long)] + pub path: Option, +} + +/// Arguments for `replay show` +#[derive(Args, Debug)] +pub struct ReplayShowArgs { + /// Replay entry ID + pub id: String, + + /// Server URL + #[arg(short, long, default_value = "http://localhost:8080")] + pub server: String, + + /// Admin bearer token + #[arg(short, long)] + pub token: String, +} + +/// Arguments for `replay run` +#[derive(Args, Debug)] +pub struct ReplayRunArgs { + /// Replay entry ID + pub id: String, + + /// Target URL to replay the request against + #[arg(short = 'T', long)] + pub target: String, + + /// Server URL + #[arg(short, long, default_value = "http://localhost:8080")] + pub server: String, + + /// Admin bearer token + #[arg(short, long)] + pub token: String, +} + +/// Arguments for `replay diff` +#[derive(Args, Debug)] +pub struct ReplayDiffArgs { + /// Replay entry ID + pub id: String, + + /// Target URL to replay and diff against + #[arg(short = 'T', long)] + pub target: String, + + /// Server URL + #[arg(short, long, default_value = "http://localhost:8080")] + pub server: String, + + /// Admin bearer token + #[arg(short, long)] + pub token: String, +} + +/// Execute a replay subcommand. +pub async fn replay(args: ReplayArgs) -> Result<()> { + match args { + ReplayArgs::List(a) => cmd_list(a).await, + ReplayArgs::Show(a) => cmd_show(a).await, + ReplayArgs::Run(a) => cmd_run(a).await, + ReplayArgs::Diff(a) => cmd_diff(a).await, + } +} + +/// Build a reqwest client with the admin bearer token. +fn build_client(token: &str) -> Result<(reqwest::Client, reqwest::header::HeaderMap)> { + let client = reqwest::Client::new(); + let mut headers = reqwest::header::HeaderMap::new(); + let auth_value = format!("Bearer {}", token); + headers.insert( + reqwest::header::AUTHORIZATION, + auth_value.parse().context("Invalid token format")?, + ); + Ok((client, headers)) +} + +async fn cmd_list(args: ReplayListArgs) -> Result<()> { + let (client, headers) = build_client(&args.token)?; + + let mut url = format!("{}/__rustapi/replays", args.server.trim_end_matches('/')); + let mut params = Vec::new(); + if let Some(limit) = args.limit { + params.push(format!("limit={}", limit)); + } + if let Some(ref method) = args.method { + params.push(format!("method={}", method)); + } + if let Some(ref path) = args.path { + params.push(format!("path={}", path)); + } + if !params.is_empty() { + url.push('?'); + url.push_str(¶ms.join("&")); + } + + let resp = client + .get(&url) + .headers(headers) + .send() + .await + .context("Failed to connect to server")?; + + let status = resp.status(); + let body: serde_json::Value = resp.json().await.context("Failed to parse response")?; + + if !status.is_success() { + let msg = body["message"].as_str().unwrap_or("Unknown error"); + anyhow::bail!("Server returned {}: {}", status, msg); + } + + let entries = body["entries"].as_array(); + let count = body["count"].as_u64().unwrap_or(0); + let total = body["total"].as_u64().unwrap_or(0); + + println!( + "{} Showing {} of {} replay entries\n", + style("Replay Entries").bold().cyan(), + count, + total + ); + + if let Some(entries) = entries { + if entries.is_empty() { + println!(" No entries found."); + } else { + println!( + " {:<38} {:<7} {:<30} {:<6} {:<8}", + style("ID").underlined(), + style("Method").underlined(), + style("Path").underlined(), + style("Status").underlined(), + style("Duration").underlined(), + ); + + for entry in entries { + let id = entry["id"].as_str().unwrap_or("-"); + let method = entry["request"]["method"].as_str().unwrap_or("-"); + let path = entry["request"]["path"].as_str().unwrap_or("-"); + let status_code = entry["response"]["status"].as_u64().unwrap_or(0); + let duration = entry["meta"]["duration_ms"].as_u64().unwrap_or(0); + + let status_styled = if status_code >= 500 { + style(status_code.to_string()).red() + } else if status_code >= 400 { + style(status_code.to_string()).yellow() + } else { + style(status_code.to_string()).green() + }; + + println!( + " {:<38} {:<7} {:<30} {:<6} {:>5}ms", + style(id).dim(), + method, + path, + status_styled, + duration, + ); + } + } + } + + println!(); + Ok(()) +} + +async fn cmd_show(args: ReplayShowArgs) -> Result<()> { + let (client, headers) = build_client(&args.token)?; + + let url = format!( + "{}/__rustapi/replays/{}", + args.server.trim_end_matches('/'), + args.id + ); + + let resp = client + .get(&url) + .headers(headers) + .send() + .await + .context("Failed to connect to server")?; + + let status = resp.status(); + let body: serde_json::Value = resp.json().await.context("Failed to parse response")?; + + if !status.is_success() { + let msg = body["message"].as_str().unwrap_or("Unknown error"); + anyhow::bail!("Server returned {}: {}", status, msg); + } + + println!("{}", style("Replay Entry").bold().cyan()); + println!(); + + // Request section + let req = &body["request"]; + println!( + " {} {} {}", + style("Request:").bold(), + req["method"].as_str().unwrap_or("-"), + req["uri"].as_str().unwrap_or("-") + ); + if let Some(headers_obj) = req["headers"].as_object() { + for (k, v) in headers_obj { + println!(" {}: {}", style(k).dim(), v.as_str().unwrap_or("-")); + } + } + if let Some(body_str) = req["body"].as_str() { + println!(" {}", style("Body:").bold()); + print_json_indented(body_str, 6); + } + + println!(); + + // Response section + let resp_data = &body["response"]; + let status_code = resp_data["status"].as_u64().unwrap_or(0); + let status_styled = if status_code >= 500 { + style(status_code.to_string()).red().bold() + } else if status_code >= 400 { + style(status_code.to_string()).yellow().bold() + } else { + style(status_code.to_string()).green().bold() + }; + println!(" {} {}", style("Response:").bold(), status_styled); + if let Some(headers_obj) = resp_data["headers"].as_object() { + for (k, v) in headers_obj { + println!(" {}: {}", style(k).dim(), v.as_str().unwrap_or("-")); + } + } + if let Some(body_str) = resp_data["body"].as_str() { + println!(" {}", style("Body:").bold()); + print_json_indented(body_str, 6); + } + + println!(); + + // Meta section + let meta = &body["meta"]; + println!(" {}", style("Meta:").bold()); + println!( + " Duration: {}ms", + meta["duration_ms"].as_u64().unwrap_or(0) + ); + if let Some(ip) = meta["client_ip"].as_str() { + println!(" Client IP: {}", ip); + } + if let Some(req_id) = meta["request_id"].as_str() { + println!(" Request ID: {}", req_id); + } + + println!(); + Ok(()) +} + +async fn cmd_run(args: ReplayRunArgs) -> Result<()> { + let (client, headers) = build_client(&args.token)?; + + let url = format!( + "{}/__rustapi/replays/{}/run?target={}", + args.server.trim_end_matches('/'), + args.id, + args.target + ); + + println!( + "{} Replaying {} against {}...", + style("Replay").bold().cyan(), + style(&args.id).dim(), + style(&args.target).yellow() + ); + + let resp = client + .post(&url) + .headers(headers) + .send() + .await + .context("Failed to connect to server")?; + + let status = resp.status(); + let body: serde_json::Value = resp.json().await.context("Failed to parse response")?; + + if !status.is_success() { + let msg = body["message"].as_str().unwrap_or("Unknown error"); + anyhow::bail!("Server returned {}: {}", status, msg); + } + + println!(); + println!( + " {} Original status: {}", + style("Original:").bold(), + body["original_response"]["status"] + ); + println!( + " {} Replayed status: {}", + style("Replayed:").bold(), + body["replayed_response"]["status"] + ); + + if let Some(body_str) = body["replayed_response"]["body"].as_str() { + println!(); + println!(" {}", style("Replayed Body:").bold()); + print_json_indented(body_str, 4); + } + + println!(); + Ok(()) +} + +async fn cmd_diff(args: ReplayDiffArgs) -> Result<()> { + let (client, headers) = build_client(&args.token)?; + + let url = format!( + "{}/__rustapi/replays/{}/diff?target={}", + args.server.trim_end_matches('/'), + args.id, + args.target + ); + + println!( + "{} Replaying {} against {} and computing diff...", + style("Diff").bold().cyan(), + style(&args.id).dim(), + style(&args.target).yellow() + ); + + let resp = client + .post(&url) + .headers(headers) + .send() + .await + .context("Failed to connect to server")?; + + let status = resp.status(); + let body: serde_json::Value = resp.json().await.context("Failed to parse response")?; + + if !status.is_success() { + let msg = body["message"].as_str().unwrap_or("Unknown error"); + anyhow::bail!("Server returned {}: {}", status, msg); + } + + let diff = &body["diff"]; + let has_diff = diff["has_diff"].as_bool().unwrap_or(false); + + println!(); + + if !has_diff { + println!(" {} No differences found!", style("MATCH").green().bold()); + } else { + println!(" {} Differences detected:", style("DIFF").red().bold()); + println!(); + + // Status diff + if let Some(status_diff) = diff["status_diff"].as_array() { + if status_diff.len() == 2 { + println!( + " Status: {} -> {}", + style(status_diff[0].to_string()).red(), + style(status_diff[1].to_string()).green(), + ); + } + } + + // Header diffs + if let Some(header_diffs) = diff["header_diffs"].as_array() { + if !header_diffs.is_empty() { + println!(" {}", style("Header differences:").bold()); + for hd in header_diffs { + let field = &hd["field"]; + let original = hd["original"].as_str().unwrap_or(""); + let replayed = hd["replayed"].as_str().unwrap_or(""); + println!( + " {}: {} -> {}", + style(format!("{}", field)).dim(), + style(original).red(), + style(replayed).green(), + ); + } + } + } + + // Body diff + if let Some(body_diff) = diff["body_diff"].as_object() { + if let Some(field_diffs) = body_diff["field_diffs"].as_array() { + if !field_diffs.is_empty() { + println!(" {}", style("Body field differences:").bold()); + for fd in field_diffs { + let field = &fd["field"]; + let original = fd["original"].as_str().unwrap_or(""); + let replayed = fd["replayed"].as_str().unwrap_or(""); + println!( + " {}: {} -> {}", + style(format!("{}", field)).dim(), + style(original).red(), + style(replayed).green(), + ); + } + } + } + } + } + + println!(); + Ok(()) +} + +/// Pretty-print a JSON string with indentation. +fn print_json_indented(json_str: &str, indent: usize) { + let prefix = " ".repeat(indent); + if let Ok(value) = serde_json::from_str::(json_str) { + if let Ok(pretty) = serde_json::to_string_pretty(&value) { + for line in pretty.lines() { + println!("{}{}", prefix, line); + } + return; + } + } + // Fallback: print raw + for line in json_str.lines() { + println!("{}{}", prefix, line); + } +} diff --git a/crates/rustapi-core/Cargo.toml b/crates/rustapi-core/Cargo.toml index 494f94ef..d60cd9d6 100644 --- a/crates/rustapi-core/Cargo.toml +++ b/crates/rustapi-core/Cargo.toml @@ -81,6 +81,9 @@ rustls-pemfile = { workspace = true, optional = true } rcgen = { workspace = true, optional = true } chrono = "0.4.43" +# Replay (feature-gated) +async-trait = { workspace = true, optional = true } + [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } proptest = "1.4" @@ -100,6 +103,7 @@ simd-json = ["dep:simd-json"] tracing = [] http3 = ["dep:quinn", "dep:h3", "dep:h3-quinn", "dep:rustls", "dep:rustls-pemfile"] http3-dev = ["http3", "dep:rcgen"] +replay = ["dep:async-trait"] diff --git a/crates/rustapi-core/src/lib.rs b/crates/rustapi-core/src/lib.rs index 93bbe33b..c5bf16c0 100644 --- a/crates/rustapi-core/src/lib.rs +++ b/crates/rustapi-core/src/lib.rs @@ -68,6 +68,8 @@ pub mod middleware; pub mod multipart; pub mod path_params; pub mod path_validation; +#[cfg(feature = "replay")] +pub mod replay; mod request; mod response; mod router; diff --git a/crates/rustapi-core/src/replay/config.rs b/crates/rustapi-core/src/replay/config.rs new file mode 100644 index 00000000..5ee1c43e --- /dev/null +++ b/crates/rustapi-core/src/replay/config.rs @@ -0,0 +1,370 @@ +//! Configuration for the replay recording system. +//! +//! Provides [`ReplayConfig`] with a builder pattern for customizing +//! replay behavior. Secure defaults: disabled, admin token required, +//! sensitive headers redacted, TTL enforced. + +use std::collections::HashSet; + +/// Configuration for the replay recording middleware. +/// +/// Uses builder pattern with secure defaults: +/// - Recording disabled (`enabled: false`) +/// - Admin token required for replay endpoints +/// - Sensitive headers redacted by default +/// - TTL enforced (1 hour default) +/// +/// # Example +/// +/// ```ignore +/// use rustapi_core::replay::ReplayConfig; +/// +/// let config = ReplayConfig::new() +/// .enabled(true) +/// .admin_token("my-secret-token") +/// .ttl_secs(3600) +/// .redact_header("x-custom-secret"); +/// ``` +#[derive(Clone)] +pub struct ReplayConfig { + /// Whether replay recording is enabled. Default: false (off in production). + pub enabled: bool, + + /// Admin bearer token required for replay endpoints. Must be set. + pub admin_token: Option, + + /// Paths to record (empty = all paths). Mutually exclusive with skip_paths. + pub record_paths: HashSet, + + /// Paths to skip from recording. + pub skip_paths: HashSet, + + /// Path prefix for admin routes. Default: `"/__rustapi/replays"`. + pub admin_route_prefix: String, + + /// Maximum request body size to capture (bytes). Default: 64KB. + pub max_request_body: usize, + + /// Maximum response body size to capture (bytes). Default: 256KB. + pub max_response_body: usize, + + /// Store capacity (in-memory ring buffer size). Default: 500. + pub store_capacity: usize, + + /// Time-to-live for entries in seconds. Default: 3600 (1 hour). + pub ttl_secs: u64, + + /// Sampling rate (0.0-1.0). Default: 1.0 (all requests). + pub sample_rate: f64, + + /// Headers to redact (values replaced with `[REDACTED]`). + pub redact_headers: HashSet, + + /// JSON body field paths to redact. + pub redact_body_fields: HashSet, + + /// Content types eligible for body capture. + pub capturable_content_types: HashSet, +} + +impl Default for ReplayConfig { + fn default() -> Self { + Self::new() + } +} + +impl ReplayConfig { + /// Create a new configuration with secure defaults. + /// + /// Defaults: + /// - Replay disabled + /// - No admin token (must be set before use) + /// - Max request body: 64KB + /// - Max response body: 256KB + /// - Store capacity: 500 entries + /// - TTL: 3600 seconds (1 hour) + /// - Sample rate: 1.0 (all requests) + /// - Redacted headers: authorization, cookie, x-api-key, x-auth-token + pub fn new() -> Self { + let mut redact_headers = HashSet::new(); + redact_headers.insert("authorization".to_string()); + redact_headers.insert("cookie".to_string()); + redact_headers.insert("x-api-key".to_string()); + redact_headers.insert("x-auth-token".to_string()); + + let mut capturable = HashSet::new(); + capturable.insert("application/json".to_string()); + capturable.insert("text/plain".to_string()); + capturable.insert("text/html".to_string()); + capturable.insert("application/xml".to_string()); + capturable.insert("text/xml".to_string()); + + Self { + enabled: false, + admin_token: None, + record_paths: HashSet::new(), + skip_paths: HashSet::new(), + admin_route_prefix: "/__rustapi/replays".to_string(), + max_request_body: 65_536, // 64KB + max_response_body: 262_144, // 256KB + store_capacity: 500, + ttl_secs: 3600, + sample_rate: 1.0, + redact_headers, + redact_body_fields: HashSet::new(), + capturable_content_types: capturable, + } + } + + /// Enable or disable replay recording. + pub fn enabled(mut self, enabled: bool) -> Self { + self.enabled = enabled; + self + } + + /// Set the admin bearer token for replay endpoints. + /// + /// All `/__rustapi/replays` endpoints require this token + /// in the `Authorization: Bearer ` header. + pub fn admin_token(mut self, token: impl Into) -> Self { + self.admin_token = Some(token.into()); + self + } + + /// Add a path to record. If any record paths are set, + /// only those paths will be recorded. + pub fn record_path(mut self, path: impl Into) -> Self { + self.record_paths.insert(path.into()); + self + } + + /// Add a path to skip from recording. + pub fn skip_path(mut self, path: impl Into) -> Self { + self.skip_paths.insert(path.into()); + self + } + + /// Set the admin route prefix. Default: `"/__rustapi/replays"`. + pub fn admin_route_prefix(mut self, prefix: impl Into) -> Self { + self.admin_route_prefix = prefix.into(); + self + } + + /// Set the maximum request body size to capture (bytes). + pub fn max_request_body(mut self, size: usize) -> Self { + self.max_request_body = size; + self + } + + /// Set the maximum response body size to capture (bytes). + pub fn max_response_body(mut self, size: usize) -> Self { + self.max_response_body = size; + self + } + + /// Set the store capacity (max number of entries). + pub fn store_capacity(mut self, capacity: usize) -> Self { + self.store_capacity = capacity; + self + } + + /// Set the TTL for replay entries (seconds). + pub fn ttl_secs(mut self, secs: u64) -> Self { + self.ttl_secs = secs; + self + } + + /// Set the sampling rate (0.0 to 1.0). + pub fn sample_rate(mut self, rate: f64) -> Self { + self.sample_rate = rate.clamp(0.0, 1.0); + self + } + + /// Add a header name to redact (case-insensitive). + pub fn redact_header(mut self, header: impl Into) -> Self { + self.redact_headers.insert(header.into().to_lowercase()); + self + } + + /// Add a JSON body field path to redact (e.g., `"password"`, `"ssn"`). + pub fn redact_body_field(mut self, field: impl Into) -> Self { + self.redact_body_fields.insert(field.into()); + self + } + + /// Add a capturable content type. + pub fn capturable_content_type(mut self, content_type: impl Into) -> Self { + self.capturable_content_types + .insert(content_type.into().to_lowercase()); + self + } + + /// Check if a path should be recorded. + pub fn should_record_path(&self, path: &str) -> bool { + // Skip admin routes + if path.starts_with(&self.admin_route_prefix) { + return false; + } + + // Skip explicitly skipped paths + if self.skip_paths.contains(path) { + return false; + } + + // If record_paths is set, only record those + if !self.record_paths.is_empty() { + return self.record_paths.contains(path); + } + + true + } + + /// Check if this request should be sampled. + pub fn should_sample(&self) -> bool { + if self.sample_rate >= 1.0 { + return true; + } + if self.sample_rate <= 0.0 { + return false; + } + rand_sample(self.sample_rate) + } + + /// Check if a content type is capturable. + pub fn is_capturable_content_type(&self, content_type: &str) -> bool { + let ct_lower = content_type.to_lowercase(); + for allowed in &self.capturable_content_types { + if ct_lower.starts_with(allowed) { + return true; + } + } + ct_lower.starts_with("text/") || ct_lower.starts_with("application/json") + } +} + +/// Simple random sampling based on rate. +fn rand_sample(rate: f64) -> bool { + use std::time::{SystemTime, UNIX_EPOCH}; + + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .subsec_nanos(); + + let threshold = (rate * u32::MAX as f64) as u32; + nanos < threshold +} + +impl std::fmt::Debug for ReplayConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReplayConfig") + .field("enabled", &self.enabled) + .field("admin_token", &self.admin_token.as_ref().map(|_| "[SET]")) + .field("record_paths", &self.record_paths) + .field("skip_paths", &self.skip_paths) + .field("admin_route_prefix", &self.admin_route_prefix) + .field("max_request_body", &self.max_request_body) + .field("max_response_body", &self.max_response_body) + .field("store_capacity", &self.store_capacity) + .field("ttl_secs", &self.ttl_secs) + .field("sample_rate", &self.sample_rate) + .field("redact_headers", &self.redact_headers) + .field("redact_body_fields", &self.redact_body_fields) + .finish() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_config() { + let config = ReplayConfig::new(); + assert!(!config.enabled); + assert!(config.admin_token.is_none()); + assert_eq!(config.max_request_body, 65_536); + assert_eq!(config.max_response_body, 262_144); + assert_eq!(config.store_capacity, 500); + assert_eq!(config.ttl_secs, 3600); + assert_eq!(config.sample_rate, 1.0); + assert_eq!(config.admin_route_prefix, "/__rustapi/replays"); + } + + #[test] + fn test_default_redacted_headers() { + let config = ReplayConfig::new(); + assert!(config.redact_headers.contains("authorization")); + assert!(config.redact_headers.contains("cookie")); + assert!(config.redact_headers.contains("x-api-key")); + assert!(config.redact_headers.contains("x-auth-token")); + } + + #[test] + fn test_builder_methods() { + let config = ReplayConfig::new() + .enabled(true) + .admin_token("test-token") + .max_request_body(1024) + .max_response_body(2048) + .store_capacity(100) + .ttl_secs(7200) + .sample_rate(0.5) + .redact_header("x-custom") + .redact_body_field("password") + .record_path("/api/users") + .skip_path("/health"); + + assert!(config.enabled); + assert_eq!(config.admin_token.as_deref(), Some("test-token")); + assert_eq!(config.max_request_body, 1024); + assert_eq!(config.max_response_body, 2048); + assert_eq!(config.store_capacity, 100); + assert_eq!(config.ttl_secs, 7200); + assert_eq!(config.sample_rate, 0.5); + assert!(config.redact_headers.contains("x-custom")); + assert!(config.redact_body_fields.contains("password")); + assert!(config.record_paths.contains("/api/users")); + assert!(config.skip_paths.contains("/health")); + } + + #[test] + fn test_sample_rate_clamping() { + let config = ReplayConfig::new().sample_rate(1.5); + assert_eq!(config.sample_rate, 1.0); + + let config = ReplayConfig::new().sample_rate(-0.5); + assert_eq!(config.sample_rate, 0.0); + } + + #[test] + fn test_should_record_path() { + let config = ReplayConfig::new() + .skip_path("/health") + .record_path("/api/users"); + + assert!(!config.should_record_path("/__rustapi/replays")); + assert!(!config.should_record_path("/health")); + assert!(config.should_record_path("/api/users")); + assert!(!config.should_record_path("/api/items")); + } + + #[test] + fn test_should_record_path_no_record_filter() { + let config = ReplayConfig::new().skip_path("/health"); + + assert!(config.should_record_path("/api/users")); + assert!(config.should_record_path("/api/items")); + assert!(!config.should_record_path("/health")); + } + + #[test] + fn test_capturable_content_type() { + let config = ReplayConfig::new(); + + assert!(config.is_capturable_content_type("application/json")); + assert!(config.is_capturable_content_type("application/json; charset=utf-8")); + assert!(config.is_capturable_content_type("text/plain")); + assert!(config.is_capturable_content_type("text/html")); + } +} diff --git a/crates/rustapi-core/src/replay/diff.rs b/crates/rustapi-core/src/replay/diff.rs new file mode 100644 index 00000000..95cc47d4 --- /dev/null +++ b/crates/rustapi-core/src/replay/diff.rs @@ -0,0 +1,465 @@ +//! Diff utilities for comparing original and replayed HTTP responses. +//! +//! Pure functions for computing structural diffs between [`RecordedResponse`] +//! instances. Supports JSON field-level and raw text diff. + +use serde::{Deserialize, Serialize}; + +use super::entry::RecordedResponse; +use super::truncation::try_parse_json; + +/// Result of diffing an original vs. replayed response. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DiffResult { + /// Whether there are any differences. + pub has_diff: bool, + + /// Status code diff `(original, replayed)` if different. + #[serde(skip_serializing_if = "Option::is_none")] + pub status_diff: Option<(u16, u16)>, + + /// Header differences. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub header_diffs: Vec, + + /// Body difference summary. + #[serde(skip_serializing_if = "Option::is_none")] + pub body_diff: Option, +} + +/// Which field differs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DiffField { + /// HTTP status code. + Status, + /// A specific header. + Header(String), + /// A JSON body field path (dot-separated). + BodyField(String), + /// The raw body (non-JSON). + BodyRaw, +} + +/// A single field difference. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FieldDiff { + /// The field that differs. + pub field: DiffField, + /// Original value (if present). + #[serde(skip_serializing_if = "Option::is_none")] + pub original: Option, + /// Replayed value (if present). + #[serde(skip_serializing_if = "Option::is_none")] + pub replayed: Option, +} + +/// Body difference details. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BodyDiff { + /// If both bodies are JSON, field-level diffs. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub field_diffs: Vec, + + /// If bodies are not JSON-diffable, a raw text diff summary. + #[serde(skip_serializing_if = "Option::is_none")] + pub raw_diff_summary: Option, +} + +/// Compare two recorded responses and produce a diff. +/// +/// # Arguments +/// +/// * `original` - The originally recorded response. +/// * `replayed` - The response from replaying the request. +/// * `ignore_headers` - Header names to exclude from comparison (e.g., `date`, `x-request-id`). +pub fn compute_diff( + original: &RecordedResponse, + replayed: &RecordedResponse, + ignore_headers: &[String], +) -> DiffResult { + let mut result = DiffResult { + has_diff: false, + status_diff: None, + header_diffs: Vec::new(), + body_diff: None, + }; + + // Compare status codes + if original.status != replayed.status { + result.has_diff = true; + result.status_diff = Some((original.status, replayed.status)); + } + + // Compare headers + let ignore_set: std::collections::HashSet = + ignore_headers.iter().map(|h| h.to_lowercase()).collect(); + + // Check headers in original + for (key, orig_val) in &original.headers { + let key_lower = key.to_lowercase(); + if ignore_set.contains(&key_lower) { + continue; + } + match replayed.headers.get(key) { + Some(replay_val) if replay_val != orig_val => { + result.has_diff = true; + result.header_diffs.push(FieldDiff { + field: DiffField::Header(key.clone()), + original: Some(orig_val.clone()), + replayed: Some(replay_val.clone()), + }); + } + None => { + result.has_diff = true; + result.header_diffs.push(FieldDiff { + field: DiffField::Header(key.clone()), + original: Some(orig_val.clone()), + replayed: None, + }); + } + _ => {} // same value + } + } + + // Check headers only in replayed + for (key, replay_val) in &replayed.headers { + let key_lower = key.to_lowercase(); + if ignore_set.contains(&key_lower) { + continue; + } + if !original.headers.contains_key(key) { + result.has_diff = true; + result.header_diffs.push(FieldDiff { + field: DiffField::Header(key.clone()), + original: None, + replayed: Some(replay_val.clone()), + }); + } + } + + // Compare bodies + match (&original.body, &replayed.body) { + (Some(orig_body), Some(replay_body)) => { + if orig_body != replay_body { + result.has_diff = true; + + // Try JSON diff + let orig_json = try_parse_json(orig_body); + let replay_json = try_parse_json(replay_body); + + match (orig_json, replay_json) { + (Some(orig_val), Some(replay_val)) => { + let field_diffs = diff_json(&orig_val, &replay_val, ""); + if !field_diffs.is_empty() { + result.body_diff = Some(BodyDiff { + field_diffs, + raw_diff_summary: None, + }); + } + } + _ => { + // Fall back to raw diff summary + let summary = format!( + "Bodies differ: original {} bytes, replayed {} bytes", + orig_body.len(), + replay_body.len() + ); + result.body_diff = Some(BodyDiff { + field_diffs: Vec::new(), + raw_diff_summary: Some(summary), + }); + } + } + } + } + (Some(orig_body), None) => { + result.has_diff = true; + result.body_diff = Some(BodyDiff { + field_diffs: Vec::new(), + raw_diff_summary: Some(format!( + "Original has body ({} bytes), replayed has no body", + orig_body.len() + )), + }); + } + (None, Some(replay_body)) => { + result.has_diff = true; + result.body_diff = Some(BodyDiff { + field_diffs: Vec::new(), + raw_diff_summary: Some(format!( + "Original has no body, replayed has body ({} bytes)", + replay_body.len() + )), + }); + } + (None, None) => {} // both empty, no diff + } + + result +} + +/// Deep-diff two JSON values, producing field-level diffs. +/// +/// Recursively compares JSON structures. The `prefix` argument tracks the +/// current path (e.g., `"user.address.city"`). +pub fn diff_json( + original: &serde_json::Value, + replayed: &serde_json::Value, + prefix: &str, +) -> Vec { + let mut diffs = Vec::new(); + + if original == replayed { + return diffs; + } + + match (original, replayed) { + (serde_json::Value::Object(orig_map), serde_json::Value::Object(replay_map)) => { + // Check keys in original + for (key, orig_val) in orig_map { + let path = if prefix.is_empty() { + key.clone() + } else { + format!("{}.{}", prefix, key) + }; + + match replay_map.get(key) { + Some(replay_val) => { + diffs.extend(diff_json(orig_val, replay_val, &path)); + } + None => { + diffs.push(FieldDiff { + field: DiffField::BodyField(path), + original: Some(value_to_string(orig_val)), + replayed: None, + }); + } + } + } + + // Check keys only in replayed + for (key, replay_val) in replay_map { + if !orig_map.contains_key(key) { + let path = if prefix.is_empty() { + key.clone() + } else { + format!("{}.{}", prefix, key) + }; + diffs.push(FieldDiff { + field: DiffField::BodyField(path), + original: None, + replayed: Some(value_to_string(replay_val)), + }); + } + } + } + (serde_json::Value::Array(orig_arr), serde_json::Value::Array(replay_arr)) => { + let max_len = orig_arr.len().max(replay_arr.len()); + for i in 0..max_len { + let path = if prefix.is_empty() { + format!("[{}]", i) + } else { + format!("{}[{}]", prefix, i) + }; + + match (orig_arr.get(i), replay_arr.get(i)) { + (Some(orig_val), Some(replay_val)) => { + diffs.extend(diff_json(orig_val, replay_val, &path)); + } + (Some(orig_val), None) => { + diffs.push(FieldDiff { + field: DiffField::BodyField(path), + original: Some(value_to_string(orig_val)), + replayed: None, + }); + } + (None, Some(replay_val)) => { + diffs.push(FieldDiff { + field: DiffField::BodyField(path), + original: None, + replayed: Some(value_to_string(replay_val)), + }); + } + (None, None) => unreachable!(), + } + } + } + _ => { + // Leaf values differ + let path = if prefix.is_empty() { + "(root)".to_string() + } else { + prefix.to_string() + }; + diffs.push(FieldDiff { + field: DiffField::BodyField(path), + original: Some(value_to_string(original)), + replayed: Some(value_to_string(replayed)), + }); + } + } + + diffs +} + +/// Convert a JSON value to a compact string representation. +fn value_to_string(value: &serde_json::Value) -> String { + match value { + serde_json::Value::String(s) => s.clone(), + other => other.to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + + fn make_response(status: u16, body: Option<&str>) -> RecordedResponse { + RecordedResponse { + status, + headers: HashMap::new(), + body: body.map(|s| s.to_string()), + body_size: body.map(|s| s.len()).unwrap_or(0), + body_truncated: false, + } + } + + #[test] + fn test_identical_responses_no_diff() { + let resp = make_response(200, Some(r#"{"ok":true}"#)); + let diff = compute_diff(&resp, &resp, &[]); + assert!(!diff.has_diff); + assert!(diff.status_diff.is_none()); + assert!(diff.header_diffs.is_empty()); + assert!(diff.body_diff.is_none()); + } + + #[test] + fn test_different_status() { + let orig = make_response(200, None); + let replay = make_response(500, None); + let diff = compute_diff(&orig, &replay, &[]); + + assert!(diff.has_diff); + assert_eq!(diff.status_diff, Some((200, 500))); + } + + #[test] + fn test_different_headers() { + let mut orig = make_response(200, None); + orig.headers + .insert("x-custom".to_string(), "value1".to_string()); + + let mut replay = make_response(200, None); + replay + .headers + .insert("x-custom".to_string(), "value2".to_string()); + + let diff = compute_diff(&orig, &replay, &[]); + assert!(diff.has_diff); + assert_eq!(diff.header_diffs.len(), 1); + } + + #[test] + fn test_ignore_headers() { + let mut orig = make_response(200, None); + orig.headers + .insert("date".to_string(), "Mon, 01 Jan".to_string()); + + let mut replay = make_response(200, None); + replay + .headers + .insert("date".to_string(), "Tue, 02 Jan".to_string()); + + let diff = compute_diff(&orig, &replay, &["date".to_string()]); + assert!(!diff.has_diff); + } + + #[test] + fn test_missing_header_in_replay() { + let mut orig = make_response(200, None); + orig.headers + .insert("x-custom".to_string(), "value".to_string()); + + let replay = make_response(200, None); + let diff = compute_diff(&orig, &replay, &[]); + + assert!(diff.has_diff); + assert_eq!(diff.header_diffs.len(), 1); + assert!(diff.header_diffs[0].replayed.is_none()); + } + + #[test] + fn test_json_body_diff() { + let orig = make_response(200, Some(r#"{"name":"alice","age":30}"#)); + let replay = make_response(200, Some(r#"{"name":"bob","age":30}"#)); + + let diff = compute_diff(&orig, &replay, &[]); + assert!(diff.has_diff); + + let body_diff = diff.body_diff.unwrap(); + assert_eq!(body_diff.field_diffs.len(), 1); + assert!(matches!(&body_diff.field_diffs[0].field, DiffField::BodyField(f) if f == "name")); + assert_eq!(body_diff.field_diffs[0].original.as_deref(), Some("alice")); + assert_eq!(body_diff.field_diffs[0].replayed.as_deref(), Some("bob")); + } + + #[test] + fn test_json_nested_diff() { + let orig = make_response(200, Some(r#"{"user":{"name":"alice","age":30}}"#)); + let replay = make_response(200, Some(r#"{"user":{"name":"alice","age":31}}"#)); + + let diff = compute_diff(&orig, &replay, &[]); + assert!(diff.has_diff); + + let body_diff = diff.body_diff.unwrap(); + assert_eq!(body_diff.field_diffs.len(), 1); + assert!( + matches!(&body_diff.field_diffs[0].field, DiffField::BodyField(f) if f == "user.age") + ); + } + + #[test] + fn test_non_json_body_diff() { + let orig = make_response(200, Some("hello world")); + let replay = make_response(200, Some("hello changed")); + + let diff = compute_diff(&orig, &replay, &[]); + assert!(diff.has_diff); + + let body_diff = diff.body_diff.unwrap(); + assert!(body_diff.raw_diff_summary.is_some()); + } + + #[test] + fn test_body_presence_diff() { + let orig = make_response(200, Some("body here")); + let replay = make_response(200, None); + + let diff = compute_diff(&orig, &replay, &[]); + assert!(diff.has_diff); + assert!(diff.body_diff.is_some()); + } + + #[test] + fn test_diff_json_array() { + let orig: serde_json::Value = serde_json::from_str("[1, 2, 3]").unwrap(); + let replay: serde_json::Value = serde_json::from_str("[1, 2, 4]").unwrap(); + + let diffs = diff_json(&orig, &replay, ""); + assert_eq!(diffs.len(), 1); + assert!(matches!(&diffs[0].field, DiffField::BodyField(f) if f == "[2]")); + } + + #[test] + fn test_diff_json_extra_key() { + let orig: serde_json::Value = serde_json::from_str(r#"{"a":1}"#).unwrap(); + let replay: serde_json::Value = serde_json::from_str(r#"{"a":1,"b":2}"#).unwrap(); + + let diffs = diff_json(&orig, &replay, ""); + assert_eq!(diffs.len(), 1); + assert!(matches!(&diffs[0].field, DiffField::BodyField(f) if f == "b")); + assert!(diffs[0].original.is_none()); + } +} diff --git a/crates/rustapi-core/src/replay/entry.rs b/crates/rustapi-core/src/replay/entry.rs new file mode 100644 index 00000000..3cda5509 --- /dev/null +++ b/crates/rustapi-core/src/replay/entry.rs @@ -0,0 +1,211 @@ +//! Core data structures for replay entries. +//! +//! A [`ReplayEntry`] captures a complete HTTP request/response pair +//! for later replay and diff analysis. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +use super::meta::ReplayMeta; + +/// Unique identifier for a replay entry. +pub type ReplayId = String; + +/// A recorded HTTP request/response pair for replay debugging. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReplayEntry { + /// Unique replay entry identifier (UUID v4). + pub id: ReplayId, + + /// When this entry was recorded (Unix timestamp in milliseconds). + pub recorded_at: u64, + + /// The recorded HTTP request. + pub request: RecordedRequest, + + /// The recorded HTTP response. + pub response: RecordedResponse, + + /// Additional metadata (route pattern, duration, tags). + pub meta: ReplayMeta, +} + +/// A recorded HTTP request. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RecordedRequest { + /// HTTP method (GET, POST, etc.). + pub method: String, + + /// Full URI including query string. + pub uri: String, + + /// Request path (without query string). + pub path: String, + + /// Query string (without leading `?`). + #[serde(skip_serializing_if = "Option::is_none")] + pub query: Option, + + /// Request headers (after redaction). + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub headers: HashMap, + + /// Request body (after redaction and truncation). + #[serde(skip_serializing_if = "Option::is_none")] + pub body: Option, + + /// Original request body size in bytes. + pub body_size: usize, + + /// Whether the body was truncated. + #[serde(default)] + pub body_truncated: bool, +} + +/// A recorded HTTP response. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RecordedResponse { + /// HTTP status code. + pub status: u16, + + /// Response headers (after redaction). + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub headers: HashMap, + + /// Response body (after redaction and truncation). + #[serde(skip_serializing_if = "Option::is_none")] + pub body: Option, + + /// Original response body size in bytes. + pub body_size: usize, + + /// Whether the body was truncated. + #[serde(default)] + pub body_truncated: bool, +} + +impl ReplayEntry { + /// Create a new replay entry with a generated UUID and current timestamp. + pub fn new(request: RecordedRequest, response: RecordedResponse, meta: ReplayMeta) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + recorded_at: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64, + request, + response, + meta, + } + } +} + +impl RecordedRequest { + /// Create a new recorded request. + pub fn new(method: impl Into, uri: impl Into, path: impl Into) -> Self { + Self { + method: method.into(), + uri: uri.into(), + path: path.into(), + query: None, + headers: HashMap::new(), + body: None, + body_size: 0, + body_truncated: false, + } + } +} + +impl RecordedResponse { + /// Create a new recorded response. + pub fn new(status: u16) -> Self { + Self { + status, + headers: HashMap::new(), + body: None, + body_size: 0, + body_truncated: false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_replay_entry_creation() { + let req = RecordedRequest::new("GET", "/users?page=1", "/users"); + let resp = RecordedResponse::new(200); + let meta = ReplayMeta::new().with_duration_ms(42); + + let entry = ReplayEntry::new(req, resp, meta); + + assert!(!entry.id.is_empty()); + assert!(entry.recorded_at > 0); + assert_eq!(entry.request.method, "GET"); + assert_eq!(entry.request.uri, "/users?page=1"); + assert_eq!(entry.request.path, "/users"); + assert_eq!(entry.response.status, 200); + assert_eq!(entry.meta.duration_ms, 42); + } + + #[test] + fn test_recorded_request_defaults() { + let req = RecordedRequest::new("POST", "/items", "/items"); + assert_eq!(req.method, "POST"); + assert!(req.query.is_none()); + assert!(req.headers.is_empty()); + assert!(req.body.is_none()); + assert_eq!(req.body_size, 0); + assert!(!req.body_truncated); + } + + #[test] + fn test_recorded_response_defaults() { + let resp = RecordedResponse::new(404); + assert_eq!(resp.status, 404); + assert!(resp.headers.is_empty()); + assert!(resp.body.is_none()); + assert_eq!(resp.body_size, 0); + assert!(!resp.body_truncated); + } + + #[test] + fn test_serialization_roundtrip() { + let req = RecordedRequest { + method: "POST".to_string(), + uri: "/api/users".to_string(), + path: "/api/users".to_string(), + query: None, + headers: { + let mut h = HashMap::new(); + h.insert("content-type".to_string(), "application/json".to_string()); + h + }, + body: Some(r#"{"name":"test"}"#.to_string()), + body_size: 15, + body_truncated: false, + }; + let resp = RecordedResponse { + status: 201, + headers: HashMap::new(), + body: Some(r#"{"id":1}"#.to_string()), + body_size: 8, + body_truncated: false, + }; + let entry = ReplayEntry::new(req, resp, ReplayMeta::default()); + + let json = serde_json::to_string(&entry).unwrap(); + let deserialized: ReplayEntry = serde_json::from_str(&json).unwrap(); + + assert_eq!(deserialized.id, entry.id); + assert_eq!(deserialized.request.method, "POST"); + assert_eq!(deserialized.response.status, 201); + assert_eq!( + deserialized.request.body.as_deref(), + Some(r#"{"name":"test"}"#) + ); + } +} diff --git a/crates/rustapi-core/src/replay/meta.rs b/crates/rustapi-core/src/replay/meta.rs new file mode 100644 index 00000000..9b331b42 --- /dev/null +++ b/crates/rustapi-core/src/replay/meta.rs @@ -0,0 +1,124 @@ +//! Metadata associated with a replay entry. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Metadata associated with a replay entry. +/// +/// Contains contextual information about the recorded request such as +/// route pattern, processing duration, client IP, and custom tags. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ReplayMeta { + /// Route pattern that matched (e.g., `"/users/{id}"`). + #[serde(skip_serializing_if = "Option::is_none")] + pub route_pattern: Option, + + /// Request processing duration in milliseconds. + pub duration_ms: u64, + + /// Client IP address. + #[serde(skip_serializing_if = "Option::is_none")] + pub client_ip: Option, + + /// Request ID for correlation. + #[serde(skip_serializing_if = "Option::is_none")] + pub request_id: Option, + + /// Custom tags for categorization and filtering. + #[serde(default, skip_serializing_if = "HashMap::is_empty")] + pub tags: HashMap, + + /// Time-to-live in seconds (for retention). + #[serde(skip_serializing_if = "Option::is_none")] + pub ttl_secs: Option, +} + +impl ReplayMeta { + /// Create a new empty metadata instance. + pub fn new() -> Self { + Self::default() + } + + /// Set the route pattern. + pub fn with_route_pattern(mut self, pattern: impl Into) -> Self { + self.route_pattern = Some(pattern.into()); + self + } + + /// Set the processing duration in milliseconds. + pub fn with_duration_ms(mut self, ms: u64) -> Self { + self.duration_ms = ms; + self + } + + /// Set the client IP address. + pub fn with_client_ip(mut self, ip: impl Into) -> Self { + self.client_ip = Some(ip.into()); + self + } + + /// Set the request ID. + pub fn with_request_id(mut self, id: impl Into) -> Self { + self.request_id = Some(id.into()); + self + } + + /// Add a custom tag. + pub fn with_tag(mut self, key: impl Into, value: impl Into) -> Self { + self.tags.insert(key.into(), value.into()); + self + } + + /// Set the TTL in seconds. + pub fn with_ttl_secs(mut self, secs: u64) -> Self { + self.ttl_secs = Some(secs); + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default() { + let meta = ReplayMeta::new(); + assert!(meta.route_pattern.is_none()); + assert_eq!(meta.duration_ms, 0); + assert!(meta.client_ip.is_none()); + assert!(meta.request_id.is_none()); + assert!(meta.tags.is_empty()); + assert!(meta.ttl_secs.is_none()); + } + + #[test] + fn test_builder() { + let meta = ReplayMeta::new() + .with_route_pattern("/users/{id}") + .with_duration_ms(42) + .with_client_ip("192.168.1.1") + .with_request_id("req-123") + .with_tag("env", "staging") + .with_ttl_secs(3600); + + assert_eq!(meta.route_pattern.as_deref(), Some("/users/{id}")); + assert_eq!(meta.duration_ms, 42); + assert_eq!(meta.client_ip.as_deref(), Some("192.168.1.1")); + assert_eq!(meta.request_id.as_deref(), Some("req-123")); + assert_eq!(meta.tags.get("env").map(|s| s.as_str()), Some("staging")); + assert_eq!(meta.ttl_secs, Some(3600)); + } + + #[test] + fn test_serialization_roundtrip() { + let meta = ReplayMeta::new() + .with_route_pattern("/test") + .with_duration_ms(100); + + let json = serde_json::to_string(&meta).unwrap(); + let deserialized: ReplayMeta = serde_json::from_str(&json).unwrap(); + + assert_eq!(deserialized.route_pattern, meta.route_pattern); + assert_eq!(deserialized.duration_ms, meta.duration_ms); + } +} diff --git a/crates/rustapi-core/src/replay/mod.rs b/crates/rustapi-core/src/replay/mod.rs new file mode 100644 index 00000000..30fdd3e1 --- /dev/null +++ b/crates/rustapi-core/src/replay/mod.rs @@ -0,0 +1,35 @@ +//! Replay - Time-travel debugging types and traits. +//! +//! This module provides the core data structures for recording HTTP +//! request/response pairs and computing diffs between replayed and +//! original responses. All types are framework-agnostic with no IO. +//! +//! # Security +//! +//! - Recording disabled by default +//! - Admin token required for all replay endpoints +//! - Sensitive headers (authorization, cookie, etc.) redacted by default +//! - JSON body field redaction supported +//! - Configurable TTL with automatic retention cleanup +//! +//! # Crate Organization +//! +//! - **rustapi-core** (this module): Pure types, traits, and utilities +//! - **rustapi-extras**: Middleware (`ReplayLayer`), stores, HTTP routes +//! - **cargo-rustapi**: CLI commands for replay management + +mod config; +mod diff; +mod entry; +mod meta; +mod redaction; +mod store; +mod truncation; + +pub use config::ReplayConfig; +pub use diff::{compute_diff, diff_json, BodyDiff, DiffField, DiffResult, FieldDiff}; +pub use entry::{RecordedRequest, RecordedResponse, ReplayEntry, ReplayId}; +pub use meta::ReplayMeta; +pub use redaction::{redact_body, redact_headers, RedactionConfig}; +pub use store::{ReplayQuery, ReplayStore, ReplayStoreError, ReplayStoreResult}; +pub use truncation::{content_sniff, truncate_body, try_parse_json, ContentKind, TruncationConfig}; diff --git a/crates/rustapi-core/src/replay/redaction.rs b/crates/rustapi-core/src/replay/redaction.rs new file mode 100644 index 00000000..7e6cfded --- /dev/null +++ b/crates/rustapi-core/src/replay/redaction.rs @@ -0,0 +1,186 @@ +//! Header and body redaction utilities. +//! +//! Pure functions for redacting sensitive data from headers and JSON bodies. +//! No IO operations. + +use std::collections::{HashMap, HashSet}; + +/// Configuration for redaction of sensitive data. +#[derive(Debug, Clone)] +pub struct RedactionConfig { + /// Header names to redact (lowercase). + pub header_names: HashSet, + /// JSON body field paths to redact. + pub body_field_paths: HashSet, + /// Replacement string. Default: `"[REDACTED]"`. + pub replacement: String, +} + +impl Default for RedactionConfig { + fn default() -> Self { + Self { + header_names: HashSet::new(), + body_field_paths: HashSet::new(), + replacement: "[REDACTED]".to_string(), + } + } +} + +/// Redact sensitive values from a header map. +/// +/// Returns a new map with sensitive header values replaced by `"[REDACTED]"`. +/// Header name comparison is case-insensitive. +pub fn redact_headers( + headers: &HashMap, + sensitive: &HashSet, +) -> HashMap { + headers + .iter() + .map(|(k, v)| { + let key_lower = k.to_lowercase(); + if sensitive.contains(&key_lower) { + (k.clone(), "[REDACTED]".to_string()) + } else { + (k.clone(), v.clone()) + } + }) + .collect() +} + +/// Redact JSON body fields by path. +/// +/// Parses the body as JSON, replaces matching field values with the replacement +/// string, and returns the modified JSON string. Returns `None` if the input +/// is not valid JSON. +/// +/// Field paths are top-level keys only (e.g., `"password"`, `"ssn"`). +pub fn redact_body(body: &str, field_paths: &HashSet, replacement: &str) -> Option { + if field_paths.is_empty() { + return Some(body.to_string()); + } + + let mut value: serde_json::Value = serde_json::from_str(body).ok()?; + redact_value(&mut value, field_paths, replacement); + serde_json::to_string(&value).ok() +} + +/// Recursively redact fields in a JSON value. +fn redact_value(value: &mut serde_json::Value, fields: &HashSet, replacement: &str) { + match value { + serde_json::Value::Object(map) => { + for (key, val) in map.iter_mut() { + if fields.contains(key) { + *val = serde_json::Value::String(replacement.to_string()); + } else { + redact_value(val, fields, replacement); + } + } + } + serde_json::Value::Array(arr) => { + for item in arr.iter_mut() { + redact_value(item, fields, replacement); + } + } + _ => {} + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_redact_headers_basic() { + let mut headers = HashMap::new(); + headers.insert("authorization".to_string(), "Bearer secret".to_string()); + headers.insert("content-type".to_string(), "application/json".to_string()); + headers.insert("x-api-key".to_string(), "key123".to_string()); + + let mut sensitive = HashSet::new(); + sensitive.insert("authorization".to_string()); + sensitive.insert("x-api-key".to_string()); + + let redacted = redact_headers(&headers, &sensitive); + + assert_eq!(redacted.get("authorization").unwrap(), "[REDACTED]"); + assert_eq!(redacted.get("content-type").unwrap(), "application/json"); + assert_eq!(redacted.get("x-api-key").unwrap(), "[REDACTED]"); + } + + #[test] + fn test_redact_headers_case_insensitive() { + let mut headers = HashMap::new(); + headers.insert("Authorization".to_string(), "Bearer secret".to_string()); + + let mut sensitive = HashSet::new(); + sensitive.insert("authorization".to_string()); + + let redacted = redact_headers(&headers, &sensitive); + assert_eq!(redacted.get("Authorization").unwrap(), "[REDACTED]"); + } + + #[test] + fn test_redact_headers_empty_sensitive() { + let mut headers = HashMap::new(); + headers.insert("authorization".to_string(), "Bearer secret".to_string()); + + let redacted = redact_headers(&headers, &HashSet::new()); + assert_eq!(redacted.get("authorization").unwrap(), "Bearer secret"); + } + + #[test] + fn test_redact_body_basic() { + let body = r#"{"username":"john","password":"secret123","email":"john@example.com"}"#; + let mut fields = HashSet::new(); + fields.insert("password".to_string()); + + let result = redact_body(body, &fields, "[REDACTED]").unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&result).unwrap(); + + assert_eq!(parsed["username"], "john"); + assert_eq!(parsed["password"], "[REDACTED]"); + assert_eq!(parsed["email"], "john@example.com"); + } + + #[test] + fn test_redact_body_nested() { + let body = r#"{"user":{"name":"john","ssn":"123-45-6789"}}"#; + let mut fields = HashSet::new(); + fields.insert("ssn".to_string()); + + let result = redact_body(body, &fields, "[REDACTED]").unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&result).unwrap(); + + assert_eq!(parsed["user"]["name"], "john"); + assert_eq!(parsed["user"]["ssn"], "[REDACTED]"); + } + + #[test] + fn test_redact_body_array() { + let body = r#"[{"password":"a"},{"password":"b"}]"#; + let mut fields = HashSet::new(); + fields.insert("password".to_string()); + + let result = redact_body(body, &fields, "[REDACTED]").unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&result).unwrap(); + + assert_eq!(parsed[0]["password"], "[REDACTED]"); + assert_eq!(parsed[1]["password"], "[REDACTED]"); + } + + #[test] + fn test_redact_body_not_json() { + let body = "this is not json"; + let mut fields = HashSet::new(); + fields.insert("password".to_string()); + + assert!(redact_body(body, &fields, "[REDACTED]").is_none()); + } + + #[test] + fn test_redact_body_empty_fields() { + let body = r#"{"password":"secret"}"#; + let result = redact_body(body, &HashSet::new(), "[REDACTED]").unwrap(); + assert_eq!(result, body); + } +} diff --git a/crates/rustapi-core/src/replay/store.rs b/crates/rustapi-core/src/replay/store.rs new file mode 100644 index 00000000..478888c0 --- /dev/null +++ b/crates/rustapi-core/src/replay/store.rs @@ -0,0 +1,231 @@ +//! Storage trait and query types for replay entries. +//! +//! Defines the [`ReplayStore`] trait for pluggable storage backends. + +use async_trait::async_trait; + +use super::entry::ReplayEntry; + +/// Errors from replay store operations. +#[derive(Debug, thiserror::Error)] +pub enum ReplayStoreError { + /// IO error (file, network, etc.). + #[error("IO error: {0}")] + Io(String), + + /// Serialization/deserialization error. + #[error("Serialization error: {0}")] + Serialization(String), + + /// Entry not found. + #[error("Entry not found: {0}")] + NotFound(String), + + /// Store is full. + #[error("Store full")] + StoreFull, + + /// Other error. + #[error("Store error: {0}")] + Other(String), +} + +/// Convenience result type for replay store operations. +pub type ReplayStoreResult = Result; + +/// Query parameters for filtering replay entries. +#[derive(Debug, Clone, Default)] +pub struct ReplayQuery { + /// Filter by HTTP method. + pub method: Option, + + /// Filter by path substring. + pub path_contains: Option, + + /// Filter by minimum status code. + pub status_min: Option, + + /// Filter by maximum status code. + pub status_max: Option, + + /// Filter entries recorded after this timestamp (Unix ms). + pub from_timestamp: Option, + + /// Filter entries recorded before this timestamp (Unix ms). + pub to_timestamp: Option, + + /// Filter by tag key-value pair. + pub tag: Option<(String, String)>, + + /// Maximum number of entries to return. + pub limit: Option, + + /// Number of entries to skip. + pub offset: Option, + + /// Return newest entries first. Default: true. + pub newest_first: bool, +} + +impl ReplayQuery { + /// Create a new empty query (matches all entries). + pub fn new() -> Self { + Self { + newest_first: true, + ..Default::default() + } + } + + /// Filter by HTTP method. + pub fn method(mut self, method: impl Into) -> Self { + self.method = Some(method.into()); + self + } + + /// Filter by path substring. + pub fn path_contains(mut self, path: impl Into) -> Self { + self.path_contains = Some(path.into()); + self + } + + /// Filter by minimum status code. + pub fn status_min(mut self, min: u16) -> Self { + self.status_min = Some(min); + self + } + + /// Filter by maximum status code. + pub fn status_max(mut self, max: u16) -> Self { + self.status_max = Some(max); + self + } + + /// Set the maximum number of entries to return. + pub fn limit(mut self, limit: usize) -> Self { + self.limit = Some(limit); + self + } + + /// Check if an entry matches this query. + pub fn matches(&self, entry: &ReplayEntry) -> bool { + if let Some(ref method) = self.method { + if entry.request.method != *method { + return false; + } + } + if let Some(ref path) = self.path_contains { + if !entry.request.path.contains(path.as_str()) { + return false; + } + } + if let Some(min) = self.status_min { + if entry.response.status < min { + return false; + } + } + if let Some(max) = self.status_max { + if entry.response.status > max { + return false; + } + } + if let Some(from) = self.from_timestamp { + if entry.recorded_at < from { + return false; + } + } + if let Some(to) = self.to_timestamp { + if entry.recorded_at > to { + return false; + } + } + if let Some((ref key, ref value)) = self.tag { + match entry.meta.tags.get(key) { + Some(v) if v == value => {} + _ => return false, + } + } + true + } +} + +/// Trait for storing and retrieving replay entries. +/// +/// Implement this trait to create custom storage backends +/// (e.g., database, Redis, cloud storage). +#[async_trait] +pub trait ReplayStore: Send + Sync + 'static { + /// Store a new replay entry. + async fn store(&self, entry: ReplayEntry) -> ReplayStoreResult<()>; + + /// Get a single replay entry by ID. + async fn get(&self, id: &str) -> ReplayStoreResult>; + + /// List replay entries matching the given query. + async fn list(&self, query: &ReplayQuery) -> ReplayStoreResult>; + + /// Delete a replay entry by ID. Returns true if deleted. + async fn delete(&self, id: &str) -> ReplayStoreResult; + + /// Get the total count of stored entries. + async fn count(&self) -> ReplayStoreResult; + + /// Clear all stored entries. + async fn clear(&self) -> ReplayStoreResult<()>; + + /// Delete entries recorded before the given timestamp (Unix ms). + /// Returns the number of deleted entries. + async fn delete_before(&self, timestamp_ms: u64) -> ReplayStoreResult; + + /// Clone this store into a boxed trait object. + fn clone_store(&self) -> Box; +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::replay::entry::{RecordedRequest, RecordedResponse}; + use crate::replay::meta::ReplayMeta; + + fn make_entry(method: &str, path: &str, status: u16) -> ReplayEntry { + ReplayEntry::new( + RecordedRequest::new(method, path, path), + RecordedResponse::new(status), + ReplayMeta::new(), + ) + } + + #[test] + fn test_query_matches_all() { + let query = ReplayQuery::new(); + let entry = make_entry("GET", "/users", 200); + assert!(query.matches(&entry)); + } + + #[test] + fn test_query_method_filter() { + let query = ReplayQuery::new().method("POST"); + assert!(!query.matches(&make_entry("GET", "/users", 200))); + assert!(query.matches(&make_entry("POST", "/users", 201))); + } + + #[test] + fn test_query_path_filter() { + let query = ReplayQuery::new().path_contains("/users"); + assert!(query.matches(&make_entry("GET", "/users/123", 200))); + assert!(!query.matches(&make_entry("GET", "/items", 200))); + } + + #[test] + fn test_query_status_filter() { + let query = ReplayQuery::new().status_min(400).status_max(499); + assert!(!query.matches(&make_entry("GET", "/a", 200))); + assert!(query.matches(&make_entry("GET", "/a", 404))); + assert!(!query.matches(&make_entry("GET", "/a", 500))); + } + + #[test] + fn test_query_limit() { + let query = ReplayQuery::new().limit(10); + assert_eq!(query.limit, Some(10)); + } +} diff --git a/crates/rustapi-core/src/replay/truncation.rs b/crates/rustapi-core/src/replay/truncation.rs new file mode 100644 index 00000000..2fa5af22 --- /dev/null +++ b/crates/rustapi-core/src/replay/truncation.rs @@ -0,0 +1,211 @@ +//! Body truncation and content sniffing utilities. +//! +//! Pure functions for truncating large bodies and detecting content types. +//! No IO operations. + +/// The kind of content detected by [`content_sniff`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ContentKind { + /// JSON content. + Json, + /// XML content. + Xml, + /// HTML content. + Html, + /// Plain text content. + PlainText, + /// Binary content. + Binary, + /// Unknown content type. + Unknown, +} + +/// Configuration for body truncation. +#[derive(Debug, Clone)] +pub struct TruncationConfig { + /// Maximum body size in bytes. + pub max_size: usize, + /// Suffix appended to truncated bodies. + pub suffix: String, +} + +impl Default for TruncationConfig { + fn default() -> Self { + Self { + max_size: 262_144, // 256KB + suffix: "... [truncated]".to_string(), + } + } +} + +/// Truncate a body string to the given maximum byte size. +/// +/// Returns a tuple of `(body, was_truncated)`. If the body is within the +/// limit, it is returned unchanged with `false`. Otherwise, it is truncated +/// at a valid UTF-8 boundary and `true` is returned. +pub fn truncate_body(body: &str, max_size: usize) -> (String, bool) { + if body.len() <= max_size { + return (body.to_string(), false); + } + + // Find valid UTF-8 boundary + let mut end = max_size; + while end > 0 && !body.is_char_boundary(end) { + end -= 1; + } + + let mut truncated = body[..end].to_string(); + truncated.push_str("... [truncated]"); + (truncated, true) +} + +/// Sniff content type from raw bytes (no IO). +/// +/// Inspects the first few bytes to guess the content kind. +pub fn content_sniff(bytes: &[u8]) -> ContentKind { + if bytes.is_empty() { + return ContentKind::Unknown; + } + + // Skip leading whitespace + let trimmed = bytes + .iter() + .position(|&b| !b.is_ascii_whitespace()) + .map(|i| &bytes[i..]) + .unwrap_or(bytes); + + if trimmed.is_empty() { + return ContentKind::PlainText; + } + + match trimmed[0] { + b'{' | b'[' => ContentKind::Json, + b'<' => { + // Check for HTML vs XML + let prefix = std::str::from_utf8(&trimmed[..trimmed.len().min(100)]) + .unwrap_or("") + .to_lowercase(); + if prefix.contains(" { + // Check if it looks like text + let is_text = trimmed.iter().take(512).all(|&b| b.is_ascii() || b > 0x7F); + if is_text { + ContentKind::PlainText + } else { + ContentKind::Binary + } + } + } +} + +/// Attempt to parse a body string as JSON. +/// +/// Returns `Some(Value)` on success, `None` if not valid JSON. +pub fn try_parse_json(body: &str) -> Option { + serde_json::from_str(body).ok() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_truncate_body_within_limit() { + let body = "hello world"; + let (result, truncated) = truncate_body(body, 100); + assert_eq!(result, "hello world"); + assert!(!truncated); + } + + #[test] + fn test_truncate_body_exceeds_limit() { + let body = "hello world, this is a long string"; + let (result, truncated) = truncate_body(body, 11); + assert!(result.starts_with("hello world")); + assert!(result.ends_with("... [truncated]")); + assert!(truncated); + } + + #[test] + fn test_truncate_body_exact_limit() { + let body = "hello"; + let (result, truncated) = truncate_body(body, 5); + assert_eq!(result, "hello"); + assert!(!truncated); + } + + #[test] + fn test_truncate_body_utf8_boundary() { + let body = "hëllo wörld"; // multi-byte characters + let (_, truncated) = truncate_body(body, 3); + assert!(truncated); + // Should not panic or produce invalid UTF-8 + } + + #[test] + fn test_content_sniff_json_object() { + assert_eq!(content_sniff(b"{\"key\":\"value\"}"), ContentKind::Json); + } + + #[test] + fn test_content_sniff_json_array() { + assert_eq!(content_sniff(b"[1, 2, 3]"), ContentKind::Json); + } + + #[test] + fn test_content_sniff_json_with_whitespace() { + assert_eq!( + content_sniff(b" \n {\"key\":\"value\"}"), + ContentKind::Json + ); + } + + #[test] + fn test_content_sniff_xml() { + assert_eq!( + content_sniff(b""), + ContentKind::Xml + ); + } + + #[test] + fn test_content_sniff_html() { + assert_eq!(content_sniff(b""), ContentKind::Html); + } + + #[test] + fn test_content_sniff_plain_text() { + assert_eq!( + content_sniff(b"Hello, this is plain text"), + ContentKind::PlainText + ); + } + + #[test] + fn test_content_sniff_empty() { + assert_eq!(content_sniff(b""), ContentKind::Unknown); + } + + #[test] + fn test_try_parse_json_valid() { + let result = try_parse_json(r#"{"key":"value"}"#); + assert!(result.is_some()); + assert_eq!(result.unwrap()["key"], "value"); + } + + #[test] + fn test_try_parse_json_invalid() { + assert!(try_parse_json("not json").is_none()); + } + + #[test] + fn test_try_parse_json_array() { + let result = try_parse_json("[1, 2, 3]"); + assert!(result.is_some()); + } +} diff --git a/crates/rustapi-extras/Cargo.toml b/crates/rustapi-extras/Cargo.toml index 13745645..c1e431a7 100644 --- a/crates/rustapi-extras/Cargo.toml +++ b/crates/rustapi-extras/Cargo.toml @@ -72,6 +72,9 @@ base64 = { version = "0.22", optional = true } # OAuth2 (feature-gated) sha2 = { version = "0.10", optional = true } +# Replay (feature-gated) +uuid = { workspace = true, optional = true } + [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } proptest = "1.4" @@ -121,6 +124,9 @@ csrf = ["dep:cookie", "dep:rand", "dep:base64"] oauth2-client = ["dep:sha2", "dep:rand", "dep:base64", "dep:reqwest", "dep:urlencoding"] audit = ["dep:rand"] +# Replay (time-travel debugging) +replay = ["dep:reqwest", "dep:dashmap", "dep:uuid", "rustapi-core/replay"] + # Meta feature that enables all security features extras = ["jwt", "cors", "rate-limit"] @@ -128,4 +134,4 @@ extras = ["jwt", "cors", "rate-limit"] observability = ["otel", "structured-logging"] # Full feature set (retry temporarily disabled) -full = ["extras", "config", "cookies", "sqlx", "insight", "webhook", "timeout", "guard", "logging", "circuit-breaker", "security-headers", "api-key", "cache", "dedup", "sanitization", "retry", "otel", "structured-logging", "csrf", "oauth2-client", "audit"] +full = ["extras", "config", "cookies", "sqlx", "insight", "webhook", "timeout", "guard", "logging", "circuit-breaker", "security-headers", "api-key", "cache", "dedup", "sanitization", "retry", "otel", "structured-logging", "csrf", "oauth2-client", "audit", "replay"] diff --git a/crates/rustapi-extras/src/lib.rs b/crates/rustapi-extras/src/lib.rs index 2b2d2c81..3039a12e 100644 --- a/crates/rustapi-extras/src/lib.rs +++ b/crates/rustapi-extras/src/lib.rs @@ -214,3 +214,13 @@ pub use audit::{ AuditAction, AuditEvent, AuditQuery, AuditQueryBuilder, AuditSeverity, AuditStore, ComplianceInfo, FileAuditStore, InMemoryAuditStore, }; + +// Replay middleware (time-travel debugging) +#[cfg(feature = "replay")] +pub mod replay; + +#[cfg(feature = "replay")] +pub use replay::{ + FsReplayStore, FsReplayStoreConfig, InMemoryReplayStore, ReplayAdminAuth, ReplayClient, + ReplayLayer, RetentionJob, +}; diff --git a/crates/rustapi-extras/src/replay/auth.rs b/crates/rustapi-extras/src/replay/auth.rs new file mode 100644 index 00000000..19ae5c99 --- /dev/null +++ b/crates/rustapi-extras/src/replay/auth.rs @@ -0,0 +1,73 @@ +//! Admin authentication for replay endpoints. + +use bytes::Bytes; +use http_body_util::Full; +use rustapi_core::Response; +use rustapi_core::ResponseBody; +use serde_json::json; + +/// Admin authentication check for replay endpoints. +pub struct ReplayAdminAuth; + +impl ReplayAdminAuth { + /// Check the admin bearer token from the Authorization header. + /// + /// Returns `Ok(())` if the token is valid, or an `Err(Response)` with + /// a 401 JSON error if the token is missing or invalid. + #[allow(clippy::result_large_err)] + pub fn check(headers: &http::HeaderMap, expected_token: &str) -> Result<(), Response> { + let auth = headers.get("authorization").and_then(|v| v.to_str().ok()); + + let expected = format!("Bearer {}", expected_token); + + match auth { + Some(value) if value == expected => Ok(()), + _ => { + let body = json!({ + "error": "unauthorized", + "message": "Missing or invalid admin token. Use Authorization: Bearer " + }); + let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); + let response = http::Response::builder() + .status(http::StatusCode::UNAUTHORIZED) + .header("content-type", "application/json") + .body(ResponseBody::Full(Full::new(Bytes::from(body_bytes)))) + .unwrap(); + Err(response) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http::HeaderMap; + + #[test] + fn test_valid_token() { + let mut headers = HeaderMap::new(); + headers.insert("authorization", "Bearer my-token".parse().unwrap()); + assert!(ReplayAdminAuth::check(&headers, "my-token").is_ok()); + } + + #[test] + fn test_missing_token() { + let headers = HeaderMap::new(); + assert!(ReplayAdminAuth::check(&headers, "my-token").is_err()); + } + + #[test] + fn test_wrong_token() { + let mut headers = HeaderMap::new(); + headers.insert("authorization", "Bearer wrong-token".parse().unwrap()); + assert!(ReplayAdminAuth::check(&headers, "my-token").is_err()); + } + + #[test] + fn test_no_bearer_prefix() { + let mut headers = HeaderMap::new(); + headers.insert("authorization", "my-token".parse().unwrap()); + assert!(ReplayAdminAuth::check(&headers, "my-token").is_err()); + } +} diff --git a/crates/rustapi-extras/src/replay/client.rs b/crates/rustapi-extras/src/replay/client.rs new file mode 100644 index 00000000..0284d5dd --- /dev/null +++ b/crates/rustapi-extras/src/replay/client.rs @@ -0,0 +1,95 @@ +//! HTTP client for replaying recorded requests against a target server. + +use rustapi_core::replay::{RecordedResponse, ReplayEntry}; +use std::collections::HashMap; + +/// Error from replay HTTP client operations. +#[derive(Debug, thiserror::Error)] +pub enum ReplayClientError { + /// HTTP request error. + #[error("HTTP error: {0}")] + Http(#[from] reqwest::Error), + + /// Invalid URL. + #[error("Invalid URL: {0}")] + InvalidUrl(String), +} + +/// HTTP client for replaying recorded requests against a target server. +/// +/// Takes a [`ReplayEntry`] and sends the recorded request to a target URL, +/// capturing the response as a [`RecordedResponse`]. +pub struct ReplayClient { + http: reqwest::Client, +} + +impl ReplayClient { + /// Create a new replay client. + pub fn new() -> Self { + Self { + http: reqwest::Client::new(), + } + } + + /// Replay the recorded request against the given target base URL. + /// + /// The recorded request path is appended to `target_base_url`. + /// Returns the target server's response as a [`RecordedResponse`]. + pub async fn replay( + &self, + entry: &ReplayEntry, + target_base_url: &str, + ) -> Result { + let base = target_base_url.trim_end_matches('/'); + let path = &entry.request.uri; + let url = format!("{}{}", base, path); + + let method: reqwest::Method = entry.request.method.parse().map_err(|_| { + ReplayClientError::InvalidUrl(format!("Invalid method: {}", entry.request.method)) + })?; + + let mut builder = self.http.request(method, &url); + + // Add recorded headers (skip host, content-length as reqwest manages these) + for (key, value) in &entry.request.headers { + let key_lower = key.to_lowercase(); + if key_lower == "host" || key_lower == "content-length" { + continue; + } + builder = builder.header(key, value); + } + + // Add recorded body + if let Some(ref body) = entry.request.body { + builder = builder.body(body.clone()); + } + + let response = builder.send().await?; + + let status = response.status().as_u16(); + let mut headers = HashMap::new(); + for (key, value) in response.headers() { + if let Ok(v) = value.to_str() { + headers.insert(key.as_str().to_string(), v.to_string()); + } + } + + let body_bytes = response.bytes().await?; + let body_size = body_bytes.len(); + let body = String::from_utf8(body_bytes.to_vec()).ok(); + + Ok(RecordedResponse { + status, + headers, + body, + body_size, + body_truncated: false, + }) + } +} + +impl Default for ReplayClient { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/rustapi-extras/src/replay/fs_store.rs b/crates/rustapi-extras/src/replay/fs_store.rs new file mode 100644 index 00000000..ef2884d9 --- /dev/null +++ b/crates/rustapi-extras/src/replay/fs_store.rs @@ -0,0 +1,337 @@ +//! Filesystem-based replay store using JSON Lines format. +//! +//! Follows the [`FileAuditStore`](crate::audit::FileAuditStore) pattern. + +use async_trait::async_trait; +use rustapi_core::replay::{ + ReplayEntry, ReplayQuery, ReplayStore, ReplayStoreError, ReplayStoreResult, +}; +use std::fs::{self, File, OpenOptions}; +use std::io::{BufRead, BufReader, Write}; +use std::path::PathBuf; +use std::sync::Mutex; + +/// Configuration for the filesystem replay store. +#[derive(Debug, Clone)] +pub struct FsReplayStoreConfig { + /// Directory to store replay files. + pub directory: PathBuf, + /// Maximum file size before rotation (bytes). None = no rotation. + pub max_file_size: Option, + /// Create directory if it doesn't exist. + pub create_if_missing: bool, +} + +impl FsReplayStoreConfig { + /// Create a new config with the given directory. + pub fn new(dir: impl Into) -> Self { + Self { + directory: dir.into(), + max_file_size: None, + create_if_missing: true, + } + } + + /// Set the maximum file size before rotation. + pub fn max_file_size(mut self, bytes: u64) -> Self { + self.max_file_size = Some(bytes); + self + } +} + +/// Filesystem-based replay store. +/// +/// Stores entries in JSON Lines format (one entry per line). +/// Supports file rotation by size. +pub struct FsReplayStore { + config: FsReplayStoreConfig, + writer: Mutex>, +} + +impl FsReplayStore { + /// Create a new filesystem store. + pub fn new(config: FsReplayStoreConfig) -> ReplayStoreResult { + if config.create_if_missing { + fs::create_dir_all(&config.directory).map_err(|e| { + ReplayStoreError::Io(format!( + "Failed to create directory {:?}: {}", + config.directory, e + )) + })?; + } + Ok(Self { + config, + writer: Mutex::new(None), + }) + } + + /// Open a store at the given directory with defaults. + pub fn open(dir: impl Into) -> ReplayStoreResult { + Self::new(FsReplayStoreConfig::new(dir)) + } + + fn data_file(&self) -> PathBuf { + self.config.directory.join("replays.jsonl") + } + + fn ensure_writer(&self) -> ReplayStoreResult<()> { + let mut writer = self + .writer + .lock() + .map_err(|e| ReplayStoreError::Other(format!("Lock poisoned: {}", e)))?; + + if writer.is_none() { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(self.data_file()) + .map_err(|e| ReplayStoreError::Io(e.to_string()))?; + *writer = Some(file); + } + + Ok(()) + } + + fn read_all_entries(&self) -> ReplayStoreResult> { + let path = self.data_file(); + if !path.exists() { + return Ok(Vec::new()); + } + + let file = File::open(&path).map_err(|e| ReplayStoreError::Io(e.to_string()))?; + let reader = BufReader::new(file); + let mut entries = Vec::new(); + + for line in reader.lines() { + let line = line.map_err(|e| ReplayStoreError::Io(e.to_string()))?; + if line.trim().is_empty() { + continue; + } + match serde_json::from_str::(&line) { + Ok(entry) => entries.push(entry), + Err(_) => continue, // skip malformed lines + } + } + + Ok(entries) + } + + fn write_all_entries(&self, entries: &[ReplayEntry]) -> ReplayStoreResult<()> { + let path = self.data_file(); + let mut file = File::create(&path).map_err(|e| ReplayStoreError::Io(e.to_string()))?; + + for entry in entries { + let line = serde_json::to_string(entry) + .map_err(|e| ReplayStoreError::Serialization(e.to_string()))?; + writeln!(file, "{}", line).map_err(|e| ReplayStoreError::Io(e.to_string()))?; + } + + // Reset writer since we overwrote the file + if let Ok(mut writer) = self.writer.lock() { + *writer = None; + } + + Ok(()) + } + + fn check_rotation(&self) -> ReplayStoreResult<()> { + if let Some(max_size) = self.config.max_file_size { + let path = self.data_file(); + if path.exists() { + let metadata = + fs::metadata(&path).map_err(|e| ReplayStoreError::Io(e.to_string()))?; + if metadata.len() >= max_size { + // Rotate: rename current file + let rotated = self.config.directory.join(format!( + "replays.{}.jsonl", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + )); + fs::rename(&path, &rotated).map_err(|e| ReplayStoreError::Io(e.to_string()))?; + + // Reset writer + if let Ok(mut writer) = self.writer.lock() { + *writer = None; + } + } + } + } + Ok(()) + } +} + +#[async_trait] +impl ReplayStore for FsReplayStore { + async fn store(&self, entry: ReplayEntry) -> ReplayStoreResult<()> { + self.check_rotation()?; + self.ensure_writer()?; + + let line = serde_json::to_string(&entry) + .map_err(|e| ReplayStoreError::Serialization(e.to_string()))?; + + let mut writer = self + .writer + .lock() + .map_err(|e| ReplayStoreError::Other(format!("Lock poisoned: {}", e)))?; + + if let Some(ref mut file) = *writer { + writeln!(file, "{}", line).map_err(|e| ReplayStoreError::Io(e.to_string()))?; + file.flush() + .map_err(|e| ReplayStoreError::Io(e.to_string()))?; + } + + Ok(()) + } + + async fn get(&self, id: &str) -> ReplayStoreResult> { + let entries = self.read_all_entries()?; + Ok(entries.into_iter().find(|e| e.id == id)) + } + + async fn list(&self, query: &ReplayQuery) -> ReplayStoreResult> { + let entries = self.read_all_entries()?; + let mut filtered: Vec = + entries.into_iter().filter(|e| query.matches(e)).collect(); + + if query.newest_first { + filtered.sort_by(|a, b| b.recorded_at.cmp(&a.recorded_at)); + } else { + filtered.sort_by(|a, b| a.recorded_at.cmp(&b.recorded_at)); + } + + let offset = query.offset.unwrap_or(0); + let limit = query.limit.unwrap_or(usize::MAX); + + Ok(filtered.into_iter().skip(offset).take(limit).collect()) + } + + async fn delete(&self, id: &str) -> ReplayStoreResult { + let entries = self.read_all_entries()?; + let before = entries.len(); + let filtered: Vec = entries.into_iter().filter(|e| e.id != id).collect(); + let deleted = filtered.len() < before; + + if deleted { + self.write_all_entries(&filtered)?; + } + + Ok(deleted) + } + + async fn count(&self) -> ReplayStoreResult { + Ok(self.read_all_entries()?.len()) + } + + async fn clear(&self) -> ReplayStoreResult<()> { + self.write_all_entries(&[])?; + Ok(()) + } + + async fn delete_before(&self, timestamp_ms: u64) -> ReplayStoreResult { + let entries = self.read_all_entries()?; + let before = entries.len(); + let filtered: Vec = entries + .into_iter() + .filter(|e| e.recorded_at >= timestamp_ms) + .collect(); + let deleted = before - filtered.len(); + + if deleted > 0 { + self.write_all_entries(&filtered)?; + } + + Ok(deleted) + } + + fn clone_store(&self) -> Box { + Box::new(Self { + config: self.config.clone(), + writer: Mutex::new(None), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rustapi_core::replay::{RecordedRequest, RecordedResponse, ReplayMeta}; + use tempfile::TempDir; + + fn make_entry(method: &str, path: &str, status: u16) -> ReplayEntry { + ReplayEntry::new( + RecordedRequest::new(method, path, path), + RecordedResponse::new(status), + ReplayMeta::new(), + ) + } + + #[tokio::test] + async fn test_fs_store_basic() { + let tmp = TempDir::new().unwrap(); + let store = FsReplayStore::open(tmp.path()).unwrap(); + + let entry = make_entry("GET", "/users", 200); + let id = entry.id.clone(); + + store.store(entry).await.unwrap(); + + let retrieved = store.get(&id).await.unwrap(); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().request.method, "GET"); + } + + #[tokio::test] + async fn test_fs_store_list() { + let tmp = TempDir::new().unwrap(); + let store = FsReplayStore::open(tmp.path()).unwrap(); + + store.store(make_entry("GET", "/a", 200)).await.unwrap(); + store.store(make_entry("POST", "/b", 201)).await.unwrap(); + + let all = store.list(&ReplayQuery::new()).await.unwrap(); + assert_eq!(all.len(), 2); + } + + #[tokio::test] + async fn test_fs_store_delete() { + let tmp = TempDir::new().unwrap(); + let store = FsReplayStore::open(tmp.path()).unwrap(); + + let entry = make_entry("GET", "/users", 200); + let id = entry.id.clone(); + store.store(entry).await.unwrap(); + + assert!(store.delete(&id).await.unwrap()); + assert_eq!(store.count().await.unwrap(), 0); + } + + #[tokio::test] + async fn test_fs_store_clear() { + let tmp = TempDir::new().unwrap(); + let store = FsReplayStore::open(tmp.path()).unwrap(); + + store.store(make_entry("GET", "/a", 200)).await.unwrap(); + store.clear().await.unwrap(); + assert_eq!(store.count().await.unwrap(), 0); + } + + #[tokio::test] + async fn test_fs_store_delete_before() { + let tmp = TempDir::new().unwrap(); + let store = FsReplayStore::open(tmp.path()).unwrap(); + + let mut e1 = make_entry("GET", "/a", 200); + e1.recorded_at = 1000; + let mut e2 = make_entry("GET", "/b", 200); + e2.recorded_at = 3000; + + store.store(e1).await.unwrap(); + store.store(e2).await.unwrap(); + + let deleted = store.delete_before(2000).await.unwrap(); + assert_eq!(deleted, 1); + assert_eq!(store.count().await.unwrap(), 1); + } +} diff --git a/crates/rustapi-extras/src/replay/layer.rs b/crates/rustapi-extras/src/replay/layer.rs new file mode 100644 index 00000000..e7dfc98a --- /dev/null +++ b/crates/rustapi-extras/src/replay/layer.rs @@ -0,0 +1,389 @@ +//! ReplayLayer middleware for time-travel debugging. +//! +//! Records HTTP request/response pairs for later replay and diff analysis. +//! Follows the InsightLayer pattern for body capture and response buffering. + +use super::memory_store::InMemoryReplayStore; +use super::retention::RetentionJob; +use super::routes; +use bytes::Bytes; +use http_body_util::{BodyExt, Full}; +use rustapi_core::middleware::{BoxedNext, MiddlewareLayer}; +use rustapi_core::replay::{ + redact_body, redact_headers, truncate_body, RecordedRequest, RecordedResponse, ReplayConfig, + ReplayEntry, ReplayMeta, ReplayStore, +}; +use rustapi_core::{Request, Response, ResponseBody}; +use std::collections::HashMap; +use std::future::Future; +use std::net::IpAddr; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +/// Replay recording middleware layer. +/// +/// Captures HTTP request/response pairs and stores them for later replay +/// and diff analysis via the `/__rustapi/replays` admin API. +/// +/// # Security +/// +/// - Recording disabled by default (`enabled: false`) +/// - Admin token required for all replay endpoints +/// - Sensitive headers redacted automatically +/// - Configurable body field redaction +/// - TTL-based automatic cleanup +/// +/// # Example +/// +/// ```ignore +/// use rustapi_extras::replay::{ReplayLayer, InMemoryReplayStore}; +/// use rustapi_core::replay::ReplayConfig; +/// +/// let layer = ReplayLayer::new( +/// ReplayConfig::new() +/// .enabled(true) +/// .admin_token("my-secret-token") +/// .ttl_secs(3600) +/// ); +/// +/// let app = RustApi::new() +/// .layer(layer) +/// .route("/api/users", get(handler)); +/// ``` +#[derive(Clone)] +pub struct ReplayLayer { + config: Arc, + store: Arc, + retention_started: Arc, +} + +impl ReplayLayer { + /// Create a new ReplayLayer with the given configuration. + /// + /// Uses an in-memory store with capacity from the config. + pub fn new(config: ReplayConfig) -> Self { + let store = InMemoryReplayStore::new(config.store_capacity); + Self { + config: Arc::new(config), + store: Arc::new(store), + retention_started: Arc::new(AtomicBool::new(false)), + } + } + + /// Use a custom store implementation. + pub fn with_store(mut self, store: S) -> Self { + self.store = Arc::new(store); + self + } + + /// Get a reference to the replay store. + pub fn store(&self) -> &Arc { + &self.store + } + + /// Get a reference to the configuration. + pub fn config(&self) -> &ReplayConfig { + &self.config + } + + /// Extract client IP from request headers. + fn extract_client_ip(req: &Request) -> String { + if let Some(forwarded) = req.headers().get("x-forwarded-for") { + if let Ok(forwarded_str) = forwarded.to_str() { + if let Some(first_ip) = forwarded_str.split(',').next() { + let ip_str = first_ip.trim(); + if ip_str.parse::().is_ok() { + return ip_str.to_string(); + } + } + } + } + + if let Some(real_ip) = req.headers().get("x-real-ip") { + if let Ok(ip_str) = real_ip.to_str() { + let ip_str = ip_str.trim(); + if ip_str.parse::().is_ok() { + return ip_str.to_string(); + } + } + } + + "127.0.0.1".to_string() + } + + /// Extract request ID from headers. + fn extract_request_id(req: &Request) -> Option { + for header_name in &["x-request-id", "x-correlation-id", "x-trace-id"] { + if let Some(value) = req.headers().get(*header_name) { + if let Ok(id) = value.to_str() { + return Some(id.to_string()); + } + } + } + None + } + + /// Capture all request headers into a HashMap. + fn capture_headers(headers: &http::HeaderMap) -> HashMap { + let mut captured = HashMap::new(); + for (name, value) in headers.iter() { + if let Ok(value_str) = value.to_str() { + captured.insert(name.as_str().to_string(), value_str.to_string()); + } + } + captured + } + + /// Check if body should be captured based on content type. + fn should_capture_body(headers: &http::HeaderMap, config: &ReplayConfig) -> bool { + if let Some(content_type) = headers.get(http::header::CONTENT_TYPE) { + if let Ok(ct) = content_type.to_str() { + return config.is_capturable_content_type(ct); + } + } + false + } + + /// Ensure the retention background job is started once. + fn ensure_retention_started(&self) { + if !self.retention_started.swap(true, Ordering::SeqCst) { + let store = self.store.clone(); + let ttl_secs = self.config.ttl_secs; + let interval = Duration::from_secs(ttl_secs.max(60) / 2); + RetentionJob::spawn(store, ttl_secs, interval); + } + } +} + +impl MiddlewareLayer for ReplayLayer { + fn call( + &self, + mut req: Request, + next: BoxedNext, + ) -> Pin + Send + 'static>> { + let config = self.config.clone(); + let store = self.store.clone(); + + // Start retention job on first request + self.ensure_retention_started(); + + Box::pin(async move { + let path = req.uri().path().to_string(); + let method = req.method().to_string(); + + // Handle admin routes: /__rustapi/replays/* + if path.starts_with(&config.admin_route_prefix) { + let suffix = &path[config.admin_route_prefix.len()..]; + if let Some(response) = routes::dispatch( + req.headers(), + &method, + req.uri(), + store.as_ref(), + &config, + suffix, + ) + .await + { + return response; + } + } + + // If recording is disabled, pass through + if !config.enabled { + return next(req).await; + } + + // Check path filter + if !config.should_record_path(&path) { + return next(req).await; + } + + // Check sampling + if !config.should_sample() { + return next(req).await; + } + + // Start timing + let start = Instant::now(); + + // Extract request info + let uri_string = req.uri().to_string(); + let query = req.uri().query().map(|q| q.to_string()); + let client_ip = ReplayLayer::extract_client_ip(&req); + let request_id = ReplayLayer::extract_request_id(&req); + + // Capture and redact request headers + let raw_headers = ReplayLayer::capture_headers(req.headers()); + let req_headers = redact_headers(&raw_headers, &config.redact_headers); + + // Capture request body if eligible + let capture_req_body = ReplayLayer::should_capture_body(req.headers(), &config); + + let (req_body_size, req_body_str, req_body_truncated) = if capture_req_body { + if let Some(body_bytes) = req.take_body() { + let size = body_bytes.len(); + if size <= config.max_request_body { + let body_str = String::from_utf8(body_bytes.to_vec()).ok(); + // Apply body field redaction + let redacted = body_str.and_then(|s| { + if config.redact_body_fields.is_empty() { + Some(s) + } else { + redact_body(&s, &config.redact_body_fields, "[REDACTED]") + } + }); + (size, redacted, false) + } else { + // Body too large - truncate + let body_str = String::from_utf8(body_bytes.to_vec()).ok(); + let truncated = body_str.map(|s| { + let (t, _) = truncate_body(&s, config.max_request_body); + t + }); + (size, truncated, true) + } + } else { + (0, None, false) + } + } else { + let size = req + .headers() + .get(http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + (size, None, false) + }; + + // Call the next handler + let response = next(req).await; + + // Calculate duration + let duration = start.elapsed(); + let status = response.status().as_u16(); + + // Capture and redact response headers + let raw_resp_headers = ReplayLayer::capture_headers(response.headers()); + let resp_headers = redact_headers(&raw_resp_headers, &config.redact_headers); + + let capture_resp_body = ReplayLayer::should_capture_body(response.headers(), &config); + + // Buffer response body (must consume and reconstruct) + let (resp_parts, resp_body) = response.into_parts(); + let resp_body_bytes = match resp_body.collect().await { + Ok(collected) => collected.to_bytes(), + Err(_) => Bytes::new(), + }; + + let resp_body_size = resp_body_bytes.len(); + let (resp_body_str, resp_body_truncated) = if capture_resp_body && resp_body_size > 0 { + if resp_body_size <= config.max_response_body { + let body_str = String::from_utf8(resp_body_bytes.to_vec()).ok(); + let redacted = body_str.and_then(|s| { + if config.redact_body_fields.is_empty() { + Some(s) + } else { + redact_body(&s, &config.redact_body_fields, "[REDACTED]") + } + }); + (redacted, false) + } else { + let body_str = String::from_utf8(resp_body_bytes.to_vec()).ok(); + let truncated = body_str.map(|s| { + let (t, _) = truncate_body(&s, config.max_response_body); + t + }); + (truncated, true) + } + } else { + (None, false) + }; + + // Build RecordedRequest + let recorded_request = RecordedRequest { + method: method.clone(), + uri: uri_string, + path: path.clone(), + query, + headers: req_headers, + body: req_body_str, + body_size: req_body_size, + body_truncated: req_body_truncated, + }; + + // Build RecordedResponse + let recorded_response = RecordedResponse { + status, + headers: resp_headers, + body: resp_body_str, + body_size: resp_body_size, + body_truncated: resp_body_truncated, + }; + + // Build ReplayMeta + let mut meta = ReplayMeta::new() + .with_duration_ms(duration.as_millis() as u64) + .with_client_ip(client_ip) + .with_ttl_secs(config.ttl_secs); + + if let Some(req_id) = request_id { + meta = meta.with_request_id(req_id); + } + + // Create and store the entry + let entry = ReplayEntry::new(recorded_request, recorded_response, meta); + + // Store asynchronously (fire and forget, don't block the response) + let store_clone = store.clone(); + tokio::spawn(async move { + if let Err(e) = store_clone.store(entry).await { + tracing::warn!(error = %e, "Failed to store replay entry"); + } + }); + + // Reconstruct response with the buffered body + http::Response::from_parts(resp_parts, ResponseBody::Full(Full::new(resp_body_bytes))) + }) + } + + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_layer() { + let layer = ReplayLayer::new(ReplayConfig::new()); + assert!(!layer.config().enabled); + assert_eq!(layer.config().store_capacity, 500); + } + + #[test] + fn test_custom_config() { + let config = ReplayConfig::new() + .enabled(true) + .admin_token("test-token") + .store_capacity(100) + .ttl_secs(7200); + + let layer = ReplayLayer::new(config); + assert!(layer.config().enabled); + assert_eq!(layer.config().store_capacity, 100); + assert_eq!(layer.config().ttl_secs, 7200); + } + + #[test] + fn test_with_custom_store() { + let config = ReplayConfig::new().enabled(true); + let store = InMemoryReplayStore::new(42); + + let layer = ReplayLayer::new(config).with_store(store); + assert!(layer.config().enabled); + } +} diff --git a/crates/rustapi-extras/src/replay/memory_store.rs b/crates/rustapi-extras/src/replay/memory_store.rs new file mode 100644 index 00000000..e5c5b0d3 --- /dev/null +++ b/crates/rustapi-extras/src/replay/memory_store.rs @@ -0,0 +1,239 @@ +//! In-memory replay store using a ring buffer. +//! +//! Thread-safe, bounded storage with FIFO eviction. + +use async_trait::async_trait; +use dashmap::DashMap; +use rustapi_core::replay::{ReplayEntry, ReplayQuery, ReplayStore, ReplayStoreResult}; +use std::collections::VecDeque; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// In-memory replay store with a bounded ring buffer. +/// +/// When capacity is reached, the oldest entries are evicted. +/// Thread-safe via `RwLock` and `DashMap`. +/// +/// # Example +/// +/// ```ignore +/// use rustapi_extras::replay::InMemoryReplayStore; +/// +/// let store = InMemoryReplayStore::new(500); +/// ``` +#[derive(Clone)] +pub struct InMemoryReplayStore { + buffer: Arc>>>, + index: Arc>>, + capacity: usize, +} + +impl InMemoryReplayStore { + /// Create a new in-memory store with the given capacity. + pub fn new(capacity: usize) -> Self { + Self { + buffer: Arc::new(RwLock::new(VecDeque::with_capacity(capacity))), + index: Arc::new(DashMap::new()), + capacity, + } + } +} + +impl Default for InMemoryReplayStore { + fn default() -> Self { + Self::new(500) + } +} + +#[async_trait] +impl ReplayStore for InMemoryReplayStore { + async fn store(&self, entry: ReplayEntry) -> ReplayStoreResult<()> { + let entry_arc = Arc::new(entry); + let id = entry_arc.id.clone(); + + let mut buffer = self.buffer.write().await; + + // Evict oldest if at capacity + if buffer.len() >= self.capacity { + if let Some(old) = buffer.pop_front() { + self.index.remove(&old.id); + } + } + + buffer.push_back(entry_arc.clone()); + self.index.insert(id, entry_arc); + Ok(()) + } + + async fn get(&self, id: &str) -> ReplayStoreResult> { + Ok(self.index.get(id).map(|r| r.as_ref().clone())) + } + + async fn list(&self, query: &ReplayQuery) -> ReplayStoreResult> { + let buffer = self.buffer.read().await; + + let iter: Box> + '_> = if query.newest_first { + Box::new(buffer.iter().rev()) + } else { + Box::new(buffer.iter()) + }; + + let mut results: Vec = iter + .filter(|e| query.matches(e)) + .skip(query.offset.unwrap_or(0)) + .take(query.limit.unwrap_or(usize::MAX)) + .map(|e| e.as_ref().clone()) + .collect(); + + // Ensure consistent ordering + if !query.newest_first { + results.reverse(); + results.reverse(); // already correct order + } + + Ok(results) + } + + async fn delete(&self, id: &str) -> ReplayStoreResult { + let removed = self.index.remove(id).is_some(); + if removed { + let mut buffer = self.buffer.write().await; + buffer.retain(|e| e.id != id); + } + Ok(removed) + } + + async fn count(&self) -> ReplayStoreResult { + Ok(self.buffer.read().await.len()) + } + + async fn clear(&self) -> ReplayStoreResult<()> { + let mut buffer = self.buffer.write().await; + buffer.clear(); + self.index.clear(); + Ok(()) + } + + async fn delete_before(&self, timestamp_ms: u64) -> ReplayStoreResult { + let mut buffer = self.buffer.write().await; + let before_len = buffer.len(); + + // Collect IDs to remove + let to_remove: Vec = buffer + .iter() + .filter(|e| e.recorded_at < timestamp_ms) + .map(|e| e.id.clone()) + .collect(); + + for id in &to_remove { + self.index.remove(id); + } + + buffer.retain(|e| e.recorded_at >= timestamp_ms); + Ok(before_len - buffer.len()) + } + + fn clone_store(&self) -> Box { + Box::new(self.clone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rustapi_core::replay::{RecordedRequest, RecordedResponse, ReplayMeta}; + + fn make_entry(method: &str, path: &str, status: u16) -> ReplayEntry { + ReplayEntry::new( + RecordedRequest::new(method, path, path), + RecordedResponse::new(status), + ReplayMeta::new(), + ) + } + + #[tokio::test] + async fn test_store_and_get() { + let store = InMemoryReplayStore::new(10); + let entry = make_entry("GET", "/users", 200); + let id = entry.id.clone(); + + store.store(entry).await.unwrap(); + + let retrieved = store.get(&id).await.unwrap(); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().id, id); + } + + #[tokio::test] + async fn test_list() { + let store = InMemoryReplayStore::new(10); + store.store(make_entry("GET", "/users", 200)).await.unwrap(); + store + .store(make_entry("POST", "/users", 201)) + .await + .unwrap(); + store.store(make_entry("GET", "/items", 404)).await.unwrap(); + + let all = store.list(&ReplayQuery::new()).await.unwrap(); + assert_eq!(all.len(), 3); + + let filtered = store.list(&ReplayQuery::new().method("GET")).await.unwrap(); + assert_eq!(filtered.len(), 2); + } + + #[tokio::test] + async fn test_ring_buffer_eviction() { + let store = InMemoryReplayStore::new(2); + + let e1 = make_entry("GET", "/a", 200); + let id1 = e1.id.clone(); + store.store(e1).await.unwrap(); + store.store(make_entry("GET", "/b", 200)).await.unwrap(); + store.store(make_entry("GET", "/c", 200)).await.unwrap(); // evicts e1 + + assert_eq!(store.count().await.unwrap(), 2); + assert!(store.get(&id1).await.unwrap().is_none()); + } + + #[tokio::test] + async fn test_delete() { + let store = InMemoryReplayStore::new(10); + let entry = make_entry("GET", "/users", 200); + let id = entry.id.clone(); + + store.store(entry).await.unwrap(); + assert!(store.delete(&id).await.unwrap()); + assert!(store.get(&id).await.unwrap().is_none()); + assert_eq!(store.count().await.unwrap(), 0); + } + + #[tokio::test] + async fn test_clear() { + let store = InMemoryReplayStore::new(10); + store.store(make_entry("GET", "/a", 200)).await.unwrap(); + store.store(make_entry("GET", "/b", 200)).await.unwrap(); + + store.clear().await.unwrap(); + assert_eq!(store.count().await.unwrap(), 0); + } + + #[tokio::test] + async fn test_delete_before() { + let store = InMemoryReplayStore::new(10); + + let mut e1 = make_entry("GET", "/a", 200); + e1.recorded_at = 1000; + let mut e2 = make_entry("GET", "/b", 200); + e2.recorded_at = 2000; + let mut e3 = make_entry("GET", "/c", 200); + e3.recorded_at = 3000; + + store.store(e1).await.unwrap(); + store.store(e2).await.unwrap(); + store.store(e3).await.unwrap(); + + let deleted = store.delete_before(2500).await.unwrap(); + assert_eq!(deleted, 2); + assert_eq!(store.count().await.unwrap(), 1); + } +} diff --git a/crates/rustapi-extras/src/replay/mod.rs b/crates/rustapi-extras/src/replay/mod.rs new file mode 100644 index 00000000..0427904a --- /dev/null +++ b/crates/rustapi-extras/src/replay/mod.rs @@ -0,0 +1,44 @@ +//! Replay middleware for time-travel debugging. +//! +//! This module provides the runtime integration for the replay system: +//! +//! - [`ReplayLayer`] - Middleware that records request/response pairs +//! - [`InMemoryReplayStore`] - In-memory bounded ring buffer store +//! - [`FsReplayStore`] - Filesystem-backed store (JSON Lines) +//! - [`ReplayClient`] - HTTP client for replaying recorded requests +//! - [`RetentionJob`] - Background TTL cleanup task +//! - [`ReplayAdminAuth`] - Bearer token authentication for admin endpoints +//! +//! # Quick Start +//! +//! ```ignore +//! use rustapi_extras::replay::ReplayLayer; +//! use rustapi_core::replay::ReplayConfig; +//! +//! let layer = ReplayLayer::new( +//! ReplayConfig::new() +//! .enabled(true) +//! .admin_token("my-secret-token") +//! ); +//! +//! RustApi::new() +//! .layer(layer) +//! .route("/api/users", get(handler)) +//! .run("127.0.0.1:8080") +//! .await?; +//! ``` + +mod auth; +mod client; +mod fs_store; +mod layer; +mod memory_store; +mod retention; +mod routes; + +pub use auth::ReplayAdminAuth; +pub use client::{ReplayClient, ReplayClientError}; +pub use fs_store::{FsReplayStore, FsReplayStoreConfig}; +pub use layer::ReplayLayer; +pub use memory_store::InMemoryReplayStore; +pub use retention::RetentionJob; diff --git a/crates/rustapi-extras/src/replay/retention.rs b/crates/rustapi-extras/src/replay/retention.rs new file mode 100644 index 00000000..363e1882 --- /dev/null +++ b/crates/rustapi-extras/src/replay/retention.rs @@ -0,0 +1,46 @@ +//! Background retention cleanup job for replay entries. + +use rustapi_core::replay::ReplayStore; +use std::sync::Arc; +use std::time::Duration; + +/// Background task that periodically deletes expired replay entries. +pub struct RetentionJob; + +impl RetentionJob { + /// Spawn a background task that periodically deletes entries older than TTL. + /// + /// # Arguments + /// + /// * `store` - The replay store to clean up. + /// * `ttl_secs` - Time-to-live in seconds. Entries older than this are deleted. + /// * `check_interval` - How often to check for expired entries. + pub fn spawn( + store: Arc, + ttl_secs: u64, + check_interval: Duration, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + loop { + tokio::time::sleep(check_interval).await; + + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + + let cutoff = now_ms.saturating_sub(ttl_secs * 1000); + + match store.delete_before(cutoff).await { + Ok(count) if count > 0 => { + tracing::info!(deleted = count, "Replay retention cleanup"); + } + Err(e) => { + tracing::warn!(error = %e, "Replay retention cleanup failed"); + } + _ => {} + } + } + }) + } +} diff --git a/crates/rustapi-extras/src/replay/routes.rs b/crates/rustapi-extras/src/replay/routes.rs new file mode 100644 index 00000000..1f392325 --- /dev/null +++ b/crates/rustapi-extras/src/replay/routes.rs @@ -0,0 +1,258 @@ +//! HTTP admin route handlers for replay endpoints. +//! +//! Handles `/__rustapi/replays` admin API routes. + +use super::auth::ReplayAdminAuth; +use super::client::ReplayClient; +use bytes::Bytes; +use http::StatusCode; +use http_body_util::Full; +use rustapi_core::replay::{compute_diff, ReplayConfig, ReplayQuery, ReplayStore}; +use rustapi_core::Response; +use rustapi_core::ResponseBody; +use serde_json::json; + +/// Dispatch a replay admin request based on path and method. +/// +/// Returns `Some(Response)` if the path matches a replay admin route, +/// or `None` if it should be handled by the regular middleware chain. +pub async fn dispatch( + headers: &http::HeaderMap, + method: &str, + uri: &http::Uri, + store: &dyn ReplayStore, + config: &ReplayConfig, + path_suffix: &str, +) -> Option { + // Check admin token + if let Some(ref token) = config.admin_token { + if let Err(resp) = ReplayAdminAuth::check(headers, token) { + return Some(resp); + } + } else { + // No token configured, refuse all admin requests + return Some(json_response( + StatusCode::FORBIDDEN, + json!({"error": "forbidden", "message": "Admin token not configured"}), + )); + } + + // Trim leading slash + let suffix = path_suffix.trim_start_matches('/'); + + match (method, suffix) { + // GET /__rustapi/replays - list entries + ("GET", "") => Some(handle_list(uri, store).await), + + // GET /__rustapi/replays/{id} - show entry + ("GET", id) if !id.contains('/') => Some(handle_show(id, store).await), + + // POST /__rustapi/replays/{id}/run?target=URL - replay + ("POST", path) if path.ends_with("/run") => { + let id = path.trim_end_matches("/run"); + let target = extract_query_param(uri, "target"); + match target { + Some(target_url) => Some(handle_run(id, &target_url, store).await), + None => Some(json_response( + StatusCode::BAD_REQUEST, + json!({"error": "bad_request", "message": "Missing 'target' query parameter"}), + )), + } + } + + // POST /__rustapi/replays/{id}/diff?target=URL - replay & diff + ("POST", path) if path.ends_with("/diff") => { + let id = path.trim_end_matches("/diff"); + let target = extract_query_param(uri, "target"); + match target { + Some(target_url) => Some(handle_diff(id, &target_url, store).await), + None => Some(json_response( + StatusCode::BAD_REQUEST, + json!({"error": "bad_request", "message": "Missing 'target' query parameter"}), + )), + } + } + + // DELETE /__rustapi/replays/{id} - delete entry + ("DELETE", id) if !id.contains('/') => Some(handle_delete(id, store).await), + + _ => Some(json_response( + StatusCode::NOT_FOUND, + json!({"error": "not_found", "message": "Unknown replay endpoint"}), + )), + } +} + +async fn handle_list(uri: &http::Uri, store: &dyn ReplayStore) -> Response { + let mut query = ReplayQuery::new(); + + if let Some(limit) = extract_query_param(uri, "limit") { + if let Ok(n) = limit.parse::() { + query = query.limit(n); + } + } + if let Some(method) = extract_query_param(uri, "method") { + query = query.method(method); + } + if let Some(path) = extract_query_param(uri, "path") { + query = query.path_contains(path); + } + if let Some(status_min) = extract_query_param(uri, "status_min") { + if let Ok(s) = status_min.parse::() { + query = query.status_min(s); + } + } + + match store.list(&query).await { + Ok(entries) => { + let count = entries.len(); + let total = store.count().await.unwrap_or(0); + json_response( + StatusCode::OK, + json!({ + "entries": entries, + "count": count, + "total": total, + }), + ) + } + Err(e) => json_response( + StatusCode::INTERNAL_SERVER_ERROR, + json!({"error": "store_error", "message": e.to_string()}), + ), + } +} + +async fn handle_show(id: &str, store: &dyn ReplayStore) -> Response { + match store.get(id).await { + Ok(Some(entry)) => json_response(StatusCode::OK, serde_json::to_value(&entry).unwrap()), + Ok(None) => json_response( + StatusCode::NOT_FOUND, + json!({"error": "not_found", "message": format!("Entry {} not found", id)}), + ), + Err(e) => json_response( + StatusCode::INTERNAL_SERVER_ERROR, + json!({"error": "store_error", "message": e.to_string()}), + ), + } +} + +async fn handle_run(id: &str, target_url: &str, store: &dyn ReplayStore) -> Response { + let entry = match store.get(id).await { + Ok(Some(entry)) => entry, + Ok(None) => { + return json_response( + StatusCode::NOT_FOUND, + json!({"error": "not_found", "message": format!("Entry {} not found", id)}), + ); + } + Err(e) => { + return json_response( + StatusCode::INTERNAL_SERVER_ERROR, + json!({"error": "store_error", "message": e.to_string()}), + ); + } + }; + + let client = ReplayClient::new(); + match client.replay(&entry, target_url).await { + Ok(replayed) => json_response( + StatusCode::OK, + json!({ + "original_response": entry.response, + "replayed_response": replayed, + "target": target_url, + }), + ), + Err(e) => json_response( + StatusCode::BAD_GATEWAY, + json!({"error": "replay_failed", "message": e.to_string()}), + ), + } +} + +async fn handle_diff(id: &str, target_url: &str, store: &dyn ReplayStore) -> Response { + let entry = match store.get(id).await { + Ok(Some(entry)) => entry, + Ok(None) => { + return json_response( + StatusCode::NOT_FOUND, + json!({"error": "not_found", "message": format!("Entry {} not found", id)}), + ); + } + Err(e) => { + return json_response( + StatusCode::INTERNAL_SERVER_ERROR, + json!({"error": "store_error", "message": e.to_string()}), + ); + } + }; + + let client = ReplayClient::new(); + match client.replay(&entry, target_url).await { + Ok(replayed) => { + let ignore_headers = vec![ + "date".to_string(), + "x-request-id".to_string(), + "x-correlation-id".to_string(), + "server".to_string(), + ]; + let diff = compute_diff(&entry.response, &replayed, &ignore_headers); + + json_response( + StatusCode::OK, + json!({ + "diff": diff, + "original_response": entry.response, + "replayed_response": replayed, + "target": target_url, + }), + ) + } + Err(e) => json_response( + StatusCode::BAD_GATEWAY, + json!({"error": "replay_failed", "message": e.to_string()}), + ), + } +} + +async fn handle_delete(id: &str, store: &dyn ReplayStore) -> Response { + match store.delete(id).await { + Ok(true) => json_response( + StatusCode::OK, + json!({"message": format!("Entry {} deleted", id)}), + ), + Ok(false) => json_response( + StatusCode::NOT_FOUND, + json!({"error": "not_found", "message": format!("Entry {} not found", id)}), + ), + Err(e) => json_response( + StatusCode::INTERNAL_SERVER_ERROR, + json!({"error": "store_error", "message": e.to_string()}), + ), + } +} + +/// Helper to extract a query parameter value from a URI. +fn extract_query_param(uri: &http::Uri, key: &str) -> Option { + uri.query().and_then(|q| { + q.split('&').find_map(|pair| { + let (k, v) = pair.split_once('=')?; + if k == key { + Some(v.to_string()) + } else { + None + } + }) + }) +} + +/// Helper to create a JSON response. +fn json_response(status: StatusCode, body: serde_json::Value) -> Response { + let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); + http::Response::builder() + .status(status) + .header(http::header::CONTENT_TYPE, "application/json") + .body(ResponseBody::Full(Full::new(Bytes::from(body_bytes)))) + .unwrap() +} diff --git a/crates/rustapi-rs/Cargo.toml b/crates/rustapi-rs/Cargo.toml index d7783e92..a52113b4 100644 --- a/crates/rustapi-rs/Cargo.toml +++ b/crates/rustapi-rs/Cargo.toml @@ -82,6 +82,9 @@ sanitization = ["dep:rustapi-extras", "rustapi-extras/sanitization"] otel = ["dep:rustapi-extras", "rustapi-extras/otel"] structured-logging = ["dep:rustapi-extras", "rustapi-extras/structured-logging"] +# Replay (time-travel debugging) +replay = ["dep:rustapi-extras", "rustapi-extras/replay"] + # Meta features extras = ["jwt", "cors", "rate-limit"] -full = ["extras", "config", "cookies", "sqlx", "toon", "insight", "compression", "ws", "view", "timeout", "guard", "logging", "circuit-breaker", "security-headers", "api-key", "cache", "dedup", "sanitization", "otel", "structured-logging"] +full = ["extras", "config", "cookies", "sqlx", "toon", "insight", "compression", "ws", "view", "timeout", "guard", "logging", "circuit-breaker", "security-headers", "api-key", "cache", "dedup", "sanitization", "otel", "structured-logging", "replay"] diff --git a/crates/rustapi-rs/src/lib.rs b/crates/rustapi-rs/src/lib.rs index d117d4d7..4f5452d5 100644 --- a/crates/rustapi-rs/src/lib.rs +++ b/crates/rustapi-rs/src/lib.rs @@ -128,6 +128,10 @@ pub use rustapi_extras::otel; #[cfg(feature = "structured-logging")] pub use rustapi_extras::structured_logging; +// Replay (time-travel debugging) +#[cfg(feature = "replay")] +pub use rustapi_extras::replay; + // Re-export TOON (feature-gated) #[cfg(feature = "toon")] pub mod toon { diff --git a/docs/cookbook/src/recipes/replay.md b/docs/cookbook/src/recipes/replay.md new file mode 100644 index 00000000..6cbe3995 --- /dev/null +++ b/docs/cookbook/src/recipes/replay.md @@ -0,0 +1,283 @@ +# Replay: Time-Travel Debugging + +Record HTTP request/response pairs and replay them against different environments for debugging and regression testing. + +> **Security Notice**: The replay system is designed for **development and staging environments only**. See [Security](#security) for details. + +## Quick Start + +Add the `replay` feature to your `Cargo.toml`: + +```toml +[dependencies] +rustapi-rs = { version = "0.1", features = ["replay"] } +``` + +Add the `ReplayLayer` middleware to your application: + +```rust,ignore +use rustapi_rs::prelude::*; +use rustapi_rs::replay::{ReplayLayer, InMemoryReplayStore}; +use rustapi_core::replay::ReplayConfig; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let replay = ReplayLayer::new( + ReplayConfig::new() + .enabled(true) + .admin_token("my-secret-token") + .ttl_secs(3600) + ); + + RustApi::new() + .layer(replay) + .route("/api/users", get(list_users)) + .run("127.0.0.1:8080") + .await +} + +async fn list_users() -> Json> { + Json(vec!["Alice".into(), "Bob".into()]) +} +``` + +## How It Works + +1. **Record**: The `ReplayLayer` middleware captures HTTP request/response pairs as they flow through your application +2. **List**: Query recorded entries via the admin API or CLI +3. **Replay**: Re-send a recorded request against any target URL +4. **Diff**: Compare the replayed response against the original to detect regressions + +## Admin API + +All admin endpoints require a bearer token in the `Authorization` header: + +``` +Authorization: Bearer +``` + +| Method | Path | Description | +|--------|------|-------------| +| GET | `/__rustapi/replays` | List recorded entries | +| GET | `/__rustapi/replays/{id}` | Show a single entry | +| POST | `/__rustapi/replays/{id}/run?target=URL` | Replay against target | +| POST | `/__rustapi/replays/{id}/diff?target=URL` | Replay and compute diff | +| DELETE | `/__rustapi/replays/{id}` | Delete an entry | + +### Query Parameters for List + +- `limit` - Maximum number of entries to return +- `method` - Filter by HTTP method (GET, POST, etc.) +- `path` - Filter by path substring +- `status_min` - Minimum status code filter + +### Example: cURL + +```bash +# List entries +curl -H "Authorization: Bearer my-secret-token" \ + http://localhost:8080/__rustapi/replays?limit=10 + +# Show a specific entry +curl -H "Authorization: Bearer my-secret-token" \ + http://localhost:8080/__rustapi/replays/ + +# Replay against staging +curl -X POST -H "Authorization: Bearer my-secret-token" \ + "http://localhost:8080/__rustapi/replays//run?target=http://staging:8080" + +# Replay and diff +curl -X POST -H "Authorization: Bearer my-secret-token" \ + "http://localhost:8080/__rustapi/replays//diff?target=http://staging:8080" +``` + +## CLI Usage + +Install with the `replay` feature: + +```bash +cargo install cargo-rustapi --features replay +``` + +### Commands + +```bash +# List recorded entries +cargo rustapi replay list -s http://localhost:8080 -t my-secret-token + +# List with filters +cargo rustapi replay list -t my-secret-token --method GET --limit 20 + +# Show entry details +cargo rustapi replay show -t my-secret-token + +# Replay against a target URL +cargo rustapi replay run -T http://staging:8080 -t my-secret-token + +# Replay and diff +cargo rustapi replay diff -T http://staging:8080 -t my-secret-token +``` + +The `--token` (`-t`) parameter can also be set via the `RUSTAPI_REPLAY_TOKEN` environment variable: + +```bash +export RUSTAPI_REPLAY_TOKEN=my-secret-token +cargo rustapi replay list +``` + +## Configuration + +### ReplayConfig + +```rust,ignore +use rustapi_core::replay::ReplayConfig; + +let config = ReplayConfig::new() + // Enable recording (default: false) + .enabled(true) + // Required: admin bearer token + .admin_token("my-secret-token") + // Max entries in store (default: 500) + .store_capacity(1000) + // Entry TTL in seconds (default: 3600 = 1 hour) + .ttl_secs(7200) + // Sampling rate 0.0-1.0 (default: 1.0 = all requests) + .sample_rate(0.5) + // Max request body capture size (default: 64KB) + .max_request_body(131_072) + // Max response body capture size (default: 256KB) + .max_response_body(524_288) + // Only record specific paths + .record_path("/api/users") + .record_path("/api/orders") + // Or skip specific paths + .skip_path("/health") + .skip_path("/metrics") + // Add headers to redact + .redact_header("x-custom-secret") + // Add body fields to redact + .redact_body_field("password") + .redact_body_field("ssn") + .redact_body_field("credit_card") + // Custom admin route prefix (default: "/__rustapi/replays") + .admin_route_prefix("/__admin/replays"); +``` + +### Default Redacted Headers + +The following headers are redacted by default (values replaced with `[REDACTED]`): + +- `authorization` +- `cookie` +- `x-api-key` +- `x-auth-token` + +### Body Field Redaction + +JSON body fields are recursively redacted. For example, with `.redact_body_field("password")`: + +```json +// Before redaction +{"user": {"name": "alice", "password": "secret123"}} + +// After redaction +{"user": {"name": "alice", "password": "[REDACTED]"}} +``` + +## Custom Store + +### File-System Store + +For persistent storage across restarts: + +```rust,ignore +use rustapi_rs::replay::{ReplayLayer, FsReplayStore, FsReplayStoreConfig}; +use rustapi_core::replay::ReplayConfig; + +let config = ReplayConfig::new() + .enabled(true) + .admin_token("my-secret-token"); + +let fs_store = FsReplayStore::new(FsReplayStoreConfig { + directory: "./replay-data".into(), + max_file_size: Some(10 * 1024 * 1024), // 10MB per file + create_if_missing: true, +}); + +let layer = ReplayLayer::new(config).with_store(fs_store); +``` + +### Implementing a Custom Store + +Implement the `ReplayStore` trait for custom backends (Redis, database, etc.): + +```rust,ignore +use async_trait::async_trait; +use rustapi_core::replay::{ + ReplayEntry, ReplayQuery, ReplayStore, ReplayStoreResult, +}; + +struct MyCustomStore { + // your fields +} + +#[async_trait] +impl ReplayStore for MyCustomStore { + async fn store(&self, entry: ReplayEntry) -> ReplayStoreResult<()> { + // Store the entry + Ok(()) + } + + async fn get(&self, id: &str) -> ReplayStoreResult> { + // Retrieve by ID + Ok(None) + } + + async fn list(&self, query: &ReplayQuery) -> ReplayStoreResult> { + // List with filtering + Ok(vec![]) + } + + async fn delete(&self, id: &str) -> ReplayStoreResult { + // Delete by ID + Ok(false) + } + + async fn count(&self) -> ReplayStoreResult { + Ok(0) + } + + async fn clear(&self) -> ReplayStoreResult<()> { + Ok(()) + } + + async fn delete_before(&self, timestamp_ms: u64) -> ReplayStoreResult { + // Delete entries older than timestamp + Ok(0) + } + + fn clone_store(&self) -> Box { + Box::new(self.clone()) + } +} +``` + +## Security + +The replay system has multiple security layers built in: + +1. **Disabled by default**: Recording is off (`enabled: false`) until explicitly enabled +2. **Admin token required**: All `/__rustapi/replays` endpoints require a valid bearer token. Requests without the token get a `401 Unauthorized` response +3. **Header redaction**: `authorization`, `cookie`, `x-api-key`, and `x-auth-token` values are replaced with `[REDACTED]` before storage +4. **Body field redaction**: Sensitive JSON fields (e.g., `password`, `ssn`) can be configured for redaction +5. **TTL enforcement**: Entries are automatically deleted after the configured TTL (default: 1 hour) +6. **Body size limits**: Request (64KB) and response (256KB) bodies are truncated to prevent memory issues +7. **Bounded storage**: The in-memory store uses a ring buffer with FIFO eviction + +**Recommendations**: + +- Use only in development/staging environments +- Use a strong, unique admin token +- Keep TTL short +- Add application-specific sensitive fields to the redaction list +- Monitor memory usage when using the in-memory store with large capacity values