Skip to content

Commit c1f0a93

Browse files
committed
makes possible to use StreamableHttpClientTransport in McpClient
1 parent 2cc08d0 commit c1f0a93

File tree

6 files changed

+85
-47
lines changed

6 files changed

+85
-47
lines changed

Diff for: mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public class McpAsyncClient {
160160
* @param features the MCP Client supported features.
161161
*/
162162
McpAsyncClient(McpClientTransport transport, Duration requestTimeout, Duration initializationTimeout,
163-
McpClientFeatures.Async features) {
163+
McpClientFeatures.Async features, boolean connectOnInit) {
164164

165165
Assert.notNull(transport, "Transport must not be null");
166166
Assert.notNull(requestTimeout, "Request timeout must not be null");
@@ -235,7 +235,9 @@ public class McpAsyncClient {
235235
asyncLoggingNotificationHandler(loggingConsumersFinal));
236236

237237
this.mcpSession = new McpClientSession(requestTimeout, transport, requestHandlers, notificationHandlers);
238-
238+
if (connectOnInit) {
239+
this.mcpSession.openSSE();
240+
}
239241
}
240242

241243
/**
@@ -302,6 +304,18 @@ public Mono<Void> closeGracefully() {
302304
return this.mcpSession.closeGracefully();
303305
}
304306

307+
// ---------------------------
308+
// open an SSE stream
309+
// ---------------------------
310+
/**
311+
* The client may issue an HTTP GET to the MCP endpoint. This can be used to open an
312+
* SSE stream, allowing the server to communicate to the client, without the client
313+
* first sending data via HTTP POST.
314+
*/
315+
public void openSSE() {
316+
this.mcpSession.openSSE();
317+
}
318+
305319
// --------------------------
306320
// Initialization
307321
// --------------------------

Diff for: mcp/src/main/java/io/modelcontextprotocol/client/McpClient.java

+32-5
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,13 @@ class SyncSpec {
157157

158158
private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout
159159

160+
private boolean connectOnInit = true; // Default true, for backward compatibility
161+
160162
private Duration initializationTimeout = Duration.ofSeconds(20);
161163

162164
private ClientCapabilities capabilities;
163165

164-
private Implementation clientInfo = new Implementation("Java SDK MCP Client", "1.0.0");
166+
private Implementation clientInfo = new Implementation("Java SDK MCP Sync Client", "0.10.0");
165167

166168
private final Map<String, Root> roots = new HashMap<>();
167169

@@ -195,6 +197,17 @@ public SyncSpec requestTimeout(Duration requestTimeout) {
195197
return this;
196198
}
197199

200+
/**
201+
* Sets whether to connect to the server during the initialization phase (open an
202+
* SSE stream).
203+
* @param connectOnInit true to open an SSE stream during the initialization
204+
* @return This builder instance for method chaining
205+
*/
206+
public SyncSpec withConnectOnInit(final boolean connectOnInit) {
207+
this.connectOnInit = connectOnInit;
208+
return this;
209+
}
210+
198211
/**
199212
* @param initializationTimeout The duration to wait for the initialization
200213
* lifecycle step to complete.
@@ -368,8 +381,8 @@ public McpSyncClient build() {
368381

369382
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
370383

371-
return new McpSyncClient(
372-
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures));
384+
return new McpSyncClient(new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout,
385+
asyncFeatures, this.connectOnInit));
373386
}
374387

375388
}
@@ -396,11 +409,13 @@ class AsyncSpec {
396409

397410
private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout
398411

412+
private boolean connectOnInit = true; // Default true, for backward compatibility
413+
399414
private Duration initializationTimeout = Duration.ofSeconds(20);
400415

401416
private ClientCapabilities capabilities;
402417

403-
private Implementation clientInfo = new Implementation("Spring AI MCP Client", "0.3.1");
418+
private Implementation clientInfo = new Implementation("Java SDK MCP Async Client", "0.10.0");
404419

405420
private final Map<String, Root> roots = new HashMap<>();
406421

@@ -434,6 +449,17 @@ public AsyncSpec requestTimeout(Duration requestTimeout) {
434449
return this;
435450
}
436451

452+
/**
453+
* Sets whether to connect to the server during the initialization phase (open an
454+
* SSE stream).
455+
* @param connectOnInit true to open an SSE stream during the initialization
456+
* @return This builder instance for method chaining
457+
*/
458+
public AsyncSpec withConnectOnInit(final boolean connectOnInit) {
459+
this.connectOnInit = connectOnInit;
460+
return this;
461+
}
462+
437463
/**
438464
* @param initializationTimeout The duration to wait for the initialization
439465
* lifecycle step to complete.
@@ -606,7 +632,8 @@ public McpAsyncClient build() {
606632
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
607633
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
608634
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
609-
this.loggingConsumers, this.samplingHandler));
635+
this.loggingConsumers, this.samplingHandler),
636+
this.connectOnInit);
610637
}
611638

612639
}

Diff for: mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,18 @@ public boolean closeGracefully() {
129129
return true;
130130
}
131131

132+
// ---------------------------
133+
// open an SSE stream
134+
// ---------------------------
135+
/**
136+
* The client may issue an HTTP GET to the MCP endpoint. This can be used to open an
137+
* SSE stream, allowing the server to communicate to the client, without the client
138+
* first sending data via HTTP POST.
139+
*/
140+
public void openSSE() {
141+
this.delegate.openSSE();
142+
}
143+
132144
/**
133145
* The initialization phase MUST be the first interaction between client and server.
134146
* During this phase, the client and server:
@@ -148,9 +160,7 @@ public boolean closeGracefully() {
148160
* The server MUST respond with its own capabilities and information:
149161
* {@link McpSchema.ServerCapabilities}. <br/>
150162
* After successful initialization, the client MUST send an initialized notification
151-
* to indicate it is ready to begin normal operations.
152-
*
153-
* <br/>
163+
* to indicate it is ready to begin normal operations. <br/>
154164
*
155165
* <a href=
156166
* "https://github.com/modelcontextprotocol/specification/blob/main/docs/specification/basic/lifecycle.md#initialization">Initialization
@@ -272,9 +282,8 @@ public McpSchema.ReadResourceResult readResource(McpSchema.ReadResourceRequest r
272282

273283
/**
274284
* Resource templates allow servers to expose parameterized resources using URI
275-
* templates. Arguments may be auto-completed through the completion API.
276-
*
277-
* Request a list of resource templates the server has.
285+
* templates. Arguments may be auto-completed through the completion API. Request a
286+
* list of resource templates the server has.
278287
* @param cursor the cursor
279288
* @return the list of resource templates result.
280289
*/
@@ -293,9 +302,7 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() {
293302
/**
294303
* Subscriptions. The protocol supports optional subscriptions to resource changes.
295304
* Clients can subscribe to specific resources and receive notifications when they
296-
* change.
297-
*
298-
* Send a resources/subscribe request.
305+
* change. Send a resources/subscribe request.
299306
* @param subscribeRequest the subscribe request contains the uri of the resource to
300307
* subscribe to.
301308
*/

Diff for: mcp/src/main/java/io/modelcontextprotocol/client/transport/StreamableHttpClientTransport.java

+3-28
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ public class StreamableHttpClientTransport implements McpClientTransport {
7272

7373
private final URI uri;
7474

75-
private final AtomicReference<TransportState> state = new AtomicReference<>(TransportState.DISCONNECTED);
76-
7775
private final AtomicReference<String> lastEventId = new AtomicReference<>();
7876

7977
private final AtomicReference<String> mcpSessionId = new AtomicReference<>();
@@ -99,15 +97,6 @@ public static Builder builder(final String uri) {
9997
return new Builder().withBaseUri(uri);
10098
}
10199

102-
/**
103-
* The state of the Transport connection.
104-
*/
105-
public enum TransportState {
106-
107-
DISCONNECTED, CONNECTING, CONNECTED, CLOSED
108-
109-
}
110-
111100
/**
112101
* A builder for creating instances of WebSocketClientTransport.
113102
*/
@@ -188,10 +177,6 @@ public Mono<Void> connect(final Function<Mono<McpSchema.JSONRPCMessage>, Mono<Mc
188177
return sseClientTransport.connect(handler);
189178
}
190179

191-
if (!state.compareAndSet(TransportState.DISCONNECTED, TransportState.CONNECTING)) {
192-
return Mono.error(new IllegalStateException("Already connected or connecting"));
193-
}
194-
195180
return Mono.defer(() -> Mono.fromFuture(() -> {
196181
final HttpRequest.Builder request = requestBuilder.copy().GET().header(ACCEPT, TEXT_EVENT_STREAM).uri(uri);
197182
final String lastId = lastEventId.get();
@@ -219,13 +204,10 @@ public Mono<Void> connect(final Function<Mono<McpSchema.JSONRPCMessage>, Mono<Mc
219204
return handleStreamingResponse(response, handler);
220205
})
221206
.retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).filter(err -> err instanceof IllegalStateException))
222-
.doOnSuccess(v -> state.set(TransportState.CONNECTED))
223-
.doOnTerminate(() -> state.set(TransportState.CLOSED))
224207
.onErrorResume(e -> {
225208
LOGGER.error("Streamable transport connection error", e);
226-
state.set(TransportState.DISCONNECTED);
227209
return Mono.error(e);
228-
}));
210+
})).doOnTerminate(this::closeGracefully);
229211
}
230212

