Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,10 @@ public class DefaultMcpClient implements McpClient {
private volatile ServerSchema serverSchema;
private volatile boolean initialized = false;
private volatile boolean closed = false;
private final List<Tool> tools = new ArrayList<>();
private final Object initializedLock = LockUtils.newSynchronizedLock();
private final Object toolsLock = LockUtils.newSynchronizedLock();
private final Map<Long, Consumer<JsonRpc.Response<Long>>> responseConsumers = new ConcurrentHashMap<>();
private final Map<Long, Boolean> pendingRequests = new ConcurrentHashMap<>();
private final Map<Long, Object> pendingResults = new ConcurrentHashMap<>();
private final Map<Long, Result> pendingResults = new ConcurrentHashMap<>();

private volatile Subscription subscription;
private volatile ThreadPoolScheduler pingScheduler;
Expand Down Expand Up @@ -197,10 +195,6 @@ private void initializedMcpServer(JsonRpc.Response<Long> response) {
response.error());
throw new IllegalStateException(response.error().toString());
}
synchronized (this.initializedLock) {
this.initialized = true;
this.initializedLock.notifyAll();
}
this.recordServerSchema(response);
HttpClassicClientRequest request =
this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint);
Expand All @@ -225,6 +219,10 @@ private void initializedMcpServer(JsonRpc.Response<Long> response) {
} catch (IOException e) {
throw new IllegalStateException(e);
}
synchronized (this.initializedLock) {
this.initialized = true;
this.initializedLock.notifyAll();
}
this.pingScheduler = ThreadPoolScheduler.custom()
.threadPoolName("mcp-client-ping-" + this.name)
.awaitTermination(3, TimeUnit.SECONDS)
Expand Down Expand Up @@ -262,27 +260,30 @@ public List<Tool> getTools() {
while (this.pendingRequests.get(requestId)) {
ThreadUtils.sleep(100);
}
synchronized (this.toolsLock) {
return this.tools;
Result result = this.pendingResults.remove(requestId);
this.pendingRequests.remove(requestId);
if (result.isSuccess()) {
return ObjectUtils.cast(result.getContent());
} else {
throw new IllegalStateException(result.getError());
}
}

private void getTools0(JsonRpc.Response<Long> response) {
if (response.error() != null) {
log.error("Failed to get tools list from MCP server. [sessionId={}, response={}]",
String error = StringUtils.format("Failed to get tools list from MCP server. [sessionId={0}, response={1}]",
this.sessionId,
response);
this.pendingResults.put(response.id(), Result.error(error));
this.pendingRequests.put(response.id(), false);
return;
}
Map<String, Object> result = cast(response.result());
List<Map<String, Object>> rawTools = cast(result.get("tools"));
synchronized (this.toolsLock) {
this.tools.clear();
this.tools.addAll(rawTools.stream()
.map(rawTool -> ObjectUtils.<Tool>toCustomObject(rawTool, Tool.class))
.toList());
}
List<Tool> tools = new ArrayList<>(rawTools.stream()
.map(rawTool -> ObjectUtils.<Tool>toCustomObject(rawTool, Tool.class))
.toList());
this.pendingResults.put(response.id(), Result.success(tools));
this.pendingRequests.put(response.id(), false);
}

Expand All @@ -303,32 +304,46 @@ public Object callTool(String name, Map<String, Object> arguments) {
while (this.pendingRequests.get(requestId)) {
ThreadUtils.sleep(100);
}
return this.pendingResults.get(requestId);
Result result = this.pendingResults.remove(requestId);
this.pendingRequests.remove(requestId);
if (result.isSuccess()) {
return result.getContent();
} else {
throw new IllegalStateException(result.getError());
}
}

private void callTools0(JsonRpc.Response<Long> response) {
if (response.error() != null) {
log.error("Failed to call tool from MCP server. [sessionId={}, response={}]", this.sessionId, response);
String error = StringUtils.format("Failed to call tool from MCP server. [sessionId={0}, response={1}]",
this.sessionId,
response);
this.pendingResults.put(response.id(), Result.error(error));
this.pendingRequests.put(response.id(), false);
return;
}
Map<String, Object> result = cast(response.result());
boolean isError = cast(result.get("isError"));
if (isError) {
log.error("Failed to call tool from MCP server. [sessionId={}, result={}]", this.sessionId, result);
String error = StringUtils.format("Failed to call tool from MCP server. [sessionId={0}, result={1}]",
this.sessionId,
result);
this.pendingResults.put(response.id(), Result.error(error));
this.pendingRequests.put(response.id(), false);
return;
}
List<Map<String, Object>> rawContents = cast(result.get("content"));
if (CollectionUtils.isEmpty(rawContents)) {
log.error("Failed to call tool from MCP server: no result returned. [sessionId={}, result={}]",
String error = StringUtils.format(
"Failed to call tool from MCP server: no result returned. [sessionId={0}, result={1}]",
this.sessionId,
result);
this.pendingResults.put(response.id(), Result.error(error));
this.pendingRequests.put(response.id(), false);
return;
}
Map<String, Object> rawContent = rawContents.get(0);
this.pendingResults.put(response.id(), rawContent.get("text"));
this.pendingResults.put(response.id(), Result.success(rawContent.get("text")));
this.pendingRequests.put(response.id(), false);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*---------------------------------------------------------------------------------------------
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
* This file is a part of the ModelEngine Project.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

package modelengine.fel.tool.mcp.client.support;

/**
* 表示调用 MCP 的结果。
*
* @author 季聿阶
* @since 2025-08-04
*/
public class Result {
private final boolean success;
private final Object content;
private final String error;

private Result(boolean success, Object content, String error) {
this.success = success;
this.content = content;
this.error = error;
}

/**
* 创建一个成功的结果。
*
* @param content 表示成功结果的内容的 {@link Object}。
* @return 表示成功结果的对象的 {@link Result}。
*/
public static Result success(Object content) {
return new Result(true, content, null);
}

/**
* 创建一个失败的结果。
*
* @param error 表示错误结果的信息的 {@link String}。
* @return 表示错误结果的对象的 {@link Result}。
*/
public static Result error(String error) {
return new Result(false, null, error);
}

/**
* 获取结果是否成功。
*
* @return 如果结果成功,则返回 {@code true};否则返回 {@code false}。
*/
public boolean isSuccess() {
return this.success;
}

/**
* 获取结果内容。
*
* @return 表示结果内容的 {@link Object}。
*/
public Object getContent() {
return this.content;
}

/**
* 获取结果错误信息。
*
* @return 表示错误信息的 {@link String}。
*/
public String getError() {
return this.error;
}
}