diff --git a/a2a/src/main/java/com/google/adk/a2a/A2AClient.java b/a2a/src/main/java/com/google/adk/a2a/A2AClient.java
deleted file mode 100644
index 8bd87d4d1..000000000
--- a/a2a/src/main/java/com/google/adk/a2a/A2AClient.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package com.google.adk.a2a;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.PropertyAccessor;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import com.google.common.base.Preconditions;
-import io.a2a.client.http.A2AHttpClient;
-import io.a2a.client.http.A2AHttpResponse;
-import io.a2a.client.http.JdkA2AHttpClient;
-import io.a2a.spec.AgentCard;
-import io.a2a.spec.SendMessageRequest;
-import io.a2a.spec.SendMessageResponse;
-import io.reactivex.rxjava3.core.Flowable;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.Map;
-import org.jspecify.annotations.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A thin HTTP client for interacting with an A2A-compliant agent endpoint.
- *
- *
**EXPERIMENTAL:** Subject to change, rename, or removal in any future patch release. Do not
- * use in production code.
- */
-public final class A2AClient {
-
- private static final Logger logger = LoggerFactory.getLogger(A2AClient.class);
- private static final String JSON_CONTENT_TYPE = "application/json";
- private static final String DEFAULT_SEND_PATH = "/v1/message:send";
-
- private final AgentCard agentCard;
- private final A2AHttpClient httpClient;
- private final ObjectMapper objectMapper;
- private final Map defaultHeaders;
-
- public A2AClient(AgentCard agentCard) {
- this(agentCard, new JdkA2AHttpClient(), Map.of());
- }
-
- public A2AClient(
- AgentCard agentCard, A2AHttpClient httpClient, Map defaultHeaders) {
- this.agentCard = Preconditions.checkNotNull(agentCard, "agentCard");
- this.httpClient = Preconditions.checkNotNull(httpClient, "httpClient");
- this.objectMapper =
- JsonMapper.builder()
- .findAndAddModules()
- .visibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
- .build();
- this.defaultHeaders = defaultHeaders == null ? Map.of() : Map.copyOf(defaultHeaders);
- }
-
- public AgentCard getAgentCard() {
- return agentCard;
- }
-
- public String getUrl() {
- return agentCard.url();
- }
-
- /**
- * Sends a JSON-RPC message to the remote A2A agent and converts the response into the canonical
- * {@link SendMessageResponse} model.
- */
- public Flowable sendMessage(SendMessageRequest request) {
- return Flowable.fromCallable(() -> executeSendMessage(request));
- }
-
- private SendMessageResponse executeSendMessage(SendMessageRequest request) throws IOException {
- Preconditions.checkNotNull(request, "request");
- String payload = serializeRequest(request);
- String endpoint = resolveSendMessageEndpoint(agentCard.url());
-
- A2AHttpClient.PostBuilder builder =
- httpClient.createPost().url(endpoint).addHeader("Content-Type", JSON_CONTENT_TYPE);
- defaultHeaders.forEach(builder::addHeader);
- builder.body(payload);
-
- A2AHttpResponse response;
- try {
- response = builder.post();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted while sending A2A sendMessage request", e);
- }
-
- if (!response.success()) {
- String responseBody = response.body();
- logger.warn(
- "A2A sendMessage request failed with status {} and body {}",
- response.status(),
- responseBody);
- throw new IOException("A2A sendMessage request failed with HTTP status " + response.status());
- }
-
- return deserializeResponse(response.body());
- }
-
- private String serializeRequest(SendMessageRequest request) throws JsonProcessingException {
- return objectMapper.writeValueAsString(request);
- }
-
- private SendMessageResponse deserializeResponse(String body) throws JsonProcessingException {
- return objectMapper.readValue(body, SendMessageResponse.class);
- }
-
- private static String resolveSendMessageEndpoint(String baseUrl) {
- if (baseUrl == null || baseUrl.isEmpty()) {
- throw new IllegalArgumentException("Agent card URL cannot be null or empty");
- }
- if (baseUrl.endsWith("/")) {
- return baseUrl.substring(0, baseUrl.length() - 1) + DEFAULT_SEND_PATH;
- }
- return baseUrl + DEFAULT_SEND_PATH;
- }
-
- public static @Nullable String extractHostAndPort(String urlString) {
- try {
- URL url = URI.create(urlString).toURL();
- String host = url.getHost();
- int port = url.getPort();
- if (port != -1) {
- return host + ":" + port;
- }
- return host;
- } catch (MalformedURLException | IllegalArgumentException e) {
- logger.warn("Invalid URL when extracting host and port", e);
- return null;
- }
- }
-}
diff --git a/a2a/src/main/java/com/google/adk/a2a/RemoteA2AAgent.java b/a2a/src/main/java/com/google/adk/a2a/RemoteA2AAgent.java
index 0701d7422..5e6e341d7 100644
--- a/a2a/src/main/java/com/google/adk/a2a/RemoteA2AAgent.java
+++ b/a2a/src/main/java/com/google/adk/a2a/RemoteA2AAgent.java
@@ -1,28 +1,31 @@
package com.google.adk.a2a;
-import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Strings.nullToEmpty;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.adk.a2a.common.A2AClientError;
import com.google.adk.a2a.converters.EventConverter;
import com.google.adk.a2a.converters.ResponseConverter;
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.Callbacks;
import com.google.adk.agents.InvocationContext;
import com.google.adk.events.Event;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.io.Resources;
+import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
+import io.a2a.client.Client;
+import io.a2a.client.ClientEvent;
+import io.a2a.client.TaskEvent;
+import io.a2a.client.TaskUpdateEvent;
+import io.a2a.spec.A2AClientException;
import io.a2a.spec.AgentCard;
import io.a2a.spec.Message;
-import io.a2a.spec.MessageSendParams;
-import io.a2a.spec.SendMessageRequest;
+import io.a2a.spec.TaskState;
+import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable;
-import java.io.IOException;
-import java.net.URL;
+import io.reactivex.rxjava3.core.FlowableEmitter;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import org.jspecify.annotations.Nullable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,21 +55,10 @@ public class RemoteA2AAgent extends BaseAgent {
private static final Logger logger = LoggerFactory.getLogger(RemoteA2AAgent.class);
- private Optional agentCard;
- private final Optional agentCardSource;
- private Optional a2aClient;
- private Optional rpcUrl = Optional.empty();
+ private final AgentCard agentCard;
+ private final Client a2aClient;
private String description;
- private boolean isResolved = false;
-
- public RemoteA2AAgent() {
- // Initialize with empty values - will be configured later
- super("", "", null, null, null);
- this.agentCard = Optional.empty();
- this.agentCardSource = Optional.empty();
- this.a2aClient = Optional.empty();
- this.description = "";
- }
+ private final boolean streaming;
// Internal constructor used by builder
private RemoteA2AAgent(Builder builder) {
@@ -77,32 +69,29 @@ private RemoteA2AAgent(Builder builder) {
builder.beforeAgentCallback,
builder.afterAgentCallback);
- if (builder.agentCardOrSource == null) {
- throw new IllegalArgumentException("agentCardOrSource cannot be null");
+ if (builder.a2aClient == null) {
+ throw new IllegalArgumentException("a2aClient cannot be null");
}
- if (builder.agentCardOrSource instanceof AgentCard) {
- this.agentCard = Optional.of((AgentCard) builder.agentCardOrSource);
- this.agentCardSource = Optional.empty();
- this.description = builder.description;
- // If builder description is empty, use the one from AgentCard
- if (this.description.isEmpty() && this.agentCard.get().description() != null) {
- this.description = this.agentCard.get().description();
- }
- } else if (builder.agentCardOrSource instanceof String) {
- this.agentCard = Optional.empty();
- String source = (String) builder.agentCardOrSource;
- if (source.trim().isEmpty()) {
- throw new IllegalArgumentException("agentCard string cannot be empty");
- }
- this.agentCardSource = Optional.of(source.trim());
+ this.a2aClient = builder.a2aClient;
+ if (builder.agentCard != null) {
+ this.agentCard = builder.agentCard;
} else {
- throw new TypeError(
- "agentCard must be AgentCard, URL string, or file path string, got "
- + builder.agentCardOrSource.getClass());
+ try {
+ this.agentCard = this.a2aClient.getAgentCard();
+ } catch (A2AClientException e) {
+ throw new AgentCardResolutionError("Failed to resolve agent card", e);
+ }
}
-
- this.a2aClient = builder.a2aClient;
+ if (this.agentCard == null) {
+ throw new IllegalArgumentException("agentCard cannot be null");
+ }
+ this.description = nullToEmpty(builder.description);
+ // If builder description is empty, use the one from AgentCard
+ if (this.description.isEmpty() && this.agentCard.description() != null) {
+ this.description = this.agentCard.description();
+ }
+ this.streaming = this.agentCard.capabilities().streaming();
}
public static Builder builder() {
@@ -112,9 +101,9 @@ public static Builder builder() {
/** Builder for {@link RemoteA2AAgent}. */
public static class Builder {
private String name;
- private Object agentCardOrSource;
+ private AgentCard agentCard;
+ private Client a2aClient;
private String description = "";
- private Optional a2aClient = Optional.empty();
private List extends BaseAgent> subAgents;
private List beforeAgentCallback;
private List afterAgentCallback;
@@ -126,8 +115,8 @@ public Builder name(String name) {
}
@CanIgnoreReturnValue
- public Builder agentCardOrSource(Object agentCardOrSource) {
- this.agentCardOrSource = agentCardOrSource;
+ public Builder agentCard(AgentCard agentCard) {
+ this.agentCard = agentCard;
return this;
}
@@ -137,12 +126,6 @@ public Builder description(String description) {
return this;
}
- @CanIgnoreReturnValue
- public Builder a2aClient(@Nullable A2AClient a2aClient) {
- this.a2aClient = Optional.ofNullable(a2aClient);
- return this;
- }
-
@CanIgnoreReturnValue
public Builder subAgents(List extends BaseAgent> subAgents) {
this.subAgents = subAgents;
@@ -161,73 +144,19 @@ public Builder afterAgentCallback(List afterAgentC
return this;
}
- public RemoteA2AAgent build() {
- return new RemoteA2AAgent(this);
- }
- }
-
- public Optional rpcUrl() {
- return rpcUrl;
- }
-
- private void ensureResolved() {
- // This method is similar to getClientFromAgentCardUrl in the A2A Java SDK. It is called at
- // runtime not constructor time.
- if (isResolved) {
- return;
- }
-
- try {
- // Resolve agent card if needed
- if (agentCard.isEmpty()) {
- if (agentCardSource.isPresent()) {
- String source = agentCardSource.get();
- this.agentCard = Optional.of(resolveAgentCard(source));
- } else {
- // This case should not happen based on constructor logic
- }
- }
-
- // Set RPC URL
- this.rpcUrl = Optional.of(this.agentCard.get().url());
-
- // Update description if empty
- if (this.description == null && this.agentCard.get().description() != null) {
- this.description = this.agentCard.get().description();
- }
-
- if (this.a2aClient.isEmpty() && this.agentCard.isPresent()) {
- this.a2aClient = Optional.of(new A2AClient(this.agentCard.get()));
- }
- this.isResolved = true;
-
- } catch (Exception e) {
- throw new AgentCardResolutionError(
- "Failed to initialize remote A2A agent " + name() + ": " + e, e);
+ @CanIgnoreReturnValue
+ public Builder a2aClient(Client a2aClient) {
+ this.a2aClient = a2aClient;
+ return this;
}
- }
- private AgentCard resolveAgentCard(String source) throws IOException {
- ObjectMapper objectMapper = new ObjectMapper();
- try {
- URL resourceUrl = Resources.getResource(source);
- agentCard = Optional.of(objectMapper.readValue(resourceUrl, AgentCard.class));
- return agentCard.get();
- } catch (IllegalArgumentException e) {
- throw new IOException(
- "Failed to find AgentCard resource: "
- + source
- + ". Check if the resource exists and is included in the build.",
- e);
- } catch (Exception e) {
- throw new IOException("Failed to load AgentCard from resource: " + source, e);
+ public RemoteA2AAgent build() {
+ return new RemoteA2AAgent(this);
}
}
@Override
protected Flowable runAsyncImpl(InvocationContext invocationContext) {
- ensureResolved();
-
// Construct A2A Message from the last ADK event
List sessionEvents = invocationContext.session().events();
@@ -244,51 +173,77 @@ protected Flowable runAsyncImpl(InvocationContext invocationContext) {
}
Message originalMessage = a2aMessageOpt.get();
- String sessionId = invocationContext.session().id();
- String inboundContextId = originalMessage.getContextId();
- if (!isNullOrEmpty(inboundContextId) && !sessionId.equals(inboundContextId)) {
- logger.warn("Inbound context id differs from active session; using session id instead.");
+ return Flowable.create(
+ emitter -> {
+ FlowableEmitter flowableEmitter = emitter.serialize();
+ AtomicBoolean done = new AtomicBoolean(false);
+ ImmutableList> consumers =
+ ImmutableList.of(
+ (event, unused) ->
+ handleClientEvent(event, flowableEmitter, invocationContext, done));
+ a2aClient.sendMessage(
+ originalMessage, consumers, e -> handleClientError(e, flowableEmitter, done), null);
+ },
+ BackpressureStrategy.BUFFER);
+ }
+
+ private void handleClientError(Throwable e, FlowableEmitter emitter, AtomicBoolean done) {
+ // Mark the flow as done if it is already cancelled.
+ done.compareAndSet(false, emitter.isCancelled());
+
+ // If the flow is already done, stop processing and exit the consumer.
+ if (done.get()) {
+ return;
+ }
+ // If the error is raised, complete the flow with an error.
+ if (!done.getAndSet(true)) {
+ emitter.tryOnError(new A2AClientError("Failed to communicate with the remote agent", e));
+ }
+ }
+
+ private void handleClientEvent(
+ ClientEvent clientEvent,
+ FlowableEmitter emitter,
+ InvocationContext invocationContext,
+ AtomicBoolean done) {
+ // Mark the flow as done if it is already cancelled.
+ done.compareAndSet(false, emitter.isCancelled());
+
+ // If the flow is already done, stop processing and exit the consumer.
+ if (done.get()) {
+ return;
}
- Message a2aMessage = new Message.Builder(originalMessage).contextId(sessionId).build();
-
- Map metadata =
- originalMessage.getMetadata() == null ? ImmutableMap.of() : originalMessage.getMetadata();
-
- MessageSendParams params = new MessageSendParams(a2aMessage, null, metadata);
- SendMessageRequest request = new SendMessageRequest(invocationContext.invocationId(), params);
-
- return a2aClient
- .get()
- .sendMessage(request)
- .flatMap(
- response -> {
- List events =
- ResponseConverter.sendMessageResponseToEvents(
- response,
- invocationContext.invocationId(),
- invocationContext.branch().orElse(null));
-
- if (events.isEmpty()) {
- logger.warn("No events converted from A2A response");
- // Return a default event to indicate the agent executed
- return Flowable.just(
- Event.builder()
- .author(name())
- .invocationId(invocationContext.invocationId())
- .branch(invocationContext.branch().orElse(null))
- .build());
- }
-
- return Flowable.fromIterable(events);
- });
+ Optional event = ResponseConverter.clientEventToEvent(clientEvent, invocationContext);
+ if (event.isPresent()) {
+ emitter.onNext(event.get());
+ }
+
+ // For non-streaming communication, complete the flow; for streaming, wait until the client
+ // marks the completion.
+ if (isCompleted(clientEvent) || !streaming) {
+ // Only complete the flow once.
+ if (!done.getAndSet(true)) {
+ emitter.onComplete();
+ }
+ }
+ }
+
+ private static boolean isCompleted(ClientEvent event) {
+ TaskState executionState = TaskState.UNKNOWN;
+ if (event instanceof TaskEvent taskEvent) {
+ executionState = taskEvent.getTask().getStatus().state();
+ } else if (event instanceof TaskUpdateEvent updateEvent) {
+ executionState = updateEvent.getTask().getStatus().state();
+ }
+ return executionState.equals(TaskState.COMPLETED);
}
@Override
protected Flowable runLiveImpl(InvocationContext invocationContext) {
throw new UnsupportedOperationException(
- "_run_live_impl for " + getClass() + " via A2A is not implemented.");
+ "runLiveImpl for " + getClass() + " via A2A is not implemented.");
}
/** Exception thrown when the agent card cannot be resolved. */
diff --git a/a2a/src/main/java/com/google/adk/a2a/converters/EventConverter.java b/a2a/src/main/java/com/google/adk/a2a/converters/EventConverter.java
index d112fccce..cd8bcefb0 100644
--- a/a2a/src/main/java/com/google/adk/a2a/converters/EventConverter.java
+++ b/a2a/src/main/java/com/google/adk/a2a/converters/EventConverter.java
@@ -37,39 +37,6 @@ public enum AggregationMode {
EXTERNAL_HANDOFF
}
- public static Optional convertEventToA2AMessage(Event event) {
- if (event == null) {
- logger.warn("Cannot convert null event to A2A message.");
- return Optional.empty();
- }
-
- List> a2aParts = new ArrayList<>();
- Optional contentOpt = event.content();
-
- if (contentOpt.isPresent() && contentOpt.get().parts().isPresent()) {
- for (Part part : contentOpt.get().parts().get()) {
- PartConverter.fromGenaiPart(part).ifPresent(a2aParts::add);
- }
- }
-
- if (a2aParts.isEmpty()) {
- logger.warn("No convertible content found in event.");
- return Optional.empty();
- }
-
- Message.Builder builder =
- new Message.Builder()
- .messageId(event.id() != null ? event.id() : UUID.randomUUID().toString())
- .parts(a2aParts)
- .role(event.author().equals("user") ? Message.Role.USER : Message.Role.AGENT);
- event
- .content()
- .flatMap(Content::role)
- .ifPresent(
- role -> builder.role(role.equals("user") ? Message.Role.USER : Message.Role.AGENT));
- return Optional.of(builder.build());
- }
-
public static Optional convertEventsToA2AMessage(InvocationContext context) {
return convertEventsToA2AMessage(context, AggregationMode.AS_IS);
}
diff --git a/contrib/samples/a2a_basic/A2AAgent.java b/contrib/samples/a2a_basic/A2AAgent.java
index 1c684e188..fa4932cf9 100644
--- a/contrib/samples/a2a_basic/A2AAgent.java
+++ b/contrib/samples/a2a_basic/A2AAgent.java
@@ -1,6 +1,8 @@
package com.example.a2a_basic;
-import com.google.adk.a2a.A2AClient;
+import java.util.ArrayList;
+import java.util.Random;
+
import com.google.adk.a2a.RemoteA2AAgent;
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.LlmAgent;
@@ -8,12 +10,14 @@
import com.google.adk.tools.ToolContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+
+import io.a2a.client.Client;
+import io.a2a.client.config.ClientConfig;
+import io.a2a.client.http.A2ACardResolver;
import io.a2a.client.http.JdkA2AHttpClient;
-import io.a2a.spec.AgentCapabilities;
+import io.a2a.client.transport.jsonrpc.JSONRPCTransport;
+import io.a2a.client.transport.jsonrpc.JSONRPCTransportConfig;
import io.a2a.spec.AgentCard;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
/** Provides local roll logic plus a remote A2A agent for the demo. */
public final class A2AAgent {
@@ -60,24 +64,23 @@ public static LlmAgent createRootAgent(String primeAgentBaseUrl) {
}
private static BaseAgent createRemoteAgent(String primeAgentBaseUrl) {
- AgentCapabilities capabilities = new AgentCapabilities.Builder().build();
- AgentCard agentCard =
- new AgentCard.Builder()
- .name("prime_agent")
- .description("Stub agent metadata used for third-party A2A demo")
- .url(primeAgentBaseUrl)
- .version("1.0.0")
- .capabilities(capabilities)
- .defaultInputModes(List.of("text"))
- .defaultOutputModes(List.of("text"))
- .skills(List.of())
- .security(List.of())
+ String agentCardUrl = primeAgentBaseUrl + "/.well-known/agent-card.json";
+ AgentCard publicAgentCard =
+ new A2ACardResolver(new JdkA2AHttpClient(), primeAgentBaseUrl, agentCardUrl).getAgentCard();
+
+ Client a2aClient =
+ Client.builder(publicAgentCard)
+ .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
+ .clientConfig(
+ new ClientConfig.Builder()
+ .setStreaming(publicAgentCard.capabilities().streaming())
+ .build())
.build();
- A2AClient client = new A2AClient(agentCard, new JdkA2AHttpClient(), /* defaultHeaders= */ null);
+
return RemoteA2AAgent.builder()
- .name(agentCard.name())
- .agentCardOrSource(agentCard)
- .a2aClient(client)
+ .name(publicAgentCard.name())
+ .a2aClient(a2aClient)
+ .agentCard(publicAgentCard)
.build();
}
diff --git a/contrib/samples/a2a_basic/A2AAgentRun.java b/contrib/samples/a2a_basic/A2AAgentRun.java
index 406a8dcbc..12d515f62 100644
--- a/contrib/samples/a2a_basic/A2AAgentRun.java
+++ b/contrib/samples/a2a_basic/A2AAgentRun.java
@@ -1,21 +1,22 @@
package com.example.a2a_basic;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
import com.google.adk.agents.BaseAgent;
-import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.RunConfig;
import com.google.adk.artifacts.InMemoryArtifactService;
import com.google.adk.events.Event;
import com.google.adk.runner.Runner;
import com.google.adk.sessions.InMemorySessionService;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
+
import io.reactivex.rxjava3.core.Flowable;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
/** Main class to demonstrate running the A2A agent with sequential inputs. */
public final class A2AAgentRun {
@@ -38,7 +39,7 @@ public A2AAgentRun(BaseAgent agent) {
sessionService.createSession(appName, userId, initialState, sessionId).blockingGet();
}
- private List run(String prompt) {
+ private Flowable run(String prompt) {
System.out.println("\n--------------------------------------------------");
System.out.println("You> " + prompt);
Content userMessage =
@@ -49,45 +50,44 @@ private List run(String prompt) {
return processRunRequest(userMessage);
}
- private List processRunRequest(Content inputContent) {
+ private Flowable processRunRequest(Content inputContent) {
RunConfig runConfig = RunConfig.builder().build();
- Flowable eventStream =
- this.runner.runAsync(this.userId, this.sessionId, inputContent, runConfig);
- List agentEvents = Lists.newArrayList(eventStream.blockingIterable());
- System.out.println("Agent>");
- for (Event event : agentEvents) {
- if (event.content().isPresent() && event.content().get().parts().isPresent()) {
- event
- .content()
- .get()
- .parts()
- .get()
- .forEach(
- part -> {
- if (part.text().isPresent()) {
- System.out.println(" Text: " + part.text().get().stripTrailing());
- }
- });
- }
- if (event.actions() != null && event.actions().transferToAgent().isPresent()) {
- System.out.println(" Actions: transferTo=" + event.actions().transferToAgent().get());
- }
- System.out.println(" Raw Event: " + event);
+ return this.runner.runAsync(this.userId, this.sessionId, inputContent, runConfig);
+ }
+
+ private static void printOutEvent(Event event) {
+ if (event.content().isPresent() && event.content().get().parts().isPresent()) {
+ event
+ .content()
+ .get()
+ .parts()
+ .get()
+ .forEach(
+ part -> {
+ if (part.text().isPresent()) {
+ System.out.println(" Text: " + part.text().get().stripTrailing());
+ }
+ });
}
- return agentEvents;
+ if (event.actions() != null && event.actions().transferToAgent().isPresent()) {
+ System.out.println(" Actions: transferTo=" + event.actions().transferToAgent().get());
+ }
+ System.out.println(" Raw Event: " + event);
}
public static void main(String[] args) {
- String primeAgentUrl = args.length > 0 ? args[0] : "http://localhost:9876/a2a/prime_agent";
- LlmAgent agent = A2AAgent.createRootAgent(primeAgentUrl);
+ String primeAgentUrl = args.length > 0 ? args[0] : "http://localhost:8081/a2a/remote/v1";
+ BaseAgent agent = A2AAgent.createRootAgent(primeAgentUrl);
A2AAgentRun a2aRun = new A2AAgentRun(agent);
- // First user input
- System.out.println("Running turn 1");
- a2aRun.run("Roll a dice of 6 sides.");
+ List events =
+ a2aRun.run("Roll a dice of 6 sides.").toList().timeout(90, TimeUnit.SECONDS).blockingGet();
+
+ events.forEach(A2AAgentRun::printOutEvent);
+
+ events =
+ a2aRun.run("Is this a prime number?").toList().timeout(90, TimeUnit.SECONDS).blockingGet();
- // Follow-up input triggers the remote prime agent so the A2A request is logged.
- System.out.println("Running turn 2");
- a2aRun.run("Is this number a prime number?");
+ events.forEach(A2AAgentRun::printOutEvent);
}
}