Skip to content

Commit e6045f7

Browse files
committed
Basic version negotiation capabilities
- We do not support full version negotiation, but this commit allows the client to the send the correct version to match that of the server. - There is one limitation: the GET /mcp request happens too early, before we have deserialized the initialization response, so we do not know the negotiated version yet. This request will have the client latest version as Mcp-Protocol-Version value. Signed-off-by: Daniel Garnier-Moiroux <git@garnier.wf>
1 parent 7e950eb commit e6045f7

File tree

9 files changed

+53
-27
lines changed

9 files changed

+53
-27
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/LifecycleInitializer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,9 @@ public <T> Mono<T> withInitialization(String actionName, Function<Initialization
287287
this.initializationRef.compareAndSet(newInit, null);
288288
return Mono.error(new RuntimeException("Client failed to initialize " + actionName, ex));
289289
})
290-
.flatMap(operation);
290+
.flatMap(res -> operation.apply(res)
291+
.contextWrite(c -> c.put(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
292+
res.initializeResult().protocolVersion())));
291293
});
292294
}
293295

@@ -319,6 +321,8 @@ private Mono<McpSchema.InitializeResult> doInitialize(DefaultInitialization init
319321
}
320322

321323
return mcpClientSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
324+
.contextWrite(
325+
c -> c.put(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION, initializeResult.protocolVersion()))
322326
.thenReturn(initializeResult);
323327
}).flatMap(initializeResult -> {
324328
initialization.cacheResult(initializeResult);

mcp-core/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ public class McpAsyncClient {
106106
public static final TypeRef<McpSchema.ProgressNotification> PROGRESS_NOTIFICATION_TYPE_REF = new TypeRef<>() {
107107
};
108108

109+
public static final String NEGOTIATED_PROTOCOL_VERSION = "io.modelcontextprotocol.client.negotiated-protocol-version";
110+
109111
/**
110112
* Client capabilities.
111113
*/

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.function.Consumer;
2121
import java.util.function.Function;
2222

23+
import io.modelcontextprotocol.client.McpAsyncClient;
2324
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
2425
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
2526
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
@@ -193,7 +194,9 @@ private Publisher<Void> createDelete(String sessionId) {
193194
.uri(uri)
194195
.header("Cache-Control", "no-cache")
195196
.header(HttpHeaders.MCP_SESSION_ID, sessionId)
196-
.header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
197+
.header(HttpHeaders.PROTOCOL_VERSION,
198+
ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
199+
this.latestSupportedProtocolVersion))
197200
.DELETE();
198201
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
199202
return Mono.from(this.httpRequestCustomizer.customize(builder, "DELETE", uri, null, transportContext));
@@ -264,7 +267,9 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
264267
var builder = requestBuilder.uri(uri)
265268
.header(HttpHeaders.ACCEPT, TEXT_EVENT_STREAM)
266269
.header("Cache-Control", "no-cache")
267-
.header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
270+
.header(HttpHeaders.PROTOCOL_VERSION,
271+
connectionCtx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
272+
this.latestSupportedProtocolVersion))
268273
.GET();
269274
var transportContext = connectionCtx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
270275
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null, transportContext));
@@ -439,7 +444,9 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
439444
.header(HttpHeaders.ACCEPT, APPLICATION_JSON + ", " + TEXT_EVENT_STREAM)
440445
.header(HttpHeaders.CONTENT_TYPE, APPLICATION_JSON)
441446
.header(HttpHeaders.CACHE_CONTROL, "no-cache")
442-
.header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
447+
.header(HttpHeaders.PROTOCOL_VERSION,
448+
ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
449+
this.latestSupportedProtocolVersion))
443450
.POST(HttpRequest.BodyPublishers.ofString(jsonBody));
444451
var transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
445452
return Mono

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpTransportSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
package io.modelcontextprotocol.spec;
66

7-
import org.reactivestreams.Publisher;
8-
97
import java.util.Optional;
108

