Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f20fcc3
implement linter and formatter
guglielmo-san Nov 16, 2025
0fc3b2e
remove formatting configuration from eslint
guglielmo-san Nov 17, 2025
265e924
some eslint fixes
guglielmo-san Nov 17, 2025
fe2cf55
some eslint fixes
guglielmo-san Nov 17, 2025
dc2779d
eslint fixes
guglielmo-san Nov 17, 2025
1846345
eslint fixes
guglielmo-san Nov 17, 2025
3fe1382
fix unused params for tests
guglielmo-san Nov 17, 2025
12b813a
final fixes
guglielmo-san Nov 17, 2025
f0ec991
Merge branch 'main' into implement_linter_and_prettier
guglielmo-san Nov 17, 2025
7fa6a59
fix tests
guglielmo-san Nov 17, 2025
2b1ab69
fix eslint
guglielmo-san Nov 17, 2025
a7187de
Merge pull request #3 from guglielmo-san/implement_linter_and_prettier
guglielmo-san Nov 17, 2025
e5423e5
set up git hub workflow for linter
guglielmo-san Nov 17, 2025
bfc43c6
small eslint fixes
guglielmo-san Nov 17, 2025
cd6f3ec
change workflow name
guglielmo-san Nov 17, 2025
2397a91
test failure
guglielmo-san Nov 17, 2025
77ac77b
fix typo
guglielmo-san Nov 17, 2025
14117d9
allow no yield in return
guglielmo-san Nov 17, 2025
afa4b55
disable only on specific use case
guglielmo-san Nov 17, 2025
e7f7693
move the exception to the right line
guglielmo-san Nov 17, 2025
0c39d30
Replace function with NextFunction to remove eslint exception
guglielmo-san Nov 17, 2025
dca28c9
Merge branch 'a2aproject:main' into main
guglielmo-san Nov 17, 2025
b18a9ac
enable prettier on the eslint config
guglielmo-san Nov 17, 2025
348ff18
Apply prettier within linter
guglielmo-san Nov 17, 2025
7ce04fe
modify prettier parameters
guglielmo-san Nov 17, 2025
827e932
remove unused config
guglielmo-san Nov 17, 2025
a00cc38
Handle failure in non blocking request
guglielmo-san Nov 18, 2025
4f1602c
add test for the handling of error in event loop
guglielmo-san Nov 18, 2025
b0d201a
Merge branch 'main' into test_hanging_promise_error
guglielmo-san Nov 18, 2025
b81546d
add new test
guglielmo-san Nov 18, 2025
082b40c
Merge branch 'main' into test_hanging_promise_error
guglielmo-san Nov 18, 2025
01ca334
remove unused imports
guglielmo-san Nov 18, 2025
b0a7831
add error catch
guglielmo-san Nov 18, 2025
c44e257
run lint fix
guglielmo-san Nov 18, 2025
f7f47e5
add space at the end of the file
guglielmo-san Nov 18, 2025
f7e6b1e
improve code readability
guglielmo-san Nov 18, 2025
e2dca98
add support if no task is yet available in the store
guglielmo-san Nov 18, 2025
eaf1bf5
improve the error creation
guglielmo-san Nov 19, 2025
bf55dd1
fix lint
guglielmo-san Nov 19, 2025
8dc5f9d
revert sendMessage and sendMessageStream
guglielmo-san Nov 19, 2025
be2b953
run linter
guglielmo-san Nov 19, 2025
8618842
Improve error messages
guglielmo-san Nov 19, 2025
168a6cb
Merge branch 'main' into test_hanging_promise_error
guglielmo-san Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 63 additions & 7 deletions src/server/request_handler/default_request_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ export class DefaultRequestHandler implements A2ARequestHandler {
for await (const event of eventQueue.events()) {
await resultManager.processEvent(event);

await this._sendPushNotificationIfNeeded(event);
try {
await this._sendPushNotificationIfNeeded(event);
} catch (error) {
console.error(`Error sending push notification: ${error}`);
}

if (options?.firstResultResolver && !firstResultSent) {
let firstResult: Message | Task | undefined;
Expand All @@ -187,13 +191,15 @@ export class DefaultRequestHandler implements A2ARequestHandler {
A2AError.internalError('Execution finished before a message or task was produced.')
);
}
} catch (error) {
} catch (error: any) {
console.error(`Event processing loop failed for task ${taskId}:`, error);
if (options?.firstResultRejector && !firstResultSent) {
options.firstResultRejector(error);
}
// re-throw error for blocking case to catch
throw error;
this._handleProcessingError(
error,
resultManager,
firstResultSent,
taskId,
options?.firstResultRejector
);
} finally {
this.eventBusManager.cleanupByTaskId(taskId);
}
Expand Down Expand Up @@ -621,4 +627,54 @@ export class DefaultRequestHandler implements A2ARequestHandler {
// Send push notification in the background.
this.pushNotificationSender?.send(task);
}

private async _handleProcessingError(
error: any,
resultManager: ResultManager,
firstResultSent: boolean,
taskId: string,
firstResultRejector?: (reason: any) => void
): Promise<void> {
// Non-blocking case with with first result not sent
if (firstResultRejector && !firstResultSent) {
firstResultRejector(error);
return;
}

// re-throw error for blocking case to catch
if (!firstResultRejector) {
throw error;
}

// Non-blocking case with first result already sent
const currentTask = resultManager.getCurrentTask();
if (currentTask) {
const statusUpdateFailed: TaskStatusUpdateEvent = {
taskId: currentTask.id,
contextId: currentTask.contextId,
status: {
state: 'failed',
message: {
kind: 'message',
role: 'agent',
messageId: uuidv4(),
parts: [{ kind: 'text', text: `Event processing loop failed: ${error.message}` }],
taskId: currentTask.id,
contextId: currentTask.contextId,
},
timestamp: new Date().toISOString(),
},
kind: 'status-update',
final: true,
};

try {
await resultManager.processEvent(statusUpdateFailed);
} catch (error) {
console.error(`Event processing loop failed for task ${taskId}: ${error.message}`);
}
} else {
console.error(`Event processing loop failed for task ${taskId}: ${error.message}`);
}
}
}
83 changes: 83 additions & 0 deletions test/server/default_request_handler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import {
} from './mocks/agent-executor.mock.js';
import { MockPushNotificationSender } from './mocks/push_notification_sender.mock.js';
import { ServerCallContext } from '../../src/server/context.js';
import { MockTaskStore } from './mocks/task_store.mock.js';
import { TextPart } from 'genkit/model';

