Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
- Replace double `AnyProvider::clone()` in `embed_fn()` with single `Arc` clone (#636)
- Add `with_client()` builder to ClaudeProvider and OpenAiProvider for shared `reqwest::Client` (#637)
- Cache `JsonSchema` per `TypeId` in `chat_typed` to avoid per-call schema generation (#638)
- Scrape executor performs post-DNS resolution validation against private/loopback IPs with pinned address client to prevent SSRF via DNS rebinding
- Private host detection expanded to block `*.localhost`, `*.internal`, `*.local` domains
- A2A error responses sanitized: serde details and method names no longer exposed to clients
- Rate limiter rejects new clients with 429 when entry map is at capacity after stale eviction
- Secret redaction regex-based pattern matching replaces whitespace tokenizer, detecting secrets in URLs, JSON, and quoted strings
- Added `hf_`, `npm_`, `dckr_pat_` to secret redaction prefixes
- A2A client stream errors truncate upstream body to 256 bytes

### Fixed
- False positive: "sudoku" no longer matched by "sudo" blocked pattern (word-boundary matching)
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 38 additions & 3 deletions crates/zeph-a2a/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ impl A2aClient {
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(A2aError::Stream(format!("HTTP {status}: {body}")));
// Truncate body to avoid leaking large upstream error responses.
let truncated = if body.len() > 256 {
format!("{}…", &body[..256])
} else {
body
};
return Err(A2aError::Stream(format!("HTTP {status}: {truncated}")));
}

let event_stream = resp.bytes_stream().eventsource();
Expand Down Expand Up @@ -182,9 +188,38 @@ impl A2aClient {
fn is_private_ip(ip: IpAddr) -> bool {
match ip {
IpAddr::V4(v4) => {
v4.is_loopback() || v4.is_private() || v4.is_link_local() || v4.is_unspecified()
v4.is_loopback()
|| v4.is_private()
|| v4.is_link_local()
|| v4.is_unspecified()
|| v4.is_broadcast()
}
IpAddr::V6(v6) => {
if v6.is_loopback() || v6.is_unspecified() {
return true;
}
let seg = v6.segments();
// fe80::/10 — link-local
if seg[0] & 0xffc0 == 0xfe80 {
return true;
}
// fc00::/7 — unique local
if seg[0] & 0xfe00 == 0xfc00 {
return true;
}
// ::ffff:x.x.x.x — IPv4-mapped, check inner IPv4
if seg[0..6] == [0, 0, 0, 0, 0, 0xffff] {
let v4 = v6
.to_ipv4_mapped()
.unwrap_or(std::net::Ipv4Addr::UNSPECIFIED);
return v4.is_loopback()
|| v4.is_private()
|| v4.is_link_local()
|| v4.is_unspecified()
|| v4.is_broadcast();
}
false
}
IpAddr::V6(v6) => v6.is_loopback() || v6.is_unspecified(),
}
}

Expand Down
113 changes: 105 additions & 8 deletions crates/zeph-a2a/src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ pub async fn jsonrpc_handler(
),
METHOD_GET_TASK => handle_get_task(state, id.clone(), raw.params).await,
METHOD_CANCEL_TASK => handle_cancel_task(state, id.clone(), raw.params).await,
_ => error_response(
id.clone(),
ERR_METHOD_NOT_FOUND,
format!("unknown method: {}", raw.method),
),
_ => {
tracing::warn!(method = %raw.method, "unknown JSON-RPC method");
error_response(id.clone(), ERR_METHOD_NOT_FOUND, "method not found")
}
};

Json(response)
Expand All @@ -92,7 +91,10 @@ async fn handle_send_message(
) -> JsonRpcResponse<serde_json::Value> {
let params: SendMessageParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => return error_response(id, ERR_INVALID_PARAMS, format!("invalid params: {e}")),
Err(e) => {
tracing::warn!("invalid params in send_message: {e}");
return error_response(id, ERR_INVALID_PARAMS, "invalid parameters");
}
};

let task = state.task_manager.create_task(params.message.clone()).await;
Expand Down Expand Up @@ -142,7 +144,10 @@ async fn handle_get_task(
) -> JsonRpcResponse<serde_json::Value> {
let params: TaskIdParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => return error_response(id, ERR_INVALID_PARAMS, format!("invalid params: {e}")),
Err(e) => {
tracing::warn!("invalid params in get_task: {e}");
return error_response(id, ERR_INVALID_PARAMS, "invalid parameters");
}
};

match state
Expand All @@ -162,7 +167,10 @@ async fn handle_cancel_task(
) -> JsonRpcResponse<serde_json::Value> {
let params: TaskIdParams = match serde_json::from_value(params) {
Ok(p) => p,
Err(e) => return error_response(id, ERR_INVALID_PARAMS, format!("invalid params: {e}")),
Err(e) => {
tracing::warn!("invalid params in cancel_task: {e}");
return error_response(id, ERR_INVALID_PARAMS, "invalid parameters");
}
};

match state.task_manager.cancel_task(&params.id).await {
Expand Down Expand Up @@ -314,3 +322,92 @@ async fn stream_task(state: AppState, message: crate::types::Message, tx: mpsc::
}
}
}

#[cfg(test)]
mod tests {
use axum::body::Body;
use http_body_util::BodyExt;
use tower::ServiceExt;

use super::super::router::build_router_with_config;
use super::super::testing::test_state;

fn make_rpc_request(method: &str, params: serde_json::Value) -> axum::http::Request<Body> {
let body = serde_json::json!({
"jsonrpc": "2.0",
"id": "1",
"method": method,
"params": params,
});
axum::http::Request::builder()
.method("POST")
.uri("/a2a")
.header("content-type", "application/json")
.body(Body::from(serde_json::to_vec(&body).unwrap()))
.unwrap()
}

async fn get_rpc_body(resp: axum::http::Response<Body>) -> serde_json::Value {
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
serde_json::from_slice(&bytes).unwrap()
}

#[tokio::test]
async fn unknown_method_does_not_echo_method_name() {
let app = build_router_with_config(test_state(), None, 0);
let req = make_rpc_request("tasks/evil_probe", serde_json::json!({}));
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), 200);
let body = get_rpc_body(resp).await;
let msg = body["error"]["message"].as_str().unwrap_or("");
assert_eq!(msg, "method not found", "must not echo method name");
assert!(
!msg.contains("evil_probe"),
"method name must not appear in error"
);
assert!(
!msg.contains("unknown"),
"must not leak 'unknown method' phrasing"
);
}

#[tokio::test]
async fn invalid_params_send_message_no_serde_details() {
let app = build_router_with_config(test_state(), None, 0);
// Pass wrong type for message to trigger serde deserialization error
let req = make_rpc_request("message/send", serde_json::json!({"message": 42}));
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), 200);
let body = get_rpc_body(resp).await;
let msg = body["error"]["message"].as_str().unwrap_or("");
assert_eq!(msg, "invalid parameters");
// Serde error text like "invalid type" or field names must not leak
assert!(!msg.contains("invalid type"), "serde details must not leak");
assert!(!msg.contains("expected"), "serde details must not leak");
}

