diff --git a/mcp/src/main/java/org/modelcontextprotocol/server/transport/StdioServerTransport.java b/mcp/src/main/java/org/modelcontextprotocol/server/transport/StdioServerTransport.java index e0dd8653..52845a98 100644 --- a/mcp/src/main/java/org/modelcontextprotocol/server/transport/StdioServerTransport.java +++ b/mcp/src/main/java/org/modelcontextprotocol/server/transport/StdioServerTransport.java @@ -105,10 +105,10 @@ private void handleIncomingMessages(Function, Mono Mono.just(message) .transform(inboundMessageHandler) .contextWrite(ctx -> ctx.put("observation", "myObservation"))) - .doOnComplete(() -> { + .doOnTerminate(() -> { + // The outbound processing will dispose its scheduler upon completion this.outboundSink.tryEmitComplete(); this.inboundScheduler.dispose(); - this.outboundScheduler.dispose(); }) .subscribe(); } @@ -208,13 +208,13 @@ else if (isClosing) { }) .doOnComplete(() -> { isClosing = true; - outboundSink.tryEmitComplete(); + outboundScheduler.dispose(); }) .doOnError(e -> { if (!isClosing) { logger.error("Error in outbound processing", e); isClosing = true; - outboundSink.tryEmitComplete(); + outboundScheduler.dispose(); } }) .map(msg -> (JSONRPCMessage) msg); @@ -224,26 +224,15 @@ else if (isClosing) { @Override public Mono closeGracefully() { - - return Mono.fromRunnable(() -> { + return Mono.defer(() -> { isClosing = true; logger.debug("Initiating graceful shutdown"); - }).then(Mono.defer(() -> { - // First complete the sinks to stop processing + // Completing the inbound causes the outbound to be completed as well, so + // we only close the inbound. inboundSink.tryEmitComplete(); - outboundSink.tryEmitComplete(); - return Mono.delay(Duration.ofMillis(100)); - })).then(Mono.fromRunnable(() -> { - try { - // Dispose schedulers with longer timeout - inboundScheduler.dispose(); - outboundScheduler.dispose(); - logger.info("Graceful shutdown completed"); - } - catch (Exception e) { - logger.error("Error during graceful shutdown", e); - } - })).then().subscribeOn(Schedulers.boundedElastic()); + logger.info("Graceful shutdown complete"); + return Mono.empty(); + }).subscribeOn(Schedulers.boundedElastic()); } @Override