Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): subscribe to status #76

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/demo-nextjs-app-router/app/api/fal/proxy/route.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import { route } from '@fal-ai/serverless-proxy/nextjs';

export const { GET, POST } = route;
export const { GET, POST, PUT } = route;
2 changes: 1 addition & 1 deletion libs/client/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@fal-ai/serverless-client",
"description": "The fal serverless JS/TS client",
"version": "0.14.0-alpha.2",
"version": "0.14.0-alpha.3",
"license": "MIT",
"repository": {
"type": "git",
Expand Down
259 changes: 158 additions & 101 deletions libs/client/src/function.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import { getTemporaryAuthToken } from './auth';
import { dispatchRequest } from './request';
import { storageImpl } from './storage';
import { FalStream } from './streaming';
import { EnqueueResult, QueueStatus, RequestLog } from './types';
import {
CompletedQueueStatus,
EnqueueResult,
QueueStatus,
RequestLog,
} from './types';
import { ensureAppIdFormat, isUUIDv4, isValidUrl, parseAppId } from './utils';

/**
Expand Down Expand Up @@ -110,6 +115,9 @@ export async function send<Input, Output>(
);
}

export type QueueStatusSubscriptionOptions = QueueStatusOptions &
Omit<QueueSubscribeOptions, 'onEnqueue' | 'webhookUrl'>;

/**
* Runs a fal serverless function identified by its `id`.
*
Expand All @@ -123,93 +131,10 @@ export async function run<Input, Output>(
return send(id, options);
}

type TimeoutId = ReturnType<typeof setTimeout>;
type TimeoutId = ReturnType<typeof setTimeout> | undefined;

const DEFAULT_POLL_INTERVAL = 500;

/**
* Subscribes to updates for a specific request in the queue.
*
* @param id - The ID or URL of the function web endpoint.
* @param options - Options to configure how the request is run and how updates are received.
* @returns A promise that resolves to the result of the request once it's completed.
*/
export async function subscribe<Input, Output>(
id: string,
options: RunOptions<Input> & QueueSubscribeOptions = {}
): Promise<Output> {
const { request_id: requestId } = await queue.submit(id, options);
if (options.onEnqueue) {
options.onEnqueue(requestId);
}
const timeout = options.timeout;
let timeoutId: TimeoutId = undefined;
if (timeout) {
timeoutId = setTimeout(() => {
queue.cancel(id, { requestId }).catch(console.warn);
throw new Error(
`Client timed out waiting for the request to complete after ${timeout}ms`
);
}, timeout);
}
if (options.mode === 'streaming') {
const status = await queue.streamStatus(id, {
requestId,
logs: options.logs,
});
const logs: RequestLog[] = [];
status.on('message', (data: QueueStatus) => {
if (options.onQueueUpdate) {
// accumulate logs to match previous polling behavior
if (
'logs' in data &&
Array.isArray(data.logs) &&
data.logs.length > 0
) {
logs.push(...data.logs);
}
options.onQueueUpdate('logs' in data ? { ...data, logs } : data);
}
});
await status.done();
if (timeoutId) {
clearTimeout(timeoutId);
}
return queue.result<Output>(id, { requestId });
}
// default to polling until status streaming is stable and faster
return new Promise<Output>((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout>;
const pollInterval = options.pollInterval ?? DEFAULT_POLL_INTERVAL;
const poll = async () => {
try {
const requestStatus = await queue.status(id, {
requestId,
logs: options.logs ?? false,
});
if (options.onQueueUpdate) {
options.onQueueUpdate(requestStatus);
}
if (requestStatus.status === 'COMPLETED') {
clearTimeout(timeoutId);
try {
const result = await queue.result<Output>(id, { requestId });
resolve(result);
} catch (error) {
reject(error);
}
return;
}
timeoutId = setTimeout(poll, pollInterval);
} catch (error) {
clearTimeout(timeoutId);
reject(error);
}
};
poll().catch(reject);
});
}

/**
* Options for subscribing to the request queue.
*/
Expand Down Expand Up @@ -247,6 +172,10 @@ type QueueSubscribeOptions = {
* The timeout (in milliseconds) for the request. If the request is not
* completed within this time, the subscription will be cancelled.
*
* Keep in mind that although the client resolves the function on a timeout,
* and will try to cancel the request on the server, the server might not be
* able to cancel the request if it's already running.
*
* Note: currently, the timeout is not enforced and the default is `undefined`.
* This behavior might change in the future.
*/
Expand Down Expand Up @@ -326,35 +255,41 @@ interface Queue {
status(endpointId: string, options: QueueStatusOptions): Promise<QueueStatus>;

/**
* Retrieves the result of a specific request from the queue.
* Subscribes to updates for a specific request in the queue using HTTP streaming events.
*
* @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request is run.
* @returns A promise that resolves to the result of the request.
* @param options - Options to configure how the request is run and how updates are received.
* @returns The streaming object that can be used to listen for updates.
*/
result<Output>(
streamStatus(
endpointId: string,
options: BaseQueueOptions
): Promise<Output>;
options: QueueStatusOptions
): Promise<FalStream<unknown, QueueStatus>>;

/**
* @deprecated Use `fal.subscribe` instead.
* Subscribes to updates for a specific request in the queue using polling or streaming.
* See `options.mode` for more details.
*
* @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request is run and how updates are received.
* @returns A promise that resolves to the final status of the request.
*/
subscribe<Input, Output>(
subscribeToStatus(
endpointId: string,
options: RunOptions<Input> & QueueSubscribeOptions
): Promise<Output>;
options: QueueStatusSubscriptionOptions
): Promise<CompletedQueueStatus>;

/**
* Subscribes to updates for a specific request in the queue.
* Retrieves the result of a specific request from the queue.
*
* @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request is run and how updates are received.
* @param options - Options to configure how the request is run.
* @returns A promise that resolves to the result of the request.
*/
streamStatus(
result<Output>(
endpointId: string,
options: QueueStatusOptions
): Promise<FalStream<unknown, QueueStatus>>;
options: BaseQueueOptions
): Promise<Output>;

/**
* Cancels a request in the queue.
Expand Down Expand Up @@ -402,6 +337,7 @@ export const queue: Queue = {
},
});
},

async streamStatus(
endpointId: string,
{ requestId, logs = false }: QueueStatusOptions
Expand All @@ -424,6 +360,108 @@ export const queue: Queue = {
method: 'get',
});
},

async subscribeToStatus(endpointId, options): Promise<CompletedQueueStatus> {
const requestId = options.requestId;
const timeout = options.timeout;
let timeoutId: TimeoutId = undefined;

const handleCancelError = () => {
// Ignore errors as the client will follow through with the timeout
// regardless of the server response. In case cancelation fails, we
// still want to reject the promise and consider the client call canceled.
};
if (options.mode === 'streaming') {
const status = await queue.streamStatus(endpointId, {
requestId,
logs: options.logs,
});
const logs: RequestLog[] = [];
if (timeout) {
timeoutId = setTimeout(() => {
status.abort();
queue.cancel(endpointId, { requestId }).catch(handleCancelError);
// TODO this error cannot bubble up to the user since it's thrown in
// a closure in the global scope due to setTimeout behavior.
// User will get a platform error instead. We should find a way to
// make this behavior aligned with polling.
throw new Error(
`Client timed out waiting for the request to complete after ${timeout}ms`
);
}, timeout);
}
status.on('message', (data: QueueStatus) => {
if (options.onQueueUpdate) {
// accumulate logs to match previous polling behavior
if (
'logs' in data &&
Array.isArray(data.logs) &&
data.logs.length > 0
) {
logs.push(...data.logs);
}
options.onQueueUpdate('logs' in data ? { ...data, logs } : data);
}
});
const doneStatus = await status.done();
if (timeoutId) {
clearTimeout(timeoutId);
}
return doneStatus as CompletedQueueStatus;
}
// default to polling until status streaming is stable and faster
return new Promise<CompletedQueueStatus>((resolve, reject) => {
let pollingTimeoutId: TimeoutId;
// type resolution isn't great in this case, so check for its presence
// and and type so the typechecker behaves as expected
const pollInterval =
'pollInterval' in options && typeof options.pollInterval === 'number'
? options.pollInterval ?? DEFAULT_POLL_INTERVAL
: DEFAULT_POLL_INTERVAL;

const clearScheduledTasks = () => {
if (timeoutId) {
clearTimeout(timeoutId);
}
if (pollingTimeoutId) {
clearTimeout(pollingTimeoutId);
}
};
if (timeout) {
timeoutId = setTimeout(() => {
clearScheduledTasks();
queue.cancel(endpointId, { requestId }).catch(handleCancelError);
reject(
new Error(
`Client timed out waiting for the request to complete after ${timeout}ms`
)
);
}, timeout);
}
const poll = async () => {
try {
const requestStatus = await queue.status(endpointId, {
requestId,
logs: options.logs ?? false,
});
if (options.onQueueUpdate) {
options.onQueueUpdate(requestStatus);
}
if (requestStatus.status === 'COMPLETED') {
clearScheduledTasks();
resolve(requestStatus);
return;
}
pollingTimeoutId = setTimeout(poll, pollInterval);
} catch (error) {
clearScheduledTasks();
reject(error);
}
};
poll().catch(reject);
});
},

async result<Output>(
endpointId: string,
{ requestId }: BaseQueueOptions
Expand All @@ -436,6 +474,7 @@ export const queue: Queue = {
path: `/requests/${requestId}`,
});
},

async cancel(
endpointId: string,
{ requestId }: BaseQueueOptions
Expand All @@ -448,5 +487,23 @@ export const queue: Queue = {
path: `/requests/${requestId}/cancel`,
});
},
subscribe,
};

/**
* Subscribes to updates for a specific request in the queue.
*
* @param endpointId - The ID of the function web endpoint.
* @param options - Options to configure how the request is run and how updates are received.
* @returns A promise that resolves to the result of the request once it's completed.
*/
export async function subscribe<Input, Output>(
endpointId: string,
options: RunOptions<Input> & QueueSubscribeOptions = {}
): Promise<Output> {
const { request_id: requestId } = await queue.submit(endpointId, options);
if (options.onEnqueue) {
options.onEnqueue(requestId);
}
await queue.subscribeToStatus(endpointId, { requestId, ...options });
return queue.result(endpointId, { requestId });
}
10 changes: 10 additions & 0 deletions libs/client/src/streaming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

type FalStreamEventType = 'message' | 'error' | 'done';

type EventHandler = (event: any) => void;

Check warning on line 39 in libs/client/src/streaming.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

/**
* The class representing a streaming response. With t
Expand All @@ -56,6 +56,8 @@
private streamClosed = false;
private donePromise: Promise<Output>;

private abortController = new AbortController();

constructor(url: string, options: StreamOptions<Input>) {
this.url = url;
this.options = options;
Expand Down Expand Up @@ -93,6 +95,7 @@
'content-type': 'application/json',
},
body: input && method !== 'get' ? JSON.stringify(input) : undefined,
signal: this.abortController.signal,
});
this.handleResponse(response);
} catch (error) {
Expand Down Expand Up @@ -171,7 +174,7 @@
return;
};

private handleError = (error: any) => {

Check warning on line 177 in libs/client/src/streaming.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
const apiError =
error instanceof ApiError
? error
Expand All @@ -190,7 +193,7 @@
this.listeners.get(type)?.push(listener);
};

private emit = (type: FalStreamEventType, event: any) => {

Check warning on line 196 in libs/client/src/streaming.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
const listeners = this.listeners.get(type) || [];
for (const listener of listeners) {
listener(event);
Expand Down Expand Up @@ -225,6 +228,13 @@
* @returns the promise that resolves when the request is done.
*/
public done = async () => this.donePromise;

/**
* Aborts the streaming request.
*/
public abort = () => {
this.abortController.abort();
};
}

/**
Expand All @@ -236,7 +246,7 @@
* @param options the request options, including the input payload.
* @returns the `FalStream` instance.
*/
export async function stream<Input = Record<string, any>, Output = any>(

Check warning on line 249 in libs/client/src/streaming.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 249 in libs/client/src/streaming.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
appId: string,
options: StreamOptions<Input>
): Promise<FalStream<Input, Output>> {
Expand Down
Loading
Loading