#[tokio::test]
async fn invalid_params_get_task_no_serde_details() {
let app = build_router_with_config(test_state(), None, 0);
// Pass wrong type for id field
let req = make_rpc_request("tasks/get", serde_json::json!({"id": [1, 2, 3]}));
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), 200);
let body = get_rpc_body(resp).await;
let msg = body["error"]["message"].as_str().unwrap_or("");
assert_eq!(msg, "invalid parameters");
assert!(!msg.contains("invalid type"), "serde details must not leak");
}

#[tokio::test]
async fn invalid_params_cancel_task_no_serde_details() {
let app = build_router_with_config(test_state(), None, 0);
let req = make_rpc_request("tasks/cancel", serde_json::json!({"id": false}));
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), 200);
let body = get_rpc_body(resp).await;
let msg = body["error"]["message"].as_str().unwrap_or("");
assert_eq!(msg, "invalid parameters");
assert!(!msg.contains("invalid type"), "serde details must not leak");
}
}
60 changes: 46 additions & 14 deletions crates/zeph-a2a/src/server/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ async fn rate_limit_middleware(
before = before_eviction,
after = after_eviction,
limit = MAX_RATE_LIMIT_ENTRIES,
"rate limiter still at capacity after stale entry eviction"
"rate limiter at capacity after stale entry eviction, rejecting new IP"
);
return StatusCode::TOO_MANY_REQUESTS.into_response();
}
}