describe('DefaultRequestHandler as A2ARequestHandler', () => {
let handler: A2ARequestHandler;
Expand Down Expand Up @@ -277,6 +279,87 @@ describe('DefaultRequestHandler as A2ARequestHandler', () => {
assert.equal(saveSpy.secondCall.args[0].status.state, 'completed');
});

it('sendMessage: (non-blocking) should handle failure in event loop after successfull task event', async () => {
clock = sinon.useFakeTimers();

const mockTaskStore = new MockTaskStore();
const handler = new DefaultRequestHandler(
testAgentCard,
mockTaskStore,
mockAgentExecutor,
executionEventBusManager
);

const params: MessageSendParams = {
message: createTestMessage('msg-nonblock', 'Do a long task'),
configuration: {
blocking: false,
acceptedOutputModes: [],
},
};

const taskId = 'task-nonblock-123';
const contextId = 'ctx-nonblock-abc';
(mockAgentExecutor as MockAgentExecutor).execute.callsFake(async (ctx, bus) => {
// First event is the task creation, which should be returned immediately
bus.publish({
id: taskId,
contextId,
status: { state: 'submitted' },
kind: 'task',
});

// Simulate work before publishing more events
await clock.tickAsync(500);

bus.publish({
taskId,
contextId,
kind: 'status-update',
status: { state: 'completed' },
final: true,
});
bus.finished();
});

let finalTaskSaved: Task | undefined;
const errorMessage = 'Error thrown on saving completed task notification';
(mockTaskStore as MockTaskStore).save.callsFake(async (task) => {
if (task.status.state == 'completed') {
throw new Error(errorMessage);
}

if (task.status.state == 'failed') {
finalTaskSaved = task;
}
});

// This call should return as soon as the first 'task' event is published
const immediateResult = await handler.sendMessage(params);

// Assert that we got the initial task object back right away
const taskResult = immediateResult as Task;
assert.equal(taskResult.kind, 'task');
assert.equal(taskResult.id, taskId);
assert.equal(
taskResult.status.state,
'submitted',
"Should return immediately with 'submitted' state"
);

// Allow the background processing to complete
await clock.runAllAsync();

assert.equal(finalTaskSaved!.status.state, 'failed');
assert.equal(finalTaskSaved!.id, taskId);
assert.equal(finalTaskSaved!.contextId, contextId);
assert.equal(finalTaskSaved!.status.message!.role, 'agent');
assert.equal(
(finalTaskSaved!.status.message!.parts[0] as TextPart).text,
`Event processing loop failed: ${errorMessage}`
);
});

it('sendMessage: should handle agent execution failure for non-blocking calls', async () => {
const errorMessage = 'Agent failed!';
(mockAgentExecutor as MockAgentExecutor).execute.rejects(new Error(errorMessage));
Expand Down
8 changes: 8 additions & 0 deletions test/server/mocks/task_store.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import sinon, { SinonStub } from 'sinon';
import { Task } from '../../../src/index.js';
import { TaskStore } from '../../../src/server/store.js';

export class MockTaskStore implements TaskStore {
public save: SinonStub<[Task], Promise<void>> = sinon.stub();
public load: SinonStub<[string], Promise<Task | undefined>> = sinon.stub();
}