From a2283782bf15ac95780e280755cf1b25fa1af165 Mon Sep 17 00:00:00 2001 From: Matthew Alvarez Date: Tue, 1 Jul 2025 15:57:18 +0000 Subject: [PATCH 1/4] add session persistance --- client/client.go | 7 +++++++ client/http.go | 7 ++++++- client/http_test.go | 13 ++++++++++++- client/transport/streamable_http.go | 19 +++++++++++++------ 4 files changed, 38 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index dd0e31a01..f12a493dc 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,13 @@ func WithClientCapabilities(capabilities mcp.ClientCapabilities) ClientOption { } } +// WithSession assumes a MCP Session has already been initialized +func WithSession(sessionID string) ClientOption { + return func(c *Client) { + c.initialized = true + } +} + // NewClient creates a new MCP client with the given transport. // Usage: // diff --git a/client/http.go b/client/http.go index cb3be35d6..1577986da 100644 --- a/client/http.go +++ b/client/http.go @@ -13,5 +13,10 @@ func NewStreamableHttpClient(baseURL string, options ...transport.StreamableHTTP if err != nil { return nil, fmt.Errorf("failed to create SSE transport: %w", err) } - return NewClient(trans), nil + clientOptions := make([]ClientOption, 0) + sessionID := trans.GetSessionId() + if sessionID != "" { + clientOptions = append(clientOptions, WithSession(sessionID)) + } + return NewClient(trans, clientOptions...), nil } diff --git a/client/http_test.go b/client/http_test.go index ad4d4ba55..e0142eb2e 100644 --- a/client/http_test.go +++ b/client/http_test.go @@ -7,12 +7,12 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/mark3labs/mcp-go/client/transport" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" ) - func TestHTTPClient(t *testing.T) { hooks := &server.Hooks{} hooks.AddAfterCallTool(func(ctx context.Context, id any, message *mcp.CallToolRequest, result *mcp.CallToolResult) { @@ -81,6 +81,17 @@ func TestHTTPClient(t *testing.T) { }, } + t.Run("Can Configure a server with a pre-existing session", func(t *testing.T) { + sessionID := uuid.NewString() + client, err := NewStreamableHttpClient(testServer.URL, transport.WithSession(sessionID)) + if err != nil { + t.Fatalf("create client failed %v", err) + } + if client.initialized != true { + t.Fatalf("Client is not initialized") + } + }) + t.Run("Can receive notification from server", func(t *testing.T) { client, err := NewStreamableHttpClient(testServer.URL) if err != nil { diff --git a/client/transport/streamable_http.go b/client/transport/streamable_http.go index 2b173ff94..e358751b3 100644 --- a/client/transport/streamable_http.go +++ b/client/transport/streamable_http.go @@ -74,6 +74,13 @@ func WithLogger(logger util.Logger) StreamableHTTPCOption { } } +// WithSession creates a client with a pre-configured session +func WithSession(sessionID string) StreamableHTTPCOption { + return func(sc *StreamableHTTP) { + sc.sessionID.Store(sessionID) + } +} + // StreamableHTTP implements Streamable HTTP transport. // // It transmits JSON-RPC messages over individual HTTP requests. One message per request. @@ -236,7 +243,7 @@ func (c *StreamableHTTP) SendRequest( resp, err := c.sendHTTP(ctx, http.MethodPost, bytes.NewReader(requestBody), "application/json, text/event-stream") if err != nil { - if errors.Is(err, errSessionTerminated) && request.Method == string(mcp.MethodInitialize) { + if errors.Is(err, ErrSessionTerminated) && request.Method == string(mcp.MethodInitialize) { // If the request is initialize, should not return a SessionTerminated error // It should be a genuine endpoint-routing issue. // ( Fall through to return StatusCode checking. ) @@ -357,7 +364,7 @@ func (c *StreamableHTTP) sendHTTP( // universal handling for session terminated if resp.StatusCode == http.StatusNotFound { c.sessionID.CompareAndSwap(sessionID, "") - return nil, errSessionTerminated + return nil, ErrSessionTerminated } return resp, nil @@ -543,7 +550,7 @@ func (c *StreamableHTTP) listenForever(ctx context.Context) { c.logger.Infof("listening to server forever") for { err := c.createGETConnectionToServer(ctx) - if errors.Is(err, errGetMethodNotAllowed) { + if errors.Is(err, ErrGetMethodNotAllowed) { // server does not support listening c.logger.Errorf("server does not support listening") return @@ -563,8 +570,8 @@ func (c *StreamableHTTP) listenForever(ctx context.Context) { } var ( - errSessionTerminated = fmt.Errorf("session terminated (404). need to re-initialize") - errGetMethodNotAllowed = fmt.Errorf("GET method not allowed") + ErrSessionTerminated = fmt.Errorf("session terminated (404). need to re-initialize") + ErrGetMethodNotAllowed = fmt.Errorf("GET method not allowed") retryInterval = 1 * time.Second // a variable is convenient for testing ) @@ -579,7 +586,7 @@ func (c *StreamableHTTP) createGETConnectionToServer(ctx context.Context) error // Check if we got an error response if resp.StatusCode == http.StatusMethodNotAllowed { - return errGetMethodNotAllowed + return ErrGetMethodNotAllowed } if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { From 9a1616e7951f149f18d6f2f5e9215961fd17ea5d Mon Sep 17 00:00:00 2001 From: Matthew Alvarez Date: Tue, 1 Jul 2025 16:08:03 +0000 Subject: [PATCH 2/4] Address nitpick --- client/client.go | 4 ++-- client/http.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index f12a493dc..1c3afa305 100644 --- a/client/client.go +++ b/client/client.go @@ -34,8 +34,8 @@ func WithClientCapabilities(capabilities mcp.ClientCapabilities) ClientOption { } // WithSession assumes a MCP Session has already been initialized -func WithSession(sessionID string) ClientOption { - return func(c *Client) { +func WithSession() ClientOption { + return func(c *Client) { c.initialized = true } } diff --git a/client/http.go b/client/http.go index 1577986da..d001a1e63 100644 --- a/client/http.go +++ b/client/http.go @@ -16,7 +16,7 @@ func NewStreamableHttpClient(baseURL string, options ...transport.StreamableHTTP clientOptions := make([]ClientOption, 0) sessionID := trans.GetSessionId() if sessionID != "" { - clientOptions = append(clientOptions, WithSession(sessionID)) + clientOptions = append(clientOptions, WithSession()) } return NewClient(trans, clientOptions...), nil } From b20c5c1aa7f1654f8a7b50a5b468bb17634a6028 Mon Sep 17 00:00:00 2001 From: Matthew Alvarez Date: Tue, 1 Jul 2025 20:04:00 +0000 Subject: [PATCH 3/4] Expose SessionID of the client --- client/client.go | 9 +++++++++ client/transport/inprocess.go | 4 ++++ client/transport/interface.go | 3 +++ client/transport/sse.go | 6 ++++++ client/transport/stdio.go | 6 ++++++ 5 files changed, 28 insertions(+) diff --git a/client/client.go b/client/client.go index 1c3afa305..53dc5528b 100644 --- a/client/client.go +++ b/client/client.go @@ -439,3 +439,12 @@ func (c *Client) GetServerCapabilities() mcp.ServerCapabilities { func (c *Client) GetClientCapabilities() mcp.ClientCapabilities { return c.clientCapabilities } + +// GetSessionId returns the session ID of the transport. +// If the transport does not support sessions, it returns an empty string. +func (c *Client) GetSessionId() string { + if c.transport == nil { + return "" + } + return c.transport.GetSessionId() +} diff --git a/client/transport/inprocess.go b/client/transport/inprocess.go index 90fc2fae1..0e2393f07 100644 --- a/client/transport/inprocess.go +++ b/client/transport/inprocess.go @@ -68,3 +68,7 @@ func (c *InProcessTransport) SetNotificationHandler(handler func(notification mc func (*InProcessTransport) Close() error { return nil } + +func (c *InProcessTransport) GetSessionId() string { + return "" +} diff --git a/client/transport/interface.go b/client/transport/interface.go index c83c7c65a..59ead6321 100644 --- a/client/transport/interface.go +++ b/client/transport/interface.go @@ -29,6 +29,9 @@ type Interface interface { // Close the connection. Close() error + + // GetSessionId returns the session ID of the transport. + GetSessionId() string } type JSONRPCRequest struct { diff --git a/client/transport/sse.go b/client/transport/sse.go index b22ff62d4..ffe3247f0 100644 --- a/client/transport/sse.go +++ b/client/transport/sse.go @@ -428,6 +428,12 @@ func (c *SSE) Close() error { return nil } +// GetSessionId returns the session ID of the transport. +// Since SSE does not maintain a session ID, it returns an empty string. +func (c *SSE) GetSessionId() string { + return "" +} + // SendNotification sends a JSON-RPC notification to the server without expecting a response. func (c *SSE) SendNotification(ctx context.Context, notification mcp.JSONRPCNotification) error { if c.endpoint == nil { diff --git a/client/transport/stdio.go b/client/transport/stdio.go index c300c405f..a04a71acc 100644 --- a/client/transport/stdio.go +++ b/client/transport/stdio.go @@ -148,6 +148,12 @@ func (c *Stdio) Close() error { return nil } +// GetSessionId returns the session ID of the transport. +// Since stdio does not maintain a session ID, it returns an empty string. +func (c *Stdio) GetSessionId() string { + return "" +} + // SetNotificationHandler sets the handler function to be called when a notification is received. // Only one handler can be set at a time; setting a new one replaces the previous handler. func (c *Stdio) SetNotificationHandler( From 39bc055c30fafe73cd1e38a42282f4b76a2111f4 Mon Sep 17 00:00:00 2001 From: Matthew Alvarez Date: Wed, 2 Jul 2025 15:19:23 +0000 Subject: [PATCH 4/4] Address feedback, add docs about preconfigured sessions --- client/client.go | 5 +++++ client/http_test.go | 2 +- www/docs/pages/clients/transports.mdx | 24 ++++++++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 53dc5528b..5e3f1a7d9 100644 --- a/client/client.go +++ b/client/client.go @@ -448,3 +448,8 @@ func (c *Client) GetSessionId() string { } return c.transport.GetSessionId() } + +// IsInitialized returns true if the client has been initialized. +func (c *Client) IsInitialized() bool { + return c.initialized +} diff --git a/client/http_test.go b/client/http_test.go index e0142eb2e..514004857 100644 --- a/client/http_test.go +++ b/client/http_test.go @@ -87,7 +87,7 @@ func TestHTTPClient(t *testing.T) { if err != nil { t.Fatalf("create client failed %v", err) } - if client.initialized != true { + if client.IsInitialized() != true { t.Fatalf("Client is not initialized") } }) diff --git a/www/docs/pages/clients/transports.mdx b/www/docs/pages/clients/transports.mdx index efef67cf8..af25fb65a 100644 --- a/www/docs/pages/clients/transports.mdx +++ b/www/docs/pages/clients/transports.mdx @@ -389,6 +389,30 @@ func (pool *StreamableHTTPClientPool) CallTool(ctx context.Context, req mcp.Call } ``` +### StreamableHTTP With Preconfigured Session +You can also create a StreamableHTTP client with a preconfigured session, which allows you to reuse the same session across multiple requests + +```go +func createStreamableHTTPClientWithSession() { + // Create StreamableHTTP client with options + sessionID := // fetch existing session ID + c := client.NewStreamableHttpClient("https://api.example.com/mcp", + transport.WithSession(sessionID), + ) + defer c.Close() + + ctx := context.Background() + // Use client... + _, err := c.ListTools(ctx) + // If the session is terminated, you must reinitialize the client + if errors.Is(err, transport.ErrSessionTerminated) { + c.Initialize(ctx) // Reinitialize if session is terminated + // The session ID should change after reinitialization + sessionID = c.GetSessionId() // Update session ID + } +} +``` + ## SSE Client SSE (Server-Sent Events) clients provide real-time communication with servers.