From 2d383f5648020485d18265349a8873a21b92f8e4 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Tue, 18 Feb 2025 17:52:07 +0100 Subject: [PATCH 1/7] refactor(client): improve validation and remove server methods Add client initialization and capability validation checks - New isInitialized() method to check client state - Validate server capabilities before tool/resource operations - Add clear error messages for common failure cases - Remove server-side notification methods from client: sendResourcesListChanged(), promptListChangedNotification() - Improve protocol version handling - Testing improvements and new initialization tests Resolves #13 Signed-off-by: Christian Tzolov --- .../client/AbstractMcpAsyncClientTests.java | 3 - .../client/AbstractMcpSyncClientTests.java | 3 - .../client/McpAsyncClient.java | 57 +++++++---- .../client/McpSyncClient.java | 17 ---- .../server/McpAsyncServer.java | 4 + .../MockMcpTransport.java | 7 ++ .../client/AbstractMcpAsyncClientTests.java | 3 - .../client/AbstractMcpSyncClientTests.java | 3 - .../McpAsyncClientResponseHandlerTests.java | 99 +++++++++++++++++++ 9 files changed, 148 insertions(+), 48 deletions(-) diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index a904a87ed..092d5afe8 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -274,9 +274,6 @@ void testNotificationHandlers() { assertThatCode(() -> { client.initialize().block(); - // Trigger notifications - client.sendResourcesListChanged().block(); - client.promptListChangedNotification().block(); client.closeGracefully().block(); }).doesNotThrowAnyException(); } diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java index b1c58a604..796dd5ace 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java @@ -258,9 +258,6 @@ void testNotificationHandlers() { assertThatCode(() -> { client.initialize(); - // Trigger notifications - client.sendResourcesListChanged(); - client.promptListChangedNotification(); client.close(); }).doesNotThrowAnyException(); } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index a49ad7fdd..9dabf4c5a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -296,6 +296,14 @@ public McpSchema.Implementation getServerInfo() { return this.serverInfo; } + /** + * Check if the client-server connection is initialized. + * @return true if the client-server connection is initialized + */ + public boolean isInitialized() { + return this.serverCapabilities != null; + } + /** * Get the client capabilities that define the supported features and functionality. * @return The client capabilities @@ -456,6 +464,12 @@ private RequestHandler samplingCreateMessageHandler() { * (false/absent) */ public Mono callTool(McpSchema.CallToolRequest callToolRequest) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before calling tools")); + } + if (this.serverCapabilities.tools() == null) { + return Mono.error(new McpError("Server does not provide tools capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF); } @@ -477,6 +491,12 @@ public Mono listTools() { * Optional cursor for pagination if more tools are available */ public Mono listTools(String cursor) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before calling tools")); + } + if (this.serverCapabilities.tools() == null) { + return Mono.error(new McpError("Server does not provide tools capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor), LIST_TOOLS_RESULT_TYPE_REF); } @@ -532,6 +552,12 @@ public Mono listResources() { * @return A Mono that completes with the list of resources result */ public Mono listResources(String cursor) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before calling tools")); + } + if (this.serverCapabilities.resources() == null) { + return Mono.error(new McpError("Server does not provide the resources capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_LIST, new McpSchema.PaginatedRequest(cursor), LIST_RESOURCES_RESULT_TYPE_REF); } @@ -551,6 +577,12 @@ public Mono readResource(McpSchema.Resource resour * @return A Mono that completes with the resource content */ public Mono readResource(McpSchema.ReadResourceRequest readResourceRequest) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before calling tools")); + } + if (this.serverCapabilities.resources() == null) { + return Mono.error(new McpError("Server does not provide the resources capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_READ, readResourceRequest, READ_RESOURCE_RESULT_TYPE_REF); } @@ -575,19 +607,16 @@ public Mono listResourceTemplates() { * @return A Mono that completes with the list of resource templates result */ public Mono listResourceTemplates(String cursor) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before calling tools")); + } + if (this.serverCapabilities.resources() == null) { + return Mono.error(new McpError("Server does not provide the resources capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, new McpSchema.PaginatedRequest(cursor), LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF); } - /** - * List Changed Notification. When the list of available resources changes, servers - * that declared the listChanged capability SHOULD send a notification. - * @return A Mono that completes when the notification is sent - */ - public Mono sendResourcesListChanged() { - return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED); - } - /** * Subscriptions. The protocol supports optional subscriptions to resource changes. * Clients can subscribe to specific resources and receive notifications when they @@ -660,16 +689,6 @@ public Mono getPrompt(GetPromptRequest getPromptRequest) { return this.mcpSession.sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF); } - /** - * (Server) An optional notification from the server to the client, informing it that - * the list of prompts it offers has changed. This may be issued by servers without - * any previous subscription from the client. - * @return A Mono that completes when the notification is sent - */ - public Mono promptListChangedNotification() { - return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED); - } - private NotificationHandler asyncPromptsChangeNotificationHandler( List, Mono>> promptsChangeConsumers) { return params -> listPrompts().flatMap(listPromptsResult -> Flux.fromIterable(promptsChangeConsumers) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java index 0178b4b84..e5d964b7a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java @@ -284,14 +284,6 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() { return this.delegate.listResourceTemplates().block(); } - /** - * List Changed Notification. When the list of available resources changes, servers - * that declared the listChanged capability SHOULD send a notification: - */ - public void sendResourcesListChanged() { - this.delegate.sendResourcesListChanged().block(); - } - /** * Subscriptions. The protocol supports optional subscriptions to resource changes. * Clients can subscribe to specific resources and receive notifications when they @@ -329,15 +321,6 @@ public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) { return this.delegate.getPrompt(getPromptRequest).block(); } - /** - * (Server) An optional notification from the server to the client, informing it that - * the list of prompts it offers has changed. This may be issued by servers without - * any previous subscription from the client. - */ - public void promptListChangedNotification() { - this.delegate.promptListChangedNotification().block(); - } - /** * Client can set the minimum logging level it wants to receive from the server. * @param loggingLevel the min logging level diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 8f615f249..7b6916785 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -188,9 +188,13 @@ private DefaultMcpSession.RequestHandler asyncInitia initializeRequest.protocolVersion(), initializeRequest.capabilities(), initializeRequest.clientInfo()); + // The server MUST respond with the highest protocol version it supports if + // it does not support the requested (e.g. Client) version. String serverProtocolVersion = this.protocolVersions.get(this.protocolVersions.size() - 1); if (this.protocolVersions.contains(initializeRequest.protocolVersion())) { + // If the server supports the requested protocol version, it MUST respond + // with the same version. serverProtocolVersion = initializeRequest.protocolVersion(); } else { diff --git a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java index 1679ab726..6e0ecc38c 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java +++ b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java @@ -38,6 +38,13 @@ public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) { throw new RuntimeException("Failed to emit message " + message); } inboundMessageCount.incrementAndGet(); + + try { + Thread.sleep(200); + } + catch (InterruptedException e) { + e.printStackTrace(); + } } @Override diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index 974f9c302..d32b1910a 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -275,9 +275,6 @@ void testNotificationHandlers() { assertThatCode(() -> { client.initialize().block(); - // Trigger notifications - client.sendResourcesListChanged().block(); - client.promptListChangedNotification().block(); client.closeGracefully().block(); }).doesNotThrowAnyException(); } diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java index 0750ccd65..cbad54192 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java @@ -259,9 +259,6 @@ void testNotificationHandlers() { assertThatCode(() -> { client.initialize(); - // Trigger notifications - client.sendResourcesListChanged(); - client.promptListChangedNotification(); client.close(); }).doesNotThrowAnyException(); } diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java index d11fa0f3d..732412fbd 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java @@ -17,7 +17,9 @@ import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; +import io.modelcontextprotocol.spec.McpSchema.InitializeResult; import io.modelcontextprotocol.spec.McpSchema.Root; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -27,6 +29,91 @@ class McpAsyncClientResponseHandlerTests { + private InitializeResult initialization(McpAsyncClient asyncMcpClient, MockMcpTransport transport) { + + // Create mock server response + McpSchema.ServerCapabilities mockServerCapabilities = McpSchema.ServerCapabilities.builder() + .tools(true) + .resources(true, true) // Enable both resources and resource templates + .build(); + McpSchema.Implementation mockServerInfo = new McpSchema.Implementation("test-server", "1.0.0"); + McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION, + mockServerCapabilities, mockServerInfo, "Test instructions"); + + Mono initMono = asyncMcpClient.initialize(); + + new Thread(new Runnable() { + @Override + public void run() { + McpSchema.JSONRPCRequest initRequest = transport.getLastSentMessageAsRequest(); + assertThat(initRequest.method()).isEqualTo(McpSchema.METHOD_INITIALIZE); + + // Send mock server response + McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, + initRequest.id(), mockInitResult, null); + transport.simulateIncomingMessage(initResponse); + } + }).start(); + + return initMono.block(); + } + + @Test + void testSuccessfulInitialization() { + MockMcpTransport transport = new MockMcpTransport(); + McpAsyncClient asyncMcpClient = McpClient.async(transport).build(); + + // Verify client is not initialized initially + assertThat(asyncMcpClient.isInitialized()).isFalse(); + + // Create mock server response + McpSchema.ServerCapabilities mockServerCapabilities = McpSchema.ServerCapabilities.builder() + .tools(true) + .resources(true, true) // Enable both resources and resource templates + .build(); + McpSchema.Implementation mockServerInfo = new McpSchema.Implementation("test-server", "1.0.0"); + McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION, + mockServerCapabilities, mockServerInfo, "Test instructions"); + + // Start initialization + Mono initMono = asyncMcpClient.initialize(); + + new Thread(new Runnable() { + @Override + public void run() { + McpSchema.JSONRPCRequest initRequest = transport.getLastSentMessageAsRequest(); + assertThat(initRequest.method()).isEqualTo(McpSchema.METHOD_INITIALIZE); + + // Send mock server response + McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, + initRequest.id(), mockInitResult, null); + transport.simulateIncomingMessage(initResponse); + } + }).start(); + + InitializeResult result = initMono.block(); + + // Verify initialized notification was sent + McpSchema.JSONRPCMessage notificationMessage = transport.getLastSentMessage(); + assertThat(notificationMessage).isInstanceOf(McpSchema.JSONRPCNotification.class); + McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) notificationMessage; + assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_INITIALIZED); + + // Verify initialization result + assertThat(result).isNotNull(); + assertThat(result.protocolVersion()).isEqualTo(McpSchema.LATEST_PROTOCOL_VERSION); + assertThat(result.capabilities()).isEqualTo(mockServerCapabilities); + assertThat(result.serverInfo()).isEqualTo(mockServerInfo); + assertThat(result.instructions()).isEqualTo("Test instructions"); + + // Verify client state after initialization + assertThat(asyncMcpClient.isInitialized()).isTrue(); + assertThat(asyncMcpClient.getServerCapabilities()).isEqualTo(mockServerCapabilities); + assertThat(asyncMcpClient.getServerInfo()).isEqualTo(mockServerInfo); + + asyncMcpClient.closeGracefully(); + } + @Test void testToolsChangeNotificationHandling() throws JsonProcessingException { MockMcpTransport transport = new MockMcpTransport(); @@ -41,6 +128,8 @@ void testToolsChangeNotificationHandling() throws JsonProcessingException { // Create client with tools change consumer McpAsyncClient asyncMcpClient = McpClient.async(transport).toolsChangeConsumer(toolsChangeConsumer).build(); + assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + // Create a mock tools list that the server will return Map inputSchema = Map.of("type", "object", "properties", Map.of(), "required", List.of()); McpSchema.Tool mockTool = new McpSchema.Tool("test-tool", "Test Tool Description", @@ -78,6 +167,8 @@ void testRootsListRequestHandling() { .roots(new Root("file:///test/path", "test-root")) .build(); + assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + // Simulate incoming request McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_ROOTS_LIST, "test-id", null); @@ -112,6 +203,8 @@ void testResourcesChangeNotificationHandling() { .resourcesChangeConsumer(resourcesChangeConsumer) .build(); + assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + // Create a mock resources list that the server will return McpSchema.Resource mockResource = new McpSchema.Resource("test://resource", "Test Resource", "A test resource", "text/plain", null); @@ -156,6 +249,8 @@ void testPromptsChangeNotificationHandling() { // Create client with prompts change consumer McpAsyncClient asyncMcpClient = McpClient.async(transport).promptsChangeConsumer(promptsChangeConsumer).build(); + assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + // Create a mock prompts list that the server will return McpSchema.Prompt mockPrompt = new McpSchema.Prompt("test-prompt", "Test Prompt Description", List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); @@ -203,6 +298,8 @@ void testSamplingCreateMessageRequestHandling() { .sampling(samplingHandler) .build(); + assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + // Create a mock create message request var messageRequest = new McpSchema.CreateMessageRequest( List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, new McpSchema.TextContent("Test message"))), @@ -247,6 +344,8 @@ void testSamplingCreateMessageRequestHandlingWithoutCapability() { .capabilities(ClientCapabilities.builder().build()) // No sampling capability .build(); + assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + // Create a mock create message request var messageRequest = new McpSchema.CreateMessageRequest( List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, new McpSchema.TextContent("Test message"))), From 139a65cbf13f5ac659fea7e2b12105f3eba0a8f5 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Tue, 25 Feb 2025 13:11:51 +0100 Subject: [PATCH 2/7] Fix test fixtures Signed-off-by: Christian Tzolov --- .../McpAsyncClientResponseHandlerTests.java | 66 ++++++++++++------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java index 732412fbd..00e766e2a 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java @@ -19,7 +19,6 @@ import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; import io.modelcontextprotocol.spec.McpSchema.InitializeResult; import io.modelcontextprotocol.spec.McpSchema.Root; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -30,21 +29,21 @@ class McpAsyncClientResponseHandlerTests { private InitializeResult initialization(McpAsyncClient asyncMcpClient, MockMcpTransport transport) { - // Create mock server response - McpSchema.ServerCapabilities mockServerCapabilities = McpSchema.ServerCapabilities.builder() - .tools(true) - .resources(true, true) // Enable both resources and resource templates - .build(); - McpSchema.Implementation mockServerInfo = new McpSchema.Implementation("test-server", "1.0.0"); McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION, - mockServerCapabilities, mockServerInfo, "Test instructions"); - - Mono initMono = asyncMcpClient.initialize(); - - new Thread(new Runnable() { - @Override - public void run() { + McpSchema.ServerCapabilities.builder() + .tools(true) + .resources(true, true) // Enable both resources and resource templates + .build(), + new McpSchema.Implementation("test-server", "1.0.0"), "Test instructions"); + + // Use CountDownLatch to coordinate between threads + java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); + + // Create a Mono that will handle the initialization and response simulation + return asyncMcpClient.initialize().doOnSubscribe(subscription -> { + // Run in a separate reactive context to avoid blocking the main subscription + Mono.fromRunnable(() -> { McpSchema.JSONRPCRequest initRequest = transport.getLastSentMessageAsRequest(); assertThat(initRequest.method()).isEqualTo(McpSchema.METHOD_INITIALIZE); @@ -52,10 +51,18 @@ public void run() { McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, initRequest.id(), mockInitResult, null); transport.simulateIncomingMessage(initResponse); + latch.countDown(); + }).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe(); + }).doOnTerminate(() -> { + try { + // Wait for the response simulation to complete + latch.await(5, java.util.concurrent.TimeUnit.SECONDS); } - }).start(); - - return initMono.block(); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for initialization", e); + } + }).block(); } @Test @@ -75,12 +82,13 @@ void testSuccessfulInitialization() { McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION, mockServerCapabilities, mockServerInfo, "Test instructions"); - // Start initialization - Mono initMono = asyncMcpClient.initialize(); + // Use CountDownLatch to coordinate between threads + java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); - new Thread(new Runnable() { - @Override - public void run() { + // Start initialization with reactive handling + InitializeResult result = asyncMcpClient.initialize().doOnSubscribe(subscription -> { + // Run in a separate reactive context to avoid blocking the main subscription + Mono.fromRunnable(() -> { McpSchema.JSONRPCRequest initRequest = transport.getLastSentMessageAsRequest(); assertThat(initRequest.method()).isEqualTo(McpSchema.METHOD_INITIALIZE); @@ -88,10 +96,18 @@ public void run() { McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, initRequest.id(), mockInitResult, null); transport.simulateIncomingMessage(initResponse); + latch.countDown(); + }).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe(); + }).doOnTerminate(() -> { + try { + // Wait for the response simulation to complete + latch.await(5, java.util.concurrent.TimeUnit.SECONDS); } - }).start(); - - InitializeResult result = initMono.block(); + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for initialization", e); + } + }).block(); // Verify initialized notification was sent McpSchema.JSONRPCMessage notificationMessage = transport.getLastSentMessage(); From 3ef98ef1473bb7eb68a78b231342153becf4a88d Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Tue, 25 Feb 2025 14:42:23 +0100 Subject: [PATCH 3/7] Fix the simulateIncomingMessage coordination Signed-off-by: Christian Tzolov --- .../MockMcpTransport.java | 22 ++++++++------- .../McpAsyncClientResponseHandlerTests.java | 27 +------------------ 2 files changed, 14 insertions(+), 35 deletions(-) diff --git a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java index 6e0ecc38c..b4a2d738e 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java +++ b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java @@ -4,6 +4,7 @@ package io.modelcontextprotocol; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -33,18 +34,14 @@ public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport private final Flux outboundView = outgoing.asFlux().cache(1); + java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); + public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) { if (inbound.tryEmitNext(message).isFailure()) { throw new RuntimeException("Failed to emit message " + message); } inboundMessageCount.incrementAndGet(); - - try { - Thread.sleep(200); - } - catch (InterruptedException e) { - e.printStackTrace(); - } + latch = new java.util.concurrent.CountDownLatch(1); } @Override @@ -52,18 +49,25 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { if (outgoing.tryEmitNext(message).isFailure()) { return Mono.error(new RuntimeException("Can't emit outgoing message " + message)); } + latch.countDown(); return Mono.empty(); } public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() { - return (JSONRPCRequest) outboundView.blockFirst(); + return (JSONRPCRequest) getLastSentMessage(); } public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() { - return (JSONRPCNotification) outboundView.blockFirst(); + return (JSONRPCNotification) getLastSentMessage(); } public McpSchema.JSONRPCMessage getLastSentMessage() { + try { + latch.await(200, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + e.printStackTrace(); + } return outboundView.blockFirst(); } diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java index 00e766e2a..7f2b0e112 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java @@ -37,9 +37,6 @@ private InitializeResult initialization(McpAsyncClient asyncMcpClient, MockMcpTr .build(), new McpSchema.Implementation("test-server", "1.0.0"), "Test instructions"); - // Use CountDownLatch to coordinate between threads - java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); - // Create a Mono that will handle the initialization and response simulation return asyncMcpClient.initialize().doOnSubscribe(subscription -> { // Run in a separate reactive context to avoid blocking the main subscription @@ -51,17 +48,7 @@ private InitializeResult initialization(McpAsyncClient asyncMcpClient, MockMcpTr McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, initRequest.id(), mockInitResult, null); transport.simulateIncomingMessage(initResponse); - latch.countDown(); }).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe(); - }).doOnTerminate(() -> { - try { - // Wait for the response simulation to complete - latch.await(5, java.util.concurrent.TimeUnit.SECONDS); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for initialization", e); - } }).block(); } @@ -82,9 +69,6 @@ void testSuccessfulInitialization() { McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION, mockServerCapabilities, mockServerInfo, "Test instructions"); - // Use CountDownLatch to coordinate between threads - java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); - // Start initialization with reactive handling InitializeResult result = asyncMcpClient.initialize().doOnSubscribe(subscription -> { // Run in a separate reactive context to avoid blocking the main subscription @@ -96,17 +80,8 @@ void testSuccessfulInitialization() { McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, initRequest.id(), mockInitResult, null); transport.simulateIncomingMessage(initResponse); - latch.countDown(); + // latch.countDown(); }).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe(); - }).doOnTerminate(() -> { - try { - // Wait for the response simulation to complete - latch.await(5, java.util.concurrent.TimeUnit.SECONDS); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while waiting for initialization", e); - } }).block(); // Verify initialized notification was sent From c7ccbca6a198c3285bd18b10c764f24c7da9fd40 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Wed, 26 Feb 2025 11:20:58 +0100 Subject: [PATCH 4/7] Address review comments Signed-off-by: Christian Tzolov --- .../MockMcpTransport.java | 13 +++++--- .../McpAsyncClientResponseHandlerTests.java | 33 +++---------------- .../spec/DefaultMcpSessionTests.java | 2 +- 3 files changed, 15 insertions(+), 33 deletions(-) diff --git a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java index b4a2d738e..a1f5a349e 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java +++ b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java @@ -4,7 +4,6 @@ package io.modelcontextprotocol; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -12,9 +11,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.spec.ClientMcpTransport; import io.modelcontextprotocol.spec.McpSchema; -import io.modelcontextprotocol.spec.ServerMcpTransport; import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification; import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; +import io.modelcontextprotocol.spec.ServerMcpTransport; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -34,14 +33,20 @@ public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport private final Flux outboundView = outgoing.asFlux().cache(1); + // Latch to wait for the next message(s) to be sent in response of simulated incoming + // message java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) { + simulateIncomingMessage(message, 1); + } + + public void simulateIncomingMessage(McpSchema.JSONRPCMessage message, int expectedResponseMessagesCount) { if (inbound.tryEmitNext(message).isFailure()) { throw new RuntimeException("Failed to emit message " + message); } inboundMessageCount.incrementAndGet(); - latch = new java.util.concurrent.CountDownLatch(1); + latch = new java.util.concurrent.CountDownLatch(expectedResponseMessagesCount); } @Override @@ -63,7 +68,7 @@ public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() { public McpSchema.JSONRPCMessage getLastSentMessage() { try { - latch.await(200, TimeUnit.MILLISECONDS); + latch.await(); } catch (InterruptedException e) { e.printStackTrace(); diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java index 7f2b0e112..5af000092 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java @@ -60,29 +60,11 @@ void testSuccessfulInitialization() { // Verify client is not initialized initially assertThat(asyncMcpClient.isInitialized()).isFalse(); - // Create mock server response - McpSchema.ServerCapabilities mockServerCapabilities = McpSchema.ServerCapabilities.builder() - .tools(true) - .resources(true, true) // Enable both resources and resource templates - .build(); - McpSchema.Implementation mockServerInfo = new McpSchema.Implementation("test-server", "1.0.0"); - McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION, - mockServerCapabilities, mockServerInfo, "Test instructions"); - // Start initialization with reactive handling - InitializeResult result = asyncMcpClient.initialize().doOnSubscribe(subscription -> { - // Run in a separate reactive context to avoid blocking the main subscription - Mono.fromRunnable(() -> { - McpSchema.JSONRPCRequest initRequest = transport.getLastSentMessageAsRequest(); - assertThat(initRequest.method()).isEqualTo(McpSchema.METHOD_INITIALIZE); + InitializeResult result = initialization(asyncMcpClient, transport); - // Send mock server response - McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, - initRequest.id(), mockInitResult, null); - transport.simulateIncomingMessage(initResponse); - // latch.countDown(); - }).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe(); - }).block(); + // Verify client state after initialization + assertThat(asyncMcpClient.isInitialized()).isTrue(); // Verify initialized notification was sent McpSchema.JSONRPCMessage notificationMessage = transport.getLastSentMessage(); @@ -93,15 +75,10 @@ void testSuccessfulInitialization() { // Verify initialization result assertThat(result).isNotNull(); assertThat(result.protocolVersion()).isEqualTo(McpSchema.LATEST_PROTOCOL_VERSION); - assertThat(result.capabilities()).isEqualTo(mockServerCapabilities); - assertThat(result.serverInfo()).isEqualTo(mockServerInfo); + assertThat(result.capabilities()).isEqualTo(asyncMcpClient.getServerCapabilities()); + assertThat(result.serverInfo()).isEqualTo(asyncMcpClient.getServerInfo()); assertThat(result.instructions()).isEqualTo("Test instructions"); - // Verify client state after initialization - assertThat(asyncMcpClient.isInitialized()).isTrue(); - assertThat(asyncMcpClient.getServerCapabilities()).isEqualTo(mockServerCapabilities); - assertThat(asyncMcpClient.getServerInfo()).isEqualTo(mockServerInfo); - asyncMcpClient.closeGracefully(); } diff --git a/mcp/src/test/java/io/modelcontextprotocol/spec/DefaultMcpSessionTests.java b/mcp/src/test/java/io/modelcontextprotocol/spec/DefaultMcpSessionTests.java index 9d011afff..6ce5bded3 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/spec/DefaultMcpSessionTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/spec/DefaultMcpSessionTests.java @@ -82,7 +82,7 @@ void testSendRequest() { StepVerifier.create(responseMono).then(() -> { McpSchema.JSONRPCRequest request = transport.getLastSentMessageAsRequest(); transport.simulateIncomingMessage( - new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), responseData, null)); + new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), responseData, null), 0); }).consumeNextWith(response -> { // Verify the request was sent McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessageAsRequest(); From fd76268fde47effbd4d7a2f53de6685ff72f0cb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 26 Feb 2025 15:36:07 +0100 Subject: [PATCH 5/7] Redesign MockMcpTransport internals and adapt tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dariusz Jędrzejczyk --- .../MockMcpTransport.java | 46 ++++--- .../MockMcpTransport.java | 51 +++----- .../McpAsyncClientResponseHandlerTests.java | 117 +++++++++--------- .../spec/DefaultMcpSessionTests.java | 2 +- 4 files changed, 107 insertions(+), 109 deletions(-) diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java b/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java index 060476831..4e9364cde 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java @@ -4,7 +4,9 @@ package io.modelcontextprotocol; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Function; import com.fasterxml.jackson.core.type.TypeReference; @@ -14,47 +16,54 @@ import io.modelcontextprotocol.spec.ServerMcpTransport; import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification; import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.core.scheduler.Schedulers; @SuppressWarnings("unused") +/** + * A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport} + * interfaces. + */ public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport { - private final AtomicInteger inboundMessageCount = new AtomicInteger(0); + private final Sinks.Many inbound = Sinks.many().unicast().onBackpressureBuffer(); + + private final List sent = new ArrayList<>(); - private final Sinks.Many outgoing = Sinks.many().multicast().onBackpressureBuffer(); + private final BiConsumer interceptor; - private final Sinks.Many inbound = Sinks.many().unicast().onBackpressureBuffer(); + public MockMcpTransport() { + this((t, msg) -> { + }); + } - private final Flux outboundView = outgoing.asFlux().cache(1); + public MockMcpTransport(BiConsumer interceptor) { + this.interceptor = interceptor; + } public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) { if (inbound.tryEmitNext(message).isFailure()) { - throw new RuntimeException("Failed to emit message " + message); + throw new RuntimeException("Failed to process incoming message " + message); } - inboundMessageCount.incrementAndGet(); } @Override public Mono sendMessage(McpSchema.JSONRPCMessage message) { - if (outgoing.tryEmitNext(message).isFailure()) { - return Mono.error(new RuntimeException("Can't emit outgoing message " + message)); - } + sent.add(message); + interceptor.accept(this, message); return Mono.empty(); } public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() { - return (JSONRPCRequest) outboundView.blockFirst(); + return (JSONRPCRequest) getLastSentMessage(); } - public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() { - return (JSONRPCNotification) outboundView.blockFirst(); + public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() { + return (JSONRPCNotification) getLastSentMessage(); } public McpSchema.JSONRPCMessage getLastSentMessage() { - return outboundView.blockFirst(); + return !sent.isEmpty() ? sent.get(sent.size() - 1) : null; } private volatile boolean connected = false; @@ -66,7 +75,6 @@ public Mono connect(Function, Mono Mono.just(message).transform(handler)) .doFinally(signal -> connected = false) .then(); @@ -76,8 +84,8 @@ public Mono connect(Function, Mono closeGracefully() { return Mono.defer(() -> { connected = false; - outgoing.tryEmitComplete(); inbound.tryEmitComplete(); + // Wait for all subscribers to complete return Mono.empty(); }); } @@ -87,4 +95,4 @@ public T unmarshalFrom(Object data, TypeReference typeRef) { return new ObjectMapper().convertValue(data, typeRef); } -} \ No newline at end of file +} diff --git a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java index a1f5a349e..d4e48ea7d 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java +++ b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java @@ -4,20 +4,20 @@ package io.modelcontextprotocol; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Function; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.spec.ClientMcpTransport; import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.ServerMcpTransport; import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification; import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; -import io.modelcontextprotocol.spec.ServerMcpTransport; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.core.scheduler.Schedulers; /** * A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport} @@ -25,36 +25,31 @@ */ public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport { - private final AtomicInteger inboundMessageCount = new AtomicInteger(0); - - private final Sinks.Many outgoing = Sinks.many().multicast().onBackpressureBuffer(); - private final Sinks.Many inbound = Sinks.many().unicast().onBackpressureBuffer(); - private final Flux outboundView = outgoing.asFlux().cache(1); + private final List sent = new ArrayList<>(); - // Latch to wait for the next message(s) to be sent in response of simulated incoming - // message - java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1); + private final BiConsumer interceptor; - public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) { - simulateIncomingMessage(message, 1); + public MockMcpTransport() { + this((t, msg) -> { + }); } - public void simulateIncomingMessage(McpSchema.JSONRPCMessage message, int expectedResponseMessagesCount) { + public MockMcpTransport(BiConsumer interceptor) { + this.interceptor = interceptor; + } + + public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) { if (inbound.tryEmitNext(message).isFailure()) { - throw new RuntimeException("Failed to emit message " + message); + throw new RuntimeException("Failed to process incoming message " + message); } - inboundMessageCount.incrementAndGet(); - latch = new java.util.concurrent.CountDownLatch(expectedResponseMessagesCount); } @Override public Mono sendMessage(McpSchema.JSONRPCMessage message) { - if (outgoing.tryEmitNext(message).isFailure()) { - return Mono.error(new RuntimeException("Can't emit outgoing message " + message)); - } - latch.countDown(); + sent.add(message); + interceptor.accept(this, message); return Mono.empty(); } @@ -62,18 +57,12 @@ public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() { return (JSONRPCRequest) getLastSentMessage(); } - public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() { + public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() { return (JSONRPCNotification) getLastSentMessage(); } public McpSchema.JSONRPCMessage getLastSentMessage() { - try { - latch.await(); - } - catch (InterruptedException e) { - e.printStackTrace(); - } - return outboundView.blockFirst(); + return !sent.isEmpty() ? sent.get(sent.size() - 1) : null; } private volatile boolean connected = false; @@ -85,7 +74,6 @@ public Mono connect(Function, Mono Mono.just(message).transform(handler)) .doFinally(signal -> connected = false) .then(); @@ -95,7 +83,6 @@ public Mono connect(Function, Mono closeGracefully() { return Mono.defer(() -> { connected = false; - outgoing.tryEmitComplete(); inbound.tryEmitComplete(); // Wait for all subscribers to complete return Mono.empty(); diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java index 5af000092..2f7776c32 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java @@ -22,49 +22,53 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; +import static io.modelcontextprotocol.spec.McpSchema.METHOD_INITIALIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; class McpAsyncClientResponseHandlerTests { - private InitializeResult initialization(McpAsyncClient asyncMcpClient, MockMcpTransport transport) { - // Create mock server response + private static final McpSchema.Implementation SERVER_INFO = new McpSchema.Implementation("test-server", "1.0.0"); + + private static final McpSchema.ServerCapabilities SERVER_CAPABILITIES = McpSchema.ServerCapabilities.builder() + .tools(true) + .resources(true, true) // Enable both resources and resource templates + .build(); + + private static MockMcpTransport initializationEnabledTransport() { + return initializationEnabledTransport(SERVER_CAPABILITIES, SERVER_INFO); + } + + private static MockMcpTransport initializationEnabledTransport(McpSchema.ServerCapabilities mockServerCapabilities, + McpSchema.Implementation mockServerInfo) { McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION, - McpSchema.ServerCapabilities.builder() - .tools(true) - .resources(true, true) // Enable both resources and resource templates - .build(), - new McpSchema.Implementation("test-server", "1.0.0"), "Test instructions"); - - // Create a Mono that will handle the initialization and response simulation - return asyncMcpClient.initialize().doOnSubscribe(subscription -> { - // Run in a separate reactive context to avoid blocking the main subscription - Mono.fromRunnable(() -> { - McpSchema.JSONRPCRequest initRequest = transport.getLastSentMessageAsRequest(); - assertThat(initRequest.method()).isEqualTo(McpSchema.METHOD_INITIALIZE); - - // Send mock server response + mockServerCapabilities, mockServerInfo, "Test instructions"); + + return new MockMcpTransport((t, message) -> { + if (message instanceof McpSchema.JSONRPCRequest r && METHOD_INITIALIZE.equals(r.method())) { McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, - initRequest.id(), mockInitResult, null); - transport.simulateIncomingMessage(initResponse); - }).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe(); - }).block(); + r.id(), mockInitResult, null); + t.simulateIncomingMessage(initResponse); + } + }); } @Test void testSuccessfulInitialization() { - MockMcpTransport transport = new MockMcpTransport(); + McpSchema.Implementation serverInfo = new McpSchema.Implementation("mcp-test-server", "0.0.1"); + McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() + .tools(false) + .resources(true, true) // Enable both resources and resource templates + .build(); + MockMcpTransport transport = initializationEnabledTransport(serverCapabilities, serverInfo); McpAsyncClient asyncMcpClient = McpClient.async(transport).build(); // Verify client is not initialized initially assertThat(asyncMcpClient.isInitialized()).isFalse(); // Start initialization with reactive handling - InitializeResult result = initialization(asyncMcpClient, transport); - - // Verify client state after initialization - assertThat(asyncMcpClient.isInitialized()).isTrue(); + InitializeResult result = asyncMcpClient.initialize().block(); // Verify initialized notification was sent McpSchema.JSONRPCMessage notificationMessage = transport.getLastSentMessage(); @@ -75,16 +79,21 @@ void testSuccessfulInitialization() { // Verify initialization result assertThat(result).isNotNull(); assertThat(result.protocolVersion()).isEqualTo(McpSchema.LATEST_PROTOCOL_VERSION); - assertThat(result.capabilities()).isEqualTo(asyncMcpClient.getServerCapabilities()); - assertThat(result.serverInfo()).isEqualTo(asyncMcpClient.getServerInfo()); + assertThat(result.capabilities()).isEqualTo(serverCapabilities); + assertThat(result.serverInfo()).isEqualTo(serverInfo); assertThat(result.instructions()).isEqualTo("Test instructions"); + // Verify client state after initialization + assertThat(asyncMcpClient.isInitialized()).isTrue(); + assertThat(asyncMcpClient.getServerCapabilities()).isEqualTo(serverCapabilities); + assertThat(asyncMcpClient.getServerInfo()).isEqualTo(serverInfo); + asyncMcpClient.closeGracefully(); } @Test void testToolsChangeNotificationHandling() throws JsonProcessingException { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create a list to store received tools for verification List receivedTools = new ArrayList<>(); @@ -96,7 +105,7 @@ void testToolsChangeNotificationHandling() throws JsonProcessingException { // Create client with tools change consumer McpAsyncClient asyncMcpClient = McpClient.async(transport).toolsChangeConsumer(toolsChangeConsumer).build(); - assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); // Create a mock tools list that the server will return Map inputSchema = Map.of("type", "object", "properties", Map.of(), "required", List.of()); @@ -118,24 +127,22 @@ void testToolsChangeNotificationHandling() throws JsonProcessingException { transport.simulateIncomingMessage(toolsListResponse); // Verify the consumer received the expected tools - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(receivedTools).hasSize(1); - assertThat(receivedTools.get(0).name()).isEqualTo("test-tool"); - assertThat(receivedTools.get(0).description()).isEqualTo("Test Tool Description"); - }); + assertThat(receivedTools).hasSize(1); + assertThat(receivedTools.get(0).name()).isEqualTo("test-tool"); + assertThat(receivedTools.get(0).description()).isEqualTo("Test Tool Description"); asyncMcpClient.closeGracefully(); } @Test void testRootsListRequestHandling() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); McpAsyncClient asyncMcpClient = McpClient.async(transport) .roots(new Root("file:///test/path", "test-root")) .build(); - assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); // Simulate incoming request McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, @@ -157,7 +164,7 @@ void testRootsListRequestHandling() { @Test void testResourcesChangeNotificationHandling() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create a list to store received resources for verification List receivedResources = new ArrayList<>(); @@ -171,7 +178,7 @@ void testResourcesChangeNotificationHandling() { .resourcesChangeConsumer(resourcesChangeConsumer) .build(); - assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); // Create a mock resources list that the server will return McpSchema.Resource mockResource = new McpSchema.Resource("test://resource", "Test Resource", "A test resource", @@ -193,19 +200,17 @@ void testResourcesChangeNotificationHandling() { transport.simulateIncomingMessage(resourcesListResponse); // Verify the consumer received the expected resources - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(receivedResources).hasSize(1); - assertThat(receivedResources.get(0).uri()).isEqualTo("test://resource"); - assertThat(receivedResources.get(0).name()).isEqualTo("Test Resource"); - assertThat(receivedResources.get(0).description()).isEqualTo("A test resource"); - }); + assertThat(receivedResources).hasSize(1); + assertThat(receivedResources.get(0).uri()).isEqualTo("test://resource"); + assertThat(receivedResources.get(0).name()).isEqualTo("Test Resource"); + assertThat(receivedResources.get(0).description()).isEqualTo("A test resource"); asyncMcpClient.closeGracefully(); } @Test void testPromptsChangeNotificationHandling() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create a list to store received prompts for verification List receivedPrompts = new ArrayList<>(); @@ -217,7 +222,7 @@ void testPromptsChangeNotificationHandling() { // Create client with prompts change consumer McpAsyncClient asyncMcpClient = McpClient.async(transport).promptsChangeConsumer(promptsChangeConsumer).build(); - assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); // Create a mock prompts list that the server will return McpSchema.Prompt mockPrompt = new McpSchema.Prompt("test-prompt", "Test Prompt Description", @@ -238,20 +243,18 @@ void testPromptsChangeNotificationHandling() { transport.simulateIncomingMessage(promptsListResponse); // Verify the consumer received the expected prompts - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(receivedPrompts).hasSize(1); - assertThat(receivedPrompts.get(0).name()).isEqualTo("test-prompt"); - assertThat(receivedPrompts.get(0).description()).isEqualTo("Test Prompt Description"); - assertThat(receivedPrompts.get(0).arguments()).hasSize(1); - assertThat(receivedPrompts.get(0).arguments().get(0).name()).isEqualTo("arg1"); - }); + assertThat(receivedPrompts).hasSize(1); + assertThat(receivedPrompts.get(0).name()).isEqualTo("test-prompt"); + assertThat(receivedPrompts.get(0).description()).isEqualTo("Test Prompt Description"); + assertThat(receivedPrompts.get(0).arguments()).hasSize(1); + assertThat(receivedPrompts.get(0).arguments().get(0).name()).isEqualTo("arg1"); asyncMcpClient.closeGracefully(); } @Test void testSamplingCreateMessageRequestHandling() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create a test sampling handler that echoes back the input Function> samplingHandler = request -> { @@ -266,7 +269,7 @@ void testSamplingCreateMessageRequestHandling() { .sampling(samplingHandler) .build(); - assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); // Create a mock create message request var messageRequest = new McpSchema.CreateMessageRequest( @@ -305,14 +308,14 @@ void testSamplingCreateMessageRequestHandling() { @Test void testSamplingCreateMessageRequestHandlingWithoutCapability() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create client without sampling capability McpAsyncClient asyncMcpClient = McpClient.async(transport) .capabilities(ClientCapabilities.builder().build()) // No sampling capability .build(); - assertThat(initialization(asyncMcpClient, transport)).isNotNull(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); // Create a mock create message request var messageRequest = new McpSchema.CreateMessageRequest( diff --git a/mcp/src/test/java/io/modelcontextprotocol/spec/DefaultMcpSessionTests.java b/mcp/src/test/java/io/modelcontextprotocol/spec/DefaultMcpSessionTests.java index 6ce5bded3..9d011afff 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/spec/DefaultMcpSessionTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/spec/DefaultMcpSessionTests.java @@ -82,7 +82,7 @@ void testSendRequest() { StepVerifier.create(responseMono).then(() -> { McpSchema.JSONRPCRequest request = transport.getLastSentMessageAsRequest(); transport.simulateIncomingMessage( - new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), responseData, null), 0); + new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), responseData, null)); }).consumeNextWith(response -> { // Verify the request was sent McpSchema.JSONRPCMessage sentMessage = transport.getLastSentMessageAsRequest(); From 92db0a0fbca3983bd3ee9c71b5ac2bddeebad7b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Wed, 26 Feb 2025 15:38:28 +0100 Subject: [PATCH 6/7] Correct McpError messages --- .../io/modelcontextprotocol/client/McpAsyncClient.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index 9dabf4c5a..e1bf574e2 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -492,7 +492,7 @@ public Mono listTools() { */ public Mono listTools(String cursor) { if (!this.isInitialized()) { - return Mono.error(new McpError("Client must be initialized before calling tools")); + return Mono.error(new McpError("Client must be initialized before listing tools")); } if (this.serverCapabilities.tools() == null) { return Mono.error(new McpError("Server does not provide tools capability")); @@ -553,7 +553,7 @@ public Mono listResources() { */ public Mono listResources(String cursor) { if (!this.isInitialized()) { - return Mono.error(new McpError("Client must be initialized before calling tools")); + return Mono.error(new McpError("Client must be initialized before listing resources")); } if (this.serverCapabilities.resources() == null) { return Mono.error(new McpError("Server does not provide the resources capability")); @@ -578,7 +578,7 @@ public Mono readResource(McpSchema.Resource resour */ public Mono readResource(McpSchema.ReadResourceRequest readResourceRequest) { if (!this.isInitialized()) { - return Mono.error(new McpError("Client must be initialized before calling tools")); + return Mono.error(new McpError("Client must be initialized before reading resources")); } if (this.serverCapabilities.resources() == null) { return Mono.error(new McpError("Server does not provide the resources capability")); @@ -608,7 +608,7 @@ public Mono listResourceTemplates() { */ public Mono listResourceTemplates(String cursor) { if (!this.isInitialized()) { - return Mono.error(new McpError("Client must be initialized before calling tools")); + return Mono.error(new McpError("Client must be initialized before listing resource templates")); } if (this.serverCapabilities.resources() == null) { return Mono.error(new McpError("Server does not provide the resources capability")); From 4bdd7349993693ea98dc66c72efbd91fe95408ec Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Wed, 26 Feb 2025 18:39:37 +0100 Subject: [PATCH 7/7] clean unused imports Signed-off-by: Christian Tzolov --- .../src/main/java/io/modelcontextprotocol/MockMcpTransport.java | 1 - .../client/McpAsyncClientResponseHandlerTests.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java b/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java index 4e9364cde..d4e48ea7d 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java @@ -19,7 +19,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -@SuppressWarnings("unused") /** * A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport} * interfaces. diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java index 2f7776c32..b1e82b748 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java @@ -4,7 +4,6 @@ package io.modelcontextprotocol.client; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -25,7 +24,6 @@ import static io.modelcontextprotocol.spec.McpSchema.METHOD_INITIALIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; class McpAsyncClientResponseHandlerTests {