Skip to content

Commit 7bf2823

Browse files
committed
feat: add streaming HTTP transport support
- Add --transport CLI flag to select between sse and streamable-http - Add MCP_TRANSPORT environment variable support for transport selection - Create StreamableHTTPServer wrapper in pkg/mcp/server.go - Update server startup logic to use selected transport - Add documentation for transport configuration This enables MKP to work with environments like ToolHive that require HTTP-based communication. The default transport remains SSE for backward compatibility. Fixes #82
1 parent ec667e4 commit 7bf2823

File tree

3 files changed

+73
-6
lines changed

3 files changed

+73
-6
lines changed

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,28 @@ The resource URIs follow these formats:
367367

368368
### Configuration
369369

370+
#### Transport Protocol
371+
372+
MKP supports two transport protocols for the MCP server:
373+
374+
- **SSE (Server-Sent Events)**: The default transport protocol, suitable for most use cases
375+
- **Streamable HTTP**: A streaming HTTP transport that supports both direct HTTP responses and SSE streams, useful for environments like ToolHive that require HTTP-based communication
376+
377+
You can configure the transport protocol using either a CLI flag or an environment variable:
378+
379+
```bash
380+
# Using CLI flag
381+
./build/mkp-server --transport=streamable-http
382+
383+
# Using environment variable
384+
MCP_TRANSPORT=streamable-http ./build/mkp-server
385+
386+
# Default (SSE)
387+
./build/mkp-server
388+
```
389+
390+
The `MCP_TRANSPORT` environment variable is automatically set by ToolHive when running MKP in that environment.
391+
370392
#### Controlling Resource Discovery
371393

372394
By default, MKP serves all Kubernetes resources as MCP resources, which provides

cmd/server/main.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"os/signal"
1111
"strconv"
12+
"strings"
1213
"syscall"
1314
"time"
1415

@@ -28,6 +29,8 @@ func main() {
2829
"Interval to periodically re-read the kubeconfig (e.g., 5m for 5 minutes). If 0, no refresh will be performed")
2930
enableRateLimiting := flag.Bool("enable-rate-limiting", true,
3031
"Whether to enable rate limiting for tool calls. When false, no rate limiting will be applied")
32+
transport := flag.String("transport", getDefaultTransport(),
33+
"Transport protocol to use: 'sse' or 'streamable-http'. Can also be set via MCP_TRANSPORT environment variable")
3134

3235
flag.Parse()
3336

@@ -74,16 +77,30 @@ func main() {
7477
// Create MCP server using the helper function
7578
mcpServer := mcp.CreateServer(k8sClient, config)
7679

77-
// Create SSE server
78-
sseServer := mcp.CreateSSEServer(mcpServer)
80+
// Create and start the appropriate transport server
81+
var transportServer interface {
82+
Start(string) error
83+
Shutdown(context.Context) error
84+
}
85+
86+
switch strings.ToLower(*transport) {
87+
case "streamable-http":
88+
log.Println("Using streamable-http transport")
89+
transportServer = mcp.CreateStreamableHTTPServer(mcpServer)
90+
case "sse":
91+
log.Println("Using SSE transport")
92+
transportServer = mcp.CreateSSEServer(mcpServer)
93+
default:
94+
log.Fatalf("Invalid transport: %s. Must be 'sse' or 'streamable-http'", *transport)
95+
}
7996

8097
// Channel to receive server errors
8198
serverErrCh := make(chan error, 1)
8299

83100
// Start the server in a goroutine
84101
go func() {
85-
log.Printf("Starting MCP server on %s", *addr)
86-
if err := sseServer.Start(*addr); err != nil {
102+
log.Printf("Starting MCP server on %s with %s transport", *addr, *transport)
103+
if err := transportServer.Start(*addr); err != nil {
87104
log.Printf("Server error: %v", err)
88105
serverErrCh <- err
89106
}
@@ -106,8 +123,8 @@ func main() {
106123
go func() {
107124
log.Println("Initiating server shutdown...")
108125

109-
// Stop the SSE server
110-
err := sseServer.Shutdown(shutdownCtx)
126+
// Stop the transport server
127+
err := transportServer.Shutdown(shutdownCtx)
111128
if err != nil {
112129
log.Printf("Error during shutdown: %v", err)
113130
}
@@ -166,3 +183,26 @@ func getDefaultAddress() string {
166183

167184
return fmt.Sprintf(":%d", port)
168185
}
186+
187+
// getDefaultTransport returns the transport to use based on MCP_TRANSPORT environment variable.
188+
// If the environment variable is not set, returns "sse".
189+
// Valid values are "sse" and "streamable-http".
190+
func getDefaultTransport() string {
191+
defaultTransport := "sse"
192+
193+
transportEnv := os.Getenv("MCP_TRANSPORT")
194+
if transportEnv == "" {
195+
return defaultTransport
196+
}
197+
198+
// Normalize the transport value
199+
transport := strings.ToLower(strings.TrimSpace(transportEnv))
200+
201+
// Validate the transport value
202+
if transport != "sse" && transport != "streamable-http" {
203+
log.Printf("Invalid transport in MCP_TRANSPORT environment variable: %s, using default transport %s", transportEnv, defaultTransport)
204+
return defaultTransport
205+
}
206+
207+
return transport
208+
}

pkg/mcp/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,8 @@ func StopServer() {
145145
func CreateSSEServer(mcpServer *server.MCPServer) *server.SSEServer {
146146
return server.NewSSEServer(mcpServer)
147147
}
148+
149+
// CreateStreamableHTTPServer creates a new StreamableHTTP server for the MCP server
150+
func CreateStreamableHTTPServer(mcpServer *server.MCPServer) *server.StreamableHTTPServer {
151+
return server.NewStreamableHTTPServer(mcpServer)
152+
}

0 commit comments

Comments
 (0)