Skip to content

JDK HTTP Client Streamable HTTP support #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(ServerSentEvent<String> event) {
if (MESSAGE_EVENT_TYPE.equals(event.event())) {
try {
System.out.println(event.id());
System.out.println(event.data());
// We don't support batching ATM and probably won't since the next version
// considers removing it.
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ protected void onClose() {
}

protected Duration getRequestTimeout() {
return Duration.ofSeconds(14);
return Duration.ofSeconds(2100);
}

protected Duration getInitializationTimeout() {
return Duration.ofSeconds(2);
return Duration.ofSeconds(2000);
}

McpAsyncClient client(McpClientTransport transport) {
Expand Down Expand Up @@ -129,6 +129,14 @@ <T> void verifyCallSucceedsWithImplicitInitialization(Function<McpAsyncClient, M
});
}

@Test
void testInitialize() {
withClient(createMcpTransport(), mcpAsyncClient -> {
StepVerifier.create(mcpAsyncClient.initialize().then()).verifyComplete();
});

}

@Test
void testConstructorWithInvalidArguments() {
assertThatThrownBy(() -> McpClient.async(null).build()).isInstanceOf(IllegalArgumentException.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright 2024 - 2024 the original author or authors.
*/
* Copyright 2024 - 2024 the original author or authors.
*/
package io.modelcontextprotocol.client.transport;

import java.net.URI;
Expand Down Expand Up @@ -131,64 +131,7 @@ public void subscribe(String url, SseEventHandler eventHandler) {
AtomicReference<String> currentEventId = new AtomicReference<>();
AtomicReference<String> currentEventType = new AtomicReference<>("message");

Flow.Subscriber<String> lineSubscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(String line) {
if (line.isEmpty()) {
// Empty line means end of event
if (eventBuilder.length() > 0) {
String eventData = eventBuilder.toString();
SseEvent event = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim());
eventHandler.onEvent(event);
eventBuilder.setLength(0);
}
}
else {
if (line.startsWith("data:")) {
var matcher = EVENT_DATA_PATTERN.matcher(line);
if (matcher.find()) {
eventBuilder.append(matcher.group(1).trim()).append("\n");
}
}
else if (line.startsWith("id:")) {
var matcher = EVENT_ID_PATTERN.matcher(line);
if (matcher.find()) {
currentEventId.set(matcher.group(1).trim());
}
}
else if (line.startsWith("event:")) {
var matcher = EVENT_TYPE_PATTERN.matcher(line);
if (matcher.find()) {
currentEventType.set(matcher.group(1).trim());
}
}
}
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
eventHandler.onError(throwable);
}

@Override
public void onComplete() {
// Handle any remaining event data
if (eventBuilder.length() > 0) {
String eventData = eventBuilder.toString();
SseEvent event = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim());
eventHandler.onEvent(event);
}
}
};
Flow.Subscriber<String> lineSubscriber = new SseEventSubscriber(eventHandler);

Function<Flow.Subscriber<String>, HttpResponse.BodySubscriber<Void>> subscriberFactory = subscriber -> HttpResponse.BodySubscribers
.fromLineSubscriber(subscriber);
Expand All @@ -207,4 +150,81 @@ public void onComplete() {
});
}

/**
* Subscriber implementation for processing SSE event lines. This subscriber parses
* incoming lines and accumulates them into complete events.
*/
public static class SseEventSubscriber implements Flow.Subscriber<String> {

private Flow.Subscription subscription;

private final SseEventHandler eventHandler;

private final StringBuilder eventBuilder = new StringBuilder();

private final AtomicReference<String> currentEventId = new AtomicReference<>();

private final AtomicReference<String> currentEventType = new AtomicReference<>("message");

public SseEventSubscriber(SseEventHandler eventHandler) {
this.eventHandler = eventHandler;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(String line) {
if (line.isEmpty()) {
// Empty line means end of event
if (eventBuilder.length() > 0) {
String eventData = eventBuilder.toString();
SseEvent event = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim());
eventHandler.onEvent(event);
eventBuilder.setLength(0);
}
}
else {
if (line.startsWith("data:")) {
var matcher = EVENT_DATA_PATTERN.matcher(line);
if (matcher.find()) {
eventBuilder.append(matcher.group(1).trim()).append("\n");
}
}
else if (line.startsWith("id:")) {
var matcher = EVENT_ID_PATTERN.matcher(line);
if (matcher.find()) {
currentEventId.set(matcher.group(1).trim());
}
}
else if (line.startsWith("event:")) {
var matcher = EVENT_TYPE_PATTERN.matcher(line);
if (matcher.find()) {
currentEventType.set(matcher.group(1).trim());
}
}
}
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
eventHandler.onError(throwable);
}

@Override
public void onComplete() {
// Handle any remaining event data
if (!eventBuilder.isEmpty()) {
String eventData = eventBuilder.toString();
SseEvent event = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim());
eventHandler.onEvent(event);
}
}

}

}
Loading