Skip to content

Handle errors thrown by requests in Realtime react hooks #1599

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/many-panthers-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/react-hooks": patch
---

Make sure useRealtimeRun onComplete hook fires at the correct time
6 changes: 6 additions & 0 deletions .changeset/silent-dragons-chew.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/react-hooks": patch
"@trigger.dev/core": patch
---

Handle errors thrown by requests in Realtime react hooks
13 changes: 10 additions & 3 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,20 +640,25 @@ export class ApiClient {

subscribeToRun<TRunTypes extends AnyRunTypes>(
runId: string,
options?: { signal?: AbortSignal; closeOnComplete?: boolean }
options?: {
signal?: AbortSignal;
closeOnComplete?: boolean;
onFetchError?: (error: Error) => void;
}
) {
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/runs/${runId}`, {
closeOnComplete:
typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : true,
headers: this.#getRealtimeHeaders(),
client: this,
signal: options?.signal,
onFetchError: options?.onFetchError,
});
}

subscribeToRunsWithTag<TRunTypes extends AnyRunTypes>(
tag: string | string[],
options?: { signal?: AbortSignal }
options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void }
) {
const searchParams = createSearchQueryForSubscribeToRuns({
tags: tag,
Expand All @@ -666,19 +671,21 @@ export class ApiClient {
headers: this.#getRealtimeHeaders(),
client: this,
signal: options?.signal,
onFetchError: options?.onFetchError,
}
);
}

subscribeToBatch<TRunTypes extends AnyRunTypes>(
batchId: string,
options?: { signal?: AbortSignal }
options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void }
) {
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/batches/${batchId}`, {
closeOnComplete: false,
headers: this.#getRealtimeHeaders(),
client: this,
signal: options?.signal,
onFetchError: options?.onFetchError,
});
}

Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/v3/apiClient/runStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export type RunShapeStreamOptions = {
closeOnComplete?: boolean;
signal?: AbortSignal;
client?: ApiClient;
onFetchError?: (e: Error) => void;
};

export type StreamPartResult<TRun, TStreams extends Record<string, any>> = {
Expand Down Expand Up @@ -111,6 +112,9 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
const runStreamInstance = zodShapeStream(SubscribeRunRawShape, url, {
...options,
signal: abortController.signal,
onError: (e) => {
options?.onFetchError?.(e);
},
});

const $options: RunSubscriptionOptions = {
Expand Down
4 changes: 4 additions & 0 deletions packages/core/src/v3/apiClient/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export type ZodShapeStreamOptions = {
headers?: Record<string, string>;
fetchClient?: typeof fetch;
signal?: AbortSignal;
onError?: (e: Error) => void;
};

export type ZodShapeStreamInstance<TShapeSchema extends z.ZodTypeAny> = {
Expand Down Expand Up @@ -44,6 +45,9 @@ export function zodShapeStream<TShapeSchema extends z.ZodTypeAny>(
},
fetchClient: options?.fetchClient,
signal: abortController.signal,
onError: (e) => {
options?.onError?.(e);
},
});

const readableShape = new ReadableShapeStream(shapeStream);
Expand Down
40 changes: 38 additions & 2 deletions packages/react-hooks/src/hooks/useRealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export function useRealtimeRun<TTask extends AnyTask>(
runId,
apiClient,
mutateRun,
setError,
abortControllerRef,
typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true
);
Expand Down Expand Up @@ -150,6 +151,12 @@ export function useRealtimeRun<TTask extends AnyTask>(
};
}, [runId, stop, options?.enabled]);

useEffect(() => {
if (run?.finishedAt) {
setIsComplete(true);
}
}, [run]);

return { run, error, stop };
}

Expand Down Expand Up @@ -258,6 +265,7 @@ export function useRealtimeRunWithStreams<
mutateRun,
mutateStreams,
streamsRef,
setError,
abortControllerRef,
typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true,
options?.experimental_throttleInMs
Expand Down Expand Up @@ -306,6 +314,12 @@ export function useRealtimeRunWithStreams<
};
}, [runId, stop, options?.enabled]);

useEffect(() => {
if (run?.finishedAt) {
setIsComplete(true);
}
}, [run]);

return { run, streams: streams ?? initialStreamsFallback, error, stop };
}

