diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java b/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java index 060476831..d4e48ea7d 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/MockMcpTransport.java @@ -4,7 +4,9 @@ package io.modelcontextprotocol; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Function; import com.fasterxml.jackson.core.type.TypeReference; @@ -14,47 +16,53 @@ import io.modelcontextprotocol.spec.ServerMcpTransport; import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification; import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.core.scheduler.Schedulers; -@SuppressWarnings("unused") +/** + * A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport} + * interfaces. + */ public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport { - private final AtomicInteger inboundMessageCount = new AtomicInteger(0); + private final Sinks.Many inbound = Sinks.many().unicast().onBackpressureBuffer(); + + private final List sent = new ArrayList<>(); - private final Sinks.Many outgoing = Sinks.many().multicast().onBackpressureBuffer(); + private final BiConsumer interceptor; - private final Sinks.Many inbound = Sinks.many().unicast().onBackpressureBuffer(); + public MockMcpTransport() { + this((t, msg) -> { + }); + } - private final Flux outboundView = outgoing.asFlux().cache(1); + public MockMcpTransport(BiConsumer interceptor) { + this.interceptor = interceptor; + } public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) { if (inbound.tryEmitNext(message).isFailure()) { - throw new RuntimeException("Failed to emit message " + message); + throw new RuntimeException("Failed to process incoming message " + message); } - inboundMessageCount.incrementAndGet(); } @Override public Mono sendMessage(McpSchema.JSONRPCMessage message) { - if (outgoing.tryEmitNext(message).isFailure()) { - return Mono.error(new RuntimeException("Can't emit outgoing message " + message)); - } + sent.add(message); + interceptor.accept(this, message); return Mono.empty(); } public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() { - return (JSONRPCRequest) outboundView.blockFirst(); + return (JSONRPCRequest) getLastSentMessage(); } - public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() { - return (JSONRPCNotification) outboundView.blockFirst(); + public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() { + return (JSONRPCNotification) getLastSentMessage(); } public McpSchema.JSONRPCMessage getLastSentMessage() { - return outboundView.blockFirst(); + return !sent.isEmpty() ? sent.get(sent.size() - 1) : null; } private volatile boolean connected = false; @@ -66,7 +74,6 @@ public Mono connect(Function, Mono Mono.just(message).transform(handler)) .doFinally(signal -> connected = false) .then(); @@ -76,8 +83,8 @@ public Mono connect(Function, Mono closeGracefully() { return Mono.defer(() -> { connected = false; - outgoing.tryEmitComplete(); inbound.tryEmitComplete(); + // Wait for all subscribers to complete return Mono.empty(); }); } @@ -87,4 +94,4 @@ public T unmarshalFrom(Object data, TypeReference typeRef) { return new ObjectMapper().convertValue(data, typeRef); } -} \ No newline at end of file +} diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index a904a87ed..092d5afe8 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -274,9 +274,6 @@ void testNotificationHandlers() { assertThatCode(() -> { client.initialize().block(); - // Trigger notifications - client.sendResourcesListChanged().block(); - client.promptListChangedNotification().block(); client.closeGracefully().block(); }).doesNotThrowAnyException(); } diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java index b1c58a604..796dd5ace 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java @@ -258,9 +258,6 @@ void testNotificationHandlers() { assertThatCode(() -> { client.initialize(); - // Trigger notifications - client.sendResourcesListChanged(); - client.promptListChangedNotification(); client.close(); }).doesNotThrowAnyException(); } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java index a49ad7fdd..e1bf574e2 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java @@ -296,6 +296,14 @@ public McpSchema.Implementation getServerInfo() { return this.serverInfo; } + /** + * Check if the client-server connection is initialized. + * @return true if the client-server connection is initialized + */ + public boolean isInitialized() { + return this.serverCapabilities != null; + } + /** * Get the client capabilities that define the supported features and functionality. * @return The client capabilities @@ -456,6 +464,12 @@ private RequestHandler samplingCreateMessageHandler() { * (false/absent) */ public Mono callTool(McpSchema.CallToolRequest callToolRequest) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before calling tools")); + } + if (this.serverCapabilities.tools() == null) { + return Mono.error(new McpError("Server does not provide tools capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF); } @@ -477,6 +491,12 @@ public Mono listTools() { * Optional cursor for pagination if more tools are available */ public Mono listTools(String cursor) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before listing tools")); + } + if (this.serverCapabilities.tools() == null) { + return Mono.error(new McpError("Server does not provide tools capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor), LIST_TOOLS_RESULT_TYPE_REF); } @@ -532,6 +552,12 @@ public Mono listResources() { * @return A Mono that completes with the list of resources result */ public Mono listResources(String cursor) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before listing resources")); + } + if (this.serverCapabilities.resources() == null) { + return Mono.error(new McpError("Server does not provide the resources capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_LIST, new McpSchema.PaginatedRequest(cursor), LIST_RESOURCES_RESULT_TYPE_REF); } @@ -551,6 +577,12 @@ public Mono readResource(McpSchema.Resource resour * @return A Mono that completes with the resource content */ public Mono readResource(McpSchema.ReadResourceRequest readResourceRequest) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before reading resources")); + } + if (this.serverCapabilities.resources() == null) { + return Mono.error(new McpError("Server does not provide the resources capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_READ, readResourceRequest, READ_RESOURCE_RESULT_TYPE_REF); } @@ -575,19 +607,16 @@ public Mono listResourceTemplates() { * @return A Mono that completes with the list of resource templates result */ public Mono listResourceTemplates(String cursor) { + if (!this.isInitialized()) { + return Mono.error(new McpError("Client must be initialized before listing resource templates")); + } + if (this.serverCapabilities.resources() == null) { + return Mono.error(new McpError("Server does not provide the resources capability")); + } return this.mcpSession.sendRequest(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, new McpSchema.PaginatedRequest(cursor), LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF); } - /** - * List Changed Notification. When the list of available resources changes, servers - * that declared the listChanged capability SHOULD send a notification. - * @return A Mono that completes when the notification is sent - */ - public Mono sendResourcesListChanged() { - return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_RESOURCES_LIST_CHANGED); - } - /** * Subscriptions. The protocol supports optional subscriptions to resource changes. * Clients can subscribe to specific resources and receive notifications when they @@ -660,16 +689,6 @@ public Mono getPrompt(GetPromptRequest getPromptRequest) { return this.mcpSession.sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF); } - /** - * (Server) An optional notification from the server to the client, informing it that - * the list of prompts it offers has changed. This may be issued by servers without - * any previous subscription from the client. - * @return A Mono that completes when the notification is sent - */ - public Mono promptListChangedNotification() { - return this.mcpSession.sendNotification(McpSchema.METHOD_NOTIFICATION_PROMPTS_LIST_CHANGED); - } - private NotificationHandler asyncPromptsChangeNotificationHandler( List, Mono>> promptsChangeConsumers) { return params -> listPrompts().flatMap(listPromptsResult -> Flux.fromIterable(promptsChangeConsumers) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java b/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java index 0178b4b84..e5d964b7a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/McpSyncClient.java @@ -284,14 +284,6 @@ public McpSchema.ListResourceTemplatesResult listResourceTemplates() { return this.delegate.listResourceTemplates().block(); } - /** - * List Changed Notification. When the list of available resources changes, servers - * that declared the listChanged capability SHOULD send a notification: - */ - public void sendResourcesListChanged() { - this.delegate.sendResourcesListChanged().block(); - } - /** * Subscriptions. The protocol supports optional subscriptions to resource changes. * Clients can subscribe to specific resources and receive notifications when they @@ -329,15 +321,6 @@ public GetPromptResult getPrompt(GetPromptRequest getPromptRequest) { return this.delegate.getPrompt(getPromptRequest).block(); } - /** - * (Server) An optional notification from the server to the client, informing it that - * the list of prompts it offers has changed. This may be issued by servers without - * any previous subscription from the client. - */ - public void promptListChangedNotification() { - this.delegate.promptListChangedNotification().block(); - } - /** * Client can set the minimum logging level it wants to receive from the server. * @param loggingLevel the min logging level diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 8f615f249..7b6916785 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -188,9 +188,13 @@ private DefaultMcpSession.RequestHandler asyncInitia initializeRequest.protocolVersion(), initializeRequest.capabilities(), initializeRequest.clientInfo()); + // The server MUST respond with the highest protocol version it supports if + // it does not support the requested (e.g. Client) version. String serverProtocolVersion = this.protocolVersions.get(this.protocolVersions.size() - 1); if (this.protocolVersions.contains(initializeRequest.protocolVersion())) { + // If the server supports the requested protocol version, it MUST respond + // with the same version. serverProtocolVersion = initializeRequest.protocolVersion(); } else { diff --git a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java index 1679ab726..d4e48ea7d 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java +++ b/mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java @@ -4,7 +4,9 @@ package io.modelcontextprotocol; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; import java.util.function.Function; import com.fasterxml.jackson.core.type.TypeReference; @@ -14,10 +16,8 @@ import io.modelcontextprotocol.spec.ServerMcpTransport; import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification; import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.core.scheduler.Schedulers; /** * A mock implementation of the {@link ClientMcpTransport} and {@link ServerMcpTransport} @@ -25,39 +25,44 @@ */ public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport { - private final AtomicInteger inboundMessageCount = new AtomicInteger(0); + private final Sinks.Many inbound = Sinks.many().unicast().onBackpressureBuffer(); - private final Sinks.Many outgoing = Sinks.many().multicast().onBackpressureBuffer(); + private final List sent = new ArrayList<>(); - private final Sinks.Many inbound = Sinks.many().unicast().onBackpressureBuffer(); + private final BiConsumer interceptor; - private final Flux outboundView = outgoing.asFlux().cache(1); + public MockMcpTransport() { + this((t, msg) -> { + }); + } + + public MockMcpTransport(BiConsumer interceptor) { + this.interceptor = interceptor; + } public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) { if (inbound.tryEmitNext(message).isFailure()) { - throw new RuntimeException("Failed to emit message " + message); + throw new RuntimeException("Failed to process incoming message " + message); } - inboundMessageCount.incrementAndGet(); } @Override public Mono sendMessage(McpSchema.JSONRPCMessage message) { - if (outgoing.tryEmitNext(message).isFailure()) { - return Mono.error(new RuntimeException("Can't emit outgoing message " + message)); - } + sent.add(message); + interceptor.accept(this, message); return Mono.empty(); } public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() { - return (JSONRPCRequest) outboundView.blockFirst(); + return (JSONRPCRequest) getLastSentMessage(); } - public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() { - return (JSONRPCNotification) outboundView.blockFirst(); + public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() { + return (JSONRPCNotification) getLastSentMessage(); } public McpSchema.JSONRPCMessage getLastSentMessage() { - return outboundView.blockFirst(); + return !sent.isEmpty() ? sent.get(sent.size() - 1) : null; } private volatile boolean connected = false; @@ -69,7 +74,6 @@ public Mono connect(Function, Mono Mono.just(message).transform(handler)) .doFinally(signal -> connected = false) .then(); @@ -79,7 +83,6 @@ public Mono connect(Function, Mono closeGracefully() { return Mono.defer(() -> { connected = false; - outgoing.tryEmitComplete(); inbound.tryEmitComplete(); // Wait for all subscribers to complete return Mono.empty(); diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java index 974f9c302..d32b1910a 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientTests.java @@ -275,9 +275,6 @@ void testNotificationHandlers() { assertThatCode(() -> { client.initialize().block(); - // Trigger notifications - client.sendResourcesListChanged().block(); - client.promptListChangedNotification().block(); client.closeGracefully().block(); }).doesNotThrowAnyException(); } diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java index 0750ccd65..cbad54192 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpSyncClientTests.java @@ -259,9 +259,6 @@ void testNotificationHandlers() { assertThatCode(() -> { client.initialize(); - // Trigger notifications - client.sendResourcesListChanged(); - client.promptListChangedNotification(); client.close(); }).doesNotThrowAnyException(); } diff --git a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java index d11fa0f3d..b1e82b748 100644 --- a/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java +++ b/mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java @@ -4,7 +4,6 @@ package io.modelcontextprotocol.client; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -17,19 +16,82 @@ import io.modelcontextprotocol.spec.McpError; import io.modelcontextprotocol.spec.McpSchema; import io.modelcontextprotocol.spec.McpSchema.ClientCapabilities; +import io.modelcontextprotocol.spec.McpSchema.InitializeResult; import io.modelcontextprotocol.spec.McpSchema.Root; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; +import static io.modelcontextprotocol.spec.McpSchema.METHOD_INITIALIZE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; class McpAsyncClientResponseHandlerTests { + private static final McpSchema.Implementation SERVER_INFO = new McpSchema.Implementation("test-server", "1.0.0"); + + private static final McpSchema.ServerCapabilities SERVER_CAPABILITIES = McpSchema.ServerCapabilities.builder() + .tools(true) + .resources(true, true) // Enable both resources and resource templates + .build(); + + private static MockMcpTransport initializationEnabledTransport() { + return initializationEnabledTransport(SERVER_CAPABILITIES, SERVER_INFO); + } + + private static MockMcpTransport initializationEnabledTransport(McpSchema.ServerCapabilities mockServerCapabilities, + McpSchema.Implementation mockServerInfo) { + McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION, + mockServerCapabilities, mockServerInfo, "Test instructions"); + + return new MockMcpTransport((t, message) -> { + if (message instanceof McpSchema.JSONRPCRequest r && METHOD_INITIALIZE.equals(r.method())) { + McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, + r.id(), mockInitResult, null); + t.simulateIncomingMessage(initResponse); + } + }); + } + + @Test + void testSuccessfulInitialization() { + McpSchema.Implementation serverInfo = new McpSchema.Implementation("mcp-test-server", "0.0.1"); + McpSchema.ServerCapabilities serverCapabilities = McpSchema.ServerCapabilities.builder() + .tools(false) + .resources(true, true) // Enable both resources and resource templates + .build(); + MockMcpTransport transport = initializationEnabledTransport(serverCapabilities, serverInfo); + McpAsyncClient asyncMcpClient = McpClient.async(transport).build(); + + // Verify client is not initialized initially + assertThat(asyncMcpClient.isInitialized()).isFalse(); + + // Start initialization with reactive handling + InitializeResult result = asyncMcpClient.initialize().block(); + + // Verify initialized notification was sent + McpSchema.JSONRPCMessage notificationMessage = transport.getLastSentMessage(); + assertThat(notificationMessage).isInstanceOf(McpSchema.JSONRPCNotification.class); + McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) notificationMessage; + assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_INITIALIZED); + + // Verify initialization result + assertThat(result).isNotNull(); + assertThat(result.protocolVersion()).isEqualTo(McpSchema.LATEST_PROTOCOL_VERSION); + assertThat(result.capabilities()).isEqualTo(serverCapabilities); + assertThat(result.serverInfo()).isEqualTo(serverInfo); + assertThat(result.instructions()).isEqualTo("Test instructions"); + + // Verify client state after initialization + assertThat(asyncMcpClient.isInitialized()).isTrue(); + assertThat(asyncMcpClient.getServerCapabilities()).isEqualTo(serverCapabilities); + assertThat(asyncMcpClient.getServerInfo()).isEqualTo(serverInfo); + + asyncMcpClient.closeGracefully(); + } + @Test void testToolsChangeNotificationHandling() throws JsonProcessingException { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create a list to store received tools for verification List receivedTools = new ArrayList<>(); @@ -41,6 +103,8 @@ void testToolsChangeNotificationHandling() throws JsonProcessingException { // Create client with tools change consumer McpAsyncClient asyncMcpClient = McpClient.async(transport).toolsChangeConsumer(toolsChangeConsumer).build(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); + // Create a mock tools list that the server will return Map inputSchema = Map.of("type", "object", "properties", Map.of(), "required", List.of()); McpSchema.Tool mockTool = new McpSchema.Tool("test-tool", "Test Tool Description", @@ -61,23 +125,23 @@ void testToolsChangeNotificationHandling() throws JsonProcessingException { transport.simulateIncomingMessage(toolsListResponse); // Verify the consumer received the expected tools - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(receivedTools).hasSize(1); - assertThat(receivedTools.get(0).name()).isEqualTo("test-tool"); - assertThat(receivedTools.get(0).description()).isEqualTo("Test Tool Description"); - }); + assertThat(receivedTools).hasSize(1); + assertThat(receivedTools.get(0).name()).isEqualTo("test-tool"); + assertThat(receivedTools.get(0).description()).isEqualTo("Test Tool Description"); asyncMcpClient.closeGracefully(); } @Test void testRootsListRequestHandling() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); McpAsyncClient asyncMcpClient = McpClient.async(transport) .roots(new Root("file:///test/path", "test-root")) .build(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); + // Simulate incoming request McpSchema.JSONRPCRequest request = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_ROOTS_LIST, "test-id", null); @@ -98,7 +162,7 @@ void testRootsListRequestHandling() { @Test void testResourcesChangeNotificationHandling() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create a list to store received resources for verification List receivedResources = new ArrayList<>(); @@ -112,6 +176,8 @@ void testResourcesChangeNotificationHandling() { .resourcesChangeConsumer(resourcesChangeConsumer) .build(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); + // Create a mock resources list that the server will return McpSchema.Resource mockResource = new McpSchema.Resource("test://resource", "Test Resource", "A test resource", "text/plain", null); @@ -132,19 +198,17 @@ void testResourcesChangeNotificationHandling() { transport.simulateIncomingMessage(resourcesListResponse); // Verify the consumer received the expected resources - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(receivedResources).hasSize(1); - assertThat(receivedResources.get(0).uri()).isEqualTo("test://resource"); - assertThat(receivedResources.get(0).name()).isEqualTo("Test Resource"); - assertThat(receivedResources.get(0).description()).isEqualTo("A test resource"); - }); + assertThat(receivedResources).hasSize(1); + assertThat(receivedResources.get(0).uri()).isEqualTo("test://resource"); + assertThat(receivedResources.get(0).name()).isEqualTo("Test Resource"); + assertThat(receivedResources.get(0).description()).isEqualTo("A test resource"); asyncMcpClient.closeGracefully(); } @Test void testPromptsChangeNotificationHandling() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create a list to store received prompts for verification List receivedPrompts = new ArrayList<>(); @@ -156,6 +220,8 @@ void testPromptsChangeNotificationHandling() { // Create client with prompts change consumer McpAsyncClient asyncMcpClient = McpClient.async(transport).promptsChangeConsumer(promptsChangeConsumer).build(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); + // Create a mock prompts list that the server will return McpSchema.Prompt mockPrompt = new McpSchema.Prompt("test-prompt", "Test Prompt Description", List.of(new McpSchema.PromptArgument("arg1", "Test argument", true))); @@ -175,20 +241,18 @@ void testPromptsChangeNotificationHandling() { transport.simulateIncomingMessage(promptsListResponse); // Verify the consumer received the expected prompts - await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> { - assertThat(receivedPrompts).hasSize(1); - assertThat(receivedPrompts.get(0).name()).isEqualTo("test-prompt"); - assertThat(receivedPrompts.get(0).description()).isEqualTo("Test Prompt Description"); - assertThat(receivedPrompts.get(0).arguments()).hasSize(1); - assertThat(receivedPrompts.get(0).arguments().get(0).name()).isEqualTo("arg1"); - }); + assertThat(receivedPrompts).hasSize(1); + assertThat(receivedPrompts.get(0).name()).isEqualTo("test-prompt"); + assertThat(receivedPrompts.get(0).description()).isEqualTo("Test Prompt Description"); + assertThat(receivedPrompts.get(0).arguments()).hasSize(1); + assertThat(receivedPrompts.get(0).arguments().get(0).name()).isEqualTo("arg1"); asyncMcpClient.closeGracefully(); } @Test void testSamplingCreateMessageRequestHandling() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create a test sampling handler that echoes back the input Function> samplingHandler = request -> { @@ -203,6 +267,8 @@ void testSamplingCreateMessageRequestHandling() { .sampling(samplingHandler) .build(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); + // Create a mock create message request var messageRequest = new McpSchema.CreateMessageRequest( List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, new McpSchema.TextContent("Test message"))), @@ -240,13 +306,15 @@ void testSamplingCreateMessageRequestHandling() { @Test void testSamplingCreateMessageRequestHandlingWithoutCapability() { - MockMcpTransport transport = new MockMcpTransport(); + MockMcpTransport transport = initializationEnabledTransport(); // Create client without sampling capability McpAsyncClient asyncMcpClient = McpClient.async(transport) .capabilities(ClientCapabilities.builder().build()) // No sampling capability .build(); + assertThat(asyncMcpClient.initialize().block()).isNotNull(); + // Create a mock create message request var messageRequest = new McpSchema.CreateMessageRequest( List.of(new McpSchema.SamplingMessage(McpSchema.Role.USER, new McpSchema.TextContent("Test message"))),