diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f0384b4..e264ebab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] ### Added +- `zeph-gateway` crate: axum HTTP gateway with POST /webhook ingestion, bearer auth (blake3 + ct_eq), per-IP rate limiting, GET /health endpoint, feature-gated (`gateway`) (#379) +- `zeph-core::daemon` module: component supervisor with health monitoring, PID file management, graceful shutdown, feature-gated (`daemon`) (#380) +- `zeph-scheduler` crate: cron-based periodic task scheduler with SQLite persistence, built-in tasks (memory_cleanup, skill_refresh, health_check), TaskHandler trait, feature-gated (`scheduler`) (#381) +- New config sections: `[gateway]`, `[daemon]`, `[scheduler]` in config/default.toml (#367) +- New optional feature flags: `gateway`, `daemon`, `scheduler` - Hybrid memory search: FTS5 keyword search combined with Qdrant vector similarity (#372, #373, #374) - SQLite FTS5 virtual table with auto-sync triggers for full-text keyword search - Configurable `vector_weight`/`keyword_weight` in `[memory.semantic]` for hybrid ranking diff --git a/Cargo.lock b/Cargo.lock index 890c6c2f..f7a8f433 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1084,6 +1084,17 @@ dependencies = [ "itertools 0.13.0", ] +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow 0.6.26", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -5812,6 +5823,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", + "rustls", "serde", "serde_json", "sha2", @@ -5821,6 +5833,7 @@ dependencies = [ "tokio-stream", "tracing", "url", + "webpki-roots 0.26.11", ] [[package]] @@ -6616,7 +6629,7 @@ dependencies = [ "toml_datetime", "toml_parser", "toml_writer", - "winnow", + "winnow 0.7.14", ] [[package]] @@ -6634,7 +6647,7 @@ version = "1.0.8+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0742ff5ff03ea7e67c8ae6c93cac239e0d9784833362da3f9a9c1da8dfefcbdc" dependencies = [ - "winnow", + "winnow 0.7.14", ] [[package]] @@ -8057,6 +8070,15 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" +[[package]] +name = "winnow" +version = "0.6.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" +dependencies = [ + "memchr", +] + [[package]] name = "winnow" version = "0.7.14" @@ -8253,10 +8275,12 @@ dependencies = [ "zeph-a2a", "zeph-channels", "zeph-core", + "zeph-gateway", "zeph-index", "zeph-llm", "zeph-mcp", "zeph-memory", + "zeph-scheduler", "zeph-skills", "zeph-tools", "zeph-tui", @@ -8325,6 +8349,23 @@ dependencies = [ "zeph-tools", ] +[[package]] +name = "zeph-gateway" +version = "0.9.8" +dependencies = [ + "axum 0.8.8", + "blake3", + "http-body-util", + "serde", + "serde_json", + "subtle", + "thiserror 2.0.18", + "tokio", + "tower 0.5.3", + "tower-http", + "tracing", +] + [[package]] name = "zeph-index" version = "0.9.8" @@ -8416,6 +8457,21 @@ dependencies = [ "zeph-llm", ] +[[package]] +name = "zeph-scheduler" +version = "0.9.8" +dependencies = [ + "chrono", + "cron", + "serde", + "serde_json", + "sqlx", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tracing", +] + [[package]] name = "zeph-skills" version = "0.9.8" diff --git a/Cargo.toml b/Cargo.toml index 1e7d63cb..29e07ed6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ anyhow = "1.0" candle-core = { version = "0.9", default-features = false } candle-nn = { version = "0.9", default-features = false } candle-transformers = { version = "0.9", default-features = false } +chrono = { version = "0.4", default-features = false, features = ["std"] } crossterm = "0.29" axum = "0.8" blake3 = "1.8" @@ -74,6 +75,7 @@ tree-sitter-typescript = "0.23" unicode-width = "0.2" url = "2.5" uuid = "1.21" +cron = "0.15" zeph-a2a = { path = "crates/zeph-a2a", version = "0.9.8" } zeph-channels = { path = "crates/zeph-channels", version = "0.9.8" } zeph-core = { path = "crates/zeph-core", version = "0.9.8" } @@ -83,6 +85,8 @@ zeph-mcp = { path = "crates/zeph-mcp", version = "0.9.8" } zeph-memory = { path = "crates/zeph-memory", version = "0.9.8" } zeph-skills = { path = "crates/zeph-skills", version = "0.9.8" } zeph-tools = { path = "crates/zeph-tools", version = "0.9.8" } +zeph-gateway = { path = "crates/zeph-gateway", version = "0.9.8" } +zeph-scheduler = { path = "crates/zeph-scheduler", version = "0.9.8" } zeph-tui = { path = "crates/zeph-tui", version = "0.9.8" } [workspace.lints.clippy] @@ -112,6 +116,9 @@ router = ["zeph-llm/router"] self-learning = ["zeph-core/self-learning"] tui = ["dep:zeph-tui"] index = ["dep:zeph-index", "zeph-core/index"] +gateway = ["dep:zeph-gateway"] +daemon = ["zeph-core/daemon"] +scheduler = ["dep:zeph-scheduler"] vault-age = ["zeph-core/vault-age"] otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "dep:tracing-opentelemetry"] @@ -133,6 +140,8 @@ zeph-llm.workspace = true zeph-memory.workspace = true zeph-skills.workspace = true zeph-tools.workspace = true +zeph-gateway = { workspace = true, optional = true } +zeph-scheduler = { workspace = true, optional = true } zeph-tui = { workspace = true, optional = true } [dev-dependencies] diff --git a/README.md b/README.md index dc67a2a4..6c7b23c7 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,7 @@ cargo build --release --features tui ``` zeph (binary) — bootstrap, AnyChannel dispatch, vault resolution (anyhow for top-level errors) ├── zeph-core — Agent split into 7 submodules (context, streaming, persistence, -│ learning, mcp, index), typed AgentError/ChannelError, config hot-reload +│ learning, mcp, index), daemon supervisor, typed AgentError/ChannelError, config hot-reload ├── zeph-llm — LlmProvider: Ollama, Claude, OpenAI, Candle, orchestrator, │ native tool_use (Claude/OpenAI), typed LlmError ├── zeph-skills — SKILL.md parser, embedding matcher, hot-reload, self-learning, typed SkillError @@ -122,6 +122,8 @@ zeph (binary) — bootstrap, AnyChannel dispatch, vault resolution (anyhow for t ├── zeph-tools — schemars-driven tool registry (shell, file ops, web scrape), composite dispatch ├── zeph-mcp — MCP client, multi-server lifecycle, unified tool matching ├── zeph-a2a — A2A client + server, agent discovery, JSON-RPC 2.0 +├── zeph-gateway — HTTP gateway for webhook ingestion with bearer auth (optional) +├── zeph-scheduler — Cron-based periodic task scheduler with SQLite persistence (optional) └── zeph-tui — ratatui TUI dashboard with live agent metrics (optional) ``` @@ -149,6 +151,9 @@ Deep dive: [Architecture overview](https://bug-ops.github.io/zeph/architecture/o | `self-learning` | On | Skill evolution system | | `vault-age` | On | Age-encrypted secret storage | | `index` | On | AST-based code indexing and semantic retrieval | +| `gateway` | Off | HTTP gateway for webhook ingestion | +| `daemon` | Off | Daemon supervisor for component lifecycle | +| `scheduler` | Off | Cron-based periodic task scheduler | | `metal` | Off | Metal GPU acceleration (macOS) | | `tui` | Off | ratatui TUI dashboard with real-time metrics | | `cuda` | Off | CUDA GPU acceleration (Linux) | diff --git a/config/default.toml b/config/default.toml index cf4d23bf..79cd7cec 100644 --- a/config/default.toml +++ b/config/default.toml @@ -235,6 +235,44 @@ enabled = false # Audit destination: "stdout" or file path (e.g., "./data/audit.jsonl") destination = "stdout" +[gateway] +# Enable HTTP gateway for webhook ingestion (feature-gated: --features gateway) +enabled = false +# Bind address (127.0.0.1 = localhost only, 0.0.0.0 = all interfaces) +bind = "127.0.0.1" +# HTTP port +port = 8090 +# auth_token = "secret" # optional, from vault ZEPH_GATEWAY_TOKEN +# Rate limit: max requests per minute per IP +rate_limit = 120 +# Maximum request body size in bytes (1MB) +max_body_size = 1048576 + +[daemon] +# Enable daemon supervisor (feature-gated: --features daemon) +enabled = false +# PID file location +pid_file = "~/.zeph/zeph.pid" +# Health check interval in seconds +health_interval_secs = 30 +# Maximum restart backoff in seconds +max_restart_backoff_secs = 60 + +[scheduler] +# Enable cron scheduler (feature-gated: --features scheduler) +enabled = false +# Example task definitions: +# [[scheduler.tasks]] +# name = "memory_cleanup" +# cron = "0 0 0 * * *" +# kind = "memory_cleanup" +# config = { max_age_days = 90 } +# +# [[scheduler.tasks]] +# name = "health_check" +# cron = "0 */5 * * * *" +# kind = "health_check" + [security] # Redact secrets (API keys, tokens) from LLM responses before display redact_secrets = true diff --git a/crates/zeph-core/Cargo.toml b/crates/zeph-core/Cargo.toml index 9b1b9f1f..fdc0b9ac 100644 --- a/crates/zeph-core/Cargo.toml +++ b/crates/zeph-core/Cargo.toml @@ -10,6 +10,7 @@ repository.workspace = true default = [] index = ["dep:zeph-index"] mcp = ["dep:zeph-mcp"] +daemon = [] self-learning = ["zeph-skills/self-learning"] vault-age = ["dep:age"] diff --git a/crates/zeph-core/src/config/mod.rs b/crates/zeph-core/src/config/mod.rs index e39dade5..44b611bc 100644 --- a/crates/zeph-core/src/config/mod.rs +++ b/crates/zeph-core/src/config/mod.rs @@ -67,6 +67,9 @@ impl Config { } } } + if let Some(val) = vault.get_secret("ZEPH_GATEWAY_TOKEN").await? { + self.gateway.auth_token = Some(val); + } Ok(()) } } diff --git a/crates/zeph-core/src/config/types.rs b/crates/zeph-core/src/config/types.rs index 9f3fdd6c..ece48417 100644 --- a/crates/zeph-core/src/config/types.rs +++ b/crates/zeph-core/src/config/types.rs @@ -30,6 +30,12 @@ pub struct Config { pub cost: CostConfig, #[serde(default)] pub observability: ObservabilityConfig, + #[serde(default)] + pub gateway: GatewayConfig, + #[serde(default)] + pub daemon: DaemonConfig, + #[serde(default)] + pub scheduler: SchedulerConfig, #[serde(skip)] pub secrets: ResolvedSecrets, } @@ -683,6 +689,103 @@ impl Default for ObservabilityConfig { } } +#[derive(Debug, Clone, Deserialize)] +pub struct GatewayConfig { + #[serde(default)] + pub enabled: bool, + #[serde(default = "default_gateway_bind")] + pub bind: String, + #[serde(default = "default_gateway_port")] + pub port: u16, + #[serde(default)] + pub auth_token: Option, + #[serde(default = "default_gateway_rate_limit")] + pub rate_limit: u32, + #[serde(default = "default_gateway_max_body")] + pub max_body_size: usize, +} + +fn default_gateway_bind() -> String { + "127.0.0.1".into() +} + +fn default_gateway_port() -> u16 { + 8090 +} + +fn default_gateway_rate_limit() -> u32 { + 120 +} + +fn default_gateway_max_body() -> usize { + 1_048_576 +} + +impl Default for GatewayConfig { + fn default() -> Self { + Self { + enabled: false, + bind: default_gateway_bind(), + port: default_gateway_port(), + auth_token: None, + rate_limit: default_gateway_rate_limit(), + max_body_size: default_gateway_max_body(), + } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct DaemonConfig { + #[serde(default)] + pub enabled: bool, + #[serde(default = "default_pid_file")] + pub pid_file: String, + #[serde(default = "default_health_interval")] + pub health_interval_secs: u64, + #[serde(default = "default_max_restart_backoff")] + pub max_restart_backoff_secs: u64, +} + +fn default_pid_file() -> String { + "~/.zeph/zeph.pid".into() +} + +fn default_health_interval() -> u64 { + 30 +} + +fn default_max_restart_backoff() -> u64 { + 60 +} + +impl Default for DaemonConfig { + fn default() -> Self { + Self { + enabled: false, + pid_file: default_pid_file(), + health_interval_secs: default_health_interval(), + max_restart_backoff_secs: default_max_restart_backoff(), + } + } +} + +#[derive(Debug, Clone, Default, Deserialize)] +pub struct SchedulerConfig { + #[serde(default)] + pub enabled: bool, + #[serde(default)] + pub tasks: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ScheduledTaskConfig { + pub name: String, + pub cron: String, + pub kind: String, + #[serde(default)] + pub config: serde_json::Value, +} + #[derive(Debug, Default)] pub struct ResolvedSecrets { pub claude_api_key: Option, @@ -738,6 +841,9 @@ impl Config { timeouts: TimeoutConfig::default(), cost: CostConfig::default(), observability: ObservabilityConfig::default(), + gateway: GatewayConfig::default(), + daemon: DaemonConfig::default(), + scheduler: SchedulerConfig::default(), secrets: ResolvedSecrets::default(), } } diff --git a/crates/zeph-core/src/daemon.rs b/crates/zeph-core/src/daemon.rs new file mode 100644 index 00000000..cc3cf1ee --- /dev/null +++ b/crates/zeph-core/src/daemon.rs @@ -0,0 +1,250 @@ +//! Daemon supervisor for component lifecycle management. + +use std::time::Duration; + +use tokio::sync::watch; +use tokio::task::JoinHandle; + +use crate::config::DaemonConfig; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ComponentStatus { + Running, + Failed(String), + Stopped, +} + +pub struct ComponentHandle { + pub name: String, + handle: JoinHandle>, + pub status: ComponentStatus, + pub restart_count: u32, +} + +impl ComponentHandle { + #[must_use] + pub fn new(name: impl Into, handle: JoinHandle>) -> Self { + Self { + name: name.into(), + handle, + status: ComponentStatus::Running, + restart_count: 0, + } + } + + #[must_use] + pub fn is_finished(&self) -> bool { + self.handle.is_finished() + } +} + +pub struct DaemonSupervisor { + components: Vec, + health_interval: Duration, + _max_backoff: Duration, + shutdown_rx: watch::Receiver, +} + +impl DaemonSupervisor { + #[must_use] + pub fn new(config: &DaemonConfig, shutdown_rx: watch::Receiver) -> Self { + Self { + components: Vec::new(), + health_interval: Duration::from_secs(config.health_interval_secs), + _max_backoff: Duration::from_secs(config.max_restart_backoff_secs), + shutdown_rx, + } + } + + pub fn add_component(&mut self, handle: ComponentHandle) { + self.components.push(handle); + } + + #[must_use] + pub fn component_count(&self) -> usize { + self.components.len() + } + + /// Run the health monitoring loop until shutdown signal. + pub async fn run(&mut self) { + let mut interval = tokio::time::interval(self.health_interval); + loop { + tokio::select! { + _ = interval.tick() => { + self.check_health(); + } + _ = self.shutdown_rx.changed() => { + if *self.shutdown_rx.borrow() { + tracing::info!("daemon supervisor shutting down"); + break; + } + } + } + } + } + + fn check_health(&mut self) { + for component in &mut self.components { + if component.status == ComponentStatus::Running && component.is_finished() { + component.status = ComponentStatus::Failed("task exited".into()); + component.restart_count += 1; + tracing::warn!( + component = %component.name, + restarts = component.restart_count, + "component exited unexpectedly" + ); + } + } + } + + #[must_use] + pub fn component_statuses(&self) -> Vec<(&str, &ComponentStatus)> { + self.components + .iter() + .map(|c| (c.name.as_str(), &c.status)) + .collect() + } +} + +/// Write a PID file. Returns an error if the write fails. +/// +/// # Errors +/// +/// Returns an error if the PID file directory cannot be created or the file cannot be written. +pub fn write_pid_file(path: &str) -> std::io::Result<()> { + let expanded = expand_tilde(path); + let path = std::path::Path::new(&expanded); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + std::fs::write(path, std::process::id().to_string()) +} + +/// Read the PID from a PID file. +/// +/// # Errors +/// +/// Returns an error if the file cannot be read or the content is not a valid PID. +pub fn read_pid_file(path: &str) -> std::io::Result { + let expanded = expand_tilde(path); + let content = std::fs::read_to_string(&expanded)?; + content + .trim() + .parse::() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) +} + +/// Remove the PID file. +/// +/// # Errors +/// +/// Returns an error if the file cannot be removed. +pub fn remove_pid_file(path: &str) -> std::io::Result<()> { + let expanded = expand_tilde(path); + match std::fs::remove_file(&expanded) { + Ok(()) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e), + } +} + +fn expand_tilde(path: &str) -> String { + if let Some(rest) = path.strip_prefix("~/") + && let Some(home) = std::env::var_os("HOME") + { + return format!("{}/{rest}", home.to_string_lossy()); + } + path.to_owned() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn expand_tilde_with_home() { + let result = expand_tilde("~/test/file.pid"); + assert!(!result.starts_with("~/")); + } + + #[test] + fn expand_tilde_absolute_unchanged() { + assert_eq!(expand_tilde("/tmp/zeph.pid"), "/tmp/zeph.pid"); + } + + #[test] + fn pid_file_roundtrip() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("test.pid"); + let path_str = path.to_string_lossy().to_string(); + + write_pid_file(&path_str).unwrap(); + let pid = read_pid_file(&path_str).unwrap(); + assert_eq!(pid, std::process::id()); + remove_pid_file(&path_str).unwrap(); + assert!(!path.exists()); + } + + #[test] + fn remove_nonexistent_pid_file_ok() { + assert!(remove_pid_file("/tmp/nonexistent_zeph_test.pid").is_ok()); + } + + #[test] + fn read_invalid_pid_file() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("bad.pid"); + std::fs::write(&path, "not_a_number").unwrap(); + assert!(read_pid_file(&path.to_string_lossy()).is_err()); + } + + #[tokio::test] + async fn supervisor_tracks_components() { + let config = DaemonConfig::default(); + let (_tx, rx) = watch::channel(false); + let mut supervisor = DaemonSupervisor::new(&config, rx); + + let handle = tokio::spawn(async { Ok(()) }); + supervisor.add_component(ComponentHandle::new("test", handle)); + assert_eq!(supervisor.component_count(), 1); + } + + #[tokio::test] + async fn supervisor_detects_finished_component() { + let config = DaemonConfig::default(); + let (_tx, rx) = watch::channel(false); + let mut supervisor = DaemonSupervisor::new(&config, rx); + + let handle = tokio::spawn(async { Ok(()) }); + tokio::time::sleep(Duration::from_millis(10)).await; + supervisor.add_component(ComponentHandle::new("finished", handle)); + supervisor.check_health(); + + let statuses = supervisor.component_statuses(); + assert_eq!(statuses.len(), 1); + assert!(matches!(statuses[0].1, ComponentStatus::Failed(_))); + } + + #[tokio::test] + async fn supervisor_shutdown() { + let mut config = DaemonConfig::default(); + config.health_interval_secs = 1; + let (tx, rx) = watch::channel(false); + let mut supervisor = DaemonSupervisor::new(&config, rx); + + let run_handle = tokio::spawn(async move { supervisor.run().await }); + tokio::time::sleep(Duration::from_millis(50)).await; + let _ = tx.send(true); + tokio::time::timeout(Duration::from_secs(2), run_handle) + .await + .expect("supervisor should stop on shutdown") + .expect("task should complete"); + } + + #[test] + fn component_status_eq() { + assert_eq!(ComponentStatus::Running, ComponentStatus::Running); + assert_eq!(ComponentStatus::Stopped, ComponentStatus::Stopped); + assert_ne!(ComponentStatus::Running, ComponentStatus::Stopped); + } +} diff --git a/crates/zeph-core/src/lib.rs b/crates/zeph-core/src/lib.rs index c4ba009f..090e289b 100644 --- a/crates/zeph-core/src/lib.rs +++ b/crates/zeph-core/src/lib.rs @@ -6,6 +6,8 @@ pub mod config; pub mod config_watcher; pub mod context; pub mod cost; +#[cfg(feature = "daemon")] +pub mod daemon; pub mod metrics; pub mod project; pub mod redact; diff --git a/crates/zeph-gateway/Cargo.toml b/crates/zeph-gateway/Cargo.toml new file mode 100644 index 00000000..a4e7bb20 --- /dev/null +++ b/crates/zeph-gateway/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "zeph-gateway" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +axum.workspace = true +blake3.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +subtle.workspace = true +thiserror.workspace = true +tokio = { workspace = true, features = ["net", "sync", "time"] } +tower.workspace = true +tower-http = { workspace = true, features = ["limit", "trace"] } +tracing.workspace = true + +[dev-dependencies] +http-body-util.workspace = true +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tower.workspace = true + +[lints] +workspace = true diff --git a/crates/zeph-gateway/src/error.rs b/crates/zeph-gateway/src/error.rs new file mode 100644 index 00000000..76bbd316 --- /dev/null +++ b/crates/zeph-gateway/src/error.rs @@ -0,0 +1,9 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum GatewayError { + #[error("failed to bind {0}: {1}")] + Bind(String, std::io::Error), + #[error("server error: {0}")] + Server(String), +} diff --git a/crates/zeph-gateway/src/handlers.rs b/crates/zeph-gateway/src/handlers.rs new file mode 100644 index 00000000..55ccba39 --- /dev/null +++ b/crates/zeph-gateway/src/handlers.rs @@ -0,0 +1,66 @@ +use axum::Json; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::IntoResponse; + +use super::server::AppState; + +#[derive(serde::Deserialize)] +pub(crate) struct WebhookPayload { + pub channel: String, + pub sender: String, + pub body: String, +} + +#[derive(serde::Serialize)] +struct WebhookResponse { + status: &'static str, +} + +#[derive(serde::Serialize)] +struct HealthResponse { + status: &'static str, + uptime_secs: u64, +} + +pub(crate) async fn webhook_handler( + State(state): State, + Json(payload): Json, +) -> impl IntoResponse { + let msg = format!("[{}@{}] {}", payload.sender, payload.channel, payload.body); + match state.webhook_tx.send(msg).await { + Ok(()) => Json(WebhookResponse { status: "accepted" }).into_response(), + Err(_) => StatusCode::SERVICE_UNAVAILABLE.into_response(), + } +} + +pub(crate) async fn health_handler(State(state): State) -> impl IntoResponse { + Json(HealthResponse { + status: "ok", + uptime_secs: state.started_at.elapsed().as_secs(), + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn health_response_serializes() { + let resp = HealthResponse { + status: "ok", + uptime_secs: 42, + }; + let json = serde_json::to_string(&resp).unwrap(); + assert!(json.contains("\"status\":\"ok\"")); + } + + #[test] + fn webhook_payload_deserializes() { + let json = r#"{"channel":"discord","sender":"user1","body":"hello"}"#; + let payload: WebhookPayload = serde_json::from_str(json).unwrap(); + assert_eq!(payload.channel, "discord"); + assert_eq!(payload.sender, "user1"); + assert_eq!(payload.body, "hello"); + } +} diff --git a/crates/zeph-gateway/src/lib.rs b/crates/zeph-gateway/src/lib.rs new file mode 100644 index 00000000..2a43709c --- /dev/null +++ b/crates/zeph-gateway/src/lib.rs @@ -0,0 +1,9 @@ +//! HTTP gateway for webhook ingestion with bearer auth and health endpoint. + +mod error; +mod handlers; +mod router; +mod server; + +pub use error::GatewayError; +pub use server::GatewayServer; diff --git a/crates/zeph-gateway/src/router.rs b/crates/zeph-gateway/src/router.rs new file mode 100644 index 00000000..c735afc1 --- /dev/null +++ b/crates/zeph-gateway/src/router.rs @@ -0,0 +1,281 @@ +use std::collections::HashMap; +use std::net::IpAddr; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use axum::Router; +use axum::body::Body; +use axum::extract::ConnectInfo; +use axum::http::{Request, StatusCode}; +use axum::middleware::{self, Next}; +use axum::response::{IntoResponse, Response}; +use axum::routing::{get, post}; +use subtle::ConstantTimeEq; +use tokio::sync::Mutex; +use tower_http::limit::RequestBodyLimitLayer; + +use super::handlers::{health_handler, webhook_handler}; +use super::server::AppState; + +#[derive(Clone)] +struct AuthConfig { + token: Option, +} + +const MAX_RATE_LIMIT_ENTRIES: usize = 10_000; +const RATE_WINDOW: Duration = Duration::from_secs(60); + +#[derive(Clone)] +struct RateLimitState { + limit: u32, + counters: Arc>>, +} + +pub(crate) fn build_router( + state: AppState, + auth_token: Option, + rate_limit: u32, + max_body_size: usize, +) -> Router { + let auth_cfg = AuthConfig { token: auth_token }; + let rate_state = RateLimitState { + limit: rate_limit, + counters: Arc::new(Mutex::new(HashMap::new())), + }; + + let protected = Router::new() + .route("/webhook", post(webhook_handler)) + .layer(middleware::from_fn_with_state( + rate_state, + rate_limit_middleware, + )) + .layer(middleware::from_fn_with_state(auth_cfg, auth_middleware)) + .layer(RequestBodyLimitLayer::new(max_body_size)); + + Router::new() + .route("/health", get(health_handler)) + .merge(protected) + .with_state(state) +} + +async fn auth_middleware( + axum::extract::State(cfg): axum::extract::State, + req: Request, + next: Next, +) -> Response { + if let Some(ref expected) = cfg.token { + let auth_header = req + .headers() + .get("authorization") + .and_then(|v| v.to_str().ok()); + + let token = auth_header + .and_then(|v| v.strip_prefix("Bearer ")) + .unwrap_or(""); + + // Hash both values to fixed-length digests to avoid leaking token length + let token_hash = blake3::hash(token.as_bytes()); + let expected_hash = blake3::hash(expected.as_bytes()); + if !bool::from(token_hash.as_bytes().ct_eq(expected_hash.as_bytes())) { + return StatusCode::UNAUTHORIZED.into_response(); + } + } + + next.run(req).await +} + +async fn rate_limit_middleware( + axum::extract::State(state): axum::extract::State, + req: Request, + next: Next, +) -> Response { + if state.limit == 0 { + return next.run(req).await; + } + + let ip = req + .extensions() + .get::>() + .map_or(IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), |ci| ci.0.ip()); + + let now = Instant::now(); + let mut counters = state.counters.lock().await; + + if counters.len() >= MAX_RATE_LIMIT_ENTRIES && !counters.contains_key(&ip) { + counters.retain(|_, (_, ts)| now.duration_since(*ts) < RATE_WINDOW); + } + + let entry = counters.entry(ip).or_insert((0, now)); + if now.duration_since(entry.1) >= RATE_WINDOW { + *entry = (1, now); + } else { + entry.0 += 1; + if entry.0 > state.limit { + return StatusCode::TOO_MANY_REQUESTS.into_response(); + } + } + drop(counters); + + next.run(req).await +} + +#[cfg(test)] +mod tests { + use axum::body::Body; + use http_body_util::BodyExt; + use tower::ServiceExt; + + use super::*; + use crate::server::AppState; + + fn test_state() -> (AppState, tokio::sync::mpsc::Receiver) { + let (tx, rx) = tokio::sync::mpsc::channel(16); + let state = AppState { + webhook_tx: tx, + started_at: Instant::now(), + }; + (state, rx) + } + + fn make_router( + auth: Option, + rate_limit: u32, + ) -> (Router, tokio::sync::mpsc::Receiver) { + let (state, rx) = test_state(); + (build_router(state, auth, rate_limit, 1_048_576), rx) + } + + #[tokio::test] + async fn health_returns_ok() { + let (app, _rx) = make_router(None, 0); + let req = Request::builder() + .uri("/health") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), 200); + let body = resp.into_body().collect().await.unwrap().to_bytes(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["status"], "ok"); + } + + #[tokio::test] + async fn webhook_accepted() { + let (tx, mut rx) = tokio::sync::mpsc::channel(16); + let state = AppState { + webhook_tx: tx, + started_at: Instant::now(), + }; + let app = build_router(state, None, 0, 1_048_576); + + let body = serde_json::json!({ + "channel": "discord", + "sender": "user1", + "body": "hello" + }); + let req = Request::builder() + .method("POST") + .uri("/webhook") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap())) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), 200); + + let msg = rx.try_recv().unwrap(); + assert!(msg.contains("user1")); + } + + #[tokio::test] + async fn auth_rejects_missing_token() { + let (app, _rx) = make_router(Some("secret".into()), 0); + let body = serde_json::json!({"channel":"a","sender":"b","body":"c"}); + let req = Request::builder() + .method("POST") + .uri("/webhook") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap())) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), 401); + } + + #[tokio::test] + async fn auth_accepts_valid_token() { + let (app, _rx) = make_router(Some("secret".into()), 0); + let body = serde_json::json!({"channel":"a","sender":"b","body":"c"}); + let req = Request::builder() + .method("POST") + .uri("/webhook") + .header("content-type", "application/json") + .header("authorization", "Bearer secret") + .body(Body::from(serde_json::to_vec(&body).unwrap())) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), 200); + } + + #[tokio::test] + async fn auth_rejects_wrong_token() { + let (app, _rx) = make_router(Some("secret".into()), 0); + let body = serde_json::json!({"channel":"a","sender":"b","body":"c"}); + let req = Request::builder() + .method("POST") + .uri("/webhook") + .header("content-type", "application/json") + .header("authorization", "Bearer wrong") + .body(Body::from(serde_json::to_vec(&body).unwrap())) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), 401); + } + + #[tokio::test] + async fn health_skips_auth() { + let (app, _rx) = make_router(Some("secret".into()), 0); + let req = Request::builder() + .uri("/health") + .body(Body::empty()) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), 200); + } + + #[tokio::test] + async fn rate_limit_enforced() { + use tower::Service; + + let (mut app, _rx) = make_router(None, 2); + let make_req = || { + let body = serde_json::json!({"channel":"a","sender":"b","body":"c"}); + Request::builder() + .method("POST") + .uri("/webhook") + .header("content-type", "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap())) + .unwrap() + }; + + let resp = app.call(make_req()).await.unwrap(); + assert_eq!(resp.status(), 200); + let resp = app.call(make_req()).await.unwrap(); + assert_eq!(resp.status(), 200); + let resp = app.call(make_req()).await.unwrap(); + assert_eq!(resp.status(), 429); + } + + #[tokio::test] + async fn body_size_limit() { + let (state, _rx) = test_state(); + let app = build_router(state, None, 0, 64); + let oversized = vec![b'a'; 128]; + let req = Request::builder() + .method("POST") + .uri("/webhook") + .header("content-type", "application/json") + .body(Body::from(oversized)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), 413); + } +} diff --git a/crates/zeph-gateway/src/server.rs b/crates/zeph-gateway/src/server.rs new file mode 100644 index 00000000..46515141 --- /dev/null +++ b/crates/zeph-gateway/src/server.rs @@ -0,0 +1,132 @@ +use std::net::SocketAddr; +use std::time::Instant; + +use tokio::sync::{mpsc, watch}; + +use crate::error::GatewayError; +use crate::router::build_router; + +#[derive(Clone)] +pub(crate) struct AppState { + pub webhook_tx: mpsc::Sender, + pub started_at: Instant, +} + +pub struct GatewayServer { + addr: SocketAddr, + auth_token: Option, + rate_limit: u32, + max_body_size: usize, + webhook_tx: mpsc::Sender, + shutdown_rx: watch::Receiver, +} + +impl GatewayServer { + #[must_use] + pub fn new( + bind: &str, + port: u16, + webhook_tx: mpsc::Sender, + shutdown_rx: watch::Receiver, + ) -> Self { + let addr: SocketAddr = format!("{bind}:{port}").parse().unwrap_or_else(|e| { + tracing::warn!("invalid bind '{bind}': {e}, falling back to 127.0.0.1:{port}"); + SocketAddr::from(([127, 0, 0, 1], port)) + }); + + if bind == "0.0.0.0" { + tracing::warn!("gateway binding to 0.0.0.0 — ensure this is intended for production"); + } + + Self { + addr, + auth_token: None, + rate_limit: 120, + max_body_size: 1_048_576, + webhook_tx, + shutdown_rx, + } + } + + #[must_use] + pub fn with_auth(mut self, token: Option) -> Self { + self.auth_token = token; + self + } + + #[must_use] + pub fn with_rate_limit(mut self, limit: u32) -> Self { + self.rate_limit = limit; + self + } + + #[must_use] + pub fn with_max_body_size(mut self, size: usize) -> Self { + self.max_body_size = size; + self + } + + /// Start the HTTP gateway server. + /// + /// # Errors + /// + /// Returns an error if the server fails to bind or encounters a fatal I/O error. + pub async fn serve(self) -> Result<(), GatewayError> { + let state = AppState { + webhook_tx: self.webhook_tx, + started_at: Instant::now(), + }; + + let router = build_router(state, self.auth_token, self.rate_limit, self.max_body_size); + + let listener = tokio::net::TcpListener::bind(self.addr) + .await + .map_err(|e| GatewayError::Bind(self.addr.to_string(), e))?; + tracing::info!("gateway listening on {}", self.addr); + + let mut shutdown_rx = self.shutdown_rx; + axum::serve( + listener, + router.into_make_service_with_connect_info::(), + ) + .with_graceful_shutdown(async move { + while !*shutdown_rx.borrow_and_update() { + if shutdown_rx.changed().await.is_err() { + std::future::pending::<()>().await; + } + } + tracing::info!("gateway shutting down"); + }) + .await + .map_err(|e| GatewayError::Server(format!("{e}")))?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn server_builder_chain() { + let (tx, _rx) = mpsc::channel(1); + let (_stx, srx) = watch::channel(false); + let server = GatewayServer::new("127.0.0.1", 8090, tx, srx) + .with_auth(Some("token".into())) + .with_rate_limit(60) + .with_max_body_size(512); + + assert_eq!(server.rate_limit, 60); + assert_eq!(server.max_body_size, 512); + assert!(server.auth_token.is_some()); + } + + #[test] + fn server_invalid_bind_fallback() { + let (tx, _rx) = mpsc::channel(1); + let (_stx, srx) = watch::channel(false); + let server = GatewayServer::new("not_an_ip", 9999, tx, srx); + assert_eq!(server.addr.port(), 9999); + } +} diff --git a/crates/zeph-scheduler/Cargo.toml b/crates/zeph-scheduler/Cargo.toml new file mode 100644 index 00000000..a4f87618 --- /dev/null +++ b/crates/zeph-scheduler/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "zeph-scheduler" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +chrono.workspace = true +cron = "0.15" +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true +sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite"] } +thiserror.workspace = true +tokio = { workspace = true, features = ["sync", "time"] } +tracing.workspace = true + +[dev-dependencies] +tempfile.workspace = true +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } + +[lints] +workspace = true diff --git a/crates/zeph-scheduler/src/error.rs b/crates/zeph-scheduler/src/error.rs new file mode 100644 index 00000000..ef8fec14 --- /dev/null +++ b/crates/zeph-scheduler/src/error.rs @@ -0,0 +1,11 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum SchedulerError { + #[error("invalid cron expression: {0}")] + InvalidCron(String), + #[error("database error: {0}")] + Database(#[from] sqlx::Error), + #[error("task execution failed: {0}")] + TaskFailed(String), +} diff --git a/crates/zeph-scheduler/src/lib.rs b/crates/zeph-scheduler/src/lib.rs new file mode 100644 index 00000000..69c24acd --- /dev/null +++ b/crates/zeph-scheduler/src/lib.rs @@ -0,0 +1,11 @@ +//! Cron-based periodic task scheduler with `SQLite` persistence. + +mod error; +mod scheduler; +mod store; +mod task; + +pub use error::SchedulerError; +pub use scheduler::Scheduler; +pub use store::JobStore; +pub use task::{ScheduledTask, TaskHandler, TaskKind}; diff --git a/crates/zeph-scheduler/src/scheduler.rs b/crates/zeph-scheduler/src/scheduler.rs new file mode 100644 index 00000000..942e40a0 --- /dev/null +++ b/crates/zeph-scheduler/src/scheduler.rs @@ -0,0 +1,246 @@ +use std::collections::HashMap; +use std::time::Duration; + +use tokio::sync::watch; + +use crate::error::SchedulerError; +use crate::store::JobStore; +use crate::task::{ScheduledTask, TaskHandler, TaskKind}; + +pub struct Scheduler { + tasks: Vec, + store: JobStore, + handlers: HashMap>, + shutdown_rx: watch::Receiver, +} + +impl Scheduler { + #[must_use] + pub fn new(store: JobStore, shutdown_rx: watch::Receiver) -> Self { + Self { + tasks: Vec::new(), + store, + handlers: HashMap::new(), + shutdown_rx, + } + } + + pub fn add_task(&mut self, task: ScheduledTask) { + self.tasks.push(task); + } + + pub fn register_handler(&mut self, kind: &TaskKind, handler: Box) { + self.handlers.insert(kind.as_str().to_owned(), handler); + } + + /// Initialize the store and sync task definitions. + /// + /// # Errors + /// + /// Returns an error if DB init or upsert fails. + pub async fn init(&self) -> Result<(), SchedulerError> { + self.store.init().await?; + for task in &self.tasks { + self.store + .upsert_job(&task.name, &task.schedule.to_string(), task.kind.as_str()) + .await?; + } + Ok(()) + } + + /// Run the scheduler loop, checking every 60 seconds for due tasks. + pub async fn run(&mut self) { + let mut interval = tokio::time::interval(Duration::from_secs(60)); + loop { + tokio::select! { + _ = interval.tick() => { + self.tick().await; + } + _ = self.shutdown_rx.changed() => { + if *self.shutdown_rx.borrow() { + tracing::info!("scheduler shutting down"); + break; + } + } + } + } + } + + async fn tick(&self) { + let now_utc = chrono_now_utc(); + for task in &self.tasks { + let should_run = match self.store.last_run(&task.name).await { + Ok(last_run) => is_task_due(&task.schedule, last_run.as_deref()), + Err(e) => { + tracing::warn!(task = %task.name, "failed to check last_run: {e}"); + false + } + }; + + if should_run { + if let Some(handler) = self.handlers.get(task.kind.as_str()) { + tracing::info!(task = %task.name, kind = task.kind.as_str(), "executing task"); + match handler.execute(&task.config).await { + Ok(()) => { + if let Err(e) = self.store.record_run(&task.name, &now_utc).await { + tracing::warn!(task = %task.name, "failed to record run: {e}"); + } + } + Err(e) => { + tracing::warn!(task = %task.name, "task execution failed: {e}"); + } + } + } else { + tracing::debug!(task = %task.name, kind = task.kind.as_str(), "no handler registered"); + } + } + } + } +} + +/// Check if a task is due by finding the first cron occurrence after `last_run` +/// and verifying it is <= `now`. +fn is_task_due(schedule: &cron::Schedule, last_run: Option<&str>) -> bool { + let now_chrono = chrono::Utc::now(); + let after = match last_run { + Some(s) => match s.parse::>() { + Ok(dt) => dt, + Err(_) => return true, + }, + None => return true, + }; + // First scheduled time after the last run + schedule.after(&after).take(1).any(|dt| dt <= now_chrono) +} + +fn chrono_now_utc() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let days = secs / 86400; + let time_secs = secs % 86400; + let hours = time_secs / 3600; + let minutes = (time_secs % 3600) / 60; + let seconds = time_secs % 60; + let (year, month, day) = days_to_ymd(days); + format!("{year:04}-{month:02}-{day:02}T{hours:02}:{minutes:02}:{seconds:02}Z") +} + +fn days_to_ymd(mut days: u64) -> (u64, u64, u64) { + let mut year = 1970; + loop { + let days_in_year = if is_leap(year) { 366 } else { 365 }; + if days < days_in_year { + break; + } + days -= days_in_year; + year += 1; + } + let month_days: [u64; 12] = if is_leap(year) { + [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] + } else { + [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] + }; + let mut month = 0; + for (i, &md) in month_days.iter().enumerate() { + if days < md { + month = i as u64 + 1; + break; + } + days -= md; + } + (year, month, days + 1) +} + +fn is_leap(y: u64) -> bool { + y.is_multiple_of(4) && (!y.is_multiple_of(100) || y.is_multiple_of(400)) +} + +#[cfg(test)] +mod tests { + use std::pin::Pin; + use std::sync::Arc; + use std::sync::atomic::{AtomicU32, Ordering}; + + use super::*; + use crate::task::TaskHandler; + use sqlx::SqlitePool; + + struct CountingHandler { + count: Arc, + } + + impl TaskHandler for CountingHandler { + fn execute( + &self, + _config: &serde_json::Value, + ) -> Pin> + Send + '_>> + { + let count = self.count.clone(); + Box::pin(async move { + count.fetch_add(1, Ordering::Relaxed); + Ok(()) + }) + } + } + + async fn test_pool() -> SqlitePool { + SqlitePool::connect("sqlite::memory:").await.unwrap() + } + + #[tokio::test] + async fn scheduler_init_and_tick() { + let pool = test_pool().await; + let store = JobStore::new(pool); + let (_tx, rx) = watch::channel(false); + let mut scheduler = Scheduler::new(store, rx); + + let task = ScheduledTask::new( + "test", + "0 * * * * *", + TaskKind::HealthCheck, + serde_json::Value::Null, + ) + .unwrap(); + scheduler.add_task(task); + + let count = Arc::new(AtomicU32::new(0)); + scheduler.register_handler( + &TaskKind::HealthCheck, + Box::new(CountingHandler { + count: count.clone(), + }), + ); + + scheduler.init().await.unwrap(); + scheduler.tick().await; + assert_eq!(count.load(Ordering::Relaxed), 1); + } + + #[tokio::test] + async fn scheduler_shutdown() { + let pool = test_pool().await; + let store = JobStore::new(pool); + let (tx, rx) = watch::channel(false); + let mut scheduler = Scheduler::new(store, rx); + scheduler.init().await.unwrap(); + + let handle = tokio::spawn(async move { scheduler.run().await }); + tokio::time::sleep(Duration::from_millis(50)).await; + let _ = tx.send(true); + tokio::time::timeout(Duration::from_secs(2), handle) + .await + .expect("scheduler should stop") + .expect("task should complete"); + } + + #[test] + fn chrono_now_format() { + let ts = chrono_now_utc(); + assert!(ts.ends_with('Z')); + assert!(ts.contains('T')); + assert_eq!(ts.len(), 20); + } +} diff --git a/crates/zeph-scheduler/src/store.rs b/crates/zeph-scheduler/src/store.rs new file mode 100644 index 00000000..a19119b8 --- /dev/null +++ b/crates/zeph-scheduler/src/store.rs @@ -0,0 +1,163 @@ +use sqlx::SqlitePool; + +use crate::error::SchedulerError; + +pub struct JobStore { + pool: SqlitePool, +} + +impl JobStore { + #[must_use] + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } + + /// Initialize the `scheduled_jobs` table. + /// + /// # Errors + /// + /// Returns an error if the SQL statement fails. + pub async fn init(&self) -> Result<(), SchedulerError> { + sqlx::query( + "CREATE TABLE IF NOT EXISTS scheduled_jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL UNIQUE, + cron_expr TEXT NOT NULL, + kind TEXT NOT NULL, + last_run TEXT, + next_run TEXT, + status TEXT NOT NULL DEFAULT 'pending' + )", + ) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Upsert a job definition. + /// + /// # Errors + /// + /// Returns an error if the SQL statement fails. + pub async fn upsert_job( + &self, + name: &str, + cron_expr: &str, + kind: &str, + ) -> Result<(), SchedulerError> { + sqlx::query( + "INSERT INTO scheduled_jobs (name, cron_expr, kind) + VALUES (?, ?, ?) + ON CONFLICT(name) DO UPDATE SET cron_expr = excluded.cron_expr, kind = excluded.kind", + ) + .bind(name) + .bind(cron_expr) + .bind(kind) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Record a job execution timestamp. + /// + /// # Errors + /// + /// Returns an error if the SQL statement fails. + pub async fn record_run(&self, name: &str, timestamp: &str) -> Result<(), SchedulerError> { + sqlx::query("UPDATE scheduled_jobs SET last_run = ?, status = 'completed' WHERE name = ?") + .bind(timestamp) + .bind(name) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Get the last run timestamp for a job. + /// + /// # Errors + /// + /// Returns an error if the SQL query fails. + pub async fn last_run(&self, name: &str) -> Result, SchedulerError> { + let row: Option<(Option,)> = + sqlx::query_as("SELECT last_run FROM scheduled_jobs WHERE name = ?") + .bind(name) + .fetch_optional(&self.pool) + .await?; + Ok(row.and_then(|r| r.0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + async fn test_pool() -> SqlitePool { + SqlitePool::connect("sqlite::memory:").await.unwrap() + } + + #[tokio::test] + async fn init_creates_table() { + let pool = test_pool().await; + let store = JobStore::new(pool); + assert!(store.init().await.is_ok()); + } + + #[tokio::test] + async fn upsert_and_query() { + let pool = test_pool().await; + let store = JobStore::new(pool); + store.init().await.unwrap(); + + store + .upsert_job("test_job", "0 * * * * *", "health_check") + .await + .unwrap(); + assert!(store.last_run("test_job").await.unwrap().is_none()); + + store + .record_run("test_job", "2026-01-01T00:00:00Z") + .await + .unwrap(); + assert_eq!( + store.last_run("test_job").await.unwrap().as_deref(), + Some("2026-01-01T00:00:00Z") + ); + } + + #[tokio::test] + async fn upsert_updates_existing() { + let pool = test_pool().await; + let store = JobStore::new(pool); + store.init().await.unwrap(); + + store + .upsert_job("job1", "0 * * * * *", "health_check") + .await + .unwrap(); + store + .upsert_job("job1", "0 0 * * * *", "memory_cleanup") + .await + .unwrap(); + + let row: (String,) = sqlx::query_as("SELECT kind FROM scheduled_jobs WHERE name = 'job1'") + .fetch_one(store.pool()) + .await + .unwrap(); + assert_eq!(row.0, "memory_cleanup"); + } + + #[tokio::test] + async fn last_run_nonexistent_job() { + let pool = test_pool().await; + let store = JobStore::new(pool); + store.init().await.unwrap(); + assert!(store.last_run("no_such_job").await.unwrap().is_none()); + } +} + +impl JobStore { + #[must_use] + pub fn pool(&self) -> &SqlitePool { + &self.pool + } +} diff --git a/crates/zeph-scheduler/src/task.rs b/crates/zeph-scheduler/src/task.rs new file mode 100644 index 00000000..8f0abe20 --- /dev/null +++ b/crates/zeph-scheduler/src/task.rs @@ -0,0 +1,119 @@ +use std::future::Future; +use std::pin::Pin; +use std::str::FromStr; + +use cron::Schedule as CronSchedule; + +use crate::error::SchedulerError; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TaskKind { + MemoryCleanup, + SkillRefresh, + HealthCheck, + Custom(String), +} + +impl TaskKind { + #[must_use] + pub fn from_str_kind(s: &str) -> Self { + match s { + "memory_cleanup" => Self::MemoryCleanup, + "skill_refresh" => Self::SkillRefresh, + "health_check" => Self::HealthCheck, + other => Self::Custom(other.to_owned()), + } + } + + #[must_use] + pub fn as_str(&self) -> &str { + match self { + Self::MemoryCleanup => "memory_cleanup", + Self::SkillRefresh => "skill_refresh", + Self::HealthCheck => "health_check", + Self::Custom(s) => s, + } + } +} + +pub struct ScheduledTask { + pub name: String, + pub schedule: CronSchedule, + pub kind: TaskKind, + pub config: serde_json::Value, +} + +impl ScheduledTask { + /// Create a new scheduled task from a cron expression string. + /// + /// # Errors + /// + /// Returns `SchedulerError::InvalidCron` if the expression is not valid. + pub fn new( + name: impl Into, + cron_expr: &str, + kind: TaskKind, + config: serde_json::Value, + ) -> Result { + let schedule = CronSchedule::from_str(cron_expr) + .map_err(|e| SchedulerError::InvalidCron(format!("{cron_expr}: {e}")))?; + Ok(Self { + name: name.into(), + schedule, + kind, + config, + }) + } +} + +pub trait TaskHandler: Send + Sync { + fn execute( + &self, + config: &serde_json::Value, + ) -> Pin> + Send + '_>>; +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn task_kind_roundtrip() { + assert_eq!( + TaskKind::from_str_kind("memory_cleanup"), + TaskKind::MemoryCleanup + ); + assert_eq!(TaskKind::MemoryCleanup.as_str(), "memory_cleanup"); + assert_eq!( + TaskKind::from_str_kind("health_check"), + TaskKind::HealthCheck + ); + assert_eq!( + TaskKind::from_str_kind("custom_job"), + TaskKind::Custom("custom_job".into()) + ); + assert_eq!(TaskKind::Custom("x".into()).as_str(), "x"); + } + + #[test] + fn valid_cron_creates_task() { + let task = ScheduledTask::new( + "test", + "0 0 * * * *", + TaskKind::HealthCheck, + serde_json::Value::Null, + ); + assert!(task.is_ok()); + } + + #[test] + fn invalid_cron_returns_error() { + let task = ScheduledTask::new( + "test", + "not_cron", + TaskKind::HealthCheck, + serde_json::Value::Null, + ); + assert!(task.is_err()); + } +} diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index eb64de56..721074fb 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -27,6 +27,9 @@ - [TUI Dashboard](guide/tui.md) - [Observability & Cost](guide/observability.md) - [Code Indexing](guide/code-indexing.md) +- [HTTP Gateway](guide/gateway.md) +- [Daemon Supervisor](guide/daemon.md) +- [Cron Scheduler](guide/scheduler.md) # Architecture diff --git a/docs/src/architecture/crates.md b/docs/src/architecture/crates.md index 6f1d20e1..e3312b4e 100644 --- a/docs/src/architecture/crates.md +++ b/docs/src/architecture/crates.md @@ -16,6 +16,7 @@ Agent loop, configuration loading, and context builder. - `project.rs` — ZEPH.md config discovery (walk up directory tree) - `VaultProvider` trait — pluggable secret resolution - `MetricsSnapshot` / `MetricsCollector` — real-time metrics via `tokio::sync::watch` for TUI dashboard +- `DaemonSupervisor` — component lifecycle monitor with health polling, PID file management, restart tracking (feature-gated: `daemon`) ## zeph-llm @@ -87,6 +88,29 @@ AST-based code indexing, semantic retrieval, and repo map generation (optional, - `CodeRetriever