Expand Down Expand Up @@ -363,36 +364,67 @@ mod tests {
}

#[tokio::test]
async fn max_entries_cap_clears_map() {
async fn max_entries_cap_rejects_when_all_entries_fresh() {
// Fill map with fresh entries (within RATE_WINDOW) so retain() keeps them all.
// After retain() the map is still at capacity, so the middleware returns 429.
let counters = Arc::new(Mutex::new(HashMap::new()));
{
let mut map = counters.lock().await;
let fresh = Instant::now();
for i in 0..MAX_RATE_LIMIT_ENTRIES {
let ip = IpAddr::V4(std::net::Ipv4Addr::new(
((i >> 16) & 0xFF) as u8,
((i >> 8) & 0xFF) as u8,
(i & 0xFF) as u8,
1,
));
map.insert(ip, (1, Instant::now()));
map.insert(ip, (1, fresh));
}
assert_eq!(map.len(), MAX_RATE_LIMIT_ENTRIES);
}

let state = RateLimitState {
limit: 10,
counters,
};

let new_ip = IpAddr::V4(std::net::Ipv4Addr::new(255, 255, 255, 255));
assert!(!state.counters.lock().await.contains_key(&new_ip));

// Simulate what the middleware does when cap is exceeded
let mut map = state.counters.lock().await;
if map.len() >= MAX_RATE_LIMIT_ENTRIES && !map.contains_key(&new_ip) {
map.clear();
// Simulate middleware logic: cap exceeded, run retain(), still full → 429
let now = Instant::now();
let mut map = counters.lock().await;
let before = map.len();
map.retain(|_, (_, ts)| now.duration_since(*ts) < RATE_WINDOW);
let after = map.len();

// All entries are fresh so retain() must not remove any
assert_eq!(after, before, "retain must preserve fresh entries");
// Map still at capacity: a new IP would be rejected
assert!(
after >= MAX_RATE_LIMIT_ENTRIES && !map.contains_key(&new_ip),
"new IP should be rejected when map is still at capacity after eviction"
);
}

#[tokio::test]
async fn max_entries_cap_allows_after_stale_eviction() {
// Fill map with stale entries. After retain() the map is empty, new IP is accepted.
let counters = Arc::new(Mutex::new(HashMap::new()));
{
let mut map = counters.lock().await;
let stale = Instant::now() - Duration::from_secs(120);
for i in 0..MAX_RATE_LIMIT_ENTRIES {
let ip = IpAddr::V4(std::net::Ipv4Addr::new(
((i >> 16) & 0xFF) as u8,
((i >> 8) & 0xFF) as u8,
(i & 0xFF) as u8,
1,
));
map.insert(ip, (1, stale));
}
}
assert_eq!(map.len(), 0);

let now = Instant::now();
let mut map = counters.lock().await;
map.retain(|_, (_, ts)| now.duration_since(*ts) < RATE_WINDOW);

// All entries were stale; map should now be empty
assert_eq!(map.len(), 0, "stale entries must be evicted by retain");
}

#[tokio::test]
Expand Down
1 change: 1 addition & 0 deletions crates/zeph-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ anyhow.workspace = true
futures.workspace = true
notify.workspace = true
notify-debouncer-mini.workspace = true
regex.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
Expand Down
Loading
Loading