Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import io.a2a.jsonrpc.common.json.JsonProcessingException;
import io.a2a.spec.A2AError;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatusUpdateEvent;
import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -64,11 +66,23 @@ private void handleMessage(String message, @Nullable Future<Void> future) {

StreamingEventKind event = ProtoUtils.FromProto.streamingEventKind(response);
eventHandler.accept(event);
if (event instanceof TaskStatusUpdateEvent && ((TaskStatusUpdateEvent) event).isFinal()) {
if (future != null) {
future.cancel(true); // close SSE channel

// Client-side auto-close on final events to prevent connection leaks
// Handles both TaskStatusUpdateEvent and Task objects with final states
// This covers late subscriptions to completed tasks and ensures no connection leaks
boolean shouldClose = false;
if (event instanceof TaskStatusUpdateEvent tue && tue.isFinal()) {
shouldClose = true;
} else if (event instanceof Task task) {
TaskState state = task.status().state();
if (state.isFinal()) {
shouldClose = true;
}
}

if (shouldClose && future != null) {
future.cancel(true); // close SSE channel
}
} catch (A2AError error) {
if (errorHandler != null) {
errorHandler.accept(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import io.a2a.grpc.StreamResponse;
import io.a2a.grpc.utils.ProtoUtils;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.Task;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatusUpdateEvent;
import org.jspecify.annotations.Nullable;

public class RestSSEEventListener {
Expand All @@ -29,7 +32,7 @@ public void onMessage(String message, @Nullable Future<Void> completableFuture)
log.fine("Streaming message received: " + message);
io.a2a.grpc.StreamResponse.Builder builder = io.a2a.grpc.StreamResponse.newBuilder();
JsonFormat.parser().merge(message, builder);
handleMessage(builder.build());
handleMessage(builder.build(), completableFuture);
} catch (InvalidProtocolBufferException e) {
errorHandler.accept(RestErrorMapper.mapRestError(message, 500));
}
Expand All @@ -44,7 +47,7 @@ public void onError(Throwable throwable, @Nullable Future<Void> future) {
}
}

private void handleMessage(StreamResponse response) {
private void handleMessage(StreamResponse response, @Nullable Future<Void> future) {
StreamingEventKind event;
switch (response.getPayloadCase()) {
case MESSAGE ->
Expand All @@ -62,6 +65,23 @@ private void handleMessage(StreamResponse response) {
}
}
eventHandler.accept(event);

// Client-side auto-close on final events to prevent connection leaks
// Handles both TaskStatusUpdateEvent and Task objects with final states
// This covers late subscriptions to completed tasks and ensures no connection leaks
boolean shouldClose = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be duplicate of JSONRPC own update

Copy link
Collaborator Author

@kabir kabir Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ehsavoie Not sure what you mean here? Won't the client be using either this or JSONRPC (or gRPC)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #623

if (event instanceof TaskStatusUpdateEvent tue && tue.isFinal()) {
shouldClose = true;
} else if (event instanceof Task task) {
TaskState state = task.status().state();
if (state.isFinal()) {
shouldClose = true;
}
}

if (shouldClose && future != null) {
future.cancel(true); // close SSE channel
}
}

}
16 changes: 16 additions & 0 deletions examples/cloud-deployment/scripts/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,22 @@ echo ""
echo "Deploying PostgreSQL..."
kubectl apply -f ../k8s/01-postgres.yaml
echo "Waiting for PostgreSQL to be ready..."

# Wait for pod to be created (StatefulSet takes time to create pod)
for i in {1..30}; do
if kubectl get pod -l app=postgres -n a2a-demo 2>/dev/null | grep -q postgres; then
echo "PostgreSQL pod found, waiting for ready state..."
break
fi
if [ $i -eq 30 ]; then
echo -e "${RED}ERROR: PostgreSQL pod not created after 30 seconds${NC}"
kubectl get statefulset -n a2a-demo
exit 1
fi
sleep 1
done

# Now wait for pod to be ready
kubectl wait --for=condition=Ready pod -l app=postgres -n a2a-demo --timeout=120s
echo -e "${GREEN}✓ PostgreSQL deployed${NC}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,28 @@
* This event is fired AFTER the database transaction commits, making it safe for downstream
* components to assume the task is durably stored.
*
* <p>Used by the replicated queue manager to send poison pill events after ensuring
* the final task state is committed to the database, eliminating race conditions.
* <p>Used by the replicated queue manager to send the final task state before the poison pill,
* ensuring correct event ordering across instances and eliminating race conditions.
*/
public class TaskFinalizedEvent {
private final String taskId;
private final Object task; // Task type from io.a2a.spec - using Object to avoid dependency

public TaskFinalizedEvent(String taskId) {
public TaskFinalizedEvent(String taskId, Object task) {
this.taskId = taskId;
this.task = task;
}

public String getTaskId() {
return taskId;
}

public Object getTask() {
return task;
}

@Override
public String toString() {
return "TaskFinalizedEvent{taskId='" + taskId + "'}";
return "TaskFinalizedEvent{taskId='" + taskId + "', task=" + task + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,5 @@ public void deleteInfo(String taskId, String configId) {
taskId, configId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ public void setClosedEvent(boolean closedEvent) {
}
}

/**
* Check if this event is a Task event.
* Task events should always be processed even for inactive tasks,
* as they carry the final task state.
* @return true if this is a Task event
*/
public boolean isTaskEvent() {
return event instanceof io.a2a.spec.Task;
}

@Override
public String toString() {
return "ReplicatedEventQueueItem{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import io.a2a.server.events.EventQueueFactory;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.InMemoryQueueManager;
import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.QueueManager;
import io.a2a.server.tasks.TaskStateProvider;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -45,10 +47,12 @@ protected ReplicatedQueueManager() {
}

@Inject
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
public ReplicatedQueueManager(ReplicationStrategy replicationStrategy,
TaskStateProvider taskStateProvider,
MainEventBus mainEventBus) {
this.replicationStrategy = replicationStrategy;
this.taskStateProvider = taskStateProvider;
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider, mainEventBus);
}


Expand Down Expand Up @@ -77,8 +81,7 @@ public void close(String taskId) {

@Override
public EventQueue createOrTap(String taskId) {
EventQueue queue = delegate.createOrTap(taskId);
return queue;
return delegate.createOrTap(taskId);
}

@Override
Expand All @@ -87,48 +90,93 @@ public void awaitQueuePollerStart(EventQueue eventQueue) throws InterruptedExcep
}

public void onReplicatedEvent(@Observes ReplicatedEventQueueItem replicatedEvent) {
// Check if task is still active before processing replicated event (unless it's a QueueClosedEvent)
// QueueClosedEvent should always be processed to terminate streams, even for inactive tasks
// Check if task is still active before processing replicated event
// Always allow QueueClosedEvent and Task events (they carry final state)
// Skip other event types for inactive tasks to prevent queue creation for expired tasks
if (!replicatedEvent.isClosedEvent()
&& !replicatedEvent.isTaskEvent()
&& !taskStateProvider.isTaskActive(replicatedEvent.getTaskId())) {
// Task is no longer active - skip processing this replicated event
// This prevents creating queues for tasks that have been finalized beyond the grace period
LOGGER.debug("Skipping replicated event for inactive task {}", replicatedEvent.getTaskId());
return;
}

// Get or create a ChildQueue for this task (creates MainQueue if it doesn't exist)
EventQueue childQueue = delegate.createOrTap(replicatedEvent.getTaskId());

// Get the MainQueue to enqueue the replicated event item
// We must use enqueueItem (not enqueueEvent) to preserve the isReplicated() flag
// and avoid triggering the replication hook again (which would cause a replication loop)
//
// IMPORTANT: We must NOT create a ChildQueue here! Creating and immediately closing
// a ChildQueue means there are zero children when MainEventBusProcessor distributes
// the event. Existing ChildQueues (from active client subscriptions) will receive
// the event when MainEventBusProcessor distributes it to all children.
//
// If MainQueue doesn't exist, create it. This handles late-arriving replicated events
// for tasks that were created on another instance.
EventQueue childQueue = null; // Track ChildQueue we might create
EventQueue mainQueue = delegate.get(replicatedEvent.getTaskId());
try {
// Get the MainQueue to enqueue the replicated event item
// We must use enqueueItem (not enqueueEvent) to preserve the isReplicated() flag
// and avoid triggering the replication hook again (which would cause a replication loop)
EventQueue mainQueue = delegate.get(replicatedEvent.getTaskId());
if (mainQueue == null) {
LOGGER.debug("Creating MainQueue for replicated event on task {}", replicatedEvent.getTaskId());
childQueue = delegate.createOrTap(replicatedEvent.getTaskId()); // Creates MainQueue + returns ChildQueue
mainQueue = delegate.get(replicatedEvent.getTaskId()); // Get MainQueue from map
}

if (mainQueue != null) {
mainQueue.enqueueItem(replicatedEvent);
} else {
LOGGER.warn("MainQueue not found for task {}, cannot enqueue replicated event. This may happen if the queue was already cleaned up.",
replicatedEvent.getTaskId());
LOGGER.warn(
"MainQueue not found for task {}, cannot enqueue replicated event. This may happen if the queue was already cleaned up.",
replicatedEvent.getTaskId());
}
} finally {
// Close the temporary ChildQueue to prevent leaks
// The MainQueue remains open for other consumers
childQueue.close();
if (childQueue != null) {
try {
childQueue.close(); // Close the ChildQueue we created (not MainQueue!)
} catch (Exception ignore) {
// The close is safe, but print a stacktrace just in case
if (LOGGER.isDebugEnabled()) {
ignore.printStackTrace();
}
}
}
}
}

/**
* Observes task finalization events fired AFTER database transaction commits.
* This guarantees the task's final state is durably stored before sending the poison pill.
* This guarantees the task's final state is durably stored before replication.
*
* @param event the task finalized event containing the task ID
* Sends TaskStatusUpdateEvent (not full Task) FIRST, then the poison pill (QueueClosedEvent),
* ensuring correct event ordering across instances and eliminating race conditions where
* the poison pill arrives before the final task state.
*
* IMPORTANT: We send TaskStatusUpdateEvent instead of full Task to maintain consistency
* with local event distribution. Clients expect TaskStatusUpdateEvent for status changes,
* and sending the full Task causes issues in remote instances where clients don't handle
* bare Task objects the same way they handle TaskStatusUpdateEvent.
*
* @param event the task finalized event containing the task ID and final Task
*/
public void onTaskFinalized(@Observes(during = TransactionPhase.AFTER_SUCCESS) TaskFinalizedEvent event) {
String taskId = event.getTaskId();
LOGGER.debug("Task {} finalized - sending poison pill (QueueClosedEvent) after transaction commit", taskId);
io.a2a.spec.Task finalTask = (io.a2a.spec.Task) event.getTask(); // Cast from Object

LOGGER.debug("Task {} finalized - sending TaskStatusUpdateEvent then poison pill (QueueClosedEvent) after transaction commit", taskId);

// Convert final Task to TaskStatusUpdateEvent to match local event distribution
// This ensures remote instances receive the same event type as local instances
io.a2a.spec.TaskStatusUpdateEvent finalStatusEvent = io.a2a.spec.TaskStatusUpdateEvent.builder()
.taskId(taskId)
.contextId(finalTask.contextId())
.status(finalTask.status())
.isFinal(true)
.build();

// Send TaskStatusUpdateEvent FIRST to ensure it arrives before poison pill
replicationStrategy.send(taskId, finalStatusEvent);

// Send poison pill directly via replication strategy
// Then send poison pill
// The transaction has committed, so the final state is guaranteed to be in the database
io.a2a.server.events.QueueClosedEvent closedEvent = new io.a2a.server.events.QueueClosedEvent(taskId);
replicationStrategy.send(taskId, closedEvent);
Expand All @@ -152,12 +200,11 @@ public EventQueue.EventQueueBuilder builder(String taskId) {
// which sends the QueueClosedEvent after the database transaction commits.
// This ensures proper ordering and transactional guarantees.

// Return the builder with callbacks
return delegate.getEventQueueBuilder(taskId)
.taskId(taskId)
.hook(new ReplicationHook(taskId))
.addOnCloseCallback(delegate.getCleanupCallback(taskId))
.taskStateProvider(taskStateProvider);
// Call createBaseEventQueueBuilder() directly to avoid infinite recursion
// (getEventQueueBuilder() would delegate back to this factory, creating a loop)
// The base builder already includes: taskId, cleanup callback, taskStateProvider, mainEventBus
return delegate.createBaseEventQueueBuilder(taskId)
.hook(new ReplicationHook(taskId));
}
}

Expand Down
Loading