Skip to content

Commit

Permalink
Merge pull request #2167 from langchain-ai/dqbd/sdk-js-timeout
Browse files Browse the repository at this point in the history
feat(sdk-js): implement abort signal timeout, make default timeout for runs 5 minutes
  • Loading branch information
dqbd authored Oct 25, 2024
2 parents 582fb11 + 7a348ac commit d266ddb
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
31 changes: 29 additions & 2 deletions libs/sdk-js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
CronsCreatePayload,
OnConflictBehavior,
} from "./types.js";
import { mergeSignals } from "./utils/signals.js";

interface ClientConfig {
apiUrl?: string;
Expand All @@ -56,6 +57,9 @@ class BaseClient {
});

this.timeoutMs = config?.timeoutMs || 12_000;

// default limit being capped by Chrome
// https://github.com/nodejs/undici/issues/1373
this.apiUrl = config?.apiUrl || "http://localhost:8123";
this.defaultHeaders = config?.defaultHeaders || {};
if (config?.apiKey != null) {
Expand All @@ -68,6 +72,7 @@ class BaseClient {
options?: RequestInit & {
json?: unknown;
params?: Record<string, unknown>;
timeoutMs?: number | null;
},
): [url: URL, init: RequestInit] {
const mutatedOptions = {
Expand All @@ -84,6 +89,16 @@ class BaseClient {
delete mutatedOptions.json;
}

let timeoutSignal: AbortSignal | null = null;
if (typeof options?.timeoutMs !== "undefined") {
if (options.timeoutMs != null) {
timeoutSignal = AbortSignal.timeout(options.timeoutMs);
}
} else {
timeoutSignal = AbortSignal.timeout(this.timeoutMs);
}

mutatedOptions.signal = mergeSignals(timeoutSignal, mutatedOptions.signal);
const targetUrl = new URL(`${this.apiUrl}${path}`);

if (mutatedOptions.params) {
Expand All @@ -108,6 +123,8 @@ class BaseClient {
options?: RequestInit & {
json?: unknown;
params?: Record<string, unknown>;
timeoutMs?: number | null;
signal?: AbortSignal;
},
): Promise<T> {
const response = await this.asyncCaller.fetch(
Expand Down Expand Up @@ -689,6 +706,7 @@ export class RunsClient extends BaseClient {
...this.prepareFetchOptions(endpoint, {
method: "POST",
json,
timeoutMs: null,
signal: payload?.signal,
}),
);
Expand Down Expand Up @@ -837,6 +855,7 @@ export class RunsClient extends BaseClient {
return this.fetch<ThreadState["values"]>(endpoint, {
method: "POST",
json,
timeoutMs: null,
signal: payload?.signal,
});
}
Expand Down Expand Up @@ -911,8 +930,15 @@ export class RunsClient extends BaseClient {
* @param runId The ID of the run.
* @returns
*/
async join(threadId: string, runId: string): Promise<void> {
return this.fetch<void>(`/threads/${threadId}/runs/${runId}/join`);
async join(
threadId: string,
runId: string,
options?: { signal?: AbortSignal },
): Promise<void> {
return this.fetch<void>(`/threads/${threadId}/runs/${runId}/join`, {
timeoutMs: null,
signal: options?.signal,
});
}

/**
Expand All @@ -933,6 +959,7 @@ export class RunsClient extends BaseClient {
const response = await this.asyncCaller.fetch(
...this.prepareFetchOptions(`/threads/${threadId}/runs/${runId}/stream`, {
method: "GET",
timeoutMs: null,
signal,
}),
);
Expand Down
22 changes: 22 additions & 0 deletions libs/sdk-js/src/utils/signals.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export function mergeSignals(...signals: (AbortSignal | null | undefined)[]) {
const nonZeroSignals = signals.filter(
(signal): signal is AbortSignal => signal != null,
);

if (nonZeroSignals.length === 0) return undefined;
if (nonZeroSignals.length === 1) return nonZeroSignals[0];

const controller = new AbortController();
for (const signal of signals) {
if (signal?.aborted) {
controller.abort(signal.reason);
return controller.signal;
}

signal?.addEventListener("abort", () => controller.abort(signal.reason), {
once: true,
});
}

return controller.signal;
}

0 comments on commit d266ddb

Please sign in to comment.