diff --git a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs index 608207a..6d003b9 100644 --- a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs +++ b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs @@ -187,7 +187,15 @@ async fn create_sse_stream( tokio::spawn(async move { if let Some(last_event_id) = last_event_id { if let Some(event_store) = state.event_store.as_ref() { - if let Some(events) = event_store.events_after(last_event_id).await { + let events = event_store + .events_after(last_event_id) + .await + .unwrap_or_else(|err| { + tracing::error!("{err}"); + None + }); + + if let Some(events) = events { for message_payload in events.messages { // skip storing replay messages let error = transport.write_str(&message_payload, true).await; diff --git a/crates/rust-mcp-sdk/tests/test_streamable_http_server.rs b/crates/rust-mcp-sdk/tests/test_streamable_http_server.rs index 79c9f00..3592c97 100644 --- a/crates/rust-mcp-sdk/tests/test_streamable_http_server.rs +++ b/crates/rust-mcp-sdk/tests/test_streamable_http_server.rs @@ -1428,6 +1428,7 @@ async fn should_store_and_include_event_ids_in_server_sse_messages() { .unwrap() .events_after(first_id) .await + .unwrap() .unwrap(); assert_eq!(events.messages.len(), 1); diff --git a/crates/rust-mcp-transport/src/event_store.rs b/crates/rust-mcp-transport/src/event_store.rs index fdc0734..35eaf40 100644 --- a/crates/rust-mcp-transport/src/event_store.rs +++ b/crates/rust-mcp-transport/src/event_store.rs @@ -1,27 +1,118 @@ mod in_memory_event_store; -use async_trait::async_trait; -pub use in_memory_event_store::*; use crate::{EventId, SessionId, StreamId}; +use async_trait::async_trait; +pub use in_memory_event_store::*; +use thiserror::Error; #[derive(Debug, Clone)] -pub struct EventStoreMessages { +pub struct EventStoreEntry { pub session_id: SessionId, pub stream_id: StreamId, pub messages: Vec, } +#[derive(Debug, Error)] +#[error("{message}")] +pub struct EventStoreError { + pub message: String, +} + +impl From<&str> for EventStoreError { + fn from(s: &str) -> Self { + EventStoreError { + message: s.to_string(), + } + } +} + +impl From for EventStoreError { + fn from(s: String) -> Self { + EventStoreError { message: s } + } +} + +type EventStoreResult = Result; + +/// Trait defining the interface for event storage and retrieval, used by the MCP server +/// to store and replay events for state reconstruction after client reconnection #[async_trait] pub trait EventStore: Send + Sync { + /// Stores a new event in the store and returns the generated event ID. + /// For MCP, this stores protocol messages, timestamp is the number of microseconds since UNIX_EPOCH. + /// The timestamp helps determine the order in which messages arrived. + /// + /// # Parameters + /// - `session_id`: The session identifier for the event. + /// - `stream_id`: The stream identifier within the session. + /// - `timestamp`: The u128 timestamp of the event. + /// - `message`: The event payload as json string. + /// + /// # Returns + /// - `Ok(EventId)`: The generated ID (format: session_id:stream_id:timestamp) on success. + /// - `Err(Self::Error)`: If input is invalid or storage fails. async fn store_event( &self, session_id: SessionId, stream_id: StreamId, - time_stamp: u128, + timestamp: u128, message: String, - ) -> EventId; - async fn remove_by_session_id(&self, session_id: SessionId); - async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId); - async fn clear(&self); - async fn events_after(&self, last_event_id: EventId) -> Option; + ) -> EventStoreResult; + + /// Removes all events associated with a given session ID. + /// Used to clean up all events for a session when it is no longer needed (e.g., session ended). + /// + /// # Parameters + /// - `session_id`: The session ID whose events should be removed. + /// + async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()>; + /// Removes all events for a specific stream within a session. + /// Useful for cleaning up a specific stream without affecting others. + /// + /// # Parameters + /// - `session_id`: The session ID containing the stream. + /// - `stream_id`: The stream ID whose events should be removed. + /// + /// # Returns + /// - `Ok(())`: On successful deletion. + /// - `Err(Self::Error)`: If deletion fails. + async fn remove_stream_in_session( + &self, + session_id: SessionId, + stream_id: StreamId, + ) -> EventStoreResult<()>; + /// Clears all events from the store. + /// Used for resetting the store. + /// + async fn clear(&self) -> EventStoreResult<()>; + /// Retrieves events after a given event ID for a session and stream. + /// Critical for MCP server to replay events after a client reconnects, starting from the last known event. + /// Events are returned in chronological order (ascending timestamp) to reconstruct state. + /// + /// # Parameters + /// - `last_event_id`: The event ID to fetch events after. + /// + /// # Returns + /// - `Some(Some(EventStoreEntry))`: Events after the specified ID, if any. + /// - `None`: If no events exist after it OR the event ID is invalid. + async fn events_after( + &self, + last_event_id: EventId, + ) -> EventStoreResult>; + /// Prunes excess events to control storage usage. + /// Implementations may apply custom logic, such as limiting + /// the number of events per session or removing events older than a certain timestamp. + /// Default implementation logs a warning if not overridden by the store. + /// + /// # Parameters + /// - `session_id`: Optional session ID to prune a specific session; if None, prunes all sessions. + async fn prune_excess_events(&self, _session_id: Option) -> EventStoreResult<()> { + tracing::warn!("prune_excess_events() is not implemented for the event store."); + Ok(()) + } + /// Counts the total number of events in the store. + /// + /// # Returns + /// - The number of events across all sessions and streams. + async fn count(&self) -> EventStoreResult; } diff --git a/crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs b/crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs index 66e738c..095a014 100644 --- a/crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs +++ b/crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs @@ -1,13 +1,13 @@ +use crate::event_store::EventStoreResult; +use crate::{ + event_store::{EventStore, EventStoreEntry}, + EventId, SessionId, StreamId, +}; use async_trait::async_trait; use std::collections::HashMap; use std::collections::VecDeque; use tokio::sync::RwLock; -use crate::{ - event_store::{EventStore, EventStoreMessages}, - EventId, SessionId, StreamId, -}; - const MAX_EVENTS_PER_SESSION: usize = 64; const ID_SEPARATOR: &str = "-.-"; @@ -101,16 +101,19 @@ impl InMemoryEventStore { /// ); /// assert_eq!(store.parse_event_id("invalid"), None); /// ``` - pub fn parse_event_id<'a>(&self, event_id: &'a str) -> Option<(&'a str, &'a str, &'a str)> { + pub fn parse_event_id<'a>( + &self, + event_id: &'a str, + ) -> EventStoreResult<(&'a str, &'a str, u128)> { // Check for empty input or invalid characters (e.g., NULL) if event_id.is_empty() || event_id.contains('\0') { - return None; + return Err("Event ID is empty!".into()); } // Split into exactly three parts let parts: Vec<&'a str> = event_id.split(ID_SEPARATOR).collect(); if parts.len() != 3 { - return None; + return Err("Invalid Event ID format.".into()); } let session_id = parts[0]; @@ -119,10 +122,14 @@ impl InMemoryEventStore { // Ensure no part is empty if session_id.is_empty() || stream_id.is_empty() || time_stamp.is_empty() { - return None; + return Err("Invalid Event ID format.".into()); } - Some((session_id, stream_id, time_stamp)) + let time_stamp: u128 = time_stamp + .parse() + .map_err(|err| format!("Error parsing timestamp: {err}"))?; + + Ok((session_id, stream_id, time_stamp)) } } @@ -147,7 +154,7 @@ impl EventStore for InMemoryEventStore { stream_id: StreamId, time_stamp: u128, message: String, - ) -> EventId { + ) -> EventStoreResult { let event_id = self.generate_event_id(&session_id, &stream_id, time_stamp); let mut storage_map = self.storage_map.write().await; @@ -172,7 +179,7 @@ impl EventStore for InMemoryEventStore { session_map.push_back(entry); - event_id + Ok(event_id) } /// Removes all events associated with a given stream ID within a specific session. @@ -184,7 +191,11 @@ impl EventStore for InMemoryEventStore { /// # Arguments /// - `session_id`: The session identifier to target. /// - `stream_id`: The stream identifier to remove. - async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId) { + async fn remove_stream_in_session( + &self, + session_id: SessionId, + stream_id: StreamId, + ) -> EventStoreResult<()> { let mut storage_map = self.storage_map.write().await; // Check if session exists @@ -194,9 +205,10 @@ impl EventStore for InMemoryEventStore { // Remove session if empty if events.is_empty() { storage_map.remove(&session_id); - } + }; } // No action if session_id doesn’t exist (idempotent) + Ok(()) } /// Removes all events associated with a given session ID. @@ -205,9 +217,10 @@ impl EventStore for InMemoryEventStore { /// /// # Arguments /// - `session_id`: The session identifier to remove. - async fn remove_by_session_id(&self, session_id: SessionId) { + async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()> { let mut storage_map = self.storage_map.write().await; storage_map.remove(&session_id); + Ok(()) } /// Retrieves events after a given `event_id` for a specific session and stream. @@ -221,23 +234,20 @@ impl EventStore for InMemoryEventStore { /// - `last_event_id`: The event ID (format: `session-.-stream-.-timestamp`) to start after. /// /// # Returns - /// An `Option` containing `EventStoreMessages` with the session ID, stream ID, and sorted messages, + /// An `Option` containing `EventStoreEntry` with the session ID, stream ID, and sorted messages, /// or `None` if no events are found or the input is invalid. - async fn events_after(&self, last_event_id: EventId) -> Option { - let Some((session_id, stream_id, time_stamp)) = self.parse_event_id(&last_event_id) else { - tracing::warn!("error parsing last event id: '{last_event_id}'"); - return None; - }; + async fn events_after( + &self, + last_event_id: EventId, + ) -> EventStoreResult> { + let (session_id, stream_id, time_stamp) = self.parse_event_id(&last_event_id)?; let storage_map = self.storage_map.read().await; + + // fail silently if session id does not exists let Some(events) = storage_map.get(session_id) else { tracing::warn!("could not find the session_id in the store : '{session_id}'"); - return None; - }; - - let Ok(time_stamp) = time_stamp.parse::() else { - tracing::warn!("could not parse the timestamp: '{time_stamp}'"); - return None; + return Ok(None); }; let events = match events @@ -260,15 +270,21 @@ impl EventStore for InMemoryEventStore { tracing::trace!("{} messages after '{last_event_id}'", events.len()); - Some(EventStoreMessages { + Ok(Some(EventStoreEntry { session_id: session_id.to_string(), stream_id: stream_id.to_string(), messages: events, - }) + })) } - async fn clear(&self) { + async fn clear(&self) -> EventStoreResult<()> { let mut storage_map = self.storage_map.write().await; storage_map.clear(); + Ok(()) + } + + async fn count(&self) -> EventStoreResult { + let storage_map = self.storage_map.read().await; + Ok(storage_map.len()) } } diff --git a/crates/rust-mcp-transport/src/message_dispatcher.rs b/crates/rust-mcp-transport/src/message_dispatcher.rs index cd9727c..62c591f 100644 --- a/crates/rust-mcp-transport/src/message_dispatcher.rs +++ b/crates/rust-mcp-transport/src/message_dispatcher.rs @@ -412,16 +412,19 @@ impl McpDispatch self.stream_id.as_ref(), self.event_store.as_ref(), ) { - event_id = Some( - event_store - .store_event( - session_id.clone(), - stream_id.clone(), - current_timestamp(), - payload.to_owned(), - ) - .await, - ) + event_id = event_store + .store_event( + session_id.clone(), + stream_id.clone(), + current_timestamp(), + payload.to_owned(), + ) + .await + .map(Some) + .unwrap_or_else(|err| { + tracing::error!("{err}"); + None + }); }; }