9+
import org.reactivestreams.Publisher;
10+
1111
/**
1212
* An abstraction of the session as perceived from the MCP transport layer. Not to be
1313
* confused with the {@link McpSession} type that operates at the level of the JSON-RPC

mcp-core/src/test/java/io/modelcontextprotocol/common/HttpClientStreamableHttpVersionNegotiationIntegrationTests.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ void usesLatestVersion() {
8989
}
9090

9191
@Test
92-
void usesCustomLatestVersion() {
92+
void usesServerSupportedVersion() {
9393
startTomcat();
9494

9595
var transport = HttpClientStreamableHttpTransport.builder("http://localhost:" + PORT)
@@ -101,19 +101,21 @@ void usesCustomLatestVersion() {
101101
McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));
102102

103103
var calls = requestRecordingFilter.getCalls();
104-
105-
assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\""))
106-
// GET /mcp ; POST notification/initialized ; POST tools/call
107-
.hasSize(3)
104+
// Initialize tells the server the Client's latest supported version
105+
// FIXME: Set the correct protocol version on GET /mcp
106+
assertThat(calls).filteredOn(c -> c.method().equals("POST") && !c.body().contains("\"method\":\"initialize\""))
107+
// POST notification/initialized ; POST tools/call
108+
.hasSize(2)
108109
.map(McpTestRequestRecordingServletFilter.Call::headers)
109-
.allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version", "2263-03-18"));
110+
.allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version",
111+
ProtocolVersions.MCP_2025_06_18));
110112

111113
assertThat(response).isNotNull();
112114
assertThat(response.content()).hasSize(1)
113115
.first()
114116
.extracting(McpSchema.TextContent.class::cast)
115117
.extracting(McpSchema.TextContent::text)
116-
.isEqualTo("2263-03-18");
118+
.isEqualTo(ProtocolVersions.MCP_2025_06_18);
117119
mcpServer.close();
118120
}
119121

mcp-core/src/test/java/io/modelcontextprotocol/server/transport/McpTestRequestRecordingServletFilter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
4646
.collect(Collectors.toUnmodifiableMap(Function.identity(),
4747
name -> String.join(",", Collections.list(req.getHeaders(name)))));
4848
var request = new CachedBodyHttpServletRequest(req);
49-
calls.add(new Call(headers, request.getBodyAsString()));
49+
calls.add(new Call(req.getMethod(), headers, request.getBodyAsString()));
5050
filterChain.doFilter(request, servletResponse);
5151
}
5252
else {
@@ -60,7 +60,7 @@ public List<Call> getCalls() {
6060
return List.copyOf(calls);
6161
}
6262

63-
public record Call(Map<String, String> headers, String body) {
63+
public record Call(String method, Map<String, String> headers, String body) {
6464

6565
}
6666

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.web.reactive.function.client.WebClient;
2525
import org.springframework.web.reactive.function.client.WebClientResponseException;
2626

27+
import io.modelcontextprotocol.client.McpAsyncClient;
2728
import io.modelcontextprotocol.json.McpJsonMapper;
2829
import io.modelcontextprotocol.json.TypeRef;
2930
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
@@ -225,7 +226,9 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
225226
Disposable connection = webClient.get()
226227
.uri(this.endpoint)
227228
.accept(MediaType.TEXT_EVENT_STREAM)
228-
.header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
229+
.header(HttpHeaders.PROTOCOL_VERSION,
230+
ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
231+
this.latestSupportedProtocolVersion))
229232
.headers(httpHeaders -> {
230233
transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id));
231234
if (stream != null) {
@@ -288,10 +291,12 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
288291
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
289292
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
290293

291-
Disposable connection = webClient.post()
294+
Disposable connection = Flux.deferContextual(ctx -> webClient.post()
292295
.uri(this.endpoint)
293296
.accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
294-
.header(HttpHeaders.PROTOCOL_VERSION, this.latestSupportedProtocolVersion)
297+
.header(HttpHeaders.PROTOCOL_VERSION,
298+
ctx.getOrDefault(McpAsyncClient.NEGOTIATED_PROTOCOL_VERSION,
299+
this.latestSupportedProtocolVersion))
295300
.headers(httpHeaders -> {
296301
transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id));
297302
})
@@ -350,7 +355,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
350355
}
351356
return this.extractError(response, sessionRepresentation);
352357
}
353-
})
358+
}))
354359
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
355360
.onErrorComplete(t -> {
356361
// handle the error first

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/WebFluxStreamableHttpVersionNegotiationIntegrationTests.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import reactor.netty.DisposableServer;
2828
import reactor.netty.http.server.HttpServer;
2929

30+
import org.springframework.http.HttpMethod;
3031
import org.springframework.http.server.reactive.HttpHandler;
3132
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
3233
import org.springframework.web.reactive.function.client.WebClient;
@@ -116,7 +117,7 @@ void usesLatestVersion() {
116117
}
117118

118119
@Test
119-
void usesCustomLatestVersion() {
120+
void usesServerSupportedVersion() {
120121
var transport = WebClientStreamableHttpTransport
121122
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT))
122123
.supportedProtocolVersions(List.of(ProtocolVersions.MCP_2025_06_18, "2263-03-18"))
@@ -128,18 +129,22 @@ void usesCustomLatestVersion() {
128129
McpSchema.CallToolResult response = client.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));
129130

130131
var calls = recordingFilterFunction.getCalls();
131-
assertThat(calls).filteredOn(c -> !c.body().contains("\"method\":\"initialize\""))
132-
// GET /mcp ; POST notification/initialized ; POST tools/call
133-
.hasSize(3)
132+
// Initialize tells the server the Client's latest supported version
133+
// FIXME: Set the correct protocol version on GET /mcp
134+
assertThat(calls)
135+
.filteredOn(c -> !c.body().contains("\"method\":\"initialize\"") && c.method().equals(HttpMethod.POST))
136+
// POST notification/initialized ; POST tools/call
137+
.hasSize(2)
134138
.map(McpTestRequestRecordingExchangeFilterFunction.Call::headers)
135-
.allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version", "2263-03-18"));
139+
.allSatisfy(headers -> assertThat(headers).containsEntry("mcp-protocol-version",
140+
ProtocolVersions.MCP_2025_06_18));
136141

137142
assertThat(response).isNotNull();
138143
assertThat(response.content()).hasSize(1)
139144
.first()
140145
.extracting(McpSchema.TextContent.class::cast)
141146
.extracting(McpSchema.TextContent::text)
142-
.isEqualTo("2263-03-18");
147+
.isEqualTo(ProtocolVersions.MCP_2025_06_18);
143148
mcpServer.close();
144149
}
145150

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/utils/McpTestRequestRecordingExchangeFilterFunction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import reactor.core.publisher.Mono;
1313

14+
import org.springframework.http.HttpMethod;
1415
import org.springframework.web.reactive.function.server.HandlerFilterFunction;
1516
import org.springframework.web.reactive.function.server.HandlerFunction;
1617
import org.springframework.web.reactive.function.server.ServerRequest;
@@ -34,7 +35,7 @@ public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction next)
3435
.collect(Collectors.toMap(String::toLowerCase, k -> String.join(",", request.headers().header(k))));
3536

3637
var cr = request.bodyToMono(String.class).defaultIfEmpty("").map(body -> {
37-
this.calls.add(new Call(headers, body));
38+
this.calls.add(new Call(request.method(), headers, body));
3839
return ServerRequest.from(request).body(body).build();
3940
});
4041

@@ -46,7 +47,7 @@ public List<Call> getCalls() {
4647
return List.copyOf(calls);
4748
}
4849

49-
public record Call(Map<String, String> headers, String body) {
50+
public record Call(HttpMethod method, Map<String, String> headers, String body) {
5051

5152
}
5253

0 commit comments

Comments
 (0)