Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 34 additions & 38 deletions server/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ var _ ClientSession = (*sseSession)(nil)
// SSEServer implements a Server-Sent Events (SSE) based MCP server.
// It provides real-time communication capabilities over HTTP using the SSE protocol.
type SSEServer struct {
server *MCPServer
baseURL string
basePath string
useFullURLForMessageEndpoint bool
messageEndpoint string
sseEndpoint string
sessions sync.Map
srv *http.Server
contextFunc SSEContextFunc
server *MCPServer
baseURL string
basePath string
useFullURLForMessageEndpoint bool
messageEndpoint string
sseEndpoint string
sessions sync.Map
srv *http.Server
contextFunc SSEContextFunc

keepAlive bool
keepAliveInterval time.Duration
Expand Down Expand Up @@ -158,12 +158,12 @@ func WithSSEContextFunc(fn SSEContextFunc) SSEOption {
// NewSSEServer creates a new SSE server instance with the given MCP server and options.
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
s := &SSEServer{
server: server,
sseEndpoint: "/sse",
messageEndpoint: "/message",
useFullURLForMessageEndpoint: true,
keepAlive: false,
keepAliveInterval: 10 * time.Second,
server: server,
sseEndpoint: "/sse",
messageEndpoint: "/message",
useFullURLForMessageEndpoint: true,
keepAlive: false,
keepAliveInterval: 10 * time.Second,
}

// Apply all options
Expand Down Expand Up @@ -293,7 +293,6 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
}()
}


// Send the initial endpoint event
fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", s.GetMessageEndpointForClient(sessionID))
flusher.Flush()
Expand Down Expand Up @@ -356,31 +355,28 @@ func (s *SSEServer) handleMessage(w http.ResponseWriter, r *http.Request) {
return
}

// Process message through MCPServer
response := s.server.HandleMessage(ctx, rawMessage)
// quick return request, send 202 Accepted with no body, then deal the message and sent response via SSE
w.WriteHeader(http.StatusAccepted)

// Only send response if there is one (not for notifications)
if response != nil {
eventData, _ := json.Marshal(response)
go func() {
// Process message through MCPServer
response := s.server.HandleMessage(ctx, rawMessage)

// Queue the event for sending via SSE
select {
case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
// Event queued successfully
case <-session.done:
// Session is closed, don't try to queue
default:
// Queue is full, could log this
}
// Only send response if there is one (not for notifications)
if response != nil {
eventData, _ := json.Marshal(response)

// Send HTTP response
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(response)
} else {
// For notifications, just send 202 Accepted with no body
w.WriteHeader(http.StatusAccepted)
}
// Queue the event for sending via SSE
select {
case session.eventQueue <- fmt.Sprintf("event: message\ndata: %s\n\n", eventData):
// Event queued successfully
case <-session.done:
// Session is closed, don't try to queue
default:
// Queue is full, could log this
}
}
}()
}

// writeJSONRPCError writes a JSON-RPC error response with the given error details.
Expand Down