` — hybrid retrieval with query classification (Semantic / Grep / Hybrid), budget-aware chunk packing - `generate_repo_map()` — compact structural view via tree-sitter signature extraction, budget-constrained +## zeph-gateway + +HTTP gateway for webhook ingestion (optional, feature-gated). + +- `GatewayServer` -- axum-based HTTP server with fluent builder API +- `POST /webhook` -- accepts JSON payloads (`channel`, `sender`, `body`), forwards to agent loop via `mpsc::Sender` +- `GET /health` -- unauthenticated health endpoint returning uptime +- Bearer token auth middleware with constant-time comparison (blake3 + `subtle`) +- Per-IP rate limiting with 60s sliding window and automatic eviction at 10K entries +- Body size limit via `tower_http::limit::RequestBodyLimitLayer` +- Graceful shutdown via `watch::Receiver` + +## zeph-scheduler + +Cron-based periodic task scheduler with SQLite persistence (optional, feature-gated). + +- `Scheduler` -- tick loop checking due tasks every 60 seconds +- `ScheduledTask` -- task definition with 6-field cron expression (via `cron` crate) +- `TaskKind` -- built-in kinds (`memory_cleanup`, `skill_refresh`, `health_check`) and `Custom(String)` +- `TaskHandler` trait -- async execution interface receiving `serde_json::Value` config +- `JobStore` -- SQLite-backed persistence tracking `last_run` timestamps and status +- Graceful shutdown via `watch::Receiver` + ## zeph-mcp MCP client for external tool servers (optional, feature-gated). diff --git a/docs/src/feature-flags.md b/docs/src/feature-flags.md index b1c4c735..f5b65dec 100644 --- a/docs/src/feature-flags.md +++ b/docs/src/feature-flags.md @@ -14,6 +14,9 @@ Zeph uses Cargo feature flags to control optional functionality. Default feature | `vault-age` | Enabled | Age-encrypted vault backend for file-based secret storage ([age](https://age-encryption.org/)) | | `index` | Enabled | AST-based code indexing and semantic retrieval via tree-sitter ([guide](guide/code-indexing.md)) | | `otel` | Disabled | OpenTelemetry tracing export via OTLP/gRPC ([guide](guide/observability.md)) | +| `gateway` | Disabled | HTTP gateway for webhook ingestion with bearer auth and rate limiting ([guide](guide/gateway.md)) | +| `daemon` | Disabled | Daemon supervisor with component lifecycle, PID file, and health monitoring ([guide](guide/daemon.md)) | +| `scheduler` | Disabled | Cron-based periodic task scheduler with SQLite persistence ([guide](guide/scheduler.md)) | | `tui` | Disabled | ratatui-based TUI dashboard with real-time agent metrics | | `metal` | Disabled | Metal GPU acceleration for candle on macOS (implies `candle`) | | `cuda` | Disabled | CUDA GPU acceleration for candle on Linux (implies `candle`) | @@ -25,6 +28,7 @@ cargo build --release # all default features cargo build --release --features metal # macOS with Metal GPU cargo build --release --features cuda # Linux with NVIDIA GPU cargo build --release --features tui # with TUI dashboard +cargo build --release --features gateway,daemon,scheduler # with infrastructure components cargo build --release --no-default-features # minimal binary ``` diff --git a/docs/src/guide/daemon.md b/docs/src/guide/daemon.md new file mode 100644 index 00000000..8007955e --- /dev/null +++ b/docs/src/guide/daemon.md @@ -0,0 +1,51 @@ +# Daemon Supervisor + +The daemon supervisor manages component lifecycles within a long-running Zeph process. It monitors registered components (gateway, scheduler, A2A server) for unexpected exits and tracks restart counts. + +## Feature Flag + +Enable with `--features daemon` at build time: + +```bash +cargo build --release --features daemon +``` + +## Configuration + +Add the `[daemon]` section to `config/default.toml`: + +```toml +[daemon] +enabled = true +pid_file = "~/.zeph/zeph.pid" +health_interval_secs = 30 +max_restart_backoff_secs = 60 +``` + +### PID File + +The daemon writes its process ID to `pid_file` on startup. This file is used to detect running instances and to send stop signals. Tilde (`~`) expands to `$HOME`. The parent directory is created automatically if it does not exist. + +## Component Lifecycle + +Each registered component is wrapped in a `ComponentHandle` that tracks: + +- **name** -- human-readable identifier (e.g., `"gateway"`, `"scheduler"`) +- **status** -- `Running`, `Failed(reason)`, or `Stopped` +- **restart_count** -- number of unexpected exits detected + +The supervisor polls all components at `health_interval_secs` intervals. When a running component's task handle reports completion (unexpected exit), the supervisor marks it as `Failed` and increments its restart counter. + +## Shutdown + +The supervisor listens on the global shutdown signal (`watch::Receiver`). When the signal fires, the health loop exits and all component handles are dropped. + +## PID File Utilities + +The `daemon` module provides three standalone functions for PID file management: + +| Function | Description | +|----------|-------------| +| `write_pid_file(path)` | Write current process ID to file | +| `read_pid_file(path)` | Read PID from file | +| `remove_pid_file(path)` | Remove PID file (no-op if missing) | diff --git a/docs/src/guide/gateway.md b/docs/src/guide/gateway.md new file mode 100644 index 00000000..dd70b9f3 --- /dev/null +++ b/docs/src/guide/gateway.md @@ -0,0 +1,77 @@ +# HTTP Gateway + +The HTTP gateway exposes a webhook endpoint for external services to send messages into Zeph. It provides bearer token authentication, per-IP rate limiting, body size limits, and a health check endpoint. + +## Feature Flag + +Enable with `--features gateway` at build time: + +```bash +cargo build --release --features gateway +``` + +## Configuration + +Add the `[gateway]` section to `config/default.toml`: + +```toml +[gateway] +enabled = true +bind = "127.0.0.1" +port = 8090 +# auth_token = "secret" # optional, from vault ZEPH_GATEWAY_TOKEN +rate_limit = 120 # max requests/minute per IP (0 = unlimited) +max_body_size = 1048576 # 1 MB +``` + +Set `bind = "0.0.0.0"` to accept connections from all interfaces. The gateway logs a warning when binding to `0.0.0.0` to prevent accidental exposure. + +### Authentication + +When `auth_token` is set (or resolved from vault via `ZEPH_GATEWAY_TOKEN`), all requests to `/webhook` must include a bearer token: + +``` +Authorization: Bearer +``` + +Token comparison uses constant-time hashing (blake3 + `subtle`) to prevent timing attacks. The `/health` endpoint is always unauthenticated. + +## Endpoints + +### `GET /health` + +Returns the gateway status and uptime. No authentication required. + +```json +{ + "status": "ok", + "uptime_secs": 3600 +} +``` + +### `POST /webhook` + +Accepts a JSON payload and forwards it to the agent loop. + +```json +{ + "channel": "discord", + "sender": "user1", + "body": "hello from webhook" +} +``` + +On success, returns `200` with `{"status": "accepted"}`. Returns `401` if the token is missing or invalid, `429` if rate-limited, and `413` if the body exceeds `max_body_size`. + +## Rate Limiting + +The gateway tracks requests per source IP with a 60-second sliding window. When a client exceeds the configured `rate_limit`, subsequent requests receive `429 Too Many Requests` until the window resets. The rate limiter evicts stale entries when the tracking map exceeds 10,000 IPs. + +## Architecture + +The gateway is built on [axum](https://docs.rs/axum) with `tower-http` middleware: + +- **Auth middleware** -- validates bearer tokens on protected routes +- **Rate limit middleware** -- per-IP counters with automatic eviction +- **Body limit layer** -- `tower_http::limit::RequestBodyLimitLayer` +- **Graceful shutdown** -- listens on the global `watch::Receiver` shutdown signal diff --git a/docs/src/guide/scheduler.md b/docs/src/guide/scheduler.md new file mode 100644 index 00000000..137da261 --- /dev/null +++ b/docs/src/guide/scheduler.md @@ -0,0 +1,85 @@ +# Cron Scheduler + +The scheduler runs periodic tasks on cron schedules with SQLite-backed persistence. It tracks last execution times to avoid duplicate runs and supports built-in and custom task kinds. + +## Feature Flag + +Enable with `--features scheduler` at build time: + +```bash +cargo build --release --features scheduler +``` + +## Configuration + +Define tasks in the `[scheduler]` section of `config/default.toml`: + +```toml +[scheduler] +enabled = true + +[[scheduler.tasks]] +name = "memory_cleanup" +cron = "0 0 0 * * *" # daily at midnight +kind = "memory_cleanup" +config = { max_age_days = 90 } + +[[scheduler.tasks]] +name = "health_check" +cron = "0 */5 * * * *" # every 5 minutes +kind = "health_check" +``` + +### Cron Expression Format + +The scheduler uses 6-field cron expressions (seconds included): + +``` +sec min hour day month weekday + 0 0 0 * * * +``` + +Standard cron features are supported: ranges (`1-5`), lists (`1,3,5`), steps (`*/5`), and wildcards (`*`). + +## Built-in Task Kinds + +| Kind | Description | +|------|-------------| +| `memory_cleanup` | Remove old conversation history entries | +| `skill_refresh` | Re-scan skill directories for changes | +| `health_check` | Internal health verification | + +Custom kinds are also supported. Register a handler implementing the `TaskHandler` trait for any custom `kind` string. + +## TaskHandler Trait + +Implement `TaskHandler` to define custom task logic: + +```rust +pub trait TaskHandler: Send + Sync { + fn execute( + &self, + config: &serde_json::Value, + ) -> Pin> + Send + '_>>; +} +``` + +The `config` parameter receives the `config` value from the task definition in TOML. + +## Persistence + +The scheduler stores job metadata in a `scheduled_jobs` SQLite table: + +| Column | Type | Description | +|--------|------|-------------| +| `name` | TEXT | Unique task identifier | +| `cron_expr` | TEXT | Cron schedule expression | +| `kind` | TEXT | Task kind string | +| `last_run` | TEXT | ISO 8601 timestamp of last execution | +| `status` | TEXT | Current status (`pending`, `completed`) | + +On startup, the scheduler upserts all configured tasks into the table. Each tick (every 60 seconds), it checks whether each task is due based on `last_run` and the cron expression. + +## Shutdown + +The scheduler listens on the global shutdown signal and exits its tick loop gracefully.