state;
diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
index f2da8de15..1977df8cd 100644
--- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
+++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java
@@ -19,6 +19,7 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@@ -28,6 +29,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.ws.rs.core.MediaType;
@@ -617,7 +619,6 @@ public void testGetExtendedAgentCard() throws A2AClientException {
.filter(i -> getTransportProtocol().equals(i.protocolBinding()))
.findFirst();
assertTrue(transportInterface.isPresent());
- System.out.println("transportInterface = " + transportInterface);
assertEquals(getTransportUrl(),transportInterface.get().url());
assertEquals("1.0", agentCard.version());
assertEquals("http://example.com/docs", agentCard.documentationUrl());
@@ -2451,4 +2452,142 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception {
}
}
+ /**
+ * Test agent-to-agent communication with delegation pattern.
+ *
+ * Verifies that an AgentExecutor can use a client to delegate work to another agent
+ * by using the "delegate:" prefix. The delegated request is forwarded to another agent
+ * on the same server, and the artifacts from the delegated task are extracted and returned.
+ *
+ * This test verifies:
+ *
+ * - Transport type is correctly passed via ServerCallContext state
+ * - AgentExecutor can create a client with matching transport
+ * - Delegation pattern ("delegate:" prefix) is recognized
+ * - Client successfully communicates with same server
+ * - Artifacts from delegated task are extracted and returned
+ * - Original task ID is preserved (not replaced by delegated task ID)
+ *
+ */
+ @Test
+ public void testAgentToAgentDelegation() throws Exception {
+ String delegationTaskId = "agent-to-agent-test-" + UUID.randomUUID();
+
+ Message delegationMessage = Message.builder()
+ .taskId(delegationTaskId)
+ .contextId("agent-to-agent-context")
+ .role(Message.Role.USER)
+ .parts(new TextPart("delegate:What is 2+2?"))
+ .build();
+
+ CountDownLatch delegationLatch = new CountDownLatch(1);
+ AtomicReference delegationResultRef = new AtomicReference<>();
+ AtomicReference delegationErrorRef = new AtomicReference<>();
+
+ BiConsumer delegationConsumer =
+ AgentToAgentClientFactory.createTaskCaptureConsumer(delegationResultRef, delegationLatch);
+
+ getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> {
+ delegationErrorRef.set(error);
+ delegationLatch.countDown();
+ });
+
+ assertTrue(delegationLatch.await(30, TimeUnit.SECONDS), "Delegation should complete within timeout");
+
+ Task delegationResult = delegationResultRef.get();
+
+ // Only fail on errors if we didn't get a successful result
+ // (errors can occur after completion due to stream cleanup)
+ if (delegationResult == null && delegationErrorRef.get() != null) {
+ fail("Delegation failed: " + delegationErrorRef.get().getMessage());
+ }
+
+ assertNotNull(delegationResult, "Delegation task should not be null");
+ assertEquals(TaskState.TASK_STATE_COMPLETED, delegationResult.status().state(),
+ "Delegation task should be completed");
+ assertNotNull(delegationResult.artifacts(), "Delegation should have artifacts");
+ assertFalse(delegationResult.artifacts().isEmpty(), "Delegation should have at least one artifact");
+
+ // Extract text from result
+ String delegatedText = extractTextFromTask(delegationResult);
+ assertTrue(delegatedText.contains("Handled locally:"),
+ "Delegated content should have been handled locally by target agent. Got: " + delegatedText);
+
+ // Verify the task ID is the original one (not the delegated task's ID)
+ assertEquals(delegationTaskId, delegationResult.id(),
+ "Task ID should be the original task ID, not the delegated task's ID");
+ }
+
+ /**
+ * Test agent-to-agent communication with local handling (no delegation).
+ *
+ * Verifies that requests without the "delegate:" prefix are handled locally
+ * by the agent without creating a client connection.
+ *
+ * This test verifies:
+ *
+ * - Requests without "delegate:" prefix are handled locally
+ * - No client-to-client communication occurs for local handling
+ * - Task completes successfully with expected content
+ *
+ */
+ @Test
+ public void testAgentToAgentLocalHandling() throws Exception {
+ String localTaskId = "agent-to-agent-test-" + UUID.randomUUID();
+
+ Message localMessage = Message.builder()
+ .taskId(localTaskId)
+ .contextId("agent-to-agent-context")
+ .role(Message.Role.USER)
+ .parts(new TextPart("Hello directly"))
+ .build();
+
+ CountDownLatch localLatch = new CountDownLatch(1);
+ AtomicReference localResultRef = new AtomicReference<>();
+ AtomicReference localErrorRef = new AtomicReference<>();
+
+ BiConsumer localConsumer =
+ AgentToAgentClientFactory.createTaskCaptureConsumer(localResultRef, localLatch);
+
+ getClient().sendMessage(localMessage, List.of(localConsumer), error -> {
+ localErrorRef.set(error);
+ localLatch.countDown();
+ });
+
+ assertTrue(localLatch.await(30, TimeUnit.SECONDS), "Local handling should complete within timeout");
+
+ Task localResult = localResultRef.get();
+
+ // Only fail on errors if we didn't get a successful result
+ // (errors can occur after completion due to stream cleanup)
+ if (localResult == null && localErrorRef.get() != null) {
+ fail("Local handling failed: " + localErrorRef.get().getMessage());
+ }
+
+ assertNotNull(localResult, "Local task should not be null");
+ assertEquals(TaskState.TASK_STATE_COMPLETED, localResult.status().state(),
+ "Local task should be completed");
+
+ String localText = extractTextFromTask(localResult);
+ assertTrue(localText.contains("Handled locally: Hello directly"),
+ "Should be handled locally without delegation. Got: " + localText);
+ }
+
+ /**
+ * Extracts all text from a task's artifacts.
+ *
+ * @param task the task containing artifacts
+ * @return concatenated text from all TextParts in all artifacts
+ */
+ private String extractTextFromTask(Task task) {
+ if (task.artifacts() == null || task.artifacts().isEmpty()) {
+ return "";
+ }
+ return task.artifacts().stream()
+ .flatMap(artifact -> artifact.parts().stream())
+ .filter(part -> part instanceof TextPart)
+ .map(part -> ((TextPart) part).text())
+ .collect(Collectors.joining("\n"));
+ }
+
}
diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
index 40839e7fb..d62391bf8 100644
--- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
+++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java
@@ -1,18 +1,38 @@
package io.a2a.server.apps.common;
+import static io.a2a.server.ServerCallContext.TRANSPORT_KEY;
+
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
+import jakarta.inject.Inject;
+import io.a2a.A2A;
+import io.a2a.client.Client;
+import io.a2a.client.ClientEvent;
+import io.a2a.client.TaskEvent;
+import io.a2a.client.TaskUpdateEvent;
+import io.a2a.server.PublicAgentCard;
+import io.a2a.server.ServerCallContext;
import io.a2a.server.agentexecution.AgentExecutor;
import io.a2a.server.agentexecution.RequestContext;
import io.a2a.server.tasks.AgentEmitter;
+import io.a2a.spec.A2AClientException;
import io.a2a.spec.A2AError;
+import io.a2a.spec.AgentCard;
+import io.a2a.spec.Artifact;
+import io.a2a.spec.InternalError;
import io.a2a.spec.InvalidParamsError;
import io.a2a.spec.Message;
import io.a2a.spec.Part;
+import io.a2a.spec.Task;
import io.a2a.spec.TextPart;
+import io.a2a.spec.TransportProtocol;
import io.a2a.spec.UnsupportedOperationError;
import io.quarkus.arc.profile.IfBuildProfile;
@@ -20,6 +40,11 @@
@IfBuildProfile("test")
public class AgentExecutorProducer {
+ // Inject the existing AgentCard to avoid special handling for grpc
+ @Inject
+ @PublicAgentCard
+ AgentCard agentCard;
+
@Produces
public AgentExecutor agentExecutor() {
return new AgentExecutor() {
@@ -27,6 +52,12 @@ public AgentExecutor agentExecutor() {
public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
String taskId = context.getTaskId();
+ // Agent-to-agent communication test
+ if (taskId != null && taskId.startsWith("agent-to-agent-test")) {
+ handleAgentToAgentTest(context, agentEmitter);
+ return;
+ }
+
// Special handling for multi-event test
if (taskId != null && taskId.startsWith("multi-event-test")) {
// First call: context.getTask() == null (new task)
@@ -69,6 +100,22 @@ public void execute(RequestContext context, AgentEmitter agentEmitter) throws A2
if (context.getTaskId().equals("task-not-supported-123")) {
throw new UnsupportedOperationError();
}
+
+ // Check for delegated agent-to-agent messages (marked with special prefix)
+ if (context.getMessage() != null) {
+ String userInput = extractTextFromMessage(context.getMessage());
+ if (userInput.startsWith("#a2a-delegated#")) {
+ // This is a delegated message from agent-to-agent test - complete it
+ String actualContent = userInput.substring("#a2a-delegated#".length());
+ agentEmitter.startWork();
+ String response = "Handled locally: " + actualContent;
+ agentEmitter.addArtifact(List.of(new TextPart(response)));
+ agentEmitter.complete();
+ return;
+ }
+ }
+
+ // Default handler: echo back message or task
if (context.getMessage() != null) {
agentEmitter.sendMessage(context.getMessage());
} else {
@@ -84,6 +131,127 @@ public void cancel(RequestContext context, AgentEmitter agentEmitter) throws A2A
throw new UnsupportedOperationError();
}
}
+
+ /**
+ * Handles agent-to-agent communication testing.
+ * Detects "delegate:" prefix and forwards requests to another agent via client.
+ */
+ private void handleAgentToAgentTest(RequestContext context, AgentEmitter agentEmitter) throws A2AError {
+ try {
+ // Get transport protocol from ServerCallContext
+ ServerCallContext callContext = context.getCallContext();
+ if (callContext == null) {
+ agentEmitter.fail(new InternalError("No call context available for agent-to-agent test"));
+ return;
+ }
+
+ TransportProtocol transportProtocol = (TransportProtocol) callContext.getState().get(TRANSPORT_KEY);
+ if (transportProtocol == null) {
+ agentEmitter.fail(new InternalError("Transport type not set in call context"));
+ return;
+ }
+
+ // Extract user message
+ String userInput = context.getUserInput("\n");
+ if (userInput == null || userInput.isEmpty()) {
+ agentEmitter.fail(new InternalError("No user input received"));
+ return;
+ }
+
+ // Check for delegation pattern
+ if (userInput.startsWith("delegate:")) {
+ handleDelegation(userInput, transportProtocol, agentEmitter);
+ } else {
+ handleLocally(userInput, agentEmitter);
+ }
+ } catch (Exception e) {
+ // Log the full stack trace to help debug intermittent failures
+ e.printStackTrace();
+ agentEmitter.fail(new InternalError("Agent-to-agent test failed: " + e.getMessage()));
+ }
+ }
+
+ /**
+ * Handles delegation by forwarding to another agent via client.
+ */
+ private void handleDelegation(String userInput, TransportProtocol transportProtocol,
+ AgentEmitter agentEmitter) {
+ // Strip "delegate:" prefix
+ String delegatedContent = userInput.substring("delegate:".length()).trim();
+
+ // Create client for same transport
+ try (Client client = AgentToAgentClientFactory.createClient(agentCard, transportProtocol)) {
+ agentEmitter.startWork();
+
+ // Set up consumer to capture task result
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference resultRef = new AtomicReference<>();
+ AtomicReference errorRef = new AtomicReference<>();
+
+ BiConsumer consumer =
+ AgentToAgentClientFactory.createTaskCaptureConsumer(resultRef, latch);
+
+ // Delegate to another agent (new task on same server)
+ // Add a marker so the receiving agent knows to complete the task
+ Message delegatedMessage = A2A.toUserMessage("#a2a-delegated#" + delegatedContent);
+ client.sendMessage(delegatedMessage, List.of(consumer), error -> {
+ errorRef.set(error);
+ latch.countDown();
+ });
+
+ // Wait for response
+ if (!latch.await(30, TimeUnit.SECONDS)) {
+ agentEmitter.fail(new InternalError("Timeout waiting for delegated response"));
+ return;
+ }
+
+ Task delegatedResult = resultRef.get();
+
+ // Check for error only if we didn't get a successful result
+ // (errors can occur after completion due to stream cleanup)
+ if (delegatedResult == null && errorRef.get() != null) {
+ agentEmitter.fail(new InternalError("Delegation failed: " + errorRef.get().getMessage()));
+ return;
+ }
+
+ if (delegatedResult == null) {
+ agentEmitter.fail(new InternalError("No result received from delegation"));
+ return;
+ }
+
+ // Extract artifacts from delegated task and add to current task
+ // NOTE: We cannot use emitter.addTask(delegatedResult) because it has a different taskId
+ if (delegatedResult.artifacts() != null && !delegatedResult.artifacts().isEmpty()) {
+ for (Artifact artifact : delegatedResult.artifacts()) {
+ agentEmitter.addArtifact(artifact.parts());
+ }
+ }
+
+ // Complete current task
+ agentEmitter.complete();
+ } catch (A2AClientException e) {
+ agentEmitter.fail(new InternalError("Failed to create client: " + e.getMessage()));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ agentEmitter.fail(new InternalError("Interrupted while waiting for response"));
+ }
+ }
+
+ /**
+ * Handles request locally without delegation.
+ */
+ private void handleLocally(String userInput, AgentEmitter agentEmitter) {
+ try {
+ agentEmitter.startWork();
+ String response = "Handled locally: " + userInput;
+ agentEmitter.addArtifact(List.of(new TextPart(response)));
+ agentEmitter.complete();
+ } catch (Exception e) {
+ // Defensive catch to ensure we always emit a final state
+ e.printStackTrace();
+ agentEmitter.fail(new InternalError("Local handling failed: " + e.getMessage()));
+ }
+ }
};
}
diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java
new file mode 100644
index 000000000..f3c06eb66
--- /dev/null
+++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentToAgentClientFactory.java
@@ -0,0 +1,119 @@
+package io.a2a.server.apps.common;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+
+import io.a2a.client.Client;
+import io.a2a.client.ClientBuilder;
+import io.a2a.client.ClientEvent;
+import io.a2a.client.TaskEvent;
+import io.a2a.client.TaskUpdateEvent;
+import io.a2a.client.config.ClientConfig;
+import io.a2a.client.transport.grpc.GrpcTransport;
+import io.a2a.client.transport.grpc.GrpcTransportConfigBuilder;
+import io.a2a.client.transport.jsonrpc.JSONRPCTransport;
+import io.a2a.client.transport.jsonrpc.JSONRPCTransportConfigBuilder;
+import io.a2a.client.transport.rest.RestTransport;
+import io.a2a.client.transport.rest.RestTransportConfigBuilder;
+import io.a2a.spec.A2AClientException;
+import io.a2a.spec.AgentCard;
+import io.a2a.spec.Task;
+import io.a2a.spec.TransportProtocol;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+/**
+ * Helper class for creating A2A clients for agent-to-agent communication testing.
+ * Uses inner classes to avoid class loading issues when transport dependencies aren't on the classpath.
+ */
+public class AgentToAgentClientFactory {
+
+ /**
+ * Creates a BiConsumer that captures the final task state.
+ * This utility method is used by both test classes and agent executors to avoid code duplication.
+ *
+ * @param taskRef the AtomicReference to store the final task
+ * @param latch the CountDownLatch to signal completion
+ * @return a BiConsumer that captures completed tasks
+ */
+ public static BiConsumer createTaskCaptureConsumer(
+ AtomicReference taskRef, CountDownLatch latch) {
+ return (event, agentCard) -> {
+ Task task = null;
+ if (event instanceof TaskEvent taskEvent) {
+ task = taskEvent.getTask();
+ } else if (event instanceof TaskUpdateEvent taskUpdateEvent) {
+ task = taskUpdateEvent.getTask();
+ }
+
+ if (task != null && task.status().state().isFinal()) {
+ taskRef.set(task);
+ latch.countDown();
+ }
+ };
+ }
+
+ /**
+ * Creates a client for the specified transport protocol.
+ * The agent card parameter already contains the correct local endpoint URLs
+ * configured by the test's AgentCardProducer.
+ *
+ * @param agentCard the agent card with correct local endpoints
+ * @param transportProtocol the transport protocol to use
+ * @return configured client
+ * @throws A2AClientException if client creation fails
+ */
+ public static Client createClient(AgentCard agentCard, TransportProtocol transportProtocol)
+ throws A2AClientException {
+ ClientConfig clientConfig = ClientConfig.builder()
+ .setStreaming(false)
+ .build();
+
+ ClientBuilder clientBuilder = Client.builder(agentCard)
+ .clientConfig(clientConfig);
+
+ ClientTransportEnhancer enhancer = switch (transportProtocol) {
+ case JSONRPC -> new JsonRpcClientEnhancer();
+ case GRPC -> new GrpcClientEnhancer();
+ case HTTP_JSON -> new RestClientEnhancer();
+ default -> throw new IllegalArgumentException("Unsupported transport: " + transportProtocol);
+ };
+
+ enhancer.enhance(clientBuilder);
+ return clientBuilder.build();
+ }
+
+ /**
+ * The implementations of this interface are needed to avoid ClassNotFoundErrors for client transports that are
+ * not on the classpath.
+ */
+ interface ClientTransportEnhancer {
+ void enhance(ClientBuilder clientBuilder);
+ }
+
+ private static class GrpcClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer {
+ @Override
+ public void enhance(ClientBuilder clientBuilder) {
+ clientBuilder.withTransport(GrpcTransport.class, new GrpcTransportConfigBuilder().channelFactory(target -> {
+ ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build();
+ return channel;
+ }));
+ }
+ }
+
+ private static class JsonRpcClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer {
+ @Override
+ public void enhance(ClientBuilder clientBuilder) {
+ clientBuilder.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfigBuilder());
+ }
+ }
+
+ private static class RestClientEnhancer implements AgentToAgentClientFactory.ClientTransportEnhancer {
+ @Override
+ public void enhance(ClientBuilder clientBuilder) {
+ clientBuilder.withTransport(RestTransport.class, new RestTransportConfigBuilder());
+ }
+ }
+}
+
diff --git a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
index 7a312a5d0..277a763cd 100644
--- a/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
+++ b/transport/grpc/src/main/java/io/a2a/transport/grpc/handler/GrpcHandler.java
@@ -2,6 +2,7 @@
import static io.a2a.grpc.utils.ProtoUtils.FromProto;
import static io.a2a.grpc.utils.ProtoUtils.ToProto;
+import static io.a2a.server.ServerCallContext.TRANSPORT_KEY;
import java.util.HashMap;
import java.util.HashSet;
@@ -53,6 +54,7 @@
import io.a2a.spec.TaskNotFoundError;
import io.a2a.spec.TaskPushNotificationConfig;
import io.a2a.spec.TaskQueryParams;
+import io.a2a.spec.TransportProtocol;
import io.a2a.spec.UnsupportedOperationError;
import io.a2a.spec.VersionNotSupportedError;
import io.a2a.transport.grpc.context.GrpcContextKeys;
@@ -403,7 +405,8 @@ private ServerCallContext createCallContext(StreamObserver responseObserv
// This handles both CDI injection scenarios and test scenarios where callContextFactory is null
User user = UnauthenticatedUser.INSTANCE;
Map state = new HashMap<>();
-
+ state.put(TRANSPORT_KEY, TransportProtocol.GRPC);
+
// Enhanced gRPC context access - equivalent to Python's grpc.aio.ServicerContext
// The A2AExtensionsInterceptor captures ServerCall + Metadata and stores them in gRPC Context
// This provides proper equivalence to Python's ServicerContext for metadata access