Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/client/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AgentCard } from '../types.js';
import { AgentCardResolver } from './card-resolver.js';
import { Client, ClientConfig } from './multitransport-client.js';
import { JsonRpcTransportFactory } from './transports/json_rpc_transport.js';
import { RestTransportFactory } from './transports/rest_transport.js';
import { TransportFactory } from './transports/transport.js';

export interface ClientFactoryOptions {
Expand Down Expand Up @@ -34,7 +35,7 @@ export const ClientFactoryOptions = {
* SDK default options for {@link ClientFactory}.
*/
default: {
transports: [new JsonRpcTransportFactory()],
transports: [new JsonRpcTransportFactory(), new RestTransportFactory()],
} as Readonly<ClientFactoryOptions>,

/**
Expand Down
5 changes: 5 additions & 0 deletions src/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ export {
JsonRpcTransportFactory,
type JsonRpcTransportOptions,
} from './transports/json_rpc_transport.js';
export {
RestTransport,
RestTransportFactory,
type RestTransportOptions,
} from './transports/rest_transport.js';
export type {
CallInterceptor,
BeforeArgs,
Expand Down
61 changes: 4 additions & 57 deletions src/client/transports/json_rpc_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
} from '../../types.js';
import { A2AStreamEventData, SendMessageResult } from '../client.js';
import { RequestOptions } from '../multitransport-client.js';
import { parseSseStream } from '../../sse_utils.js';
import { Transport, TransportFactory } from './transport.js';

export interface JsonRpcTransportOptions {
Expand Down Expand Up @@ -303,62 +304,8 @@ export class JsonRpcTransport implements Transport {
);
}

yield* this._parseA2ASseStream<A2AStreamEventData>(response, clientRequestId);
}

private async *_parseA2ASseStream<TStreamItem>(
response: Response,
originalRequestId: number | string | null
): AsyncGenerator<TStreamItem, void, undefined> {
if (!response.body) {
throw new Error('SSE response body is undefined. Cannot read stream.');
}
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
let buffer = '';
let eventDataBuffer = '';

try {
while (true) {
const { done, value } = await reader.read();
if (done) {
if (eventDataBuffer.trim()) {
const result = this._processSseEventData<TStreamItem>(
eventDataBuffer,
originalRequestId
);
yield result;
}
break;
}

buffer += value;
let lineEndIndex;
while ((lineEndIndex = buffer.indexOf('\n')) >= 0) {
const line = buffer.substring(0, lineEndIndex).trim();
buffer = buffer.substring(lineEndIndex + 1);

if (line === '') {
if (eventDataBuffer) {
const result = this._processSseEventData<TStreamItem>(
eventDataBuffer,
originalRequestId
);
yield result;
eventDataBuffer = '';
}
} else if (line.startsWith('data:')) {
eventDataBuffer += line.substring(5).trimStart() + '\n';
}
}
}
} catch (error) {
console.error(
'Error reading or parsing SSE stream:',
(error instanceof Error && error.message) || 'Error unknown'
);
throw error;
} finally {
reader.releaseLock();
for await (const event of parseSseStream(response)) {
yield this._processSseEventData<A2AStreamEventData>(event.data, clientRequestId);
}
}

Expand All @@ -370,7 +317,7 @@ export class JsonRpcTransport implements Transport {
throw new Error('Attempted to process empty SSE event data.');
}
try {
const sseJsonRpcResponse = JSON.parse(jsonData.replace(/\n$/, ''));
const sseJsonRpcResponse = JSON.parse(jsonData);
const a2aStreamResponse: JSONRPCResponse = sseJsonRpcResponse as JSONRPCResponse;

if (a2aStreamResponse.id !== originalRequestId) {
Expand Down
Loading