-
Notifications
You must be signed in to change notification settings - Fork 565
[WIP] Streamable HTTP Server abstractions and WebFlux transport provider #420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…raction Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
can we support sse and streamble at the same time ? |
We could with spec support for something like this, amongst other routes. |
Will WebMVC based implementations be provided in the future? My project is based on WebMVC and has been running for some time now. I don't want to modify it to WebFlux anymore. |
I agree with @gemo12123 . Plain Java without reactive version should be supported, especially with Project Loom. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are some initial findings, while going through the code
@@ -70,8 +73,9 @@ public class McpServerSession implements McpSession { | |||
* @param notificationHandlers map of notification handlers to use | |||
*/ | |||
public McpServerSession(String id, Duration requestTimeout, McpServerTransport transport, | |||
InitRequestHandler initHandler, InitNotificationHandler initNotificationHandler, | |||
Map<String, RequestHandler<?>> requestHandlers, Map<String, NotificationHandler> notificationHandlers) { | |||
McpInitRequestHandler initHandler, InitNotificationHandler initNotificationHandler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
InitNotificationHandler -> McpInitNotificationHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this notification handler can go away. I intend to deprecate this constructor in favor of a one that doesn't have it. We actually provide an empty handler in McpAsyncServer
, so this is useless.
@@ -9,6 +9,9 @@ | |||
|
|||
import com.fasterxml.jackson.core.type.TypeReference; | |||
import io.modelcontextprotocol.server.McpAsyncServerExchange; | |||
import io.modelcontextprotocol.server.McpInitRequestHandler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the McpInitRequestHandler, McpNotificationHandler, McpRequestHandler should be under the spec package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are server-specific and work either with the exchange or with the InitializeRequest, which itself is handled by the server side.
/** | ||
* Request handler for the initialization request. | ||
*/ | ||
public interface InitRequestHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think this is used and should be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used. McpAsyncServer
creates one and feeds it into DefaultMcpStreamableServerSessionFactory
that gets passed onto the McpStreamableServerTransportProvider
.
/** | ||
* Notification handler for the initialization notification from the client. | ||
*/ | ||
public interface InitNotificationHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think this is used and should be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, removed.
} | ||
|
||
@FunctionalInterface | ||
interface RequestHandler<T> extends BiFunction<McpTransportContext, Object, Mono<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This should be called
StatlessRequestHandler
. - Also all request and notification handlers should be grouped in common (parent) package inside the spec.
this.uriTemplateManagerFactory = uriTemplateManagerFactory; | ||
this.jsonSchemaValidator = jsonSchemaValidator; | ||
|
||
Map<String, RequestHandler<?>> requestHandlers = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we will need notification handling as well. I guess we will need stateless notification handler as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll try to come up with a shared abstraction to contain these handlers under an composite class.
}); | ||
} | ||
|
||
public Mono<Void> accept(McpSchema.JSONRPCNotification notification) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The accept(McpSchema.JSONRPCNotification notification)
and accept(McpSchema.JSONRPCResponse response)
methods appear to be a split version of McpServerSession#handle(McpSchema.JSONRPCMessage message)
. For consistency, I’d prefer we stick with a single handler method.
Additionally, I believe a handle(Message)
abstraction should be part of the McpSession
interface. Since McpSession
represents bidirectional JSON-RPC communication—already exposing sendXyz(Message)
for outgoing messages—it makes sense to also include a handle(Message)
method for incoming communication.
In fact all McpSession implement some form of handle(Msg). This applies for the clients as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The accept(McpSchema.JSONRPCNotification notification) and accept(McpSchema.JSONRPCResponse response) methods appear to be a split version of McpServerSession#handle(McpSchema.JSONRPCMessage message). For consistency, I’d prefer we stick with a single handler method.
Unfortunately, with handle(McpSchema.JSONRPCMessage message)
you are communicating to the implementor that all of possible messages can be passed in. That is not my intention. I specifically want to convey that either a response or a notification is handled via this accept
method. Handling requests requires providing a dedicated transport, hence it has a different signature.
Regarding the rest of the comment, my changes are intended to introduce minimal breakages. Without the per-request SSE streams the above would make sense potentially, but with the specific transport argument for request handling and lack of such for notifications and responses makes it a non-goal for this PR. I'm open to discussion, but I believe opening this pandora's box is exactly what I tried to avoid here. The reason is that with a generic handle you have to manage the specific transport routing inside the session, something that is not a concern of the session itself, but of the transport. This is something explored in #290 that I tried to make less error prone with concrete transport instances that are 1-1 mapped to a request.
requestHandlers.put(McpSchema.METHOD_COMPLETION_COMPLETE, completionCompleteRequestHandler()); | ||
} | ||
|
||
mcpTransport.setRequestHandler((context, request) -> requestHandlers.get(request.method()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a good point that we don't use McpSession
as the stateless server is unidirectional and should support only the handle(Message) for request and notifications.
Related to my other comment about the McpSession not having handle(msg). Perhaps we can introduce McpHandlerSession
that has handle(JSONRPCMessage)
used by the stateless server). Then the McpSession can extend it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I intend to add a composite for the notification and request handlers. But I'd suggest avoiding the abstraction of handle(JSONRPCMessage)
as that conveys the notion that we can handle responses here - we should not, that is an error in the stateless server (at least in my view). Also, I suggest we avoid using "Session" as part of this composite's name to avoid even further confusion for stateless server authors - the intention for them is to handle "sessionless" situations, so a session (despite being a JSON-RPC layer concept from our early implementation) would be rather confusing.
public interface McpStatelessServerTransport { | ||
|
||
void setRequestHandler( | ||
BiFunction<McpTransportContext, McpSchema.JSONRPCRequest, Mono<McpSchema.JSONRPCResponse>> message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The signature of the setRequestHandler needs to change from JSONRPCRequest to JSONRPCMessage to allow handling both requests and notifications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As explained in the comments above, a composite will cover requests and notifications.
Mono<McpSchema.InitializeResult> initResult) { | ||
} | ||
|
||
public final class McpStreamableServerSessionStream implements McpSession { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The McpStreamableServerSessionStream
isn't bidirectional like McpSession
.
We can split this into two interfaces:
McpRpcSender
- handles outgoing messages (copies current McpSession functionality)McpRpcReceiver
- handles incoming messages withhandle(JSONRPCMessage)
(names be refined)
McpSession would implement both interfaces, McpStreamableServerSessionStream would only implement McpRpcSender, and the Stateless Server would use McpRpcReceiver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is outdated after our 1-1 sync? McpStreamableServerSessionStream
is bidirectional.
…sync server tests Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
@@ -44,7 +44,7 @@ public final class McpSchema { | |||
private McpSchema() { | |||
} | |||
|
|||
public static final String LATEST_PROTOCOL_VERSION = "2024-11-05"; | |||
public static final String LATEST_PROTOCOL_VERSION = "2025-03-26"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LATEST_PROTOCOL_VERSION unfortunately is used for all client and server transports and raising it to 2025-03-26
would likely cause misbehaviors.
One temporal solution is to let the transport implementations determine the protocol version. Something like this suggestion: #404
Then the init result should pick those version. Maybe the MpcTransport can have getSupporteProtocolVersion field or a like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not something I wanted to deal with in this PR but I guess we have to. I'm not sure how #404 addresses this? The protocol version handling in McpAsyncServer deals with the JSON-RPC MCP domain-level checks and that's what I think was failing when I stumbled upon the need to update this. I think we just have the "latest" concept so it is accurate. Please advise how we can go about this if you have more insight, I was unfortunately quite ignorant to this aspect :(
McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse( | ||
McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null); | ||
try { | ||
return this.objectMapper.writeValueAsString(jsonrpcResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems redundant as the WebFlux would do the conversion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose so! Although it's a temporary mitigation, I think we should deal with the init result conversion into JSON at an earlier layer. Need to come back to this.
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
Still WIP, but this outlines the direction for the Streamable HTTP server side. It is based on the research started with cooperation with @ZachGerman in #290. Eventually, Zach's PR should be able to be rebased on top of this work and we will have two transport implementations (WebFlux and JDK Servlet-based).