diff --git a/CHANGELOG.md b/CHANGELOG.md index 16492c87..74df7d3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] ### Added +- `autosave_assistant` and `autosave_min_length` config fields in `MemoryConfig` — assistant responses skip embedding when disabled (#748) +- `SemanticMemory::save_only()` — persist message to SQLite without generating a vector embedding (#748) +- `ResponseCache` in `zeph-memory` — SQLite-backed LLM response cache with blake3 key hashing and TTL expiry (#750) +- `response_cache_enabled` and `response_cache_ttl_secs` config fields in `LlmConfig` (#750) +- Background `cleanup_expired()` task for response cache (runs every 10 minutes) (#750) +- `ZEPH_MEMORY_AUTOSAVE_ASSISTANT`, `ZEPH_MEMORY_AUTOSAVE_MIN_LENGTH` env overrides (#748) +- `ZEPH_LLM_RESPONSE_CACHE_ENABLED`, `ZEPH_LLM_RESPONSE_CACHE_TTL_SECS` env overrides (#750) +- `MemorySnapshot`, `export_snapshot()`, `import_snapshot()` in `zeph-memory/src/snapshot.rs` (#749) +- `zeph memory export ` and `zeph memory import ` CLI subcommands (#749) +- SQLite migration `012_response_cache.sql` for the response cache table (#750) - Temporal decay scoring in `SemanticMemory::recall()` — time-based score attenuation with configurable half-life (#745) - MMR (Maximal Marginal Relevance) re-ranking in `SemanticMemory::recall()` — post-processing for result diversity (#744) - Compact XML skills prompt format (`format_skills_prompt_compact`) for low-budget contexts (#747) diff --git a/Cargo.toml b/Cargo.toml index 558a00d3..6e94bb21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,7 +138,7 @@ slack = ["zeph-channels/slack"] index = ["dep:zeph-index", "zeph-core/index"] gateway = ["dep:zeph-gateway"] daemon = ["zeph-core/daemon"] -scheduler = ["dep:zeph-scheduler", "dep:serde_json"] +scheduler = ["dep:zeph-scheduler"] otel = ["dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp", "dep:tracing-opentelemetry"] pdf = ["zeph-memory/pdf"] mock = ["zeph-llm/mock", "zeph-memory/mock"] @@ -170,7 +170,7 @@ zeph-gateway = { workspace = true, optional = true } zeph-scheduler = { workspace = true, optional = true } zeph-tui = { workspace = true, optional = true } reqwest = { workspace = true, optional = true, features = ["rustls"] } -serde_json = { workspace = true, optional = true } +serde_json.workspace = true [dev-dependencies] serial_test.workspace = true diff --git a/README.md b/README.md index b73d1185..7c3eebb7 100644 --- a/README.md +++ b/README.md @@ -60,9 +60,9 @@ zeph --tui # run with TUI dashboard | | | |---|---| -| **Hybrid inference** | Ollama, Claude, OpenAI, Candle (GGUF), any OpenAI-compatible API. Multi-model orchestrator with fallback chains | +| **Hybrid inference** | Ollama, Claude, OpenAI, Candle (GGUF), any OpenAI-compatible API. Multi-model orchestrator with fallback chains. Response cache with blake3 hashing and TTL | | **Skills-first architecture** | YAML+Markdown skill files with semantic matching, self-learning evolution, 4-tier trust model, and compact prompt mode for small-context models | -| **Semantic memory** | SQLite + Qdrant (or embedded SQLite vector search) with MMR re-ranking, temporal decay scoring, adaptive chunked compaction, credential scrubbing, cross-session recall, and vector retrieval | +| **Semantic memory** | SQLite + Qdrant (or embedded SQLite vector search) with MMR re-ranking, temporal decay scoring, adaptive chunked compaction, credential scrubbing, cross-session recall, vector retrieval, autosave assistant responses, and snapshot export/import | | **Multi-channel I/O** | CLI, Telegram, Discord, Slack, TUI — all with streaming. Vision and speech-to-text input | | **Protocols** | MCP client (stdio + HTTP), A2A agent-to-agent communication, sub-agent orchestration | | **Defense-in-depth** | Shell sandbox, tool permissions, secret redaction, SSRF protection, skill trust quarantine, audit logging | diff --git a/config/default.toml b/config/default.toml index a9a63769..b341b2df 100644 --- a/config/default.toml +++ b/config/default.toml @@ -33,6 +33,10 @@ max_tokens = 4096 # embedding_model = "text-embedding-3-small" # reasoning_effort = "medium" # low, medium, high (for reasoning models) +# LLM response cache (SQLite-backed, blake3 key hashing) +# response_cache_enabled = false +# response_cache_ttl_secs = 3600 + # Speech-to-text provider (Whisper API) # [llm.stt] # provider = "whisper" @@ -140,6 +144,8 @@ max_tokens = 4096 paths = ["./skills"] # Maximum number of skills to inject into context per query (embedding-based selection) max_active_skills = 5 +# Prompt mode: "full" (inject full SKILL.md), "compact" (name+description only), "auto" (compact if budget < 8192) +# prompt_mode = "auto" # Minimum score delta for skill disambiguation (0.0-1.0) # disambiguation_threshold = 0.05 @@ -190,6 +196,16 @@ compaction_preserve_tail = 6 prune_protect_tokens = 40000 # Minimum relevance score for cross-session memory results (0.0-1.0) cross_session_score_threshold = 0.35 +# Vector backend: "qdrant" (external) or "sqlite" (embedded, zero-dependency) +# vector_backend = "qdrant" +# Token safety margin multiplier for compaction budget (must be > 0) +# token_safety_margin = 1.0 +# Redact credentials from LLM context before sending +# redact_credentials = true +# Auto-save assistant responses to semantic memory +# autosave_assistant = false +# Minimum character length for autosave (shorter responses skip embedding) +# autosave_min_length = 20 [memory.semantic] # Enable semantic memory with vector search @@ -199,6 +215,12 @@ recall_limit = 5 # Hybrid search weights (vector + FTS5 keyword). Must sum to 1.0. vector_weight = 0.7 keyword_weight = 0.3 +# Temporal decay: penalize older memories by age +# temporal_decay_enabled = false +# temporal_decay_half_life_days = 30 +# MMR re-ranking: diversify recall results +# mmr_enabled = false +# mmr_lambda = 0.7 # Code RAG: AST-based code indexing and hybrid retrieval # Requires Qdrant and tree-sitter grammars (feature "index", not enabled by default) diff --git a/crates/zeph-core/README.md b/crates/zeph-core/README.md index 32b3e500..659de82d 100644 --- a/crates/zeph-core/README.md +++ b/crates/zeph-core/README.md @@ -55,6 +55,8 @@ Key `MemoryConfig` fields (TOML section `[memory]`): | `vector_backend` | `"qdrant"` / `"sqlite"` | `"qdrant"` | Vector search backend | | `token_safety_margin` | f32 | `1.0` | Safety multiplier for token budget estimation (validated: must be >= 1.0) | | `redact_credentials` | bool | `true` | Scrub secrets and paths before LLM context injection | +| `autosave_assistant` | bool | `false` | Persist assistant responses to semantic memory automatically | +| `autosave_min_length` | usize | `20` | Minimum response length (chars) to trigger autosave | ```toml [agent] diff --git a/crates/zeph-core/src/agent/builder.rs b/crates/zeph-core/src/agent/builder.rs index 0abeb9b4..224aec20 100644 --- a/crates/zeph-core/src/agent/builder.rs +++ b/crates/zeph-core/src/agent/builder.rs @@ -16,6 +16,22 @@ use zeph_memory::semantic::SemanticMemory; use zeph_skills::watcher::SkillEvent; impl Agent { + #[must_use] + pub fn with_autosave_config(mut self, autosave_assistant: bool, min_length: usize) -> Self { + self.memory_state.autosave_assistant = autosave_assistant; + self.memory_state.autosave_min_length = min_length; + self + } + + #[must_use] + pub fn with_response_cache( + mut self, + cache: std::sync::Arc, + ) -> Self { + self.response_cache = Some(cache); + self + } + #[must_use] pub fn with_stt(mut self, stt: Box) -> Self { self.stt = Some(stt); diff --git a/crates/zeph-core/src/agent/mod.rs b/crates/zeph-core/src/agent/mod.rs index e3c40547..6f240d14 100644 --- a/crates/zeph-core/src/agent/mod.rs +++ b/crates/zeph-core/src/agent/mod.rs @@ -75,6 +75,8 @@ pub(super) struct MemoryState { pub(super) recall_limit: usize, pub(super) summarization_threshold: usize, pub(super) cross_session_score_threshold: f32, + pub(super) autosave_assistant: bool, + pub(super) autosave_min_length: usize, } pub(super) struct SkillState { @@ -158,6 +160,7 @@ pub struct Agent { update_notify_rx: Option>, #[allow(dead_code)] pub(crate) subagent_manager: Option, + pub(super) response_cache: Option>, } impl Agent { @@ -199,6 +202,8 @@ impl Agent { recall_limit: 5, summarization_threshold: 50, cross_session_score_threshold: 0.35, + autosave_assistant: false, + autosave_min_length: 20, }, skill_state: SkillState { registry, @@ -262,6 +267,7 @@ impl Agent { stt: None, update_notify_rx: None, subagent_manager: None, + response_cache: None, } } @@ -1050,6 +1056,10 @@ pub(super) mod agent_tests { pub(crate) fn sent_messages(&self) -> Vec { self.sent.lock().unwrap().clone() } + + pub(crate) fn sent_chunks(&self) -> Vec { + self.chunks.lock().unwrap().clone() + } } impl Channel for MockChannel { diff --git a/crates/zeph-core/src/agent/persistence.rs b/crates/zeph-core/src/agent/persistence.rs index 08a84373..f1c1d556 100644 --- a/crates/zeph-core/src/agent/persistence.rs +++ b/crates/zeph-core/src/agent/persistence.rs @@ -66,16 +66,38 @@ impl Agent { .and_then(|m| serde_json::to_string(&m.parts).ok()) .unwrap_or_else(|| "[]".to_string()); + let should_embed = match role { + Role::Assistant => { + self.memory_state.autosave_assistant + && content.len() >= self.memory_state.autosave_min_length + } + _ => true, + }; + let _ = self.channel.send_status("saving...").await; - let (_message_id, embedding_stored) = match memory - .remember_with_parts(cid, role_str(role), content, &parts_json) - .await - { - Ok(result) => result, - Err(e) => { - let _ = self.channel.send_status("").await; - tracing::error!("failed to persist message: {e:#}"); - return; + let embedding_stored = if should_embed { + match memory + .remember_with_parts(cid, role_str(role), content, &parts_json) + .await + { + Ok((_message_id, stored)) => stored, + Err(e) => { + let _ = self.channel.send_status("").await; + tracing::error!("failed to persist message: {e:#}"); + return; + } + } + } else { + match memory + .save_only(cid, role_str(role), content, &parts_json) + .await + { + Ok(_) => false, + Err(e) => { + let _ = self.channel.send_status("").await; + tracing::error!("failed to persist message: {e:#}"); + return; + } } }; let _ = self.channel.send_status("").await; @@ -138,7 +160,7 @@ impl Agent { #[cfg(test)] mod tests { use super::super::agent_tests::{ - MockChannel, MockToolExecutor, create_test_registry, mock_provider, + MetricsSnapshot, MockChannel, MockToolExecutor, create_test_registry, mock_provider, }; use super::*; use zeph_llm::any::AnyProvider; @@ -261,4 +283,175 @@ mod tests { // Must not panic and must complete agent.persist_message(Role::User, "hello").await; } + + #[tokio::test] + async fn persist_message_assistant_autosave_false_uses_save_only() { + let provider = mock_provider(vec![]); + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = MockToolExecutor::no_tools(); + + let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default()); + let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await; + let cid = memory.sqlite().create_conversation().await.unwrap(); + + let mut agent = Agent::new(provider, channel, registry, None, 5, executor) + .with_metrics(tx) + .with_memory(memory, cid, 50, 5, 100) + .with_autosave_config(false, 20); + + agent + .persist_message(Role::Assistant, "short assistant reply") + .await; + + let history = agent + .memory_state + .memory + .as_ref() + .unwrap() + .sqlite() + .load_history(cid, 50) + .await + .unwrap(); + assert_eq!(history.len(), 1, "message must be saved"); + assert_eq!(history[0].content, "short assistant reply"); + // embeddings_generated must remain 0 — save_only path does not embed + assert_eq!(rx.borrow().embeddings_generated, 0); + } + + #[tokio::test] + async fn persist_message_assistant_below_min_length_uses_save_only() { + let provider = mock_provider(vec![]); + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = MockToolExecutor::no_tools(); + + let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default()); + let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await; + let cid = memory.sqlite().create_conversation().await.unwrap(); + + // autosave_assistant=true but min_length=1000 — short content falls back to save_only + let mut agent = Agent::new(provider, channel, registry, None, 5, executor) + .with_metrics(tx) + .with_memory(memory, cid, 50, 5, 100) + .with_autosave_config(true, 1000); + + agent.persist_message(Role::Assistant, "too short").await; + + let history = agent + .memory_state + .memory + .as_ref() + .unwrap() + .sqlite() + .load_history(cid, 50) + .await + .unwrap(); + assert_eq!(history.len(), 1, "message must be saved"); + assert_eq!(history[0].content, "too short"); + assert_eq!(rx.borrow().embeddings_generated, 0); + } + + #[tokio::test] + async fn persist_message_assistant_at_min_length_boundary_uses_embed() { + // content.len() == autosave_min_length → should_embed = true (>= boundary). + let provider = mock_provider(vec![]); + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = MockToolExecutor::no_tools(); + + let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default()); + let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await; + let cid = memory.sqlite().create_conversation().await.unwrap(); + + let min_length = 10usize; + let mut agent = Agent::new(provider, channel, registry, None, 5, executor) + .with_metrics(tx) + .with_memory(memory, cid, 50, 5, 100) + .with_autosave_config(true, min_length); + + // Exact boundary: len == min_length → embed path. + let content_at_boundary = "A".repeat(min_length); + assert_eq!(content_at_boundary.len(), min_length); + agent + .persist_message(Role::Assistant, &content_at_boundary) + .await; + + // sqlite_message_count must be incremented regardless of embedding success. + assert_eq!(rx.borrow().sqlite_message_count, 1); + } + + #[tokio::test] + async fn persist_message_assistant_one_below_min_length_uses_save_only() { + // content.len() == autosave_min_length - 1 → should_embed = false (below boundary). + let provider = mock_provider(vec![]); + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = MockToolExecutor::no_tools(); + + let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default()); + let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await; + let cid = memory.sqlite().create_conversation().await.unwrap(); + + let min_length = 10usize; + let mut agent = Agent::new(provider, channel, registry, None, 5, executor) + .with_metrics(tx) + .with_memory(memory, cid, 50, 5, 100) + .with_autosave_config(true, min_length); + + // One below boundary: len == min_length - 1 → save_only path, no embedding. + let content_below_boundary = "A".repeat(min_length - 1); + assert_eq!(content_below_boundary.len(), min_length - 1); + agent + .persist_message(Role::Assistant, &content_below_boundary) + .await; + + let history = agent + .memory_state + .memory + .as_ref() + .unwrap() + .sqlite() + .load_history(cid, 50) + .await + .unwrap(); + assert_eq!(history.len(), 1, "message must still be saved"); + // save_only path does not embed. + assert_eq!(rx.borrow().embeddings_generated, 0); + } + + #[tokio::test] + async fn persist_message_user_always_embeds_regardless_of_autosave_flag() { + let provider = mock_provider(vec![]); + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = MockToolExecutor::no_tools(); + + let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default()); + let memory = test_memory(&AnyProvider::Mock(zeph_llm::mock::MockProvider::default())).await; + let cid = memory.sqlite().create_conversation().await.unwrap(); + + // autosave_assistant=false — but User role always takes embedding path + let mut agent = Agent::new(provider, channel, registry, None, 5, executor) + .with_metrics(tx) + .with_memory(memory, cid, 50, 5, 100) + .with_autosave_config(false, 20); + + let long_user_msg = "A".repeat(100); + agent.persist_message(Role::User, &long_user_msg).await; + + let history = agent + .memory_state + .memory + .as_ref() + .unwrap() + .sqlite() + .load_history(cid, 50) + .await + .unwrap(); + assert_eq!(history.len(), 1, "user message must be saved"); + // User messages go through remember_with_parts (embedding path). + // sqlite_message_count must increment regardless of Qdrant availability. + assert_eq!(rx.borrow().sqlite_message_count, 1); + } } diff --git a/crates/zeph-core/src/agent/tool_execution.rs b/crates/zeph-core/src/agent/tool_execution.rs index 6e0ce568..6805c06e 100644 --- a/crates/zeph-core/src/agent/tool_execution.rs +++ b/crates/zeph-core/src/agent/tool_execution.rs @@ -198,6 +198,10 @@ impl Agent { return Ok(None); } + if let Some(resp) = self.check_response_cache().await? { + return Ok(Some(resp)); + } + let llm_timeout = std::time::Duration::from_secs(self.runtime.timeouts.llm_seconds); let start = std::time::Instant::now(); let prompt_estimate = self.cached_prompt_tokens; @@ -230,9 +234,8 @@ impl Agent { self.record_cache_usage(); self.record_cost(prompt_estimate, completion_estimate_for_cost); let raw = r?; - // Redact secrets from the full response before it is persisted to history. - // Streaming chunks were already sent to the channel without per-chunk redaction - // (acceptable trade-off: ephemeral display vs allocation per chunk). + // Redact secrets from the full accumulated response before it is persisted to + // history. Per-chunk redaction is applied during streaming (see send_chunk above). let redacted = self.maybe_redact(&raw).into_owned(); Ok(Some(redacted)) } else { @@ -269,6 +272,7 @@ impl Agent { self.record_cost(prompt_estimate, completion_estimate); let display = self.maybe_redact(&resp); self.channel.send(&display).await?; + self.store_response_in_cache(&resp).await; Ok(Some(resp)) } Ok(Err(e)) => Err(e.into()), @@ -510,7 +514,8 @@ impl Agent { }; let chunk: String = chunk_result?; response.push_str(&chunk); - self.channel.send_chunk(&chunk).await?; + let display_chunk = self.maybe_redact(&chunk); + self.channel.send_chunk(&display_chunk).await?; } self.channel.flush_chunks().await?; @@ -537,6 +542,32 @@ impl Agent { } } + async fn check_response_cache(&mut self) -> Result, super::error::AgentError> { + if let Some(ref cache) = self.response_cache + && !self.provider.supports_streaming() + { + let key = + zeph_memory::ResponseCache::compute_key(&self.messages, &self.runtime.model_name); + if let Ok(Some(cached)) = cache.get(&key).await { + tracing::debug!("response cache hit"); + let display = self.maybe_redact(&cached); + self.channel.send(&display).await?; + return Ok(Some(cached)); + } + } + Ok(None) + } + + async fn store_response_in_cache(&self, response: &str) { + if let Some(ref cache) = self.response_cache { + let key = + zeph_memory::ResponseCache::compute_key(&self.messages, &self.runtime.model_name); + if let Err(e) = cache.put(&key, response, &self.runtime.model_name).await { + tracing::warn!("failed to store response in cache: {e:#}"); + } + } + } + async fn process_response_native_tools(&mut self) -> Result<(), super::error::AgentError> { self.doom_loop_history.clear(); @@ -1629,4 +1660,159 @@ mod tests { assert!(calls[0].is_some(), "first call must set env"); assert!(calls[1].is_none(), "second call must clear env"); } + + #[tokio::test] + async fn streaming_chunk_with_secret_is_redacted_before_channel_send() { + use super::super::agent_tests::*; + use zeph_llm::provider::{Message, Role}; + + // Streaming provider returns a chunk containing an AWS-style access key. + let secret_chunk = "AKIA1234567890ABCDEF".to_string(); + let provider = mock_provider_streaming(vec![secret_chunk.clone()]); + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = MockToolExecutor::no_tools(); + let mut agent = super::super::Agent::new(provider, channel, registry, None, 5, executor); + agent.runtime.security.redact_secrets = true; + + agent.messages.push(Message { + role: Role::User, + content: "tell me a secret".into(), + parts: vec![], + }); + + let _ = agent.process_response_streaming().await.unwrap(); + + // The raw secret must not appear in any chunk sent to the channel. + let chunks = agent.channel.sent_chunks(); + assert!(!chunks.is_empty(), "at least one chunk must have been sent"); + for chunk in &chunks { + assert!( + !chunk.contains(&secret_chunk), + "raw secret must not appear in sent chunk: {chunk:?}" + ); + } + } + + #[test] + fn check_response_cache_bypassed_when_streaming() { + // Verifies that the streaming provider flag correctly identifies the bypass condition. + // The cache check guard is `!self.provider.supports_streaming()`, so a streaming + // provider must return true from supports_streaming() and a non-streaming one must not. + use super::super::agent_tests::*; + use zeph_llm::LlmProvider; + + let streaming_provider = mock_provider_streaming(vec!["hello".into()]); + let non_streaming_provider = mock_provider(vec!["hello".into()]); + + assert!( + streaming_provider.supports_streaming(), + "streaming mock must report supports_streaming=true" + ); + assert!( + !non_streaming_provider.supports_streaming(), + "non-streaming mock must report supports_streaming=false" + ); + } + + #[tokio::test] + async fn call_llm_returns_cached_response_without_provider_call() { + use super::super::agent_tests::*; + use std::sync::Arc; + use zeph_llm::provider::{Message, Role}; + use zeph_memory::{ResponseCache, sqlite::SqliteStore}; + + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = MockToolExecutor::no_tools(); + // Non-streaming provider — cache path is active for non-streaming. + let provider = mock_provider(vec!["uncached response".into()]); + let mut agent = super::super::Agent::new(provider, channel, registry, None, 5, executor); + + // Set up a response cache with a pre-populated entry. + let store = SqliteStore::new(":memory:").await.unwrap(); + let cache = Arc::new(ResponseCache::new(store.pool().clone(), 3600)); + + // Build the key for the current agent messages. + let key = ResponseCache::compute_key(&agent.messages, &agent.runtime.model_name); + cache + .put(&key, "cached response", "test-model") + .await + .unwrap(); + + agent.response_cache = Some(cache); + + // push a user message so the conversation is non-empty + agent.messages.push(Message { + role: Role::User, + content: "what is 2+2?".into(), + parts: vec![], + }); + + // Recompute key after adding user message + let key2 = ResponseCache::compute_key(&agent.messages, &agent.runtime.model_name); + if let Some(ref c) = agent.response_cache { + c.put(&key2, "cached response", "test-model").await.unwrap(); + } + + let result = agent.call_llm_with_timeout().await.unwrap(); + assert_eq!(result.as_deref(), Some("cached response")); + // Channel should have received the cached response + assert!( + agent + .channel + .sent_messages() + .iter() + .any(|s| s == "cached response") + ); + } + + #[tokio::test] + async fn store_response_in_cache_enables_second_call_to_return_cached() { + use super::super::agent_tests::*; + use std::sync::Arc; + use zeph_llm::provider::{Message, Role}; + use zeph_memory::{ResponseCache, sqlite::SqliteStore}; + + // Provider has one response; the second call must come from cache. + let provider = mock_provider(vec!["provider response".into()]); + let channel = MockChannel::new(vec![]); + let registry = create_test_registry(); + let executor = MockToolExecutor::no_tools(); + let mut agent = super::super::Agent::new(provider, channel, registry, None, 5, executor); + + let store = SqliteStore::new(":memory:").await.unwrap(); + let cache = Arc::new(ResponseCache::new(store.pool().clone(), 3600)); + agent.response_cache = Some(cache); + + agent.messages.push(Message { + role: Role::User, + content: "what is 3+3?".into(), + parts: vec![], + }); + + // First call — hits provider, stores response in cache. + let first = agent.call_llm_with_timeout().await.unwrap(); + assert_eq!(first.as_deref(), Some("provider response")); + + // Second call with the same messages — must return cached value. + let second = agent.call_llm_with_timeout().await.unwrap(); + assert_eq!( + second.as_deref(), + Some("provider response"), + "second call must return cached response" + ); + + // Channel must have received both responses. + let sent = agent.channel.sent_messages(); + let matching: Vec<_> = sent + .iter() + .filter(|s| s.as_str() == "provider response") + .collect(); + assert_eq!( + matching.len(), + 2, + "both calls must have sent the response to the channel" + ); + } } diff --git a/crates/zeph-core/src/config/env.rs b/crates/zeph-core/src/config/env.rs index 2565acd9..dc9e77b6 100644 --- a/crates/zeph-core/src/config/env.rs +++ b/crates/zeph-core/src/config/env.rs @@ -243,6 +243,26 @@ impl Config { { self.a2a.rate_limit = rate; } + if let Ok(v) = std::env::var("ZEPH_MEMORY_AUTOSAVE_ASSISTANT") + && let Ok(enabled) = v.parse::() + { + self.memory.autosave_assistant = enabled; + } + if let Ok(v) = std::env::var("ZEPH_MEMORY_AUTOSAVE_MIN_LENGTH") + && let Ok(len) = v.parse::() + { + self.memory.autosave_min_length = len; + } + if let Ok(v) = std::env::var("ZEPH_LLM_RESPONSE_CACHE_ENABLED") + && let Ok(enabled) = v.parse::() + { + self.llm.response_cache_enabled = enabled; + } + if let Ok(v) = std::env::var("ZEPH_LLM_RESPONSE_CACHE_TTL_SECS") + && let Ok(secs) = v.parse::() + { + self.llm.response_cache_ttl_secs = secs; + } } fn apply_env_overrides_security(&mut self) { diff --git a/crates/zeph-core/src/config/snapshots/zeph_core__config__types__tests__config_default_snapshot.snap b/crates/zeph-core/src/config/snapshots/zeph_core__config__types__tests__config_default_snapshot.snap index 84a15c39..0f324987 100644 --- a/crates/zeph-core/src/config/snapshots/zeph_core__config__types__tests__config_default_snapshot.snap +++ b/crates/zeph-core/src/config/snapshots/zeph_core__config__types__tests__config_default_snapshot.snap @@ -1,6 +1,5 @@ --- source: crates/zeph-core/src/config/types.rs -assertion_line: 1156 expression: toml_str --- [agent] @@ -13,6 +12,8 @@ provider = "ollama" base_url = "http://localhost:11434" model = "mistral:7b" embedding_model = "qwen3-embedding" +response_cache_enabled = false +response_cache_ttl_secs = 3600 [skills] paths = ["./skills"] @@ -49,6 +50,8 @@ cross_session_score_threshold = 0.35 vector_backend = "qdrant" token_safety_margin = 1.0 redact_credentials = true +autosave_assistant = false +autosave_min_length = 20 [memory.semantic] enabled = true diff --git a/crates/zeph-core/src/config/types.rs b/crates/zeph-core/src/config/types.rs index 44ae0f48..8fd4ea53 100644 --- a/crates/zeph-core/src/config/types.rs +++ b/crates/zeph-core/src/config/types.rs @@ -123,6 +123,14 @@ pub struct LlmConfig { pub stt: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub vision_model: Option, + #[serde(default)] + pub response_cache_enabled: bool, + #[serde(default = "default_response_cache_ttl_secs")] + pub response_cache_ttl_secs: u64, +} + +fn default_response_cache_ttl_secs() -> u64 { + 3600 } fn default_embedding_model() -> String { @@ -468,6 +476,14 @@ pub struct MemoryConfig { pub token_safety_margin: f32, #[serde(default = "default_redact_credentials")] pub redact_credentials: bool, + #[serde(default)] + pub autosave_assistant: bool, + #[serde(default = "default_autosave_min_length")] + pub autosave_min_length: usize, +} + +fn default_autosave_min_length() -> usize { + 20 } fn default_token_safety_margin() -> f32 { @@ -1085,6 +1101,8 @@ impl Default for Config { router: None, stt: None, vision_model: None, + response_cache_enabled: false, + response_cache_ttl_secs: default_response_cache_ttl_secs(), }, skills: SkillsConfig { paths: vec!["./skills".into()], @@ -1109,6 +1127,8 @@ impl Default for Config { vector_backend: VectorBackend::default(), token_safety_margin: default_token_safety_margin(), redact_credentials: default_redact_credentials(), + autosave_assistant: false, + autosave_min_length: default_autosave_min_length(), }, telegram: None, discord: None, diff --git a/crates/zeph-llm/README.md b/crates/zeph-llm/README.md index 031cd242..c514cb66 100644 --- a/crates/zeph-llm/README.md +++ b/crates/zeph-llm/README.md @@ -9,7 +9,7 @@ LLM provider abstraction with Ollama, Claude, OpenAI, and Candle backends. ## Overview -Defines the `LlmProvider` trait and ships concrete backends for Ollama, Claude, OpenAI, and OpenAI-compatible endpoints. Includes an orchestrator for multi-model coordination, a router for model selection, and an optional Candle backend for local inference. +Defines the `LlmProvider` trait and ships concrete backends for Ollama, Claude, OpenAI, and OpenAI-compatible endpoints. Includes an orchestrator for multi-model coordination, a router for model selection, an optional Candle backend for local inference, and an SQLite-backed response cache with blake3 key hashing and TTL expiry. ## Key modules diff --git a/crates/zeph-memory/README.md b/crates/zeph-memory/README.md index 002bb344..a7e06bf1 100644 --- a/crates/zeph-memory/README.md +++ b/crates/zeph-memory/README.md @@ -30,12 +30,32 @@ Includes a document ingestion subsystem for loading, chunking, and storing user | `document::pipeline` | `IngestionPipeline` — load, split, embed, store via Qdrant | | `vector_store` | `VectorStore` trait and `VectorPoint` types | | `sqlite_vector` | `SqliteVectorStore` — embedded SQLite-backed vector search as zero-dependency Qdrant alternative | +| `snapshot` | `MemorySnapshot`, `export_snapshot()`, `import_snapshot()` — portable memory export/import | +| `response_cache` | `ResponseCache` — SQLite-backed LLM response cache with blake3 key hashing and TTL expiry | | `embedding_store` | `EmbeddingStore` — high-level embedding CRUD | | `embeddable` | `Embeddable` trait and `EmbeddingRegistry` — generic Qdrant sync/search for any embeddable type | | `types` | `ConversationId`, `MessageId`, shared types | | `error` | `MemoryError` — unified error type | -**Re-exports:** `MemoryError`, `QdrantOps`, `ConversationId`, `MessageId`, `Document`, `DocumentLoader`, `TextLoader`, `TextSplitter`, `IngestionPipeline`, `Chunk`, `SplitterConfig`, `DocumentError`, `DocumentMetadata`, `PdfLoader` (behind `pdf` feature), `Embeddable`, `EmbeddingRegistry` +**Re-exports:** `MemoryError`, `QdrantOps`, `ConversationId`, `MessageId`, `Document`, `DocumentLoader`, `TextLoader`, `TextSplitter`, `IngestionPipeline`, `Chunk`, `SplitterConfig`, `DocumentError`, `DocumentMetadata`, `PdfLoader` (behind `pdf` feature), `Embeddable`, `EmbeddingRegistry`, `ResponseCache`, `MemorySnapshot` + +## Snapshot export/import + +Memory snapshots allow exporting all conversations and messages to a portable JSON file and importing them back into another instance. + +```bash +zeph memory export backup.json +zeph memory import backup.json +``` + +## Response cache + +`ResponseCache` deduplicates LLM calls by caching responses in SQLite. Cache keys are computed via blake3 hashing of the prompt content. Entries expire after a configurable TTL (default: 1 hour). A background task runs every 10 minutes to clean up expired entries. + +| Config field | Type | Default | Env override | +|-------------|------|---------|--------------| +| `response_cache_enabled` | bool | `false` | `ZEPH_LLM_RESPONSE_CACHE_ENABLED` | +| `response_cache_ttl_secs` | u64 | `3600` | `ZEPH_LLM_RESPONSE_CACHE_TTL_SECS` | ## Ranking options diff --git a/crates/zeph-memory/migrations/012_response_cache.sql b/crates/zeph-memory/migrations/012_response_cache.sql new file mode 100644 index 00000000..44e1fc26 --- /dev/null +++ b/crates/zeph-memory/migrations/012_response_cache.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS response_cache ( + cache_key TEXT PRIMARY KEY, + response TEXT NOT NULL, + model TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (unixepoch()), + expires_at INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_response_cache_expires ON response_cache(expires_at); diff --git a/crates/zeph-memory/src/error.rs b/crates/zeph-memory/src/error.rs index 62495dc7..2d93c629 100644 --- a/crates/zeph-memory/src/error.rs +++ b/crates/zeph-memory/src/error.rs @@ -21,6 +21,9 @@ pub enum MemoryError { #[error("integer conversion: {0}")] IntConversion(#[from] std::num::TryFromIntError), + #[error("snapshot error: {0}")] + Snapshot(String), + #[error("{0}")] Other(String), } diff --git a/crates/zeph-memory/src/lib.rs b/crates/zeph-memory/src/lib.rs index 9bca80bb..1e5196fb 100644 --- a/crates/zeph-memory/src/lib.rs +++ b/crates/zeph-memory/src/lib.rs @@ -7,7 +7,9 @@ pub mod error; #[cfg(feature = "mock")] pub mod in_memory_store; pub mod qdrant_ops; +pub mod response_cache; pub mod semantic; +pub mod snapshot; pub mod sqlite; pub mod sqlite_vector_store; pub mod types; @@ -25,7 +27,9 @@ pub use embedding_registry::{ pub use embedding_store::ensure_qdrant_collection; pub use error::MemoryError; pub use qdrant_ops::QdrantOps; +pub use response_cache::ResponseCache; pub use semantic::estimate_tokens; +pub use snapshot::{ImportStats, MemorySnapshot, export_snapshot, import_snapshot}; pub use types::{ConversationId, MessageId}; pub use vector_store::{ FieldCondition, FieldValue, ScoredVectorPoint, VectorFilter, VectorPoint, VectorStore, diff --git a/crates/zeph-memory/src/response_cache.rs b/crates/zeph-memory/src/response_cache.rs new file mode 100644 index 00000000..054dfa34 --- /dev/null +++ b/crates/zeph-memory/src/response_cache.rs @@ -0,0 +1,264 @@ +use sqlx::SqlitePool; +use zeph_llm::provider::{Message, Role}; + +use crate::error::MemoryError; + +fn role_to_str(role: Role) -> &'static str { + match role { + Role::System => "system", + Role::User => "user", + Role::Assistant => "assistant", + } +} + +pub struct ResponseCache { + pool: SqlitePool, + ttl_secs: u64, +} + +impl ResponseCache { + #[must_use] + pub fn new(pool: SqlitePool, ttl_secs: u64) -> Self { + Self { pool, ttl_secs } + } + + /// Look up a cached response by key. Returns `None` if not found or expired. + /// + /// # Errors + /// + /// Returns an error if the database query fails. + pub async fn get(&self, key: &str) -> Result, MemoryError> { + let now = unix_now(); + let row: Option<(String,)> = sqlx::query_as( + "SELECT response FROM response_cache WHERE cache_key = ? AND expires_at > ?", + ) + .bind(key) + .bind(now) + .fetch_optional(&self.pool) + .await?; + Ok(row.map(|(r,)| r)) + } + + /// Store a response in the cache with TTL. + /// + /// # Errors + /// + /// Returns an error if the database insert fails. + pub async fn put(&self, key: &str, response: &str, model: &str) -> Result<(), MemoryError> { + let now = unix_now(); + // Cap TTL at 1 year (31_536_000 s) to prevent i64 overflow for extreme values. + let expires_at = now.saturating_add(self.ttl_secs.min(31_536_000).cast_signed()); + sqlx::query( + "INSERT OR REPLACE INTO response_cache (cache_key, response, model, created_at, expires_at) \ + VALUES (?, ?, ?, ?, ?)", + ) + .bind(key) + .bind(response) + .bind(model) + .bind(now) + .bind(expires_at) + .execute(&self.pool) + .await?; + Ok(()) + } + + /// Delete expired cache entries. Returns the number of rows deleted. + /// + /// # Errors + /// + /// Returns an error if the database delete fails. + pub async fn cleanup_expired(&self) -> Result { + let now = unix_now(); + let result = sqlx::query("DELETE FROM response_cache WHERE expires_at <= ?") + .bind(now) + .execute(&self.pool) + .await?; + Ok(result.rows_affected()) + } + + /// Compute a deterministic cache key from messages and model name using blake3. + #[must_use] + pub fn compute_key(messages: &[Message], model: &str) -> String { + let mut hasher = blake3::Hasher::new(); + for msg in messages { + let role = role_to_str(msg.role).as_bytes(); + hasher.update(&(role.len() as u64).to_le_bytes()); + hasher.update(role); + let content = msg.content.as_bytes(); + hasher.update(&(content.len() as u64).to_le_bytes()); + hasher.update(content); + } + let model_bytes = model.as_bytes(); + hasher.update(&(model_bytes.len() as u64).to_le_bytes()); + hasher.update(model_bytes); + hasher.finalize().to_hex().to_string() + } +} + +fn unix_now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + .cast_signed() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::sqlite::SqliteStore; + + async fn test_cache() -> ResponseCache { + let store = SqliteStore::new(":memory:").await.unwrap(); + ResponseCache::new(store.pool().clone(), 3600) + } + + #[tokio::test] + async fn cache_miss_returns_none() { + let cache = test_cache().await; + let result = cache.get("nonexistent").await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn cache_put_and_get_roundtrip() { + let cache = test_cache().await; + cache.put("key1", "response text", "gpt-4").await.unwrap(); + let result = cache.get("key1").await.unwrap(); + assert_eq!(result.as_deref(), Some("response text")); + } + + #[tokio::test] + async fn cache_expired_entry_returns_none() { + let store = SqliteStore::new(":memory:").await.unwrap(); + let cache = ResponseCache::new(store.pool().clone(), 0); + // ttl=0 means expires_at == now, which fails the > check + cache.put("key1", "response", "model").await.unwrap(); + // Immediately expired (expires_at = now + 0 = now, query checks > now) + let result = cache.get("key1").await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn cleanup_expired_removes_entries() { + let store = SqliteStore::new(":memory:").await.unwrap(); + let cache = ResponseCache::new(store.pool().clone(), 0); + cache.put("key1", "response", "model").await.unwrap(); + let deleted = cache.cleanup_expired().await.unwrap(); + assert!(deleted > 0); + } + + #[tokio::test] + async fn cleanup_does_not_remove_valid_entries() { + let cache = test_cache().await; + cache.put("key1", "response", "model").await.unwrap(); + let deleted = cache.cleanup_expired().await.unwrap(); + assert_eq!(deleted, 0); + let result = cache.get("key1").await.unwrap(); + assert!(result.is_some()); + } + + #[test] + fn compute_key_deterministic() { + let msgs = vec![Message { + role: Role::User, + content: "hello".into(), + parts: vec![], + }]; + let k1 = ResponseCache::compute_key(&msgs, "gpt-4"); + let k2 = ResponseCache::compute_key(&msgs, "gpt-4"); + assert_eq!(k1, k2); + } + + #[test] + fn compute_key_different_for_different_content() { + let msgs1 = vec![Message { + role: Role::User, + content: "hello".into(), + parts: vec![], + }]; + let msgs2 = vec![Message { + role: Role::User, + content: "world".into(), + parts: vec![], + }]; + assert_ne!( + ResponseCache::compute_key(&msgs1, "gpt-4"), + ResponseCache::compute_key(&msgs2, "gpt-4") + ); + } + + #[test] + fn compute_key_different_for_different_model() { + let msgs = vec![Message { + role: Role::User, + content: "hello".into(), + parts: vec![], + }]; + assert_ne!( + ResponseCache::compute_key(&msgs, "gpt-4"), + ResponseCache::compute_key(&msgs, "gpt-3.5") + ); + } + + #[test] + fn compute_key_empty_messages() { + let k = ResponseCache::compute_key(&[], "model"); + assert!(!k.is_empty()); + } + + #[test] + fn compute_key_no_length_prefix_ambiguity() { + // Without length-prefix, "ab"+"c" and "a"+"bc" would hash identically. + // With proper length-prefixing they must differ. + let msgs1 = vec![ + Message { + role: Role::User, + content: "ab".into(), + parts: vec![], + }, + Message { + role: Role::User, + content: "c".into(), + parts: vec![], + }, + ]; + let msgs2 = vec![ + Message { + role: Role::User, + content: "a".into(), + parts: vec![], + }, + Message { + role: Role::User, + content: "bc".into(), + parts: vec![], + }, + ]; + assert_ne!( + ResponseCache::compute_key(&msgs1, "model"), + ResponseCache::compute_key(&msgs2, "model") + ); + } + + #[tokio::test] + async fn ttl_extreme_value_does_not_overflow() { + let store = SqliteStore::new(":memory:").await.unwrap(); + // Use u64::MAX - 1 as TTL; without capping this would overflow i64. + let cache = ResponseCache::new(store.pool().clone(), u64::MAX - 1); + // Should not panic or produce a negative expires_at. + cache.put("key1", "response", "model").await.unwrap(); + // Entry should be retrievable (far-future expiry). + let result = cache.get("key1").await.unwrap(); + assert_eq!(result.as_deref(), Some("response")); + } + + #[tokio::test] + async fn insert_or_replace_updates_existing_entry() { + let cache = test_cache().await; + cache.put("key1", "first response", "gpt-4").await.unwrap(); + cache.put("key1", "second response", "gpt-4").await.unwrap(); + let result = cache.get("key1").await.unwrap(); + assert_eq!(result.as_deref(), Some("second response")); + } +} diff --git a/crates/zeph-memory/src/semantic.rs b/crates/zeph-memory/src/semantic.rs index a310297e..4c291b6d 100644 --- a/crates/zeph-memory/src/semantic.rs +++ b/crates/zeph-memory/src/semantic.rs @@ -376,6 +376,25 @@ impl SemanticMemory { Ok((message_id, embedding_stored)) } + /// Save a message to `SQLite` without generating an embedding. + /// + /// Use this when embedding is intentionally skipped (e.g. autosave disabled for assistant). + /// + /// # Errors + /// + /// Returns an error if the `SQLite` save fails. + pub async fn save_only( + &self, + conversation_id: ConversationId, + role: &str, + content: &str, + parts_json: &str, + ) -> Result { + self.sqlite + .save_message_with_parts(conversation_id, role, content, parts_json) + .await + } + /// Recall relevant messages using hybrid search (vector + FTS5 keyword). /// /// When Qdrant is available, runs both vector and keyword searches, then merges diff --git a/crates/zeph-memory/src/snapshot.rs b/crates/zeph-memory/src/snapshot.rs new file mode 100644 index 00000000..0397d724 --- /dev/null +++ b/crates/zeph-memory/src/snapshot.rs @@ -0,0 +1,397 @@ +use serde::{Deserialize, Serialize}; + +use crate::error::MemoryError; +use crate::sqlite::SqliteStore; +use crate::types::ConversationId; + +#[derive(Debug, Serialize, Deserialize)] +pub struct MemorySnapshot { + pub version: u32, + pub exported_at: String, + pub conversations: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ConversationSnapshot { + pub id: i64, + pub messages: Vec, + pub summaries: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct MessageSnapshot { + pub id: i64, + pub conversation_id: i64, + pub role: String, + pub content: String, + pub parts_json: String, + pub created_at: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SummarySnapshot { + pub id: i64, + pub conversation_id: i64, + pub content: String, + pub first_message_id: i64, + pub last_message_id: i64, + pub token_estimate: i64, +} + +#[derive(Debug, Default)] +pub struct ImportStats { + pub conversations_imported: usize, + pub messages_imported: usize, + pub summaries_imported: usize, + pub skipped: usize, +} + +/// Export all conversations, messages and summaries from `SQLite` into a snapshot. +/// +/// # Errors +/// +/// Returns an error if any database query fails. +pub async fn export_snapshot(sqlite: &SqliteStore) -> Result { + let conv_ids: Vec<(i64,)> = sqlx::query_as("SELECT id FROM conversations ORDER BY id ASC") + .fetch_all(sqlite.pool()) + .await?; + + let exported_at = chrono_now(); + let mut conversations = Vec::with_capacity(conv_ids.len()); + + for (cid_raw,) in conv_ids { + let cid = ConversationId(cid_raw); + + let msg_rows: Vec<(i64, String, String, String, i64)> = sqlx::query_as( + "SELECT id, role, content, parts, \ + COALESCE(CAST(strftime('%s', created_at) AS INTEGER), 0) \ + FROM messages WHERE conversation_id = ? ORDER BY id ASC", + ) + .bind(cid) + .fetch_all(sqlite.pool()) + .await?; + + let messages = msg_rows + .into_iter() + .map( + |(id, role, content, parts_json, created_at)| MessageSnapshot { + id, + conversation_id: cid_raw, + role, + content, + parts_json, + created_at, + }, + ) + .collect(); + + let sum_rows = sqlite.load_summaries(cid).await?; + let summaries = sum_rows + .into_iter() + .map( + |( + id, + conversation_id, + content, + first_message_id, + last_message_id, + token_estimate, + )| { + SummarySnapshot { + id, + conversation_id: conversation_id.0, + content, + first_message_id: first_message_id.0, + last_message_id: last_message_id.0, + token_estimate, + } + }, + ) + .collect(); + + conversations.push(ConversationSnapshot { + id: cid_raw, + messages, + summaries, + }); + } + + Ok(MemorySnapshot { + version: 1, + exported_at, + conversations, + }) +} + +/// Import a snapshot into `SQLite`, skipping duplicate entries. +/// +/// Returns stats about what was imported. +/// +/// # Errors +/// +/// Returns an error if any database operation fails. +pub async fn import_snapshot( + sqlite: &SqliteStore, + snapshot: MemorySnapshot, +) -> Result { + if snapshot.version != 1 { + return Err(MemoryError::Snapshot(format!( + "unsupported snapshot version {}: only version 1 is supported", + snapshot.version + ))); + } + let mut stats = ImportStats::default(); + + for conv in snapshot.conversations { + let exists: Option<(i64,)> = sqlx::query_as("SELECT id FROM conversations WHERE id = ?") + .bind(conv.id) + .fetch_optional(sqlite.pool()) + .await?; + + if exists.is_none() { + sqlx::query("INSERT INTO conversations (id) VALUES (?)") + .bind(conv.id) + .execute(sqlite.pool()) + .await?; + stats.conversations_imported += 1; + } else { + stats.skipped += 1; + } + + for msg in conv.messages { + let result = sqlx::query( + "INSERT OR IGNORE INTO messages (id, conversation_id, role, content, parts) \ + VALUES (?, ?, ?, ?, ?)", + ) + .bind(msg.id) + .bind(msg.conversation_id) + .bind(&msg.role) + .bind(&msg.content) + .bind(&msg.parts_json) + .execute(sqlite.pool()) + .await?; + + if result.rows_affected() > 0 { + stats.messages_imported += 1; + } else { + stats.skipped += 1; + } + } + + for sum in conv.summaries { + let result = sqlx::query( + "INSERT OR IGNORE INTO summaries \ + (id, conversation_id, content, first_message_id, last_message_id, token_estimate) \ + VALUES (?, ?, ?, ?, ?, ?)", + ) + .bind(sum.id) + .bind(sum.conversation_id) + .bind(&sum.content) + .bind(sum.first_message_id) + .bind(sum.last_message_id) + .bind(sum.token_estimate) + .execute(sqlite.pool()) + .await?; + + if result.rows_affected() > 0 { + stats.summaries_imported += 1; + } else { + stats.skipped += 1; + } + } + } + + Ok(stats) +} + +fn chrono_now() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + // Format as ISO-8601 approximation without chrono dependency + let (year, month, day, hour, min, sec) = unix_to_parts(secs); + format!("{year:04}-{month:02}-{day:02}T{hour:02}:{min:02}:{sec:02}Z") +} + +fn unix_to_parts(secs: u64) -> (u64, u64, u64, u64, u64, u64) { + let sec = secs % 60; + let total_mins = secs / 60; + let min = total_mins % 60; + let total_hours = total_mins / 60; + let hour = total_hours % 24; + let total_days = total_hours / 24; + + // Gregorian calendar calculation (civil date from days since Unix epoch) + let adjusted = total_days + 719_468; + let era = adjusted / 146_097; + let doe = adjusted - era * 146_097; + let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146_096) / 365; + let year = yoe + era * 400; + let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); + let mp = (5 * doy + 2) / 153; + let day = doy - (153 * mp + 2) / 5 + 1; + let month = if mp < 10 { mp + 3 } else { mp - 9 }; + let year = if month <= 2 { year + 1 } else { year }; + (year, month, day, hour, min, sec) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn export_empty_database() { + let store = SqliteStore::new(":memory:").await.unwrap(); + let snapshot = export_snapshot(&store).await.unwrap(); + assert_eq!(snapshot.version, 1); + assert!(snapshot.conversations.is_empty()); + assert!(!snapshot.exported_at.is_empty()); + } + + #[tokio::test] + async fn export_import_roundtrip() { + let src = SqliteStore::new(":memory:").await.unwrap(); + let cid = src.create_conversation().await.unwrap(); + src.save_message(cid, "user", "hello export").await.unwrap(); + src.save_message(cid, "assistant", "hi import") + .await + .unwrap(); + + let snapshot = export_snapshot(&src).await.unwrap(); + assert_eq!(snapshot.conversations.len(), 1); + assert_eq!(snapshot.conversations[0].messages.len(), 2); + + let dst = SqliteStore::new(":memory:").await.unwrap(); + let stats = import_snapshot(&dst, snapshot).await.unwrap(); + assert_eq!(stats.conversations_imported, 1); + assert_eq!(stats.messages_imported, 2); + assert_eq!(stats.skipped, 0); + + let history = dst.load_history(cid, 50).await.unwrap(); + assert_eq!(history.len(), 2); + assert_eq!(history[0].content, "hello export"); + assert_eq!(history[1].content, "hi import"); + } + + #[tokio::test] + async fn import_duplicate_skips() { + let src = SqliteStore::new(":memory:").await.unwrap(); + let cid = src.create_conversation().await.unwrap(); + src.save_message(cid, "user", "msg").await.unwrap(); + + let snapshot = export_snapshot(&src).await.unwrap(); + + let dst = SqliteStore::new(":memory:").await.unwrap(); + let stats1 = import_snapshot(&dst, snapshot).await.unwrap(); + assert_eq!(stats1.messages_imported, 1); + + let snapshot2 = export_snapshot(&src).await.unwrap(); + let stats2 = import_snapshot(&dst, snapshot2).await.unwrap(); + assert_eq!(stats2.messages_imported, 0); + assert!(stats2.skipped > 0); + } + + #[tokio::test] + async fn import_existing_conversation_increments_skipped_not_imported() { + let src = SqliteStore::new(":memory:").await.unwrap(); + let cid = src.create_conversation().await.unwrap(); + src.save_message(cid, "user", "only message").await.unwrap(); + + let snapshot = export_snapshot(&src).await.unwrap(); + + // Import once — conversation is new. + let dst = SqliteStore::new(":memory:").await.unwrap(); + let stats1 = import_snapshot(&dst, snapshot).await.unwrap(); + assert_eq!(stats1.conversations_imported, 1); + assert_eq!(stats1.messages_imported, 1); + + // Import again with no new messages — conversation already exists, must be counted as skipped. + let snapshot2 = export_snapshot(&src).await.unwrap(); + let stats2 = import_snapshot(&dst, snapshot2).await.unwrap(); + assert_eq!( + stats2.conversations_imported, 0, + "existing conversation must not be counted as imported" + ); + // The conversation itself contributes one skipped, plus the duplicate message. + assert!( + stats2.skipped >= 1, + "re-importing an existing conversation must increment skipped" + ); + } + + #[tokio::test] + async fn export_includes_summaries() { + let store = SqliteStore::new(":memory:").await.unwrap(); + let cid = store.create_conversation().await.unwrap(); + let m1 = store.save_message(cid, "user", "a").await.unwrap(); + let m2 = store.save_message(cid, "assistant", "b").await.unwrap(); + store.save_summary(cid, "summary", m1, m2, 5).await.unwrap(); + + let snapshot = export_snapshot(&store).await.unwrap(); + assert_eq!(snapshot.conversations[0].summaries.len(), 1); + assert_eq!(snapshot.conversations[0].summaries[0].content, "summary"); + } + + #[test] + fn chrono_now_not_empty() { + let ts = chrono_now(); + assert!(ts.contains('T')); + assert!(ts.ends_with('Z')); + } + + #[test] + fn import_corrupt_json_returns_error() { + let result = serde_json::from_str::("not valid json at all {{{"); + assert!(result.is_err()); + } + + #[tokio::test] + async fn import_unsupported_version_returns_error() { + let store = SqliteStore::new(":memory:").await.unwrap(); + let snapshot = MemorySnapshot { + version: 99, + exported_at: "2026-01-01T00:00:00Z".into(), + conversations: vec![], + }; + let err = import_snapshot(&store, snapshot).await.unwrap_err(); + let msg = err.to_string(); + assert!(msg.contains("unsupported snapshot version 99")); + } + + #[tokio::test] + async fn import_partial_overlap_adds_new_messages() { + let src = SqliteStore::new(":memory:").await.unwrap(); + let cid = src.create_conversation().await.unwrap(); + src.save_message(cid, "user", "existing message") + .await + .unwrap(); + + let snapshot1 = export_snapshot(&src).await.unwrap(); + + let dst = SqliteStore::new(":memory:").await.unwrap(); + let stats1 = import_snapshot(&dst, snapshot1).await.unwrap(); + assert_eq!(stats1.messages_imported, 1); + + src.save_message(cid, "assistant", "new reply") + .await + .unwrap(); + let snapshot2 = export_snapshot(&src).await.unwrap(); + let stats2 = import_snapshot(&dst, snapshot2).await.unwrap(); + + assert_eq!( + stats2.messages_imported, 1, + "only the new message should be imported" + ); + // skipped includes the existing conversation (1) plus the duplicate message (1). + assert_eq!( + stats2.skipped, 2, + "existing conversation and duplicate message should be skipped" + ); + + let history = dst.load_history(cid, 50).await.unwrap(); + assert_eq!(history.len(), 2); + assert_eq!(history[1].content, "new reply"); + } +} diff --git a/docs/src/concepts/memory.md b/docs/src/concepts/memory.md index 666c8045..b1100742 100644 --- a/docs/src/concepts/memory.md +++ b/docs/src/concepts/memory.md @@ -95,6 +95,44 @@ The `Embeddable` trait provides a generic interface for any type that can be emb When `memory.redact_credentials` is enabled (default: `true`), Zeph scrubs credential patterns from message content before sending it to the LLM context pipeline. This prevents accidental leakage of API keys, tokens, and passwords stored in conversation history. The scrubbing runs via `scrub_content()` in the context builder and covers the same patterns as the output redaction system (see [Security — Secret Redaction](../reference/security.md#secret-redaction)). +## Autosave Assistant Responses + +By default, only user messages generate vector embeddings. Enable `autosave_assistant` to persist assistant responses to SQLite and optionally embed them for semantic recall: + +```toml +[memory] +autosave_assistant = true # Save assistant messages (default: false) +autosave_min_length = 20 # Minimum content length for embedding (default: 20) +``` + +When enabled, assistant responses shorter than `autosave_min_length` are saved to SQLite without generating an embedding (via `save_only()`). Responses meeting the threshold go through the full embedding pipeline. User messages always generate embeddings regardless of this setting. + +## Memory Snapshots + +Export and import conversation history as portable JSON files for backup, migration, or sharing between instances. + +```bash +# Export all conversations, messages, and summaries +zeph memory export backup.json + +# Import into another instance (duplicates are skipped) +zeph memory import backup.json +``` + +The snapshot format (version 1) includes conversations, messages with multipart content, and summaries. Import uses `INSERT OR IGNORE` semantics — existing messages with matching IDs are skipped, so importing the same file twice is safe. + +## LLM Response Cache + +Cache identical LLM requests to avoid redundant API calls. The cache is SQLite-backed, keyed by a blake3 hash of the message history and model name. + +```toml +[llm] +response_cache_enabled = true # Enable response caching (default: false) +response_cache_ttl_secs = 3600 # Cache entry lifetime in seconds (default: 3600) +``` + +A background task runs every 10 minutes to clean up expired entries. Streaming responses bypass the cache entirely — only non-streaming completions are cached. + ## Deep Dives - [Set Up Semantic Memory](../guides/semantic-memory.md) — Qdrant setup guide diff --git a/docs/src/concepts/providers.md b/docs/src/concepts/providers.md index 80ad9f51..6d817980 100644 --- a/docs/src/concepts/providers.md +++ b/docs/src/concepts/providers.md @@ -45,6 +45,18 @@ provider = "claude" # ollama, claude, openai, candle, compatible, orchestrator Or via environment variable: `ZEPH_LLM_PROVIDER`. +## Response Caching + +Enable SQLite-backed response caching to avoid redundant LLM calls for identical requests. The cache key is a blake3 hash of the full message history and model name. Streaming responses bypass the cache. + +```toml +[llm] +response_cache_enabled = true +response_cache_ttl_secs = 3600 # 1 hour (default) +``` + +See [Memory and Context — LLM Response Cache](memory.md#llm-response-cache) for details. + ## Deep Dives - [Use a Cloud Provider](../guides/cloud-provider.md) — Claude, OpenAI, and compatible API setup diff --git a/docs/src/guides/semantic-memory.md b/docs/src/guides/semantic-memory.md index 8050dd7d..aea0e845 100644 --- a/docs/src/guides/semantic-memory.md +++ b/docs/src/guides/semantic-memory.md @@ -97,6 +97,29 @@ mmr_lambda = 0.7 # 0.0 = max diversity, 1.0 = pure relevance MMR iteratively selects results that are both relevant to the query and dissimilar to already-selected items. The default `mmr_lambda = 0.7` works well for most use cases. Lower it if you see too many semantically similar results in recall. +## Autosave Assistant Responses + +By default, only user messages are embedded. Enable `autosave_assistant` to also embed assistant responses for richer semantic recall: + +```toml +[memory] +autosave_assistant = true +autosave_min_length = 20 # Skip embedding for very short replies +``` + +Short responses (below `autosave_min_length` bytes) are still saved to SQLite but skip the embedding step. User messages always generate embeddings regardless of this setting. + +## Memory Export and Import + +Back up or migrate conversation data with portable JSON snapshots: + +```bash +zeph memory export conversations.json +zeph memory import conversations.json +``` + +See [CLI Reference — `zeph memory`](../reference/cli.md#zeph-memory) for details. + ## Storage Architecture | Store | Purpose | diff --git a/docs/src/reference/cli.md b/docs/src/reference/cli.md index b2253cd4..1885caca 100644 --- a/docs/src/reference/cli.md +++ b/docs/src/reference/cli.md @@ -14,6 +14,7 @@ zeph [OPTIONS] [COMMAND] |---------|-------------| | `init` | Interactive configuration wizard (see [Configuration Wizard](../getting-started/wizard.md)) | | `skill` | Manage external skills — install, remove, verify, trust (see [Skill Trust Levels](../advanced/skill-trust.md)) | +| `memory` | Export and import conversation history snapshots | | `vault` | Manage the age-encrypted secrets vault (see [Secrets Management](security.md#age-vault)) | When no subcommand is given, Zeph starts the agent loop. @@ -65,6 +66,25 @@ zeph skill trust my-skill trusted zeph skill remove my-skill ``` +### `zeph memory` + +Export and import conversation history as portable JSON snapshots. + +| Subcommand | Description | +|------------|-------------| +| `memory export ` | Export all conversations, messages, and summaries to a JSON file | +| `memory import ` | Import a snapshot file into the local database (duplicates are skipped) | + +```bash +# Back up all conversation data +zeph memory export backup.json + +# Restore on another machine +zeph memory import backup.json +``` + +The snapshot format is versioned (currently v1). Import uses `INSERT OR IGNORE` — re-importing the same file is safe and skips existing records. + ### `zeph vault` Manage age-encrypted secrets without manual `age` CLI invocations. diff --git a/docs/src/reference/configuration.md b/docs/src/reference/configuration.md index e48dcd9e..ce6463bf 100644 --- a/docs/src/reference/configuration.md +++ b/docs/src/reference/configuration.md @@ -64,6 +64,8 @@ base_url = "http://localhost:11434" model = "mistral:7b" embedding_model = "qwen3-embedding" # Model for text embeddings # vision_model = "llava:13b" # Ollama only: dedicated model for image requests +# response_cache_enabled = false # SQLite-backed LLM response cache (default: false) +# response_cache_ttl_secs = 3600 # Cache TTL in seconds (default: 3600) [llm.cloud] model = "claude-sonnet-4-5-20250929" @@ -102,6 +104,8 @@ cross_session_score_threshold = 0.35 # Minimum relevance for cross-session resu vector_backend = "qdrant" # Vector store: "qdrant" (default) or "sqlite" (embedded) token_safety_margin = 1.0 # Multiplier for token budget safety margin (default: 1.0) redact_credentials = true # Scrub credential patterns from LLM context (default: true) +autosave_assistant = false # Persist assistant responses to SQLite and embed (default: false) +autosave_min_length = 20 # Min content length for assistant embedding (default: 20) [memory.semantic] enabled = false # Enable semantic search via Qdrant @@ -253,6 +257,10 @@ Field resolution: per-provider value → parent section (`[llm]`, `[llm.cloud]`) | `ZEPH_MEMORY_VECTOR_BACKEND` | Vector backend: `qdrant` or `sqlite` (default: `qdrant`) | | `ZEPH_MEMORY_TOKEN_SAFETY_MARGIN` | Token budget safety margin multiplier (default: 1.0) | | `ZEPH_MEMORY_REDACT_CREDENTIALS` | Scrub credentials from LLM context (default: true) | +| `ZEPH_MEMORY_AUTOSAVE_ASSISTANT` | Persist assistant responses to SQLite (default: false) | +| `ZEPH_MEMORY_AUTOSAVE_MIN_LENGTH` | Min content length for assistant embedding (default: 20) | +| `ZEPH_LLM_RESPONSE_CACHE_ENABLED` | Enable SQLite-backed LLM response cache (default: false) | +| `ZEPH_LLM_RESPONSE_CACHE_TTL_SECS` | Response cache TTL in seconds (default: 3600) | | `ZEPH_MEMORY_SEMANTIC_ENABLED` | Enable semantic memory (default: false) | | `ZEPH_MEMORY_RECALL_LIMIT` | Max semantically relevant messages to recall (default: 5) | | `ZEPH_MEMORY_SEMANTIC_TEMPORAL_DECAY_ENABLED` | Enable temporal decay scoring (default: false) | diff --git a/src/init.rs b/src/init.rs index d4f17a37..a7237a00 100644 --- a/src/init.rs +++ b/src/init.rs @@ -453,6 +453,8 @@ pub(crate) fn build_config(state: &WizardState) -> Config { router: None, stt: None, vision_model: state.vision_model.clone().filter(|s| !s.is_empty()), + response_cache_enabled: false, + response_cache_ttl_secs: 3600, }; config.memory = MemoryConfig { diff --git a/src/main.rs b/src/main.rs index 3fb4d6a6..02c34d6c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -171,6 +171,25 @@ enum Command { #[command(subcommand)] command: SkillCommand, }, + /// Manage memory snapshots + Memory { + #[command(subcommand)] + command: MemoryCommand, + }, +} + +#[derive(Subcommand)] +enum MemoryCommand { + /// Export memory to a JSON snapshot file + Export { + /// Output file path + path: PathBuf, + }, + /// Import memory from a JSON snapshot file + Import { + /// Input file path + path: PathBuf, + }, } #[derive(Subcommand)] @@ -256,6 +275,10 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); return handle_skill_command(skill_cmd, cli.config.as_deref()).await; } + Some(Command::Memory { command: mem_cmd }) => { + tracing_subscriber::fmt::init(); + return handle_memory_command(mem_cmd, cli.config.as_deref()).await; + } None => {} } @@ -461,6 +484,7 @@ async fn main() -> anyhow::Result<()> { let summary_provider = app.build_summary_provider(); let config = app.config(); let config_path = app.config_path().to_owned(); + let cache_pool = memory.sqlite().pool().clone(); let agent = Agent::new( provider, @@ -502,8 +526,33 @@ async fn main() -> anyhow::Result<()> { .custom .iter() .map(|(k, v)| (k.clone(), v.clone())), + ) + .with_autosave_config( + config.memory.autosave_assistant, + config.memory.autosave_min_length, ); + let agent = if config.llm.response_cache_enabled { + let pool = cache_pool; + let cache = std::sync::Arc::new(zeph_memory::ResponseCache::new( + pool, + config.llm.response_cache_ttl_secs, + )); + let cache_clone = std::sync::Arc::clone(&cache); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(600)); + loop { + interval.tick().await; + if let Err(e) = cache_clone.cleanup_expired().await { + tracing::warn!("response cache cleanup failed: {e:#}"); + } + } + }); + agent.with_response_cache(cache) + } else { + agent + }; + let agent = if config.cost.enabled { let tracker = CostTracker::new(true, f64::from(config.cost.max_daily_cents)); agent.with_cost_tracker(tracker) @@ -1067,6 +1116,66 @@ async fn handle_skill_command( Ok(()) } +async fn handle_memory_command( + cmd: MemoryCommand, + config_path: Option<&std::path::Path>, +) -> anyhow::Result<()> { + use zeph_core::bootstrap::resolve_config_path; + use zeph_memory::sqlite::SqliteStore; + + let config_file = resolve_config_path(config_path); + let config = zeph_core::config::Config::load(&config_file).unwrap_or_default(); + let sqlite = SqliteStore::new(&config.memory.sqlite_path) + .await + .map_err(|e| anyhow::anyhow!("failed to open SQLite: {e}"))?; + + match cmd { + MemoryCommand::Export { path } => { + let snapshot = zeph_memory::export_snapshot(&sqlite) + .await + .map_err(|e| anyhow::anyhow!("export failed: {e}"))?; + let json = serde_json::to_string_pretty(&snapshot) + .map_err(|e| anyhow::anyhow!("serialization failed: {e}"))?; + std::fs::write(&path, json) + .map_err(|e| anyhow::anyhow!("failed to write {}: {e}", path.display()))?; + let convs = snapshot.conversations.len(); + let msgs: usize = snapshot + .conversations + .iter() + .map(|c| c.messages.len()) + .sum(); + println!( + "Exported {convs} conversation(s) with {msgs} message(s) to {}", + path.display() + ); + if config.memory.redact_credentials { + eprintln!( + "Warning: snapshot may contain sensitive conversation data predating \ + redaction. Store the file securely and restrict access." + ); + } + } + MemoryCommand::Import { path } => { + let json = std::fs::read_to_string(&path) + .map_err(|e| anyhow::anyhow!("failed to read {}: {e}", path.display()))?; + let snapshot: zeph_memory::MemorySnapshot = serde_json::from_str(&json) + .map_err(|e| anyhow::anyhow!("invalid snapshot format: {e}"))?; + let stats = zeph_memory::import_snapshot(&sqlite, snapshot) + .await + .map_err(|e| anyhow::anyhow!("import failed: {e}"))?; + println!( + "Imported: {} conversation(s), {} message(s), {} summary(ies), {} skipped", + stats.conversations_imported, + stats.messages_imported, + stats.summaries_imported, + stats.skipped, + ); + } + } + + Ok(()) +} + fn handle_vault_command( cmd: VaultCommand, key_path: Option<&std::path::Path>,