231213
@Override
@@ -239,10 +221,6 @@ public Mono<Void> sendMessage(final McpSchema.JSONRPCMessage message,
239221
return fallbackToSse(message);
240222
}
241223

242-
if (state.get() == TransportState.CLOSED) {
243-
return Mono.empty();
244-
}
245-
246224
return serializeJson(message).flatMap(json -> {
247225
final HttpRequest.Builder request = requestBuilder.copy()
248226
.POST(HttpRequest.BodyPublishers.ofString(json))
@@ -427,7 +405,8 @@ else if (node.isObject()) {
427405

428406
@Override
429407
public Mono<Void> closeGracefully() {
430-
state.set(TransportState.CLOSED);
408+
mcpSessionId.set(null);
409+
lastEventId.set(null);
431410
if (fallbackToSse.get()) {
432411
return sseClientTransport.closeGracefully();
433412
}
@@ -439,8 +418,4 @@ public <T> T unmarshalFrom(final Object data, final TypeReference<T> typeRef) {
439418
return objectMapper.convertValue(data, typeRef);
440419
}
441420

442-
public TransportState getState() {
443-
return state.get();
444-
}
445-
446421
}

Diff for: mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class McpClientSession implements McpSession {
6161
/** Atomic counter for generating unique request IDs */
6262
private final AtomicLong requestCounter = new AtomicLong(0);
6363

64-
private final Disposable connection;
64+
private Disposable connection;
6565

6666
/**
6767
* Functional interface for handling incoming JSON-RPC requests. Implementations
@@ -116,6 +116,17 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
116116
this.transport = transport;
117117
this.requestHandlers.putAll(requestHandlers);
118118
this.notificationHandlers.putAll(notificationHandlers);
119+
}
120+
121+
/**
122+
* The client may issue an HTTP GET to the MCP endpoint. This can be used to open an
123+
* SSE stream, allowing the server to communicate to the client, without the client
124+
* first sending data via HTTP POST.
125+
*/
126+
public void openSSE() {
127+
if (this.connection != null && !this.connection.isDisposed()) {
128+
return; // already connected and still active
129+
}
119130

120131
// TODO: consider mono.transformDeferredContextual where the Context contains
121132
// the
@@ -282,7 +293,9 @@ public Mono<Void> closeGracefully() {
282293
*/
283294
@Override
284295
public void close() {
285-
this.connection.dispose();
296+
if (this.connection != null) {
297+
this.connection.dispose();
298+
}
286299
transport.close();
287300
}
288301

Diff for: mcp/src/test/java/io/modelcontextprotocol/spec/McpClientSessionTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ void setUp() {
4848
transport = new MockMcpClientTransport();
4949
session = new McpClientSession(TIMEOUT, transport, Map.of(),
5050
Map.of(TEST_NOTIFICATION, params -> Mono.fromRunnable(() -> logger.info("Status update: " + params))));
51+
session.openSSE();
5152
}
5253

5354
@AfterEach
@@ -141,6 +142,7 @@ void testRequestHandling() {
141142
params -> Mono.just(params));
142143
transport = new MockMcpClientTransport();
143144
session = new McpClientSession(TIMEOUT, transport, requestHandlers, Map.of());
145+
session.openSSE();
144146

145147
// Simulate incoming request
146148
McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, ECHO_METHOD,
@@ -162,7 +164,7 @@ void testNotificationHandling() {
162164
transport = new MockMcpClientTransport();
163165
session = new McpClientSession(TIMEOUT, transport, Map.of(),
164166
Map.of(TEST_NOTIFICATION, params -> Mono.fromRunnable(() -> receivedParams.tryEmitValue(params))));
165-
167+
session.openSSE();
166168
// Simulate incoming notification from the server
167169
Map<String, Object> notificationParams = Map.of("status", "ready");
168170

0 commit comments

Comments
 (0)