From a4eaa1c611828de6a9e050f8a1e58310faf69c28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Tue, 28 Jan 2025 15:53:56 +0100 Subject: [PATCH] Reorder StdioServerTransport shutdown procedure to avoid errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dariusz Jędrzejczyk --- .../transport/StdioServerTransport.java | 31 ++++++------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/mcp/src/main/java/org/modelcontextprotocol/server/transport/StdioServerTransport.java b/mcp/src/main/java/org/modelcontextprotocol/server/transport/StdioServerTransport.java index e0dd8653..52845a98 100644 --- a/mcp/src/main/java/org/modelcontextprotocol/server/transport/StdioServerTransport.java +++ b/mcp/src/main/java/org/modelcontextprotocol/server/transport/StdioServerTransport.java @@ -105,10 +105,10 @@ private void handleIncomingMessages(Function, Mono Mono.just(message) .transform(inboundMessageHandler) .contextWrite(ctx -> ctx.put("observation", "myObservation"))) - .doOnComplete(() -> { + .doOnTerminate(() -> { + // The outbound processing will dispose its scheduler upon completion this.outboundSink.tryEmitComplete(); this.inboundScheduler.dispose(); - this.outboundScheduler.dispose(); }) .subscribe(); } @@ -208,13 +208,13 @@ else if (isClosing) { }) .doOnComplete(() -> { isClosing = true; - outboundSink.tryEmitComplete(); + outboundScheduler.dispose(); }) .doOnError(e -> { if (!isClosing) { logger.error("Error in outbound processing", e); isClosing = true; - outboundSink.tryEmitComplete(); + outboundScheduler.dispose(); } }) .map(msg -> (JSONRPCMessage) msg); @@ -224,26 +224,15 @@ else if (isClosing) { @Override public Mono closeGracefully() { - - return Mono.fromRunnable(() -> { + return Mono.defer(() -> { isClosing = true; logger.debug("Initiating graceful shutdown"); - }).then(Mono.defer(() -> { - // First complete the sinks to stop processing + // Completing the inbound causes the outbound to be completed as well, so + // we only close the inbound. inboundSink.tryEmitComplete(); - outboundSink.tryEmitComplete(); - return Mono.delay(Duration.ofMillis(100)); - })).then(Mono.fromRunnable(() -> { - try { - // Dispose schedulers with longer timeout - inboundScheduler.dispose(); - outboundScheduler.dispose(); - logger.info("Graceful shutdown completed"); - } - catch (Exception e) { - logger.error("Error during graceful shutdown", e); - } - })).then().subscribeOn(Schedulers.boundedElastic()); + logger.info("Graceful shutdown complete"); + return Mono.empty(); + }).subscribeOn(Schedulers.boundedElastic()); } @Override