Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte

EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue);
ResultAggregator.EventTypeAndInterrupt etai = null;
EventKind kind = null; // Declare outside try block so it's in scope for return
try {
// Create callback for push notifications during background event processing
Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator);
Expand All @@ -203,7 +204,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
// This callback must be added before we start consuming. Otherwise,
// any errors thrown by the producerRunnable are not picked up by the consumer
producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback());
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback);

// Get agent future before consuming (for blocking calls to wait for agent completion)
CompletableFuture<Void> agentFuture = runningAgents.get(taskId);
etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback, agentFuture);

if (etai == null) {
LOGGER.debug("No result, throwing InternalError");
Expand All @@ -212,7 +216,63 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
interruptedOrNonBlocking = etai.interrupted();
LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking);

EventKind kind = etai.eventType();
// For blocking calls that were interrupted (returned on first event),
// wait for agent execution and event processing BEFORE returning to client.
// This ensures the returned Task has all artifacts and current state.
// We do this HERE (not in ResultAggregator) to avoid blocking Vert.x worker threads
// during the consumption loop itself.
kind = etai.eventType();
if (blocking && interruptedOrNonBlocking) {
// For blocking calls: ensure all events are processed before returning
// Order of operations is critical to avoid circular dependency:
// 1. Wait for agent to finish enqueueing events
// 2. Close the queue to signal consumption can complete
// 3. Wait for consumption to finish processing events
// 4. Fetch final task state from TaskStore

try {
// Step 1: Wait for agent to finish (with short timeout for fast agents)
if (agentFuture != null) {
try {
agentFuture.get(5, java.util.concurrent.TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout value 5 is hardcoded. It's a good practice to define it as a named constant (e.g., AGENT_COMPLETION_TIMEOUT_SECONDS) at the top of the class for better readability and maintainability.

LOGGER.debug("Agent completed for task {}", taskId);
} catch (java.util.concurrent.TimeoutException e) {
// Agent still running after 5s - that's fine, events already being processed
LOGGER.debug("Agent still running for task {} after 5s", taskId);
}
}

// Step 2: Close the queue to signal consumption can complete
// For fire-and-forget tasks, there's no final event, so we need to close the queue
// This allows EventConsumer.consumeAll() to exit
queue.close(false, false); // graceful close, don't notify parent yet
LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId);

// Step 3: Wait for consumption to complete (now that queue is closed)
if (etai.consumptionFuture() != null) {
etai.consumptionFuture().get(2, java.util.concurrent.TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the agent completion timeout, the hardcoded timeout value 2 should be defined as a named constant (e.g., CONSUMPTION_COMPLETION_TIMEOUT_SECONDS) for improved code clarity and ease of maintenance.

LOGGER.debug("Consumption completed for task {}", taskId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted waiting for task {} completion", taskId, e);
} catch (java.util.concurrent.ExecutionException e) {
LOGGER.warn("Error during task {} execution", taskId, e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
LOGGER.warn("Timeout waiting for consumption to complete for task {}", taskId);
}
Comment on lines +233 to +263
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The timeout values 5 (line 237) and 2 (line 253) are hardcoded. For better readability and maintainability, consider extracting them into private static final constants with descriptive names, for example AGENT_COMPLETION_TIMEOUT_SECONDS and CONSUMPTION_COMPLETION_TIMEOUT_SECONDS.


// Step 4: Fetch the final task state from TaskStore (all events have been processed)
Task updatedTask = taskStore.get(taskId);
if (updatedTask != null) {
kind = updatedTask;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Fetched final task for {} with state {} and {} artifacts",
taskId, updatedTask.getStatus().state(),
updatedTask.getArtifacts() != null ? updatedTask.getArtifacts().size() : 0);
}
}
}
Comment on lines +225 to +275
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic inside the if (blocking && interruptedOrNonBlocking) block is quite complex and spans many lines. To improve the readability and maintainability of the onMessageSend method, consider extracting this logic into a private helper method, for example waitForCompletionAndFetchFinalTask. This new method could take parameters like agentFuture, queue, etai, and taskId and return the final Task.

if (kind instanceof Task taskResult && !taskId.equals(taskResult.getId())) {
throw new InternalError("Task ID mismatch in agent response");
}
Expand All @@ -229,8 +289,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte
trackBackgroundTask(cleanupProducer(agentFuture, etai != null ? etai.consumptionFuture() : null, taskId, queue, false));
}

LOGGER.debug("Returning: {}", etai.eventType());
return etai.eventType();
LOGGER.debug("Returning: {}", kind);
return kind;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.a2a.server.events.EventConsumer;
import io.a2a.server.events.EventQueueItem;
import io.a2a.spec.A2AServerException;
import io.a2a.spec.Event;
import io.a2a.spec.EventKind;
import io.a2a.spec.InternalError;
import io.a2a.spec.JSONRPCError;
import io.a2a.spec.Message;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatusUpdateEvent;
import io.a2a.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResultAggregator {
private static final Logger LOGGER = LoggerFactory.getLogger(ResultAggregator.class);
Expand Down Expand Up @@ -106,10 +106,14 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError {
}

public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking) throws JSONRPCError {
return consumeAndBreakOnInterrupt(consumer, blocking, null);
return consumeAndBreakOnInterrupt(consumer, blocking, null, null);
}

public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback) throws JSONRPCError {
return consumeAndBreakOnInterrupt(consumer, blocking, eventCallback, null);
}

