From b42befaa417b8017f619a83cea09b02d98ecdf12 Mon Sep 17 00:00:00 2001 From: Tomas Dvorak Date: Tue, 15 Oct 2024 13:42:24 +0200 Subject: [PATCH] feat(internals): use different client for sse Signed-off-by: Tomas Dvorak --- package.json | 2 +- src/internals/fetcher.ts | 76 +++++++++++++++++++--------------------- yarn.lock | 20 +++++++++-- 3 files changed, 55 insertions(+), 43 deletions(-) diff --git a/package.json b/package.json index ce1b767f..43a28c28 100644 --- a/package.json +++ b/package.json @@ -133,7 +133,6 @@ "_ensure_env": "cp -n .env.template .env || true" }, "dependencies": { - "@ai-zen/node-fetch-event-source": "^2.1.4", "@connectrpc/connect": "^1.4.0", "@connectrpc/connect-node": "^1.4.0", "@streamparser/json": "^0.0.21", @@ -142,6 +141,7 @@ "bee-proto": "0.0.2", "dirty-json": "0.9.2", "duck-duck-scrape": "^2.2.5", + "eventsource-client": "^1.1.2", "fast-xml-parser": "^4.4.1", "header-generator": "^2.1.54", "joplin-turndown-plugin-gfm": "^1.0.12", diff --git a/src/internals/fetcher.ts b/src/internals/fetcher.ts index 5fc15f0e..52404fe5 100644 --- a/src/internals/fetcher.ts +++ b/src/internals/fetcher.ts @@ -16,17 +16,17 @@ import { FrameworkError } from "@/errors.js"; import { Serializable } from "@/internals/serializable.js"; -import { - EventSourceMessage, - EventStreamContentType, - fetchEventSource, -} from "@ai-zen/node-fetch-event-source"; -import { FetchEventSourceInit } from "@ai-zen/node-fetch-event-source/lib/cjs/fetch.js"; -import { emitterToGenerator } from "@/internals/helpers/promise.js"; import { isPlainObject } from "remeda"; +import { createEventSource, FetchLikeInit } from "eventsource-client"; export class RestfulClientError extends FrameworkError {} +export interface EventSourceMessage { + id: string; + event: string; + data: string; +} + type URLParamType = string | number | boolean | null | undefined; export function createURLParams( data: Record>, @@ -65,44 +65,40 @@ export class RestfulClient> extends Serializabl async *stream( path: keyof K, - init: FetchEventSourceInit, + init: FetchLikeInit = {}, ): AsyncGenerator { const { paths, baseUrl, headers } = this.input; const target = new URL(paths[path] ?? path, baseUrl); - return yield* emitterToGenerator(async ({ emit }) => - fetchEventSource(target.toString(), { - method: "POST", - headers: await headers().then((raw) => Object.fromEntries(raw.entries())), - async onopen(response) { - if (response.ok && response.headers.get("content-type") === EventStreamContentType) { - return; - } - throw new RestfulClientError("Failed to stream!", [], { - context: { - url: response.url, - err: await response.text(), - response, - }, - isRetryable: response.status >= 400 && response.status < 500 && response.status !== 429, + const client = createEventSource({ + ...init, + url: target.toString(), + method: "POST", + headers: await headers().then((raw) => Object.fromEntries(raw.entries())), + onDisconnect: () => { + client.close(); + }, + }); + + try { + client.connect(); + + for await (const message of client) { + if (message.event === "error") { + throw new RestfulClientError(`Error during streaming has occurred.`, [], { + context: message, }); - }, - onmessage(msg) { - if (msg?.event === "error") { - throw new RestfulClientError(`Error during streaming has occurred.`, [], { - context: msg, - }); - } - emit(msg); - }, - onclose() {}, - onerror(err) { - throw new RestfulClientError(`Error during streaming has occurred.`, [err]); - }, - ...init, - fetch, - }), - ); + } + + yield { + id: message.id!, + event: message.event!, + data: message.data, + }; + } + } finally { + client.close(); + } } async fetch(path: keyof K, init?: RequestInit & { searchParams?: URLSearchParams }) { diff --git a/yarn.lock b/yarn.lock index ddb46d2d..00bf5b5a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5,7 +5,7 @@ __metadata: version: 8 cacheKey: 10c0 -"@ai-zen/node-fetch-event-source@npm:^2.1.2, @ai-zen/node-fetch-event-source@npm:^2.1.4": +"@ai-zen/node-fetch-event-source@npm:^2.1.2": version: 2.1.4 resolution: "@ai-zen/node-fetch-event-source@npm:2.1.4" dependencies: @@ -2741,7 +2741,6 @@ __metadata: version: 0.0.0-use.local resolution: "bee-agent-framework@workspace:." dependencies: - "@ai-zen/node-fetch-event-source": "npm:^2.1.4" "@commitlint/cli": "npm:^19.4.1" "@commitlint/config-conventional": "npm:^19.4.1" "@connectrpc/connect": "npm:^1.4.0" @@ -2778,6 +2777,7 @@ __metadata: eslint: "npm:^9.9.0" eslint-config-prettier: "npm:^9.1.0" eslint-plugin-unused-imports: "npm:^4.1.3" + eventsource-client: "npm:^1.1.2" fast-xml-parser: "npm:^4.4.1" glob: "npm:^11.0.0" groq-sdk: "npm:^0.7.0" @@ -4513,6 +4513,22 @@ __metadata: languageName: node linkType: hard +"eventsource-client@npm:^1.1.2": + version: 1.1.2 + resolution: "eventsource-client@npm:1.1.2" + dependencies: + eventsource-parser: "npm:^1.1.2" + checksum: 10c0/390567748f089e4717ae522d443af07dccdabaff8baacd7aa721d1798c464c3a7b4d5042c97e6dab0ccfbe2cd7c8cde2cad4fda087bfd952a9c5373f794472d4 + languageName: node + linkType: hard + +"eventsource-parser@npm:^1.1.2": + version: 1.1.2 + resolution: "eventsource-parser@npm:1.1.2" + checksum: 10c0/b38948bc81ae6c2a8b9c88383d4f8c2bfbaf23955827a9af68d39bc0550ae83cc400b197e814bea9aef6e0cdc9bae5afd95787418ee3d9ad01ffc4774cf1b84a + languageName: node + linkType: hard + "execa@npm:8.0.1, execa@npm:^8.0.1, execa@npm:~8.0.1": version: 8.0.1 resolution: "execa@npm:8.0.1"