-
Notifications
You must be signed in to change notification settings - Fork 702
Description
Please do a quick search on GitHub issues first, the feature you are about to request might have already been requested.
#79
#80
#425
#432
Expected Behavior
The user can customize the endpoint routing functions.
We hope to wrap HTTP-APIs to MCP-Server-Tools.
- Uses Spring WebFlux's RouterFunction for endpoint handling (GET, POST, DELETE)
We hope to support the follow MCP-Servers in one application process:
/mcp/mcp/mcp-server-app-name-A-> some MCP-Tools/mcp/mcp-server-app-name-B-> some MCP-Tools/mcp/mcp-server-app-name-C-> some MCP-Tools
Current Behavior
The RouterFunction is private initialization in WebFluxStreamableServerTransportProvider, and its constructor is private.
public class WebFluxStreamableServerTransportProvider implements McpStreamableServerTransportProvider {
private final String mcpEndpoint;
private final RouterFunction<?> routerFunction;
private WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint,
McpTransportContextExtractor<ServerRequest> contextExtractor, boolean disallowDelete,
Duration keepAliveInterval) {
this.mcpEndpoint = mcpEndpoint;
this.routerFunction = RouterFunctions.route()
.GET(this.mcpEndpoint, this::handleGet)
.POST(this.mcpEndpoint, this::handlePost)
.DELETE(this.mcpEndpoint, this::handleDelete)
.build();
}
public RouterFunction<?> getRouterFunction() {
return this.routerFunction;
}
}public abstract class RouterFunctions {
public static Builder route() {
return new RouterFunctionBuilder();
}
}class RouterFunctionBuilder implements RouterFunctions.Builder {
private final List<RouterFunction<ServerResponse>> routerFunctions = new ArrayList<>();
@Override
public RouterFunctions.Builder add(RouterFunction<ServerResponse> routerFunction) {
Assert.notNull(routerFunction, "RouterFunction must not be null");
this.routerFunctions.add(routerFunction);
return this;
}
@Override
public RouterFunction<ServerResponse> build() {
if (this.routerFunctions.isEmpty()) {
throw new IllegalStateException("No routes registered. Register a route with GET(), POST(), etc.");
}
RouterFunction<ServerResponse> result = new BuiltRouterFunction(this.routerFunctions);
if (this.filterFunctions.isEmpty() && this.errorHandlers.isEmpty()) {
return result;
}
else {
HandlerFilterFunction<ServerResponse, ServerResponse> filter =
Stream.concat(this.filterFunctions.stream(), this.errorHandlers.stream())
.reduce(HandlerFilterFunction::andThen)
.orElseThrow(IllegalStateException::new);
return result.filter(filter);
}
}
/**
* Router function returned by {@link #build()} that simply iterates over the registered routes.
*/
private static class BuiltRouterFunction extends RouterFunctions.AbstractRouterFunction<ServerResponse> {
private final List<RouterFunction<ServerResponse>> routerFunctions;
public BuiltRouterFunction(List<RouterFunction<ServerResponse>> routerFunctions) {
Assert.notEmpty(routerFunctions, "RouterFunctions must not be empty");
this.routerFunctions = new ArrayList<>(routerFunctions);
}
@Override
public Mono<HandlerFunction<ServerResponse>> route(ServerRequest request) {
return Flux.fromIterable(this.routerFunctions)
.concatMap(routerFunction -> routerFunction.route(request))
.next();
}
@Override
public void accept(RouterFunctions.Visitor visitor) {
this.routerFunctions.forEach(routerFunction -> routerFunction.accept(visitor));
}
}
}Context
API is MCP, allowing AI to connect to the real world with lower cost, speed, and security. The existing APIs can be instantly converted into a Remote MCP Server, laying out the shortest connection path between AI and the real world.
We need to start multiple WebFluxStreamableServerTransportProvider, McpAsyncServer instances in one application process. Please to see the follow code in McpServerConfiguration, that is reference to McpServerStreamableHttpWebFluxAutoConfiguration.
It can support the follow MCP-Servers:
/mcp/mcp/mcp-server-app-name-A-> some MCP-Tools/mcp/mcp-server-app-name-B-> some MCP-Tools
But the RouterFunction can not dynamic update when the database update for some new app-name MCP-Server.
/mcp/mcp-server-app-name-C
@Slf4j
@EnableConfigurationProperties({ McpServerStreamableHttpProperties.class })
@Configuration(proxyBeanMethods = false)
public class McpServerConfiguration {
public McpServerConfiguration() {
log.info("create McpServerConfiguration");
}
@Bean
public Map<String, List<McpTool>> mcpToolListMap() {
List<String> yamlFiles = List.of(
"mcp-server-user-apis.yml",
"mcp-server-travel-apis.yml"
);
return yamlFiles.stream()
.map(YamlUtil::load)
.collect(Collectors.toMap(
mcpServerRule -> mcpServerRule.getServer().getName(),
McpServerRule::getTools
));
}
@Bean
@ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
@Conditional({ McpServerAutoConfiguration.EnabledStreamableServerCondition.class })
public Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap(
Map<String, List<McpTool>> mcpToolListMap) {
log.info("init transportProviderMap");
Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap =
new ConcurrentHashMap<>(mcpToolListMap.size());
transportProviderMap.putAll(McpServerTransportManager.transportProviderMap(mcpToolListMap.keySet()));
return transportProviderMap;
}
/**
* @see McpServerAutoConfiguration#capabilitiesBuilder()
*/
@Bean
public McpSchema.ServerCapabilities.Builder capabilitiesBuilder() {
log.info("init capabilitiesBuilder");
return McpSchema.ServerCapabilities.builder()
.tools(true);
}
@Bean
@ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
public Map<String, McpAsyncServer> mcpAsyncServerMap(
Map<String, List<McpTool>> mcpToolListMap,
Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap,
McpSchema.ServerCapabilities.Builder capabilitiesBuilder) {
log.info("init mcpAsyncServerMap");
return McpServerManager.mcpAsyncServerMap(mcpToolListMap, transportProviderMap, capabilitiesBuilder);
}
/**
* @see McpServerStreamableHttpWebFluxAutoConfiguration#webFluxStreamableServerTransportProvider
*/
@Bean
@ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
@Conditional({ McpServerAutoConfiguration.EnabledStreamableServerCondition.class })
public WebFluxStreamableServerTransportProvider webFluxStreamableServerTransportProvider(
ObjectProvider<ObjectMapper> objectMapperProvider, McpServerStreamableHttpProperties serverProperties) {
log.info("init webFluxStreamableServerTransportProvider");
ObjectMapper objectMapper = objectMapperProvider.getIfAvailable(ObjectMapper::new);
return WebFluxStreamableServerTransportProvider.builder()
.objectMapper(objectMapper)
.messageEndpoint(serverProperties.getMcpEndpoint())
.keepAliveInterval(serverProperties.getKeepAliveInterval())
.disallowDelete(serverProperties.isDisallowDelete())
.build();
}
/**
* @see McpServerStreamableHttpWebFluxAutoConfiguration#webFluxStreamableServerRouterFunction
*/
// Router function for streamable http transport used by Spring WebFlux to start an
// HTTP server.
@Bean
@ConditionalOnProperty(prefix = McpServerProperties.CONFIG_PREFIX, name = "type", havingValue = "ASYNC")
@Conditional({ McpServerAutoConfiguration.EnabledStreamableServerCondition.class })
public RouterFunction<?> webFluxStreamableServerRouterFunction(
WebFluxStreamableServerTransportProvider webFluxProvider,
Map<String, WebFluxStreamableServerTransportProvider> transportProviderMap) {
log.info("init webFluxStreamableServerRouterFunction");
RouterFunctions.Builder routerFunctionBuilder = RouterFunctions.route();
routerFunctionBuilder.add((RouterFunction<ServerResponse>) webFluxProvider.getRouterFunction());
for (WebFluxStreamableServerTransportProvider transportProvider : transportProviderMap.values()) {
routerFunctionBuilder.add((RouterFunction<ServerResponse>) transportProvider.getRouterFunction());
}
return routerFunctionBuilder.build();
}
}