public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, boolean blocking, Runnable eventCallback, CompletableFuture<Void> agentFuture) throws JSONRPCError {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The agentFuture parameter is not used within this method. If it's not needed here, it should be removed to avoid confusion and simplify the method signature. This would require updating the calling methods as well.

Flow.Publisher<EventQueueItem> allItems = consumer.consumeAll();
AtomicReference<Message> message = new AtomicReference<>();
AtomicBoolean interrupted = new AtomicBoolean(false);
Expand Down Expand Up @@ -180,11 +184,11 @@ else if (!blocking) {
shouldInterrupt = true;
continueInBackground = true;
}
else {
// For ALL blocking calls (both final and non-final events), use background consumption
// This ensures all events are processed and persisted to TaskStore in background
// Queue lifecycle is now managed by DefaultRequestHandler.cleanupProducer()
// which waits for BOTH agent and consumption futures before closing queues
else if (blocking) {
// For blocking calls: Interrupt to free Vert.x thread, but continue in background
// Python's async consumption doesn't block threads, but Java's does
// So we interrupt to return quickly, then rely on background consumption
// DefaultRequestHandler will fetch the final state from TaskStore
shouldInterrupt = true;
continueInBackground = true;
if (LOGGER.isDebugEnabled()) {
Expand All @@ -198,10 +202,17 @@ else if (!blocking) {
interrupted.set(true);
completionFuture.complete(null);

// Signal that cleanup can proceed while consumption continues in background.
// This prevents infinite hangs for fire-and-forget agents that never emit final events.
// Processing continues (return true below) and all events are still persisted to TaskStore.
consumptionCompletionFuture.complete(null);
// For blocking calls, DON'T complete consumptionCompletionFuture here.
// Let it complete naturally when subscription finishes (onComplete callback below).
// This ensures all events are processed and persisted to TaskStore before
// DefaultRequestHandler.cleanupProducer() proceeds with cleanup.
//
// For non-blocking and auth-required calls, complete immediately to allow
// cleanup to proceed while consumption continues in background.
if (!blocking) {
consumptionCompletionFuture.complete(null);
}
// else: blocking calls wait for actual consumption completion in onComplete

// Continue consuming in background - keep requesting events
// Note: continueInBackground is always true when shouldInterrupt is true
Expand Down Expand Up @@ -244,8 +255,8 @@ else if (!blocking) {
}
}

// Background consumption continues automatically via the subscription
// returning true in the consumer function keeps the subscription alive
// Note: For blocking calls that were interrupted, the wait logic has been moved
// to DefaultRequestHandler.onMessageSend() to avoid blocking Vert.x worker threads.
// Queue lifecycle is managed by DefaultRequestHandler.cleanupProducer()

Throwable error = errorRef.get();
Expand Down
Loading