@@ -44,8 +44,24 @@ public class StreamableHttpClientTransport implements McpClientTransport {
44
44
45
45
private static final Logger LOGGER = LoggerFactory .getLogger (StreamableHttpClientTransport .class );
46
46
47
+ private static final String DEFAULT_MCP_ENDPOINT = "/mcp" ;
48
+
47
49
private static final String MCP_SESSION_ID = "Mcp-Session-Id" ;
48
50
51
+ private static final String LAST_EVENT_ID = "Last-Event-ID" ;
52
+
53
+ private static final String ACCEPT = "Accept" ;
54
+
55
+ private static final String CONTENT_TYPE = "Content-Type" ;
56
+
57
+ private static final String APPLICATION_JSON = "application/json" ;
58
+
59
+ private static final String TEXT_EVENT_STREAM = "text/event-stream" ;
60
+
61
+ private static final String APPLICATION_JSON_SEQ = "application/json-seq" ;
62
+
63
+ private static final String DEFAULT_ACCEPT_VALUES = "%s, %s" .formatted (APPLICATION_JSON , TEXT_EVENT_STREAM );
64
+
49
65
private final HttpClientSseClientTransport sseClientTransport ;
50
66
51
67
private final HttpClient httpClient ;
@@ -107,7 +123,7 @@ public static class Builder {
107
123
108
124
private String baseUri ;
109
125
110
- private String endpoint = "/mcp" ;
126
+ private String endpoint = DEFAULT_MCP_ENDPOINT ;
111
127
112
128
private Consumer <HttpClient .Builder > clientCustomizer ;
113
129
@@ -156,7 +172,7 @@ public StreamableHttpClientTransport build() {
156
172
builder .customizeRequest (requestCustomizer );
157
173
}
158
174
159
- if (!endpoint .equals ("/mcp" )) {
175
+ if (!endpoint .equals (DEFAULT_MCP_ENDPOINT )) {
160
176
builder .sseEndpoint (endpoint );
161
177
}
162
178
@@ -177,20 +193,24 @@ public Mono<Void> connect(final Function<Mono<McpSchema.JSONRPCMessage>, Mono<Mc
177
193
}
178
194
179
195
return Mono .defer (() -> Mono .fromFuture (() -> {
180
- final HttpRequest .Builder request = requestBuilder .copy ()
181
- .GET ()
182
- .header ("Accept" , "text/event-stream" )
183
- .uri (uri );
196
+ final HttpRequest .Builder request = requestBuilder .copy ().GET ().header (ACCEPT , TEXT_EVENT_STREAM ).uri (uri );
184
197
final String lastId = lastEventId .get ();
185
198
if (lastId != null ) {
186
- request .header ("Last-Event-ID" , lastId );
199
+ request .header (LAST_EVENT_ID , lastId );
187
200
}
188
201
if (mcpSessionId .get () != null ) {
189
202
request .header (MCP_SESSION_ID , mcpSessionId .get ());
190
203
}
191
204
192
205
return httpClient .sendAsync (request .build (), HttpResponse .BodyHandlers .ofInputStream ());
193
206
}).flatMap (response -> {
207
+ // must like server terminate session and the client need to start a
208
+ // new session by sending a new `InitializeRequest` without a session
209
+ // ID attached.
210
+ if (mcpSessionId .get () != null && response .statusCode () == 404 ) {
211
+ mcpSessionId .set (null );
212
+ }
213
+
194
214
if (response .statusCode () == 405 || response .statusCode () == 404 ) {
195
215
LOGGER .warn ("Operation not allowed, falling back to SSE" );
196
216
fallbackToSse .set (true );
@@ -226,8 +246,8 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
226
246
return serializeJson (message ).flatMap (json -> {
227
247
final HttpRequest .Builder request = requestBuilder .copy ()
228
248
.POST (HttpRequest .BodyPublishers .ofString (json ))
229
- .header ("Accept" , "application/json, text/event-stream" )
230
- .header ("Content-Type" , "application/json" )
249
+ .header (ACCEPT , DEFAULT_ACCEPT_VALUES )
250
+ .header (CONTENT_TYPE , APPLICATION_JSON )
231
251
.uri (uri );
232
252
if (mcpSessionId .get () != null ) {
233
253
request .header (MCP_SESSION_ID , mcpSessionId .get ());
@@ -238,7 +258,13 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
238
258
239
259
// server may assign a session ID at initialization time, if yes we
240
260
// have to use it for any subsequent requests
241
- response .headers ().firstValue (MCP_SESSION_ID ).map (String ::trim ).ifPresent (this .mcpSessionId ::set );
261
+ if (message instanceof McpSchema .JSONRPCRequest
262
+ && ((McpSchema .JSONRPCRequest ) message ).method ().equals (McpSchema .METHOD_INITIALIZE )) {
263
+ response .headers ()
264
+ .firstValue (MCP_SESSION_ID )
265
+ .map (String ::trim )
266
+ .ifPresent (this .mcpSessionId ::set );
267
+ }
242
268
243
269
// If the response is 202 Accepted, there's no body to process
244
270
if (response .statusCode () == 202 ) {
@@ -296,19 +322,17 @@ private Mono<String> serializeJson(final McpSchema.JSONRPCMessage msg) {
296
322
297
323
private Mono <Void > handleStreamingResponse (final HttpResponse <InputStream > response ,
298
324
final Function <Mono <McpSchema .JSONRPCMessage >, Mono <McpSchema .JSONRPCMessage >> handler ) {
299
- final String contentType = response .headers ().firstValue ("Content-Type" ).orElse ("" );
300
- if (contentType .contains ("application/json-seq" )) {
325
+ final String contentType = response .headers ().firstValue (CONTENT_TYPE ).orElse ("" );
326
+ if (contentType .contains (APPLICATION_JSON_SEQ )) {
301
327
return handleJsonStream (response , handler );
302
328
}
303
- else if (contentType .contains ("text/event-stream" )) {
329
+ else if (contentType .contains (TEXT_EVENT_STREAM )) {
304
330
return handleSseStream (response , handler );
305
331
}
306
- else if (contentType .contains ("application/json" )) {
332
+ else if (contentType .contains (APPLICATION_JSON )) {
307
333
return handleSingleJson (response , handler );
308
334
}
309
- else {
310
- return Mono .error (new UnsupportedOperationException ("Unsupported Content-Type: " + contentType ));
311
- }
335
+ return Mono .error (new UnsupportedOperationException ("Unsupported Content-Type: " + contentType ));
312
336
}
313
337
314
338
private Mono <Void > handleSingleJson (final HttpResponse <InputStream > response ,
@@ -381,7 +405,7 @@ else if (node.isObject()) {
381
405
}
382
406
else {
383
407
String warning = "Unexpected JSON in SSE data: " + rawData ;
384
- LOGGER .warn ("Unexpected JSON in SSE data: {}" , rawData );
408
+ LOGGER .warn (warning );
385
409
return Mono .error (new IllegalArgumentException (warning ));
386
410
}
387
411
0 commit comments