From edb4054fc3f930e58daf506ccdb7cbc5b05e94f3 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Fri, 13 Feb 2026 14:08:06 +0000 Subject: [PATCH 1/7] feat: Add agent-to-agent communication support (#625) Implements server-side agent-to-agent communication where an AgentExecutor can use a client to connect back to the same server, enabling agents to delegate work to other agents. --- reference/jsonrpc/pom.xml | 6 + .../server/apps/quarkus/A2AServerRoutes.java | 3 + .../server/rest/quarkus/A2AServerRoutes.java | 3 + .../java/io/a2a/server/ServerCallContext.java | 6 + .../apps/common/AbstractA2AServerTest.java | 140 ++++++++++++- .../apps/common/AgentExecutorProducer.java | 195 ++++++++++++++++++ .../common/AgentToAgentClientFactory.java | 99 +++++++++ .../transport/grpc/handler/GrpcHandler.java | 5 +- 8 files changed, 455 insertions(+), 2 deletions(-) create mode 100644 tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java diff --git a/reference/jsonrpc/pom.xml b/reference/jsonrpc/pom.xml index e099711a3..e7172fde7 100644 --- a/reference/jsonrpc/pom.xml +++ b/reference/jsonrpc/pom.xml @@ -51,6 +51,12 @@ test-jar test + + + ${project.groupId} + a2a-java-sdk-client-transport-jsonrpc + test + io.quarkus quarkus-reactive-routes diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java index 548f88da3..971a0cc77 100644 --- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java +++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java @@ -1,5 +1,6 @@ package io.a2a.server.apps.quarkus; +import static io.a2a.server.ServerCallContext.TRANSPORT_KEY; import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.HEADERS_KEY; import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY; import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.TENANT_KEY; @@ -62,6 +63,7 @@ import io.a2a.spec.A2AError; import io.a2a.spec.InternalError; import io.a2a.spec.JSONParseError; +import io.a2a.spec.TransportProtocol; import io.a2a.spec.UnsupportedOperationError; import io.a2a.transport.jsonrpc.handler.JSONRPCHandler; import io.quarkus.security.Authenticated; @@ -246,6 +248,7 @@ public String getUsername() { headerNames.forEach(name -> headers.put(name, rc.request().getHeader(name))); state.put(HEADERS_KEY, headers); state.put(TENANT_KEY, extractTenant(rc)); + state.put(TRANSPORT_KEY, TransportProtocol.JSONRPC); // Extract requested protocol version from X-A2A-Version header String requestedVersion = rc.request().getHeader(A2AHeaders.X_A2A_VERSION); diff --git a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java index 3c664c4b2..9b6bf510a 100644 --- a/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java +++ b/reference/rest/src/main/java/io/a2a/server/rest/quarkus/A2AServerRoutes.java @@ -1,5 +1,6 @@ package io.a2a.server.rest.quarkus; +import static io.a2a.server.ServerCallContext.TRANSPORT_KEY; import static io.a2a.spec.A2AMethods.CANCEL_TASK_METHOD; import static io.a2a.spec.A2AMethods.SEND_STREAMING_MESSAGE_METHOD; import static io.a2a.transport.rest.context.RestContextKeys.HEADERS_KEY; @@ -33,6 +34,7 @@ import io.a2a.spec.InternalError; import io.a2a.spec.InvalidParamsError; import io.a2a.spec.MethodNotFoundError; +import io.a2a.spec.TransportProtocol; import io.a2a.transport.rest.handler.RestHandler; import io.a2a.transport.rest.handler.RestHandler.HTTPRestResponse; import io.a2a.transport.rest.handler.RestHandler.HTTPRestStreamingResponse; @@ -430,6 +432,7 @@ public String getUsername() { state.put(HEADERS_KEY, headers); state.put(METHOD_NAME_KEY, jsonRpcMethodName); state.put(TENANT_KEY, extractTenant(rc)); + state.put(TRANSPORT_KEY, TransportProtocol.HTTP_JSON); // Extract requested protocol version from X-A2A-Version header String requestedVersion = rc.request().getHeader(A2AHeaders.X_A2A_VERSION); diff --git a/server-common/src/main/java/io/a2a/server/ServerCallContext.java b/server-common/src/main/java/io/a2a/server/ServerCallContext.java index c12c60c21..21a9e790b 100644 --- a/server-common/src/main/java/io/a2a/server/ServerCallContext.java +++ b/server-common/src/main/java/io/a2a/server/ServerCallContext.java @@ -9,6 +9,12 @@ import org.jspecify.annotations.Nullable; public class ServerCallContext { + /** + * Key for transport protocol type in the state map. + * Value should be a {@link io.a2a.spec.TransportProtocol} instance. + */ + public static final String TRANSPORT_KEY = "transport"; + // TODO Not totally sure yet about these field types private final Map modelConfig = new ConcurrentHashMap<>(); private final Map state; diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index f2da8de15..fdbba462c 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -19,6 +19,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -28,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Collectors; import java.util.stream.Stream; import jakarta.ws.rs.core.MediaType; @@ -617,7 +619,6 @@ public void testGetExtendedAgentCard() throws A2AClientException { .filter(i -> getTransportProtocol().equals(i.protocolBinding())) .findFirst(); assertTrue(transportInterface.isPresent()); - System.out.println("transportInterface = " + transportInterface); assertEquals(getTransportUrl(),transportInterface.get().url()); assertEquals("1.0", agentCard.version()); assertEquals("http://example.com/docs", agentCard.documentationUrl()); @@ -2451,4 +2452,141 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception { } } + /** + * Test agent-to-agent communication where AgentExecutor uses a client + * to delegate requests to another agent (same server, for testing). + *

+ * This test verifies: + *

    + *
  • Transport type is correctly passed via ServerCallContext state
  • + *
  • AgentExecutor can create a client with matching transport
  • + *
  • Delegation pattern ("delegate:" prefix) is recognized
  • + *
  • Client successfully communicates with same server
  • + *
  • Artifacts from delegated task are extracted and returned
  • + *
  • Local handling works without delegation
  • + *
+ */ + @Test + public void testAgentToAgentCommunication() throws Exception { + // Test 1: Delegation pattern (with "delegate:" prefix) + String delegationTaskId = "agent-to-agent-test-" + UUID.randomUUID(); + + Message delegationMessage = Message.builder() + .taskId(delegationTaskId) + .contextId("agent-to-agent-context") + .role(Message.Role.USER) + .parts(new TextPart("delegate:What is 2+2?")) + .build(); + + CountDownLatch delegationLatch = new CountDownLatch(1); + AtomicReference delegationResultRef = new AtomicReference<>(); + AtomicReference delegationErrorRef = new AtomicReference<>(); + + BiConsumer delegationConsumer = (event, agentCard) -> { + Task task = null; + if (event instanceof TaskEvent taskEvent) { + task = taskEvent.getTask(); + } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { + task = taskUpdateEvent.getTask(); + } + + if (task != null && task.status().state().isFinal()) { + delegationResultRef.set(task); + delegationLatch.countDown(); + } + }; + + getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> { + delegationErrorRef.set(error); + delegationLatch.countDown(); + }); + + assertTrue(delegationLatch.await(30, TimeUnit.SECONDS), "Delegation should complete within timeout"); + + Task delegationResult = delegationResultRef.get(); + + // Only fail on errors if we didn't get a successful result + // (errors can occur after completion due to stream cleanup) + if (delegationResult == null && delegationErrorRef.get() != null) { + fail("Delegation failed: " + delegationErrorRef.get().getMessage()); + } + + assertNotNull(delegationResult, "Delegation task should not be null"); + assertEquals(TaskState.COMPLETED, delegationResult.status().state(), + "Delegation task should be completed"); + assertNotNull(delegationResult.artifacts(), "Delegation should have artifacts"); + assertFalse(delegationResult.artifacts().isEmpty(), "Delegation should have at least one artifact"); + + // Extract text from result + String delegatedText = extractTextFromTask(delegationResult); + assertTrue(delegatedText.contains("Handled locally:"), + "Delegated content should have been handled locally by target agent. Got: " + delegatedText); + + // Verify the task ID is the original one (not the delegated task's ID) + assertEquals(delegationTaskId, delegationResult.id(), + "Task ID should be the original task ID, not the delegated task's ID"); + + // Test 2: Local handling (without "delegate:" prefix) + String localTaskId = "agent-to-agent-test-" + UUID.randomUUID(); + + Message localMessage = Message.builder() + .taskId(localTaskId) + .contextId("agent-to-agent-context") + .role(Message.Role.USER) + .parts(new TextPart("Hello directly")) + .build(); + + CountDownLatch localLatch = new CountDownLatch(1); + AtomicReference localResultRef = new AtomicReference<>(); + AtomicReference localErrorRef = new AtomicReference<>(); + + BiConsumer localConsumer = (event, agentCard) -> { + Task task = null; + if (event instanceof TaskEvent taskEvent) { + task = taskEvent.getTask(); + } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { + task = taskUpdateEvent.getTask(); + } + + if (task != null && task.status().state().isFinal()) { + localResultRef.set(task); + localLatch.countDown(); + } + }; + + getClient().sendMessage(localMessage, List.of(localConsumer), error -> { + localErrorRef.set(error); + localLatch.countDown(); + }); + + assertTrue(localLatch.await(30, TimeUnit.SECONDS), "Local handling should complete within timeout"); + assertNull(localErrorRef.get(), "Local handling should not have errors"); + + Task localResult = localResultRef.get(); + assertNotNull(localResult, "Local task should not be null"); + assertEquals(TaskState.COMPLETED, localResult.status().state(), + "Local task should be completed"); + + String localText = extractTextFromTask(localResult); + assertTrue(localText.contains("Handled locally: Hello directly"), + "Should be handled locally without delegation. Got: " + localText); + } + + /** + * Extracts all text from a task's artifacts. + * + * @param task the task containing artifacts + * @return concatenated text from all TextParts in all artifacts + */ + private String extractTextFromTask(Task task) { + if (task.artifacts() == null || task.artifacts().isEmpty()) { + return ""; + } + return task.artifacts().stream() + .flatMap(artifact -> artifact.parts().stream()) + .filter(part -> part instanceof TextPart) + .map(part -> ((TextPart) part).text()) + .collect(Collectors.joining("\n")); + } + } diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java index 40839e7fb..00fd55fbd 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java @@ -1,18 +1,38 @@ package io.a2a.server.apps.common; +import static io.a2a.server.ServerCallContext.TRANSPORT_KEY; + import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Produces; +import jakarta.inject.Inject; +import io.a2a.A2A; +import io.a2a.client.Client; +import io.a2a.client.ClientEvent; +import io.a2a.client.TaskEvent; +import io.a2a.client.TaskUpdateEvent; +import io.a2a.server.PublicAgentCard; +import io.a2a.server.ServerCallContext; import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.tasks.AgentEmitter; +import io.a2a.spec.A2AClientException; import io.a2a.spec.A2AError; +import io.a2a.spec.AgentCard; +import io.a2a.spec.Artifact; +import io.a2a.spec.InternalError; import io.a2a.spec.InvalidParamsError; import io.a2a.spec.Message; import io.a2a.spec.Part; +import io.a2a.spec.Task; import io.a2a.spec.TextPart; +import io.a2a.spec.TransportProtocol; import io.a2a.spec.UnsupportedOperationError; import io.quarkus.arc.profile.IfBuildProfile; @@ -20,6 +40,10 @@ @IfBuildProfile("test") public class AgentExecutorProducer { + @Inject + @PublicAgentCard + AgentCard agentCard; + @Produces public AgentExecutor agentExecutor() { return new AgentExecutor() { @@ -27,6 +51,12 @@ public AgentExecutor agentExecutor() { public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError { String taskId = context.getTaskId(); + // Agent-to-agent communication test + if (taskId != null && taskId.startsWith("agent-to-agent-test")) { + handleAgentToAgentTest(context, agentEmitter); + return; + } + // Special handling for multi-event test if (taskId != null && taskId.startsWith("multi-event-test")) { // First call: context.getTask() == null (new task) @@ -69,6 +99,22 @@ public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2 if (context.getTaskId().equals("task-not-supported-123")) { throw new UnsupportedOperationError(); } + + // Check for delegated agent-to-agent messages (marked with special prefix) + if (context.getMessage() != null) { + String userInput = extractTextFromMessage(context.getMessage()); + if (userInput.startsWith("#a2a-delegated#")) { + // This is a delegated message from agent-to-agent test - complete it + String actualContent = userInput.substring("#a2a-delegated#".length()); + agentEmitter.startWork(); + String response = "Handled locally: " + actualContent; + agentEmitter.addArtifact(List.of(new TextPart(response))); + agentEmitter.complete(); + return; + } + } + + // Default handler: echo back message or task if (context.getMessage() != null) { agentEmitter.sendMessage(context.getMessage()); } else { @@ -84,6 +130,135 @@ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2A throw new UnsupportedOperationError(); } } + + /** + * Handles agent-to-agent communication testing. + * Detects "delegate:" prefix and forwards requests to another agent via client. + */ + private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEmitter) throws A2AError { + try { + // Get transport protocol from ServerCallContext + ServerCallContext callContext = context.getCallContext(); + if (callContext == null) { + agentEmitter.fail(new InternalError("No call context available for agent-to-agent test")); + return; + } + + TransportProtocol transportProtocol = (TransportProtocol) callContext.getState().get(TRANSPORT_KEY); + if (transportProtocol == null) { + agentEmitter.fail(new InternalError("Transport type not set in call context")); + return; + } + + String serverUrl = getServerUrl(transportProtocol); + + // Extract user message + String userInput = context.getUserInput("\n"); + + // Check for delegation pattern + if (userInput.startsWith("delegate:")) { + handleDelegation(userInput, transportProtocol, serverUrl, agentEmitter); + } else { + handleLocally(userInput, agentEmitter); + } + } catch (Exception e) { + agentEmitter.fail(new InternalError("Agent-to-agent test failed: " + e.getMessage())); + } + } + + /** + * Handles delegation by forwarding to another agent via client. + */ + private void handleDelegation(String userInput, TransportProtocol transportProtocol, + String serverUrl, AgentEmitter agentEmitter) { + // Strip "delegate:" prefix + String delegatedContent = userInput.substring("delegate:".length()).trim(); + + // Create client for same transport + Client client = null; + try { + client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol, serverUrl); + + agentEmitter.startWork(); + + // Set up consumer to capture task result + CountDownLatch latch = new CountDownLatch(1); + AtomicReference resultRef = new AtomicReference<>(); + AtomicReference errorRef = new AtomicReference<>(); + + BiConsumer consumer = (event, agentCard) -> { + Task task = null; + if (event instanceof TaskEvent taskEvent) { + task = taskEvent.getTask(); + } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { + task = taskUpdateEvent.getTask(); + } + + if (task != null && task.status().state().isFinal()) { + resultRef.set(task); + latch.countDown(); + } + }; + + // Delegate to another agent (new task on same server) + // Add a marker so the receiving agent knows to complete the task + Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent); + client.sendMessage(delegatedMessage, List.of(consumer), error -> { + errorRef.set(error); + latch.countDown(); + }); + + // Wait for response + if (!latch.await(30, TimeUnit.SECONDS)) { + agentEmitter.fail(new InternalError("Timeout waiting for delegated response")); + return; + } + + Task delegatedResult = resultRef.get(); + + // Check for error only if we didn't get a successful result + // (errors can occur after completion due to stream cleanup) + if (delegatedResult == null && errorRef.get() != null) { + agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage())); + return; + } + + if (delegatedResult == null) { + agentEmitter.fail(new InternalError("No result received from delegation")); + return; + } + + // Extract artifacts from delegated task and add to current task + // NOTE: We cannot use emitter.addTask(delegatedResult) because it has a different taskId + if (delegatedResult.artifacts() != null && !delegatedResult.artifacts().isEmpty()) { + for (Artifact artifact : delegatedResult.artifacts()) { + agentEmitter.addArtifact(artifact.parts()); + } + } + + // Complete current task + agentEmitter.complete(); + } catch (A2AClientException e) { + agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage())); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + agentEmitter.fail(new InternalError("Interrupted while waiting for response")); + } finally { + if (client != null) { + client.close(); + } + } + } + + /** + * Handles request locally without delegation. + */ + private void handleLocally(String userInput, AgentEmitter agentEmitter) { + agentEmitter.startWork(); + String response = "Handled locally: " + userInput; + agentEmitter.addArtifact(List.of(new TextPart(response))); + agentEmitter.complete(); + } }; } @@ -103,4 +278,24 @@ private String extractTextFromMessage(final Message message) { } return textBuilder.toString(); } + + /** + * Gets the server URL for testing based on transport protocol. + * Uses the same port property as AgentCardProducer. + * + * @param transportProtocol the transport protocol + * @return server URL (e.g., "http://localhost:8081" or "localhost:9090") + */ + private static String getServerUrl(TransportProtocol transportProtocol) { + // Use same property as AgentCardProducer + String port = System.getProperty("test.agent.card.port", "8081"); + + // Construct URL using same logic as AgentCardProducer + if (transportProtocol == TransportProtocol.GRPC) { + return "localhost:" + port; + } else { + // JSONRPC and HTTP_JSON both use HTTP + return "http://localhost:" + port; + } + } } diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java new file mode 100644 index 000000000..17b2c8de5 --- /dev/null +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java @@ -0,0 +1,99 @@ +package io.a2a.server.apps.common; + +import io.a2a.client.Client; +import io.a2a.client.ClientBuilder; +import io.a2a.client.config.ClientConfig; +import io.a2a.client.transport.grpc.GrpcTransport; +import io.a2a.client.transport.grpc.GrpcTransportConfigBuilder; +import io.a2a.client.transport.jsonrpc.JSONRPCTransport; +import io.a2a.client.transport.jsonrpc.JSONRPCTransportConfigBuilder; +import io.a2a.client.transport.rest.RestTransport; +import io.a2a.client.transport.rest.RestTransportConfigBuilder; +import io.a2a.spec.A2AClientException; +import io.a2a.spec.AgentCard; +import io.a2a.spec.TransportProtocol; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +/** + * Helper class for creating A2A clients for agent-to-agent communication testing. + * Uses reflection to load transport-specific factory classes to avoid class loading + * issues when transport dependencies aren't on the classpath. + */ +public class AgentToAgentClientFactory { + + /** + * Creates a client for the specified transport protocol. + * Uses reflection to load the appropriate factory class, ensuring that + * only the factory for the active transport is loaded. + * + * @param agentCard + * @param transportProtocol the transport protocol to use + * @param serverUrl the server URL (e.g., "http://localhost:8081" or "localhost:9090") + * @return configured client + * @throws A2AClientException if client creation fails + */ + public static Client createClient(AgentCard agentCard, TransportProtocol transportProtocol, String serverUrl) + throws A2AClientException { + String factoryClassName; + + ClientConfig clientConfig = ClientConfig.builder() + .setStreaming(false) + .build(); + + ClientBuilder clientBuilder = Client.builder(agentCard) + .clientConfig(clientConfig); + + ClientTransportEnhancer enhancer; + switch (transportProtocol) { + case JSONRPC: + enhancer = new JsonRpcClientEnhancer(); + break; + case GRPC: + enhancer = new GrpcClientEnhancer(); + break; + case HTTP_JSON: + enhancer = new RestClientEnhancer(); + break; + default: + throw new IllegalArgumentException("Unsupported transport: " + transportProtocol); + } + + enhancer.enhance(clientBuilder, serverUrl); + return clientBuilder.build(); + } + + /** + * The implementations of this interface are needed to avoid ClassNotFoundErrors for client transports that are + * not on the classpath. + */ + interface ClientTransportEnhancer { + void enhance(ClientBuilder clientBuilder, String serverUrl); + } + + private static class GrpcClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer { + + @Override + public void enhance(ClientBuilder clientBuilder, String serverUrl) { + clientBuilder.withTransport(GrpcTransport.class, new GrpcTransportConfigBuilder().channelFactory(target -> { + ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build(); + return channel; + })); + } + } + + private static class JsonRpcClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer { + @Override + public void enhance(ClientBuilder clientBuilder, String serverUrl) { + clientBuilder.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfigBuilder()); + } + } + + private static class RestClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer { + @Override + public void enhance(ClientBuilder clientBuilder, String serverUrl) { + clientBuilder.withTransport(RestTransport.class, new RestTransportConfigBuilder()); + } + } +} + diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java index 7a312a5d0..277a763cd 100644 --- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java +++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java @@ -2,6 +2,7 @@ import static io.a2a.grpc.utils.ProtoUtils.FromProto; import static io.a2a.grpc.utils.ProtoUtils.ToProto; +import static io.a2a.server.ServerCallContext.TRANSPORT_KEY; import java.util.HashMap; import java.util.HashSet; @@ -53,6 +54,7 @@ import io.a2a.spec.TaskNotFoundError; import io.a2a.spec.TaskPushNotificationConfig; import io.a2a.spec.TaskQueryParams; +import io.a2a.spec.TransportProtocol; import io.a2a.spec.UnsupportedOperationError; import io.a2a.spec.VersionNotSupportedError; import io.a2a.transport.grpc.context.GrpcContextKeys; @@ -403,7 +405,8 @@ private ServerCallContext createCallContext(StreamObserver responseObserv // This handles both CDI injection scenarios and test scenarios where callContextFactory is null User user = UnauthenticatedUser.INSTANCE; Map state = new HashMap<>(); - + state.put(TRANSPORT_KEY, TransportProtocol.GRPC); + // Enhanced gRPC context access - equivalent to Python's grpc.aio.ServicerContext // The A2AExtensionsInterceptor captures ServerCall + Metadata and stores them in gRPC Context // This provides proper equivalence to Python's ServicerContext for metadata access From 1292d986b4c7a2ed1bd256eb2fbcf0814adc83c0 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Fri, 13 Feb 2026 15:35:47 +0000 Subject: [PATCH 2/7] Gemini review --- .../apps/common/AbstractA2AServerTest.java | 29 +++++++++++++++---- .../common/AgentToAgentClientFactory.java | 10 ++----- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index fdbba462c..e186b2fad 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -2453,8 +2453,11 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception { } /** - * Test agent-to-agent communication where AgentExecutor uses a client - * to delegate requests to another agent (same server, for testing). + * Test agent-to-agent communication with delegation pattern. + *

+ * Verifies that an AgentExecutor can use a client to delegate work to another agent + * by using the "delegate:" prefix. The delegated request is forwarded to another agent + * on the same server, and the artifacts from the delegated task are extracted and returned. *

* This test verifies: *

    @@ -2463,12 +2466,11 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception { *
  • Delegation pattern ("delegate:" prefix) is recognized
  • *
  • Client successfully communicates with same server
  • *
  • Artifacts from delegated task are extracted and returned
  • - *
  • Local handling works without delegation
  • + *
  • Original task ID is preserved (not replaced by delegated task ID)
  • *
*/ @Test - public void testAgentToAgentCommunication() throws Exception { - // Test 1: Delegation pattern (with "delegate:" prefix) + public void testAgentToAgentDelegation() throws Exception { String delegationTaskId = "agent-to-agent-test-" + UUID.randomUUID(); Message delegationMessage = Message.builder() @@ -2525,8 +2527,23 @@ public void testAgentToAgentCommunication() throws Exception { // Verify the task ID is the original one (not the delegated task's ID) assertEquals(delegationTaskId, delegationResult.id(), "Task ID should be the original task ID, not the delegated task's ID"); + } - // Test 2: Local handling (without "delegate:" prefix) + /** + * Test agent-to-agent communication with local handling (no delegation). + *

+ * Verifies that requests without the "delegate:" prefix are handled locally + * by the agent without creating a client connection. + *

+ * This test verifies: + *

    + *
  • Requests without "delegate:" prefix are handled locally
  • + *
  • No client-to-client communication occurs for local handling
  • + *
  • Task completes successfully with expected content
  • + *
+ */ + @Test + public void testAgentToAgentLocalHandling() throws Exception { String localTaskId = "agent-to-agent-test-" + UUID.randomUUID(); Message localMessage = Message.builder() diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java index 17b2c8de5..7843f58a3 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java @@ -17,17 +17,14 @@ /** * Helper class for creating A2A clients for agent-to-agent communication testing. - * Uses reflection to load transport-specific factory classes to avoid class loading - * issues when transport dependencies aren't on the classpath. + * Uses inner classes to avoid class loading issues when transport dependencies aren't on the classpath. */ public class AgentToAgentClientFactory { /** * Creates a client for the specified transport protocol. - * Uses reflection to load the appropriate factory class, ensuring that - * only the factory for the active transport is loaded. * - * @param agentCard + * @param agentCard the agentcard of the remote server * @param transportProtocol the transport protocol to use * @param serverUrl the server URL (e.g., "http://localhost:8081" or "localhost:9090") * @return configured client @@ -35,8 +32,6 @@ public class AgentToAgentClientFactory { */ public static Client createClient(AgentCard agentCard, TransportProtocol transportProtocol, String serverUrl) throws A2AClientException { - String factoryClassName; - ClientConfig clientConfig = ClientConfig.builder() .setStreaming(false) .build(); @@ -72,7 +67,6 @@ interface ClientTransportEnhancer { } private static class GrpcClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer { - @Override public void enhance(ClientBuilder clientBuilder, String serverUrl) { clientBuilder.withTransport(GrpcTransport.class, new GrpcTransportConfigBuilder().channelFactory(target -> { From 65aeefe46293b5916e8c0e18d81a6e9bbbefb1c6 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 16 Feb 2026 16:18:44 +0000 Subject: [PATCH 3/7] Gemini review --- .../apps/common/AbstractA2AServerTest.java | 55 ++++++++++--------- .../apps/common/AgentExecutorProducer.java | 9 +-- .../common/AgentToAgentClientFactory.java | 20 ++----- 3 files changed, 36 insertions(+), 48 deletions(-) diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index e186b2fad..b011a0ba6 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -2484,19 +2484,8 @@ public void testAgentToAgentDelegation() throws Exception { AtomicReference delegationResultRef = new AtomicReference<>(); AtomicReference delegationErrorRef = new AtomicReference<>(); - BiConsumer delegationConsumer = (event, agentCard) -> { - Task task = null; - if (event instanceof TaskEvent taskEvent) { - task = taskEvent.getTask(); - } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { - task = taskUpdateEvent.getTask(); - } - - if (task != null && task.status().state().isFinal()) { - delegationResultRef.set(task); - delegationLatch.countDown(); - } - }; + BiConsumer delegationConsumer = + createTaskCaptureConsumer(delegationResultRef, delegationLatch); getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> { delegationErrorRef.set(error); @@ -2557,19 +2546,8 @@ public void testAgentToAgentLocalHandling() throws Exception { AtomicReference localResultRef = new AtomicReference<>(); AtomicReference localErrorRef = new AtomicReference<>(); - BiConsumer localConsumer = (event, agentCard) -> { - Task task = null; - if (event instanceof TaskEvent taskEvent) { - task = taskEvent.getTask(); - } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { - task = taskUpdateEvent.getTask(); - } - - if (task != null && task.status().state().isFinal()) { - localResultRef.set(task); - localLatch.countDown(); - } - }; + BiConsumer localConsumer = + createTaskCaptureConsumer(localResultRef, localLatch); getClient().sendMessage(localMessage, List.of(localConsumer), error -> { localErrorRef.set(error); @@ -2589,6 +2567,31 @@ public void testAgentToAgentLocalHandling() throws Exception { "Should be handled locally without delegation. Got: " + localText); } + /** + * Creates a BiConsumer that captures the final task state. + * This helper method reduces code duplication in agent-to-agent tests. + * + * @param taskRef the AtomicReference to store the final task + * @param latch the CountDownLatch to signal completion + * @return a BiConsumer that captures completed tasks + */ + private BiConsumer createTaskCaptureConsumer( + AtomicReference taskRef, CountDownLatch latch) { + return (event, agentCard) -> { + Task task = null; + if (event instanceof TaskEvent taskEvent) { + task = taskEvent.getTask(); + } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { + task = taskUpdateEvent.getTask(); + } + + if (task != null && task.status().state().isFinal()) { + taskRef.set(task); + latch.countDown(); + } + }; + } + /** * Extracts all text from a task's artifacts. * diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java index 00fd55fbd..771bf8239 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java @@ -175,10 +175,7 @@ private void handleDelegation(String userInput, TransportProtocol transportProto String delegatedContent = userInput.substring("delegate:".length()).trim(); // Create client for same transport - Client client = null; - try { - client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol, serverUrl); - + try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol, serverUrl)) { agentEmitter.startWork(); // Set up consumer to capture task result @@ -243,10 +240,6 @@ private void handleDelegation(String userInput, TransportProtocol transportProto } catch (InterruptedException e) { Thread.currentThread().interrupt(); agentEmitter.fail(new InternalError("Interrupted while waiting for response")); - } finally { - if (client != null) { - client.close(); - } } } diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java index 7843f58a3..c545b8b30 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java @@ -39,20 +39,12 @@ public static Client createClient(AgentCard agentCard, TransportProtocol transpo ClientBuilder clientBuilder = Client.builder(agentCard) .clientConfig(clientConfig); - ClientTransportEnhancer enhancer; - switch (transportProtocol) { - case JSONRPC: - enhancer = new JsonRpcClientEnhancer(); - break; - case GRPC: - enhancer = new GrpcClientEnhancer(); - break; - case HTTP_JSON: - enhancer = new RestClientEnhancer(); - break; - default: - throw new IllegalArgumentException("Unsupported transport: " + transportProtocol); - } + ClientTransportEnhancer enhancer = switch (transportProtocol) { + case JSONRPC -> new JsonRpcClientEnhancer(); + case GRPC -> new GrpcClientEnhancer(); + case HTTP_JSON -> new RestClientEnhancer(); + default -> throw new IllegalArgumentException("Unsupported transport: " + transportProtocol); + }; enhancer.enhance(clientBuilder, serverUrl); return clientBuilder.build(); From 20d556a3c51f4494487213f6c8451bff87ac12ae Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 16 Feb 2026 16:26:03 +0000 Subject: [PATCH 4/7] Flaky test --- .../io/a2a/server/apps/common/AbstractA2AServerTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index b011a0ba6..0732929be 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -2555,9 +2555,15 @@ public void testAgentToAgentLocalHandling() throws Exception { }); assertTrue(localLatch.await(30, TimeUnit.SECONDS), "Local handling should complete within timeout"); - assertNull(localErrorRef.get(), "Local handling should not have errors"); Task localResult = localResultRef.get(); + + // Only fail on errors if we didn't get a successful result + // (errors can occur after completion due to stream cleanup) + if (localResult == null && localErrorRef.get() != null) { + fail("Local handling failed: " + localErrorRef.get().getMessage()); + } + assertNotNull(localResult, "Local task should not be null"); assertEquals(TaskState.COMPLETED, localResult.status().state(), "Local task should be completed"); From 9c46b27c9b503d95ba7c9637eb0f0114dd6611be Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 16 Feb 2026 17:25:59 +0000 Subject: [PATCH 5/7] Get rid of serverUrl --- .../apps/common/AbstractA2AServerTest.java | 29 +---------- .../apps/common/AgentExecutorProducer.java | 44 +++------------- .../common/AgentToAgentClientFactory.java | 50 ++++++++++++++++--- 3 files changed, 50 insertions(+), 73 deletions(-) diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index 0732929be..eeb4357b3 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -2485,7 +2485,7 @@ public void testAgentToAgentDelegation() throws Exception { AtomicReference delegationErrorRef = new AtomicReference<>(); BiConsumer delegationConsumer = - createTaskCaptureConsumer(delegationResultRef, delegationLatch); + AgentToAgentClientFactory.createTaskCaptureConsumer(delegationResultRef, delegationLatch); getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> { delegationErrorRef.set(error); @@ -2547,7 +2547,7 @@ public void testAgentToAgentLocalHandling() throws Exception { AtomicReference localErrorRef = new AtomicReference<>(); BiConsumer localConsumer = - createTaskCaptureConsumer(localResultRef, localLatch); + AgentToAgentClientFactory.createTaskCaptureConsumer(localResultRef, localLatch); getClient().sendMessage(localMessage, List.of(localConsumer), error -> { localErrorRef.set(error); @@ -2573,31 +2573,6 @@ public void testAgentToAgentLocalHandling() throws Exception { "Should be handled locally without delegation. Got: " + localText); } - /** - * Creates a BiConsumer that captures the final task state. - * This helper method reduces code duplication in agent-to-agent tests. - * - * @param taskRef the AtomicReference to store the final task - * @param latch the CountDownLatch to signal completion - * @return a BiConsumer that captures completed tasks - */ - private BiConsumer createTaskCaptureConsumer( - AtomicReference taskRef, CountDownLatch latch) { - return (event, agentCard) -> { - Task task = null; - if (event instanceof TaskEvent taskEvent) { - task = taskEvent.getTask(); - } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { - task = taskUpdateEvent.getTask(); - } - - if (task != null && task.status().state().isFinal()) { - taskRef.set(task); - latch.countDown(); - } - }; - } - /** * Extracts all text from a task's artifacts. * diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java index 771bf8239..3f7e3c0f1 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java @@ -40,6 +40,7 @@ @IfBuildProfile("test") public class AgentExecutorProducer { + // Inject the existing AgentCard to avoid special handling for grpc @Inject @PublicAgentCard AgentCard agentCard; @@ -150,14 +151,12 @@ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEm return; } - String serverUrl = getServerUrl(transportProtocol); - // Extract user message String userInput = context.getUserInput("\n"); // Check for delegation pattern if (userInput.startsWith("delegate:")) { - handleDelegation(userInput, transportProtocol, serverUrl, agentEmitter); + handleDelegation(userInput, transportProtocol, agentEmitter); } else { handleLocally(userInput, agentEmitter); } @@ -170,12 +169,12 @@ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEm * Handles delegation by forwarding to another agent via client. */ private void handleDelegation(String userInput, TransportProtocol transportProtocol, - String serverUrl, AgentEmitter agentEmitter) { + AgentEmitter agentEmitter) { // Strip "delegate:" prefix String delegatedContent = userInput.substring("delegate:".length()).trim(); // Create client for same transport - try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol, serverUrl)) { + try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol)) { agentEmitter.startWork(); // Set up consumer to capture task result @@ -183,19 +182,8 @@ private void handleDelegation(String userInput, TransportProtocol transportProto AtomicReference resultRef = new AtomicReference<>(); AtomicReference errorRef = new AtomicReference<>(); - BiConsumer consumer = (event, agentCard) -> { - Task task = null; - if (event instanceof TaskEvent taskEvent) { - task = taskEvent.getTask(); - } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { - task = taskUpdateEvent.getTask(); - } - - if (task != null && task.status().state().isFinal()) { - resultRef.set(task); - latch.countDown(); - } - }; + BiConsumer consumer = + AgentToAgentClientFactory.createTaskCaptureConsumer(resultRef, latch); // Delegate to another agent (new task on same server) // Add a marker so the receiving agent knows to complete the task @@ -271,24 +259,4 @@ private String extractTextFromMessage(final Message message) { } return textBuilder.toString(); } - - /** - * Gets the server URL for testing based on transport protocol. - * Uses the same port property as AgentCardProducer. - * - * @param transportProtocol the transport protocol - * @return server URL (e.g., "http://localhost:8081" or "localhost:9090") - */ - private static String getServerUrl(TransportProtocol transportProtocol) { - // Use same property as AgentCardProducer - String port = System.getProperty("test.agent.card.port", "8081"); - - // Construct URL using same logic as AgentCardProducer - if (transportProtocol == TransportProtocol.GRPC) { - return "localhost:" + port; - } else { - // JSONRPC and HTTP_JSON both use HTTP - return "http://localhost:" + port; - } - } } diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java index c545b8b30..f3c06eb66 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java @@ -1,7 +1,14 @@ package io.a2a.server.apps.common; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; + import io.a2a.client.Client; import io.a2a.client.ClientBuilder; +import io.a2a.client.ClientEvent; +import io.a2a.client.TaskEvent; +import io.a2a.client.TaskUpdateEvent; import io.a2a.client.config.ClientConfig; import io.a2a.client.transport.grpc.GrpcTransport; import io.a2a.client.transport.grpc.GrpcTransportConfigBuilder; @@ -11,6 +18,7 @@ import io.a2a.client.transport.rest.RestTransportConfigBuilder; import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; +import io.a2a.spec.Task; import io.a2a.spec.TransportProtocol; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -21,16 +29,42 @@ */ public class AgentToAgentClientFactory { + /** + * Creates a BiConsumer that captures the final task state. + * This utility method is used by both test classes and agent executors to avoid code duplication. + * + * @param taskRef the AtomicReference to store the final task + * @param latch the CountDownLatch to signal completion + * @return a BiConsumer that captures completed tasks + */ + public static BiConsumer createTaskCaptureConsumer( + AtomicReference taskRef, CountDownLatch latch) { + return (event, agentCard) -> { + Task task = null; + if (event instanceof TaskEvent taskEvent) { + task = taskEvent.getTask(); + } else if (event instanceof TaskUpdateEvent taskUpdateEvent) { + task = taskUpdateEvent.getTask(); + } + + if (task != null && task.status().state().isFinal()) { + taskRef.set(task); + latch.countDown(); + } + }; + } + /** * Creates a client for the specified transport protocol. + * The agent card parameter already contains the correct local endpoint URLs + * configured by the test's AgentCardProducer. * - * @param agentCard the agentcard of the remote server + * @param agentCard the agent card with correct local endpoints * @param transportProtocol the transport protocol to use - * @param serverUrl the server URL (e.g., "http://localhost:8081" or "localhost:9090") * @return configured client * @throws A2AClientException if client creation fails */ - public static Client createClient(AgentCard agentCard, TransportProtocol transportProtocol, String serverUrl) + public static Client createClient(AgentCard agentCard, TransportProtocol transportProtocol) throws A2AClientException { ClientConfig clientConfig = ClientConfig.builder() .setStreaming(false) @@ -46,7 +80,7 @@ public static Client createClient(AgentCard agentCard, TransportProtocol transpo default -> throw new IllegalArgumentException("Unsupported transport: " + transportProtocol); }; - enhancer.enhance(clientBuilder, serverUrl); + enhancer.enhance(clientBuilder); return clientBuilder.build(); } @@ -55,12 +89,12 @@ public static Client createClient(AgentCard agentCard, TransportProtocol transpo * not on the classpath. */ interface ClientTransportEnhancer { - void enhance(ClientBuilder clientBuilder, String serverUrl); + void enhance(ClientBuilder clientBuilder); } private static class GrpcClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer { @Override - public void enhance(ClientBuilder clientBuilder, String serverUrl) { + public void enhance(ClientBuilder clientBuilder) { clientBuilder.withTransport(GrpcTransport.class, new GrpcTransportConfigBuilder().channelFactory(target -> { ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build(); return channel; @@ -70,14 +104,14 @@ public void enhance(ClientBuilder clientBuilder, String serverUrl) { private static class JsonRpcClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer { @Override - public void enhance(ClientBuilder clientBuilder, String serverUrl) { + public void enhance(ClientBuilder clientBuilder) { clientBuilder.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfigBuilder()); } } private static class RestClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer { @Override - public void enhance(ClientBuilder clientBuilder, String serverUrl) { + public void enhance(ClientBuilder clientBuilder) { clientBuilder.withTransport(RestTransport.class, new RestTransportConfigBuilder()); } } From 62356dce047056c5e170dde4111e452a2dc81d57 Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Tue, 17 Feb 2026 09:27:55 +0000 Subject: [PATCH 6/7] Fixes after rebase --- .../java/io/a2a/server/apps/common/AbstractA2AServerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index eeb4357b3..1977df8cd 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -2503,7 +2503,7 @@ public void testAgentToAgentDelegation() throws Exception { } assertNotNull(delegationResult, "Delegation task should not be null"); - assertEquals(TaskState.COMPLETED, delegationResult.status().state(), + assertEquals(TaskState.TASK_STATE_COMPLETED, delegationResult.status().state(), "Delegation task should be completed"); assertNotNull(delegationResult.artifacts(), "Delegation should have artifacts"); assertFalse(delegationResult.artifacts().isEmpty(), "Delegation should have at least one artifact"); @@ -2565,7 +2565,7 @@ public void testAgentToAgentLocalHandling() throws Exception { } assertNotNull(localResult, "Local task should not be null"); - assertEquals(TaskState.COMPLETED, localResult.status().state(), + assertEquals(TaskState.TASK_STATE_COMPLETED, localResult.status().state(), "Local task should be completed"); String localText = extractTextFromTask(localResult); From 556b34fc745a9cf59272912348d99a4e76b7169f Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Tue, 17 Feb 2026 12:15:36 +0000 Subject: [PATCH 7/7] Add diagnostic logging for the failing test --- .../apps/common/AgentExecutorProducer.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java index 3f7e3c0f1..d62391bf8 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java @@ -153,6 +153,10 @@ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEm // Extract user message String userInput = context.getUserInput("\n"); + if (userInput == null || userInput.isEmpty()) { + agentEmitter.fail(new InternalError("No user input received")); + return; + } // Check for delegation pattern if (userInput.startsWith("delegate:")) { @@ -161,6 +165,8 @@ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEm handleLocally(userInput, agentEmitter); } } catch (Exception e) { + // Log the full stack trace to help debug intermittent failures + e.printStackTrace(); agentEmitter.fail(new InternalError("Agent-to-agent test failed: " + e.getMessage())); } } @@ -235,10 +241,16 @@ private void handleDelegation(String userInput, TransportProtocol transportProto * Handles request locally without delegation. */ private void handleLocally(String userInput, AgentEmitter agentEmitter) { - agentEmitter.startWork(); - String response = "Handled locally: " + userInput; - agentEmitter.addArtifact(List.of(new TextPart(response))); - agentEmitter.complete(); + try { + agentEmitter.startWork(); + String response = "Handled locally: " + userInput; + agentEmitter.addArtifact(List.of(new TextPart(response))); + agentEmitter.complete(); + } catch (Exception e) { + // Defensive catch to ensure we always emit a final state + e.printStackTrace(); + agentEmitter.fail(new InternalError("Local handling failed: " + e.getMessage())); + } } }; }