From 9b66ff6902802e2be195a82a7829e7d5841363cf Mon Sep 17 00:00:00 2001 From: Daniel Rochetti Date: Thu, 11 Jul 2024 08:33:43 -0700 Subject: [PATCH] feat(client): enable both polling and streaming for queue status --- .../demo-nextjs-app-router/app/queue/page.tsx | 2 + libs/client/package.json | 2 +- libs/client/src/function.ts | 93 +++++++++++++++---- 3 files changed, 76 insertions(+), 21 deletions(-) diff --git a/apps/demo-nextjs-app-router/app/queue/page.tsx b/apps/demo-nextjs-app-router/app/queue/page.tsx index b8edc23..e663a3b 100644 --- a/apps/demo-nextjs-app-router/app/queue/page.tsx +++ b/apps/demo-nextjs-app-router/app/queue/page.tsx @@ -52,6 +52,8 @@ export default function Home() { const result: any = await fal.subscribe(endpointId, { input: JSON.parse(input), logs: true, + mode: 'streaming', + // pollInterval: 1000, onQueueUpdate(update) { console.log('queue update'); console.log(update); diff --git a/libs/client/package.json b/libs/client/package.json index 01ac312..77e0dbd 100644 --- a/libs/client/package.json +++ b/libs/client/package.json @@ -1,7 +1,7 @@ { "name": "@fal-ai/serverless-client", "description": "The fal serverless JS/TS client", - "version": "0.12.0", + "version": "0.13.0-alpha.0", "license": "MIT", "repository": { "type": "git", diff --git a/libs/client/src/function.ts b/libs/client/src/function.ts index c311307..019ad44 100644 --- a/libs/client/src/function.ts +++ b/libs/client/src/function.ts @@ -125,6 +125,8 @@ export async function run( return send(id, options); } +const DEFAULT_POLL_INTERVAL = 500; + /** * Subscribes to updates for a specific request in the queue. * @@ -140,22 +142,59 @@ export async function subscribe( if (options.onEnqueue) { options.onEnqueue(requestId); } - const status = await queue.streamStatus(id, { - requestId, - logs: options.logs, - }); - const logs: RequestLog[] = []; - status.on('message', (data: QueueStatus) => { - if (options.onQueueUpdate) { - // accumulate logs to match previous polling behavior - if ('logs' in data && Array.isArray(data.logs) && data.logs.length > 0) { - logs.push(...data.logs); + if (options.mode === 'streaming') { + const status = await queue.streamStatus(id, { + requestId, + logs: options.logs, + }); + const logs: RequestLog[] = []; + status.on('message', (data: QueueStatus) => { + if (options.onQueueUpdate) { + // accumulate logs to match previous polling behavior + if ( + 'logs' in data && + Array.isArray(data.logs) && + data.logs.length > 0 + ) { + logs.push(...data.logs); + } + options.onQueueUpdate('logs' in data ? { ...data, logs } : data); } - options.onQueueUpdate('logs' in data ? { ...data, logs } : data); - } + }); + await status.done(); + return queue.result(id, { requestId }); + } + // default to polling until status streaming is stable and faster + return new Promise((resolve, reject) => { + let timeoutId: ReturnType; + const pollInterval = options.pollInterval ?? DEFAULT_POLL_INTERVAL; + const poll = async () => { + try { + const requestStatus = await queue.status(id, { + requestId, + logs: options.logs ?? false, + }); + if (options.onQueueUpdate) { + options.onQueueUpdate(requestStatus); + } + if (requestStatus.status === 'COMPLETED') { + clearTimeout(timeoutId); + try { + const result = await queue.result(id, { requestId }); + resolve(result); + } catch (error) { + reject(error); + } + return; + } + timeoutId = setTimeout(poll, pollInterval); + } catch (error) { + clearTimeout(timeoutId); + reject(error); + } + }; + poll().catch(reject); }); - await status.done(); - return queue.result(id, { requestId }); } /** @@ -163,13 +202,15 @@ export async function subscribe( */ type QueueSubscribeOptions = { /** - * The interval (in milliseconds) at which to poll for updates. - * If not provided, a default value of `1000` will be used. + * The mode to use for subscribing to updates. It defaults to `polling`. + * You can also use client-side streaming by setting it to `streaming`. + * + * **Note:** Streaming is currently experimental and once stable, it will + * be the default mode. * - * @deprecated starting from v0.12.0 the queue status is streamed - * using the `queue.subscribeToStatus` method. + * @see pollInterval */ - pollInterval?: number; + mode?: 'polling' | 'streaming'; /** * Callback function that is called when a request is enqueued. @@ -194,7 +235,19 @@ type QueueSubscribeOptions = { * @see WebHookResponse */ webhookUrl?: string; -}; +} & ( + | { + mode?: 'polling'; + /** + * The interval (in milliseconds) at which to poll for updates. + * If not provided, a default value of `500` will be used. + */ + pollInterval?: number; + } + | { + mode: 'streaming'; + } +); /** * Options for submitting a request to the queue.