From ccfeafa975a7adf687e4657489791683c4b08254 Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Thu, 24 Oct 2024 09:38:42 +0200 Subject: [PATCH 1/2] feat(sdk-js): implement abort signal timeout, make default timeout for runs 5 minutes --- libs/sdk-js/src/client.ts | 29 +++++++++++++++++++++++++++-- libs/sdk-js/src/utils/signals.ts | 22 ++++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 libs/sdk-js/src/utils/signals.ts diff --git a/libs/sdk-js/src/client.ts b/libs/sdk-js/src/client.ts index c17e15ed5..9922720d5 100644 --- a/libs/sdk-js/src/client.ts +++ b/libs/sdk-js/src/client.ts @@ -30,6 +30,7 @@ import { CronsCreatePayload, OnConflictBehavior, } from "./types.js"; +import { mergeSignals } from "./utils/signals.js"; interface ClientConfig { apiUrl?: string; @@ -44,6 +45,8 @@ class BaseClient { protected timeoutMs: number; + protected runTimeoutMs: number; + protected apiUrl: string; protected defaultHeaders: Record; @@ -56,6 +59,10 @@ class BaseClient { }); this.timeoutMs = config?.timeoutMs || 12_000; + + // default limit being capped by Chrome + // https://github.com/nodejs/undici/issues/1373 + this.runTimeoutMs = config?.timeoutMs || 300_000; this.apiUrl = config?.apiUrl || "http://localhost:8123"; this.defaultHeaders = config?.defaultHeaders || {}; if (config?.apiKey != null) { @@ -68,6 +75,7 @@ class BaseClient { options?: RequestInit & { json?: unknown; params?: Record; + timeoutMs?: number; }, ): [url: URL, init: RequestInit] { const mutatedOptions = { @@ -84,6 +92,10 @@ class BaseClient { delete mutatedOptions.json; } + mutatedOptions.signal = mergeSignals( + AbortSignal.timeout(options?.timeoutMs ?? this.timeoutMs), + mutatedOptions.signal, + ); const targetUrl = new URL(`${this.apiUrl}${path}`); if (mutatedOptions.params) { @@ -108,6 +120,8 @@ class BaseClient { options?: RequestInit & { json?: unknown; params?: Record; + timeoutMs?: number; + signal?: AbortSignal; }, ): Promise { const response = await this.asyncCaller.fetch( @@ -689,6 +703,7 @@ export class RunsClient extends BaseClient { ...this.prepareFetchOptions(endpoint, { method: "POST", json, + timeoutMs: this.runTimeoutMs, signal: payload?.signal, }), ); @@ -765,6 +780,7 @@ export class RunsClient extends BaseClient { return this.fetch(`/threads/${threadId}/runs`, { method: "POST", json, + timeoutMs: this.runTimeoutMs, signal: payload?.signal, }); } @@ -837,6 +853,7 @@ export class RunsClient extends BaseClient { return this.fetch(endpoint, { method: "POST", json, + timeoutMs: this.runTimeoutMs, signal: payload?.signal, }); } @@ -911,8 +928,15 @@ export class RunsClient extends BaseClient { * @param runId The ID of the run. * @returns */ - async join(threadId: string, runId: string): Promise { - return this.fetch(`/threads/${threadId}/runs/${runId}/join`); + async join( + threadId: string, + runId: string, + options?: { signal?: AbortSignal }, + ): Promise { + return this.fetch(`/threads/${threadId}/runs/${runId}/join`, { + timeoutMs: this.runTimeoutMs, + signal: options?.signal, + }); } /** @@ -933,6 +957,7 @@ export class RunsClient extends BaseClient { const response = await this.asyncCaller.fetch( ...this.prepareFetchOptions(`/threads/${threadId}/runs/${runId}/stream`, { method: "GET", + timeoutMs: this.runTimeoutMs, signal, }), ); diff --git a/libs/sdk-js/src/utils/signals.ts b/libs/sdk-js/src/utils/signals.ts new file mode 100644 index 000000000..915753325 --- /dev/null +++ b/libs/sdk-js/src/utils/signals.ts @@ -0,0 +1,22 @@ +export function mergeSignals(...signals: (AbortSignal | null | undefined)[]) { + const nonZeroSignals = signals.filter( + (signal): signal is AbortSignal => signal != null, + ); + + if (nonZeroSignals.length === 0) return undefined; + if (nonZeroSignals.length === 1) return nonZeroSignals[0]; + + const controller = new AbortController(); + for (const signal of signals) { + if (signal?.aborted) { + controller.abort(signal.reason); + return controller.signal; + } + + signal?.addEventListener("abort", () => controller.abort(signal.reason), { + once: true, + }); + } + + return controller.signal; +} From 7a348ac19cd57a4d2c79351d8208148c00847a34 Mon Sep 17 00:00:00 2001 From: Tat Dat Duong Date: Fri, 25 Oct 2024 15:34:36 +0200 Subject: [PATCH 2/2] Don't enforce timeouts for run stream / block endpoints --- libs/sdk-js/src/client.ts | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/libs/sdk-js/src/client.ts b/libs/sdk-js/src/client.ts index 9922720d5..5ae764be3 100644 --- a/libs/sdk-js/src/client.ts +++ b/libs/sdk-js/src/client.ts @@ -45,8 +45,6 @@ class BaseClient { protected timeoutMs: number; - protected runTimeoutMs: number; - protected apiUrl: string; protected defaultHeaders: Record; @@ -62,7 +60,6 @@ class BaseClient { // default limit being capped by Chrome // https://github.com/nodejs/undici/issues/1373 - this.runTimeoutMs = config?.timeoutMs || 300_000; this.apiUrl = config?.apiUrl || "http://localhost:8123"; this.defaultHeaders = config?.defaultHeaders || {}; if (config?.apiKey != null) { @@ -75,7 +72,7 @@ class BaseClient { options?: RequestInit & { json?: unknown; params?: Record; - timeoutMs?: number; + timeoutMs?: number | null; }, ): [url: URL, init: RequestInit] { const mutatedOptions = { @@ -92,10 +89,16 @@ class BaseClient { delete mutatedOptions.json; } - mutatedOptions.signal = mergeSignals( - AbortSignal.timeout(options?.timeoutMs ?? this.timeoutMs), - mutatedOptions.signal, - ); + let timeoutSignal: AbortSignal | null = null; + if (typeof options?.timeoutMs !== "undefined") { + if (options.timeoutMs != null) { + timeoutSignal = AbortSignal.timeout(options.timeoutMs); + } + } else { + timeoutSignal = AbortSignal.timeout(this.timeoutMs); + } + + mutatedOptions.signal = mergeSignals(timeoutSignal, mutatedOptions.signal); const targetUrl = new URL(`${this.apiUrl}${path}`); if (mutatedOptions.params) { @@ -120,7 +123,7 @@ class BaseClient { options?: RequestInit & { json?: unknown; params?: Record; - timeoutMs?: number; + timeoutMs?: number | null; signal?: AbortSignal; }, ): Promise { @@ -703,7 +706,7 @@ export class RunsClient extends BaseClient { ...this.prepareFetchOptions(endpoint, { method: "POST", json, - timeoutMs: this.runTimeoutMs, + timeoutMs: null, signal: payload?.signal, }), ); @@ -780,7 +783,6 @@ export class RunsClient extends BaseClient { return this.fetch(`/threads/${threadId}/runs`, { method: "POST", json, - timeoutMs: this.runTimeoutMs, signal: payload?.signal, }); } @@ -853,7 +855,7 @@ export class RunsClient extends BaseClient { return this.fetch(endpoint, { method: "POST", json, - timeoutMs: this.runTimeoutMs, + timeoutMs: null, signal: payload?.signal, }); } @@ -934,7 +936,7 @@ export class RunsClient extends BaseClient { options?: { signal?: AbortSignal }, ): Promise { return this.fetch(`/threads/${threadId}/runs/${runId}/join`, { - timeoutMs: this.runTimeoutMs, + timeoutMs: null, signal: options?.signal, }); } @@ -957,7 +959,7 @@ export class RunsClient extends BaseClient { const response = await this.asyncCaller.fetch( ...this.prepareFetchOptions(`/threads/${threadId}/runs/${runId}/stream`, { method: "GET", - timeoutMs: this.runTimeoutMs, + timeoutMs: null, signal, }), );