diff --git a/src/server/request_handler/default_request_handler.ts b/src/server/request_handler/default_request_handler.ts index ea79f2dc..f551620b 100644 --- a/src/server/request_handler/default_request_handler.ts +++ b/src/server/request_handler/default_request_handler.ts @@ -167,7 +167,11 @@ export class DefaultRequestHandler implements A2ARequestHandler { for await (const event of eventQueue.events()) { await resultManager.processEvent(event); - await this._sendPushNotificationIfNeeded(event); + try { + await this._sendPushNotificationIfNeeded(event); + } catch (error) { + console.error(`Error sending push notification: ${error}`); + } if (options?.firstResultResolver && !firstResultSent) { let firstResult: Message | Task | undefined; @@ -187,13 +191,15 @@ export class DefaultRequestHandler implements A2ARequestHandler { A2AError.internalError('Execution finished before a message or task was produced.') ); } - } catch (error) { + } catch (error: any) { console.error(`Event processing loop failed for task ${taskId}:`, error); - if (options?.firstResultRejector && !firstResultSent) { - options.firstResultRejector(error); - } - // re-throw error for blocking case to catch - throw error; + this._handleProcessingError( + error, + resultManager, + firstResultSent, + taskId, + options?.firstResultRejector + ); } finally { this.eventBusManager.cleanupByTaskId(taskId); } @@ -621,4 +627,54 @@ export class DefaultRequestHandler implements A2ARequestHandler { // Send push notification in the background. this.pushNotificationSender?.send(task); } + + private async _handleProcessingError( + error: any, + resultManager: ResultManager, + firstResultSent: boolean, + taskId: string, + firstResultRejector?: (reason: any) => void + ): Promise { + // Non-blocking case with with first result not sent + if (firstResultRejector && !firstResultSent) { + firstResultRejector(error); + return; + } + + // re-throw error for blocking case to catch + if (!firstResultRejector) { + throw error; + } + + // Non-blocking case with first result already sent + const currentTask = resultManager.getCurrentTask(); + if (currentTask) { + const statusUpdateFailed: TaskStatusUpdateEvent = { + taskId: currentTask.id, + contextId: currentTask.contextId, + status: { + state: 'failed', + message: { + kind: 'message', + role: 'agent', + messageId: uuidv4(), + parts: [{ kind: 'text', text: `Event processing loop failed: ${error.message}` }], + taskId: currentTask.id, + contextId: currentTask.contextId, + }, + timestamp: new Date().toISOString(), + }, + kind: 'status-update', + final: true, + }; + + try { + await resultManager.processEvent(statusUpdateFailed); + } catch (error) { + console.error(`Event processing loop failed for task ${taskId}: ${error.message}`); + } + } else { + console.error(`Event processing loop failed for task ${taskId}: ${error.message}`); + } + } } diff --git a/test/server/default_request_handler.spec.ts b/test/server/default_request_handler.spec.ts index ceaad465..bd1f8499 100644 --- a/test/server/default_request_handler.spec.ts +++ b/test/server/default_request_handler.spec.ts @@ -40,6 +40,8 @@ import { } from './mocks/agent-executor.mock.js'; import { MockPushNotificationSender } from './mocks/push_notification_sender.mock.js'; import { ServerCallContext } from '../../src/server/context.js'; +import { MockTaskStore } from './mocks/task_store.mock.js'; +import { TextPart } from 'genkit/model'; describe('DefaultRequestHandler as A2ARequestHandler', () => { let handler: A2ARequestHandler; @@ -277,6 +279,87 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => { assert.equal(saveSpy.secondCall.args[0].status.state, 'completed'); }); + it('sendMessage: (non-blocking) should handle failure in event loop after successfull task event', async () => { + clock = sinon.useFakeTimers(); + + const mockTaskStore = new MockTaskStore(); + const handler = new DefaultRequestHandler( + testAgentCard, + mockTaskStore, + mockAgentExecutor, + executionEventBusManager + ); + + const params: MessageSendParams = { + message: createTestMessage('msg-nonblock', 'Do a long task'), + configuration: { + blocking: false, + acceptedOutputModes: [], + }, + }; + + const taskId = 'task-nonblock-123'; + const contextId = 'ctx-nonblock-abc'; + (mockAgentExecutor as MockAgentExecutor).execute.callsFake(async (ctx, bus) => { + // First event is the task creation, which should be returned immediately + bus.publish({ + id: taskId, + contextId, + status: { state: 'submitted' }, + kind: 'task', + }); + + // Simulate work before publishing more events + await clock.tickAsync(500); + + bus.publish({ + taskId, + contextId, + kind: 'status-update', + status: { state: 'completed' }, + final: true, + }); + bus.finished(); + }); + + let finalTaskSaved: Task | undefined; + const errorMessage = 'Error thrown on saving completed task notification'; + (mockTaskStore as MockTaskStore).save.callsFake(async (task) => { + if (task.status.state == 'completed') { + throw new Error(errorMessage); + } + + if (task.status.state == 'failed') { + finalTaskSaved = task; + } + }); + + // This call should return as soon as the first 'task' event is published + const immediateResult = await handler.sendMessage(params); + + // Assert that we got the initial task object back right away + const taskResult = immediateResult as Task; + assert.equal(taskResult.kind, 'task'); + assert.equal(taskResult.id, taskId); + assert.equal( + taskResult.status.state, + 'submitted', + "Should return immediately with 'submitted' state" + ); + + // Allow the background processing to complete + await clock.runAllAsync(); + + assert.equal(finalTaskSaved!.status.state, 'failed'); + assert.equal(finalTaskSaved!.id, taskId); + assert.equal(finalTaskSaved!.contextId, contextId); + assert.equal(finalTaskSaved!.status.message!.role, 'agent'); + assert.equal( + (finalTaskSaved!.status.message!.parts[0] as TextPart).text, + `Event processing loop failed: ${errorMessage}` + ); + }); + it('sendMessage: should handle agent execution failure for non-blocking calls', async () => { const errorMessage = 'Agent failed!'; (mockAgentExecutor as MockAgentExecutor).execute.rejects(new Error(errorMessage)); diff --git a/test/server/mocks/task_store.mock.ts b/test/server/mocks/task_store.mock.ts new file mode 100644 index 00000000..e0c49579 --- /dev/null +++ b/test/server/mocks/task_store.mock.ts @@ -0,0 +1,8 @@ +import sinon, { SinonStub } from 'sinon'; +import { Task } from '../../../src/index.js'; +import { TaskStore } from '../../../src/server/store.js'; + +export class MockTaskStore implements TaskStore { + public save: SinonStub<[Task], Promise> = sinon.stub(); + public load: SinonStub<[string], Promise> = sinon.stub(); +}