Expand Down Expand Up @@ -380,7 +394,14 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>(
const abortController = new AbortController();
abortControllerRef.current = abortController;

await processRealtimeRunsWithTag(tag, apiClient, mutateRuns, runsRef, abortControllerRef);
await processRealtimeRunsWithTag(
tag,
apiClient,
mutateRuns,
runsRef,
setError,
abortControllerRef
);
} catch (err) {
// Ignore abort errors as they are expected.
if ((err as any).name === "AbortError") {
Expand Down Expand Up @@ -470,7 +491,14 @@ export function useRealtimeBatch<TTask extends AnyTask>(
const abortController = new AbortController();
abortControllerRef.current = abortController;

await processRealtimeBatch(batchId, apiClient, mutateRuns, runsRef, abortControllerRef);
await processRealtimeBatch(
batchId,
apiClient,
mutateRuns,
runsRef,
setError,
abortControllerRef
);
} catch (err) {
// Ignore abort errors as they are expected.
if ((err as any).name === "AbortError") {
Expand Down Expand Up @@ -506,10 +534,12 @@ async function processRealtimeBatch<TTask extends AnyTask = AnyTask>(
apiClient: ApiClient,
mutateRunsData: KeyedMutator<RealtimeRun<TTask>[]>,
existingRunsRef: React.MutableRefObject<RealtimeRun<TTask>[]>,
onError: (e: Error) => void,
abortControllerRef: React.MutableRefObject<AbortController | null>
) {
const subscription = apiClient.subscribeToBatch<InferRunTypes<TTask>>(batchId, {
signal: abortControllerRef.current?.signal,
onFetchError: onError,
});

for await (const part of subscription) {
Expand Down Expand Up @@ -541,10 +571,12 @@ async function processRealtimeRunsWithTag<TTask extends AnyTask = AnyTask>(
apiClient: ApiClient,
mutateRunsData: KeyedMutator<RealtimeRun<TTask>[]>,
existingRunsRef: React.MutableRefObject<RealtimeRun<TTask>[]>,
onError: (e: Error) => void,
abortControllerRef: React.MutableRefObject<AbortController | null>
) {
const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(tag, {
signal: abortControllerRef.current?.signal,
onFetchError: onError,
});

for await (const part of subscription) {
Expand Down Expand Up @@ -582,13 +614,15 @@ async function processRealtimeRunWithStreams<
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
mutateStreamData: KeyedMutator<StreamResults<TStreams>>,
existingDataRef: React.MutableRefObject<StreamResults<TStreams>>,
onError: (e: Error) => void,
abortControllerRef: React.MutableRefObject<AbortController | null>,
stopOnCompletion: boolean = true,
throttleInMs?: number
) {
const subscription = apiClient.subscribeToRun<InferRunTypes<TTask>>(runId, {
signal: abortControllerRef.current?.signal,
closeOnComplete: stopOnCompletion,
onFetchError: onError,
});

type StreamUpdate = {
Expand Down Expand Up @@ -637,12 +671,14 @@ async function processRealtimeRun<TTask extends AnyTask = AnyTask>(
runId: string,
apiClient: ApiClient,
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
onError: (e: Error) => void,
abortControllerRef: React.MutableRefObject<AbortController | null>,
stopOnCompletion: boolean = true
) {
const subscription = apiClient.subscribeToRun<InferRunTypes<TTask>>(runId, {
signal: abortControllerRef.current?.signal,
closeOnComplete: stopOnCompletion,
onFetchError: onError,
});

for await (const part of subscription) {
Expand Down
15 changes: 12 additions & 3 deletions references/nextjs-realtime/src/app/actions.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"use server";

import type { exampleTask } from "@/trigger/example";
import { tasks } from "@trigger.dev/sdk/v3";
import { auth, tasks } from "@trigger.dev/sdk/v3";
import { cookies } from "next/headers";
import { redirect } from "next/navigation";
import { randomUUID } from "node:crypto";
Expand All @@ -11,10 +11,19 @@ export async function triggerExampleTask() {
id: randomUUID(),
});

console.log("Setting the run JWT in a cookie", handle.publicAccessToken);
const publicToken = await auth.createPublicToken({
scopes: {
read: {
runs: [handle.id],
},
},
expirationTime: "2s",
});

console.log("Setting the run JWT in a cookie", publicToken);

// Set JWT in a secure, HTTP-only cookie
cookies().set("run_token", handle.publicAccessToken);
cookies().set("run_token", publicToken);

// Redirect to the details page
redirect(`/runs/${handle.id}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ function RunDetailsWrapper({
},
});

if (error) {
if (error && !run) {
return (
<div className="w-full min-h-screen bg-gray-900 p-4">
<Card className="w-full bg-gray-800 shadow-md">
Expand Down
Loading