Skip to content

Commit 0601671

Browse files
fix: call tasks/result to deliver side-channel messages
Replace polling loop in requestStream() with direct tasks/result call that delivers queued messages (elicitation, sampling) via SSE. Add integration test for callToolStream with elicitation.
1 parent a64cee7 commit 0601671

File tree

2 files changed

+100
-31
lines changed

2 files changed

+100
-31
lines changed

src/integration-tests/taskLifecycle.test.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1604,4 +1604,77 @@ describe('Task Lifecycle Integration Tests', () => {
16041604
await transport.close();
16051605
});
16061606
});
1607+
1608+
describe('callToolStream with elicitation', () => {
1609+
it('should deliver elicitation via callToolStream and complete task', async () => {
1610+
const client = new Client(
1611+
{
1612+
name: 'test-client',
1613+
version: '1.0.0'
1614+
},
1615+
{
1616+
capabilities: {
1617+
elicitation: {}
1618+
}
1619+
}
1620+
);
1621+
1622+
// Track elicitation request receipt
1623+
let elicitationReceived = false;
1624+
let elicitationMessage = '';
1625+
1626+
// Set up elicitation handler on client
1627+
client.setRequestHandler(ElicitRequestSchema, async request => {
1628+
elicitationReceived = true;
1629+
elicitationMessage = request.params.message;
1630+
1631+
return {
1632+
action: 'accept' as const,
1633+
content: {
1634+
userName: 'StreamUser'
1635+
}
1636+
};
1637+
});
1638+
1639+
const transport = new StreamableHTTPClientTransport(baseUrl);
1640+
await client.connect(transport);
1641+
1642+
// Use callToolStream instead of raw request()
1643+
const stream = client.experimental.tasks.callToolStream({ name: 'input-task', arguments: {} }, CallToolResultSchema, {
1644+
task: { ttl: 60000 }
1645+
});
1646+
1647+
// Collect all stream messages
1648+
const messages: Array<{ type: string; task?: unknown; result?: unknown; error?: unknown }> = [];
1649+
for await (const message of stream) {
1650+
messages.push(message);
1651+
}
1652+
1653+
// Verify stream yielded expected message types
1654+
expect(messages.length).toBeGreaterThanOrEqual(2);
1655+
1656+
// First message should be taskCreated
1657+
expect(messages[0].type).toBe('taskCreated');
1658+
expect(messages[0].task).toBeDefined();
1659+
1660+
// Should have a taskStatus message
1661+
const statusMessages = messages.filter(m => m.type === 'taskStatus');
1662+
expect(statusMessages.length).toBeGreaterThanOrEqual(1);
1663+
1664+
// Last message should be result
1665+
const lastMessage = messages[messages.length - 1];
1666+
expect(lastMessage.type).toBe('result');
1667+
expect(lastMessage.result).toBeDefined();
1668+
1669+
// Verify elicitation was received and processed
1670+
expect(elicitationReceived).toBe(true);
1671+
expect(elicitationMessage).toContain('What is your name?');
1672+
1673+
// Verify result content
1674+
const result = lastMessage.result as { content: Array<{ type: string; text: string }> };
1675+
expect(result.content).toEqual([{ type: 'text', text: 'Hello, StreamUser!' }]);
1676+
1677+
await transport.close();
1678+
}, 15000);
1679+
});
16071680
});

src/shared/protocol.ts

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -995,39 +995,35 @@ export abstract class Protocol<SendRequestT extends Request, SendNotificationT e
995995
throw new McpError(ErrorCode.InternalError, 'Task creation did not return a task');
996996
}
997997

998-
// Poll for task completion
999-
while (true) {
1000-
// Get current task status
1001-
const task = await this.getTask({ taskId }, options);
1002-
yield { type: 'taskStatus', task };
1003-
1004-
// Check if task is terminal
1005-
if (isTerminal(task.status)) {
1006-
if (task.status === 'completed') {
1007-
// Get the final result
1008-
const result = await this.getTaskResult({ taskId }, resultSchema, options);
1009-
yield { type: 'result', result };
1010-
} else if (task.status === 'failed') {
1011-
yield {
1012-
type: 'error',
1013-
error: new McpError(ErrorCode.InternalError, `Task ${taskId} failed`)
1014-
};
1015-
} else if (task.status === 'cancelled') {
1016-
yield {
1017-
type: 'error',
1018-
error: new McpError(ErrorCode.InternalError, `Task ${taskId} was cancelled`)
1019-
};
1020-
}
1021-
return;
998+
// Get initial task status
999+
const initialTask = await this.getTask({ taskId }, options);
1000+
yield { type: 'taskStatus', task: initialTask };
1001+
1002+
// If already terminal, handle immediately
1003+
if (isTerminal(initialTask.status)) {
1004+
if (initialTask.status === 'completed') {
1005+
const result = await this.getTaskResult({ taskId }, resultSchema, options);
1006+
yield { type: 'result', result };
1007+
} else if (initialTask.status === 'failed') {
1008+
yield {
1009+
type: 'error',
1010+
error: new McpError(ErrorCode.InternalError, `Task ${taskId} failed`)
1011+
};
1012+
} else if (initialTask.status === 'cancelled') {
1013+
yield {
1014+
type: 'error',
1015+
error: new McpError(ErrorCode.InternalError, `Task ${taskId} was cancelled`)
1016+
};
10221017
}
1023-
1024-
// Wait before polling again
1025-
const pollInterval = task.pollInterval ?? this._options?.defaultTaskPollInterval ?? 1000;
1026-
await new Promise(resolve => setTimeout(resolve, pollInterval));
1027-
1028-
// Check if cancelled
1029-
options?.signal?.throwIfAborted();
1018+
return;
10301019
}
1020+
1021+
// Call tasks/result - this will:
1022+
// 1. Deliver any queued messages (elicitation, sampling) via SSE
1023+
// 2. Block until task becomes terminal
1024+
// 3. Return final result
1025+
const result = await this.getTaskResult({ taskId }, resultSchema, options);
1026+
yield { type: 'result', result };
10311027
} catch (error) {
10321028
yield {
10331029
type: 'error',

0 commit comments

Comments
 (0)