Skip to content

Commit fab7e1e

Browse files
feat: add closeSSEStream callback to RequestHandlerExtra (#1166)
1 parent 6c863c8 commit fab7e1e

File tree

5 files changed

+147
-11
lines changed

5 files changed

+147
-11
lines changed

src/examples/server/ssePollingExample.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
* Key features:
88
* - Configures `retryInterval` to tell clients how long to wait before reconnecting
99
* - Uses `eventStore` to persist events for replay after reconnection
10-
* - Calls `closeSSEStream()` to gracefully disconnect clients mid-operation
10+
* - Uses `extra.closeSSEStream()` callback to gracefully disconnect clients mid-operation
1111
*
1212
* Run with: npx tsx src/examples/server/ssePollingExample.ts
1313
* Test with: curl or the MCP Inspector
@@ -31,9 +31,6 @@ const server = new McpServer(
3131
}
3232
);
3333

34-
// Track active transports by session ID for closeSSEStream access
35-
const transports = new Map<string, StreamableHTTPServerTransport>();
36-
3734
// Register a long-running tool that demonstrates server-initiated disconnect
3835
server.tool(
3936
'long-task',
@@ -65,11 +62,11 @@ server.tool(
6562
await sleep(1000);
6663

6764
// Server decides to disconnect the client to free resources
68-
// Client will reconnect via GET with Last-Event-ID after retryInterval
69-
const transport = transports.get(extra.sessionId!);
70-
if (transport) {
65+
// Client will reconnect via GET with Last-Event-ID after the transport's retryInterval
66+
// Use extra.closeSSEStream callback - available when eventStore is configured
67+
if (extra.closeSSEStream) {
7168
console.log(`[${extra.sessionId}] Closing SSE stream to trigger client polling...`);
72-
transport.closeSSEStream(extra.requestId);
69+
extra.closeSSEStream();
7370
}
7471

7572
// Continue processing while client is disconnected
@@ -112,6 +109,9 @@ app.use(cors());
112109
// Create event store for resumability
113110
const eventStore = new InMemoryEventStore();
114111

112+
// Track transports by session ID for session reuse
113+
const transports = new Map<string, StreamableHTTPServerTransport>();
114+
115115
// Handle all MCP requests - use express.json() only for this route
116116
app.all('/mcp', express.json(), async (req: Request, res: Response) => {
117117
const sessionId = req.headers['mcp-session-id'] as string | undefined;
@@ -123,7 +123,7 @@ app.all('/mcp', express.json(), async (req: Request, res: Response) => {
123123
transport = new StreamableHTTPServerTransport({
124124
sessionIdGenerator: () => randomUUID(),
125125
eventStore,
126-
retryInterval: 2000, // Client should reconnect after 2 seconds
126+
retryInterval: 2000, // Default retry interval for priming events
127127
onsessioninitialized: id => {
128128
console.log(`[${id}] Session initialized`);
129129
transports.set(id, transport!);

src/server/streamableHttp.test.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,120 @@ describe.each(zodTestMatrix)('$zodVersionLabel', (entry: ZodMatrixEntry) => {
18021802
// Clean up - resolve the tool promise
18031803
toolResolve!();
18041804
});
1805+
1806+
it('should provide closeSSEStream callback in extra when eventStore is configured', async () => {
1807+
const result = await createTestServer({
1808+
sessionIdGenerator: () => randomUUID(),
1809+
eventStore: createEventStore(),
1810+
retryInterval: 1000
1811+
});
1812+
server = result.server;
1813+
transport = result.transport;
1814+
baseUrl = result.baseUrl;
1815+
mcpServer = result.mcpServer;
1816+
1817+
// Track whether closeSSEStream callback was provided
1818+
let receivedCloseSSEStream: (() => void) | undefined;
1819+
1820+
// Register a tool that captures the extra.closeSSEStream callback
1821+
mcpServer.tool('test-callback-tool', 'Test tool', {}, async (_args, extra) => {
1822+
receivedCloseSSEStream = extra.closeSSEStream;
1823+
return { content: [{ type: 'text', text: 'Done' }] };
1824+
});
1825+
1826+
// Initialize to get session ID
1827+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1828+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1829+
expect(sessionId).toBeDefined();
1830+
1831+
// Call the tool
1832+
const toolCallRequest: JSONRPCMessage = {
1833+
jsonrpc: '2.0',
1834+
id: 200,
1835+
method: 'tools/call',
1836+
params: { name: 'test-callback-tool', arguments: {} }
1837+
};
1838+
1839+
const postResponse = await fetch(baseUrl, {
1840+
method: 'POST',
1841+
headers: {
1842+
'Content-Type': 'application/json',
1843+
Accept: 'text/event-stream, application/json',
1844+
'mcp-session-id': sessionId,
1845+
'mcp-protocol-version': '2025-03-26'
1846+
},
1847+
body: JSON.stringify(toolCallRequest)
1848+
});
1849+
1850+
expect(postResponse.status).toBe(200);
1851+
1852+
// Read all events to completion
1853+
const reader = postResponse.body?.getReader();
1854+
while (true) {
1855+
const { done } = await reader!.read();
1856+
if (done) break;
1857+
}
1858+
1859+
// Verify closeSSEStream callback was provided
1860+
expect(receivedCloseSSEStream).toBeDefined();
1861+
expect(typeof receivedCloseSSEStream).toBe('function');
1862+
});
1863+
1864+
it('should NOT provide closeSSEStream callback when eventStore is NOT configured', async () => {
1865+
const result = await createTestServer({
1866+
sessionIdGenerator: () => randomUUID()
1867+
// No eventStore
1868+
});
1869+
server = result.server;
1870+
transport = result.transport;
1871+
baseUrl = result.baseUrl;
1872+
mcpServer = result.mcpServer;
1873+
1874+
// Track whether closeSSEStream callback was provided
1875+
let receivedCloseSSEStream: (() => void) | undefined;
1876+
1877+
// Register a tool that captures the extra.closeSSEStream callback
1878+
mcpServer.tool('test-no-callback-tool', 'Test tool', {}, async (_args, extra) => {
1879+
receivedCloseSSEStream = extra.closeSSEStream;
1880+
return { content: [{ type: 'text', text: 'Done' }] };
1881+
});
1882+
1883+
// Initialize to get session ID
1884+
const initResponse = await sendPostRequest(baseUrl, TEST_MESSAGES.initialize);
1885+
sessionId = initResponse.headers.get('mcp-session-id') as string;
1886+
expect(sessionId).toBeDefined();
1887+
1888+
// Call the tool
1889+
const toolCallRequest: JSONRPCMessage = {
1890+
jsonrpc: '2.0',
1891+
id: 201,
1892+
method: 'tools/call',
1893+
params: { name: 'test-no-callback-tool', arguments: {} }
1894+
};
1895+
1896+
const postResponse = await fetch(baseUrl, {
1897+
method: 'POST',
1898+
headers: {
1899+
'Content-Type': 'application/json',
1900+
Accept: 'text/event-stream, application/json',
1901+
'mcp-session-id': sessionId,
1902+
'mcp-protocol-version': '2025-03-26'
1903+
},
1904+
body: JSON.stringify(toolCallRequest)
1905+
});
1906+
1907+
expect(postResponse.status).toBe(200);
1908+
1909+
// Read all events to completion
1910+
const reader = postResponse.body?.getReader();
1911+
while (true) {
1912+
const { done } = await reader!.read();
1913+
if (done) break;
1914+
}
1915+
1916+
// Verify closeSSEStream callback was NOT provided
1917+
expect(receivedCloseSSEStream).toBeUndefined();
1918+
});
18051919
});
18061920

18071921
// Test onsessionclosed callback

src/server/streamableHttp.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,15 @@ export class StreamableHTTPServerTransport implements Transport {
649649

650650
// handle each message
651651
for (const message of messages) {
652-
this.onmessage?.(message, { authInfo, requestInfo });
652+
// Build closeSSEStream callback for requests when eventStore is configured
653+
let closeSSEStream: (() => void) | undefined;
654+
if (isJSONRPCRequest(message) && this._eventStore) {
655+
closeSSEStream = () => {
656+
this.closeSSEStream(message.id);
657+
};
658+
}
659+
660+
this.onmessage?.(message, { authInfo, requestInfo, closeSSEStream });
653661
}
654662
// The server SHOULD NOT close the SSE stream before sending all JSON-RPC responses
655663
// This will be handled by the send() method when responses are ready

src/shared/protocol.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,13 @@ export type RequestHandlerExtra<SendRequestT extends Request, SendNotificationT
283283
* This is used by certain transports to correctly associate related messages.
284284
*/
285285
sendRequest: <U extends AnySchema>(request: SendRequestT, resultSchema: U, options?: TaskRequestOptions) => Promise<SchemaOutput<U>>;
286+
287+
/**
288+
* Closes the SSE stream for this request, triggering client reconnection.
289+
* Only available when using StreamableHTTPServerTransport with eventStore configured.
290+
* Use this to implement polling behavior during long-running operations.
291+
*/
292+
closeSSEStream?: () => void;
286293
};
287294

288295
/**
@@ -728,7 +735,8 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
728735
requestInfo: extra?.requestInfo,
729736
taskId: relatedTaskId,
730737
taskStore: taskStore,
731-
taskRequestedTtl: taskCreationParams?.ttl
738+
taskRequestedTtl: taskCreationParams?.ttl,
739+
closeSSEStream: extra?.closeSSEStream
732740
};
733741

734742
// Starting with Promise.resolve() puts any synchronous errors into the monad as well.

src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2137,6 +2137,12 @@ export interface MessageExtraInfo {
21372137
* The authentication information.
21382138
*/
21392139
authInfo?: AuthInfo;
2140+
2141+
/**
2142+
* Callback to close the SSE stream for this request, triggering client reconnection.
2143+
* Only available when using StreamableHTTPServerTransport with eventStore configured.
2144+
*/
2145+
closeSSEStream?: () => void;
21402146
}
21412147

21422148
/* JSON-RPC types */

0 commit comments

Comments
 (0)