Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions client/base/src/main/java/io/a2a/client/AbstractClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.a2a.spec.ListTaskPushNotificationConfigResult;
import io.a2a.spec.ListTasksParams;
import io.a2a.spec.Message;
import io.a2a.spec.MessageSendParams;
import io.a2a.spec.PushNotificationConfig;
import io.a2a.spec.Task;
import io.a2a.spec.TaskIdParams;
Expand Down Expand Up @@ -160,6 +161,26 @@ public abstract void sendMessage(@NonNull Message request,
@Nullable Map<String, Object> metadata,
@Nullable ClientCallContext context) throws A2AClientException;

/**
* Send a message to the remote agent. This method will automatically use
* the streaming or non-streaming approach as determined by the server's
* agent card and the client configuration. The specified client consumers
* will be used to handle messages, tasks, and update events received
* from the remote agent. The specified streaming error handler will be used
* if an error occurs during streaming. The configured client push notification
* configuration will get used for streaming.
*
* @param params the request parameters
* @param consumers a list of consumers to pass responses from the remote agent to
* @param streamingErrorHandler an error handler that should be used for the streaming case if an error occurs
* @param context optional client call context for the request
* @throws A2AClientException if sending the message fails for any reason
*/
public abstract void sendMessage(@NonNull MessageSendParams params,
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
@Nullable Consumer<Throwable> streamingErrorHandler,
@Nullable ClientCallContext context) throws A2AClientException;

/**
* Retrieve the current state and history of a specific task.
*
Expand Down
77 changes: 38 additions & 39 deletions client/base/src/main/java/io/a2a/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,13 @@ public void sendMessage(@NonNull Message request,
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
@Nullable Consumer<Throwable> streamingErrorHandler,
@Nullable ClientCallContext context) throws A2AClientException {
MessageSendParams messageSendParams = getMessageSendParams(request, clientConfig);
MessageSendConfiguration messageSendConfiguration = createMessageSendConfiguration(clientConfig.getPushNotificationConfig());

MessageSendParams messageSendParams = MessageSendParams.builder()
.message(request)
.configuration(messageSendConfiguration)
.metadata(clientConfig.getMetadata())
.build();
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
}

Expand Down Expand Up @@ -289,7 +295,6 @@ public void sendMessage(@NonNull Message request,
@Nullable Map<String, Object> metadata,
@Nullable ClientCallContext context) throws A2AClientException {
MessageSendConfiguration messageSendConfiguration = createMessageSendConfiguration(pushNotificationConfiguration);

MessageSendParams messageSendParams = MessageSendParams.builder()
.message(request)
.configuration(messageSendConfiguration)
Expand All @@ -299,6 +304,37 @@ public void sendMessage(@NonNull Message request,
sendMessage(messageSendParams, consumers, streamingErrorHandler, context);
}

@Override
public void sendMessage(@NonNull MessageSendParams messageSendParams,
@NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
@Nullable Consumer<Throwable> streamingErrorHandler,
@Nullable ClientCallContext context) throws A2AClientException {
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
EventKind eventKind = clientTransport.sendMessage(messageSendParams, context);
ClientEvent clientEvent;
if (eventKind instanceof Task task) {
clientEvent = new TaskEvent(task);
} else {
// must be a message
clientEvent = new MessageEvent((Message) eventKind);
}
consume(clientEvent, agentCard, consumers);
} else {
ClientTaskManager tracker = new ClientTaskManager();
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(streamingErrorHandler);
Consumer<StreamingEventKind> eventHandler = event -> {
try {
ClientEvent clientEvent = getClientEvent(event, tracker);
consume(clientEvent, agentCard, consumers);
} catch (A2AClientError e) {
overriddenErrorHandler.accept(e);
}
};
clientTransport.sendMessageStreaming(messageSendParams, eventHandler, overriddenErrorHandler, context);
}
}


/**
* Retrieve a specific task by ID.
* <p>
Expand Down Expand Up @@ -666,33 +702,6 @@ private MessageSendConfiguration createMessageSendConfiguration(@Nullable PushNo
.build();
}

private void sendMessage(@NonNull MessageSendParams messageSendParams, @NonNull List<BiConsumer<ClientEvent, AgentCard>> consumers,
@Nullable Consumer<Throwable> errorHandler, @Nullable ClientCallContext context) throws A2AClientException {
if (! clientConfig.isStreaming() || ! agentCard.capabilities().streaming()) {
EventKind eventKind = clientTransport.sendMessage(messageSendParams, context);
ClientEvent clientEvent;
if (eventKind instanceof Task task) {
clientEvent = new TaskEvent(task);
} else {
// must be a message
clientEvent = new MessageEvent((Message) eventKind);
}
consume(clientEvent, agentCard, consumers);
} else {
ClientTaskManager tracker = new ClientTaskManager();
Consumer<Throwable> overriddenErrorHandler = getOverriddenErrorHandler(errorHandler);
Consumer<StreamingEventKind> eventHandler = event -> {
try {
ClientEvent clientEvent = getClientEvent(event, tracker);
consume(clientEvent, agentCard, consumers);
} catch (A2AClientError e) {
overriddenErrorHandler.accept(e);
}
};
clientTransport.sendMessageStreaming(messageSendParams, eventHandler, overriddenErrorHandler, context);
}
}

private @NonNull Consumer<Throwable> getOverriddenErrorHandler(@Nullable Consumer<Throwable> errorHandler) {
return e -> {
if (errorHandler != null) {
Expand All @@ -710,14 +719,4 @@ private void consume(ClientEvent clientEvent, AgentCard agentCard, @NonNull List
consumer.accept(clientEvent, agentCard);
}
}

private MessageSendParams getMessageSendParams(Message request, ClientConfig clientConfig) {
MessageSendConfiguration messageSendConfiguration = createMessageSendConfiguration(clientConfig.getPushNotificationConfig());

return MessageSendParams.builder()
.message(request)
.configuration(messageSendConfiguration)
.metadata(clientConfig.getMetadata())
.build();
}
}