Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions reference/jsonrpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- JSON-RPC client transport for agent-to-agent communication tests -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>a2a-java-sdk-client-transport-jsonrpc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.a2a.server.apps.quarkus;

import static io.a2a.server.ServerCallContext.TRANSPORT_KEY;
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.HEADERS_KEY;
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.METHOD_NAME_KEY;
import static io.a2a.transport.jsonrpc.context.JSONRPCContextKeys.TENANT_KEY;
Expand Down Expand Up @@ -62,6 +63,7 @@
import io.a2a.spec.A2AError;
import io.a2a.spec.InternalError;
import io.a2a.spec.JSONParseError;
import io.a2a.spec.TransportProtocol;
import io.a2a.spec.UnsupportedOperationError;
import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
import io.quarkus.security.Authenticated;
Expand Down Expand Up @@ -246,6 +248,7 @@ public String getUsername() {
headerNames.forEach(name -> headers.put(name, rc.request().getHeader(name)));
state.put(HEADERS_KEY, headers);
state.put(TENANT_KEY, extractTenant(rc));
state.put(TRANSPORT_KEY, TransportProtocol.JSONRPC);

// Extract requested protocol version from X-A2A-Version header
String requestedVersion = rc.request().getHeader(A2AHeaders.X_A2A_VERSION);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.a2a.server.rest.quarkus;

import static io.a2a.server.ServerCallContext.TRANSPORT_KEY;
import static io.a2a.spec.A2AMethods.CANCEL_TASK_METHOD;
import static io.a2a.spec.A2AMethods.SEND_STREAMING_MESSAGE_METHOD;
import static io.a2a.transport.rest.context.RestContextKeys.HEADERS_KEY;
Expand Down Expand Up @@ -33,6 +34,7 @@
import io.a2a.spec.InternalError;
import io.a2a.spec.InvalidParamsError;
import io.a2a.spec.MethodNotFoundError;
import io.a2a.spec.TransportProtocol;
import io.a2a.transport.rest.handler.RestHandler;
import io.a2a.transport.rest.handler.RestHandler.HTTPRestResponse;
import io.a2a.transport.rest.handler.RestHandler.HTTPRestStreamingResponse;
Expand Down Expand Up @@ -430,6 +432,7 @@ public String getUsername() {
state.put(HEADERS_KEY, headers);
state.put(METHOD_NAME_KEY, jsonRpcMethodName);
state.put(TENANT_KEY, extractTenant(rc));
state.put(TRANSPORT_KEY, TransportProtocol.HTTP_JSON);

// Extract requested protocol version from X-A2A-Version header
String requestedVersion = rc.request().getHeader(A2AHeaders.X_A2A_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
import org.jspecify.annotations.Nullable;

public class ServerCallContext {
/**
* Key for transport protocol type in the state map.
* Value should be a {@link io.a2a.spec.TransportProtocol} instance.
*/
public static final String TRANSPORT_KEY = "transport";

// TODO Not totally sure yet about these field types
private final Map<Object, Object> modelConfig = new ConcurrentHashMap<>();
private final Map<String, Object> state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import jakarta.ws.rs.core.MediaType;
Expand Down Expand Up @@ -617,7 +619,6 @@ public void testGetExtendedAgentCard() throws A2AClientException {
.filter(i -> getTransportProtocol().equals(i.protocolBinding()))
.findFirst();
assertTrue(transportInterface.isPresent());
System.out.println("transportInterface = " + transportInterface);
assertEquals(getTransportUrl(),transportInterface.get().url());
assertEquals("1.0", agentCard.version());
assertEquals("http://example.com/docs", agentCard.documentationUrl());
Expand Down Expand Up @@ -2451,4 +2452,142 @@ public void testMainQueueClosesForFinalizedTasks() throws Exception {
}
}

/**
* Test agent-to-agent communication with delegation pattern.
* <p>
* Verifies that an AgentExecutor can use a client to delegate work to another agent
* by using the "delegate:" prefix. The delegated request is forwarded to another agent
* on the same server, and the artifacts from the delegated task are extracted and returned.
* <p>
* This test verifies:
* <ul>
* <li>Transport type is correctly passed via ServerCallContext state</li>
* <li>AgentExecutor can create a client with matching transport</li>
* <li>Delegation pattern ("delegate:" prefix) is recognized</li>
* <li>Client successfully communicates with same server</li>
* <li>Artifacts from delegated task are extracted and returned</li>
* <li>Original task ID is preserved (not replaced by delegated task ID)</li>
* </ul>
*/
@Test
public void testAgentToAgentDelegation() throws Exception {
String delegationTaskId = "agent-to-agent-test-" + UUID.randomUUID();

Message delegationMessage = Message.builder()
.taskId(delegationTaskId)
.contextId("agent-to-agent-context")
.role(Message.Role.USER)
.parts(new TextPart("delegate:What is 2+2?"))
.build();

CountDownLatch delegationLatch = new CountDownLatch(1);
AtomicReference<Task> delegationResultRef = new AtomicReference<>();
AtomicReference<Throwable> delegationErrorRef = new AtomicReference<>();

BiConsumer<ClientEvent, AgentCard> delegationConsumer =
AgentToAgentClientFactory.createTaskCaptureConsumer(delegationResultRef, delegationLatch);

getClient().sendMessage(delegationMessage, List.of(delegationConsumer), error -> {
delegationErrorRef.set(error);
delegationLatch.countDown();
});

assertTrue(delegationLatch.await(30, TimeUnit.SECONDS), "Delegation should complete within timeout");

Task delegationResult = delegationResultRef.get();

// Only fail on errors if we didn't get a successful result
// (errors can occur after completion due to stream cleanup)
if (delegationResult == null && delegationErrorRef.get() != null) {
fail("Delegation failed: " + delegationErrorRef.get().getMessage());
}

assertNotNull(delegationResult, "Delegation task should not be null");
assertEquals(TaskState.TASK_STATE_COMPLETED, delegationResult.status().state(),
"Delegation task should be completed");
assertNotNull(delegationResult.artifacts(), "Delegation should have artifacts");
assertFalse(delegationResult.artifacts().isEmpty(), "Delegation should have at least one artifact");

// Extract text from result
String delegatedText = extractTextFromTask(delegationResult);
assertTrue(delegatedText.contains("Handled locally:"),
"Delegated content should have been handled locally by target agent. Got: " + delegatedText);

// Verify the task ID is the original one (not the delegated task's ID)
assertEquals(delegationTaskId, delegationResult.id(),
"Task ID should be the original task ID, not the delegated task's ID");
}

/**
* Test agent-to-agent communication with local handling (no delegation).
* <p>
* Verifies that requests without the "delegate:" prefix are handled locally
* by the agent without creating a client connection.
* <p>
* This test verifies:
* <ul>
* <li>Requests without "delegate:" prefix are handled locally</li>
* <li>No client-to-client communication occurs for local handling</li>
* <li>Task completes successfully with expected content</li>
* </ul>
*/
@Test
public void testAgentToAgentLocalHandling() throws Exception {
String localTaskId = "agent-to-agent-test-" + UUID.randomUUID();

Message localMessage = Message.builder()
.taskId(localTaskId)
.contextId("agent-to-agent-context")
.role(Message.Role.USER)
.parts(new TextPart("Hello directly"))
.build();

CountDownLatch localLatch = new CountDownLatch(1);
AtomicReference<Task> localResultRef = new AtomicReference<>();
AtomicReference<Throwable> localErrorRef = new AtomicReference<>();

BiConsumer<ClientEvent, AgentCard> localConsumer =
AgentToAgentClientFactory.createTaskCaptureConsumer(localResultRef, localLatch);

getClient().sendMessage(localMessage, List.of(localConsumer), error -> {
localErrorRef.set(error);
localLatch.countDown();
});

assertTrue(localLatch.await(30, TimeUnit.SECONDS), "Local handling should complete within timeout");

Task localResult = localResultRef.get();

// Only fail on errors if we didn't get a successful result
// (errors can occur after completion due to stream cleanup)
if (localResult == null && localErrorRef.get() != null) {
fail("Local handling failed: " + localErrorRef.get().getMessage());
}

assertNotNull(localResult, "Local task should not be null");
assertEquals(TaskState.TASK_STATE_COMPLETED, localResult.status().state(),
"Local task should be completed");

String localText = extractTextFromTask(localResult);
assertTrue(localText.contains("Handled locally: Hello directly"),
"Should be handled locally without delegation. Got: " + localText);
}

/**
* Extracts all text from a task's artifacts.
*
* @param task the task containing artifacts
* @return concatenated text from all TextParts in all artifacts
*/
private String extractTextFromTask(Task task) {
if (task.artifacts() == null || task.artifacts().isEmpty()) {
return "";
}
return task.artifacts().stream()
.flatMap(artifact -> artifact.parts().stream())
.filter(part -> part instanceof TextPart)
.map(part -> ((TextPart) part).text())
.collect(Collectors.joining("\n"));
}

}
Loading