From f6cb399a219ecec388f7c62d2aa54d30511ba78f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 10 Jan 2025 19:38:16 +0000 Subject: [PATCH] Handle errors thrown by requests in Realtime react hooks --- .changeset/many-panthers-relax.md | 5 +++ .changeset/silent-dragons-chew.md | 6 +++ packages/core/src/v3/apiClient/index.ts | 13 ++++-- packages/core/src/v3/apiClient/runStream.ts | 4 ++ packages/core/src/v3/apiClient/stream.ts | 4 ++ packages/react-hooks/src/hooks/useRealtime.ts | 40 ++++++++++++++++++- references/nextjs-realtime/src/app/actions.ts | 15 +++++-- .../src/app/runs/[id]/ClientRunDetails.tsx | 2 +- 8 files changed, 80 insertions(+), 9 deletions(-) create mode 100644 .changeset/many-panthers-relax.md create mode 100644 .changeset/silent-dragons-chew.md diff --git a/.changeset/many-panthers-relax.md b/.changeset/many-panthers-relax.md new file mode 100644 index 0000000000..7ec1450aff --- /dev/null +++ b/.changeset/many-panthers-relax.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/react-hooks": patch +--- + +Make sure useRealtimeRun onComplete hook fires at the correct time diff --git a/.changeset/silent-dragons-chew.md b/.changeset/silent-dragons-chew.md new file mode 100644 index 0000000000..2593c11cb3 --- /dev/null +++ b/.changeset/silent-dragons-chew.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/react-hooks": patch +"@trigger.dev/core": patch +--- + +Handle errors thrown by requests in Realtime react hooks diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 53892c36fd..4dfd236235 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -640,7 +640,11 @@ export class ApiClient { subscribeToRun( runId: string, - options?: { signal?: AbortSignal; closeOnComplete?: boolean } + options?: { + signal?: AbortSignal; + closeOnComplete?: boolean; + onFetchError?: (error: Error) => void; + } ) { return runShapeStream(`${this.baseUrl}/realtime/v1/runs/${runId}`, { closeOnComplete: @@ -648,12 +652,13 @@ export class ApiClient { headers: this.#getRealtimeHeaders(), client: this, signal: options?.signal, + onFetchError: options?.onFetchError, }); } subscribeToRunsWithTag( tag: string | string[], - options?: { signal?: AbortSignal } + options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void } ) { const searchParams = createSearchQueryForSubscribeToRuns({ tags: tag, @@ -666,19 +671,21 @@ export class ApiClient { headers: this.#getRealtimeHeaders(), client: this, signal: options?.signal, + onFetchError: options?.onFetchError, } ); } subscribeToBatch( batchId: string, - options?: { signal?: AbortSignal } + options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void } ) { return runShapeStream(`${this.baseUrl}/realtime/v1/batches/${batchId}`, { closeOnComplete: false, headers: this.#getRealtimeHeaders(), client: this, signal: options?.signal, + onFetchError: options?.onFetchError, }); } diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index f8c56b746d..15de844759 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -66,6 +66,7 @@ export type RunShapeStreamOptions = { closeOnComplete?: boolean; signal?: AbortSignal; client?: ApiClient; + onFetchError?: (e: Error) => void; }; export type StreamPartResult> = { @@ -111,6 +112,9 @@ export function runShapeStream( const runStreamInstance = zodShapeStream(SubscribeRunRawShape, url, { ...options, signal: abortController.signal, + onError: (e) => { + options?.onFetchError?.(e); + }, }); const $options: RunSubscriptionOptions = { diff --git a/packages/core/src/v3/apiClient/stream.ts b/packages/core/src/v3/apiClient/stream.ts index d9124cb380..fd027ab665 100644 --- a/packages/core/src/v3/apiClient/stream.ts +++ b/packages/core/src/v3/apiClient/stream.ts @@ -14,6 +14,7 @@ export type ZodShapeStreamOptions = { headers?: Record; fetchClient?: typeof fetch; signal?: AbortSignal; + onError?: (e: Error) => void; }; export type ZodShapeStreamInstance = { @@ -44,6 +45,9 @@ export function zodShapeStream( }, fetchClient: options?.fetchClient, signal: abortController.signal, + onError: (e) => { + options?.onError?.(e); + }, }); const readableShape = new ReadableShapeStream(shapeStream); diff --git a/packages/react-hooks/src/hooks/useRealtime.ts b/packages/react-hooks/src/hooks/useRealtime.ts index f018df6ecc..670285faca 100644 --- a/packages/react-hooks/src/hooks/useRealtime.ts +++ b/packages/react-hooks/src/hooks/useRealtime.ts @@ -103,6 +103,7 @@ export function useRealtimeRun( runId, apiClient, mutateRun, + setError, abortControllerRef, typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true ); @@ -150,6 +151,12 @@ export function useRealtimeRun( }; }, [runId, stop, options?.enabled]); + useEffect(() => { + if (run?.finishedAt) { + setIsComplete(true); + } + }, [run]); + return { run, error, stop }; } @@ -258,6 +265,7 @@ export function useRealtimeRunWithStreams< mutateRun, mutateStreams, streamsRef, + setError, abortControllerRef, typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true, options?.experimental_throttleInMs @@ -306,6 +314,12 @@ export function useRealtimeRunWithStreams< }; }, [runId, stop, options?.enabled]); + useEffect(() => { + if (run?.finishedAt) { + setIsComplete(true); + } + }, [run]); + return { run, streams: streams ?? initialStreamsFallback, error, stop }; } @@ -380,7 +394,14 @@ export function useRealtimeRunsWithTag( const abortController = new AbortController(); abortControllerRef.current = abortController; - await processRealtimeRunsWithTag(tag, apiClient, mutateRuns, runsRef, abortControllerRef); + await processRealtimeRunsWithTag( + tag, + apiClient, + mutateRuns, + runsRef, + setError, + abortControllerRef + ); } catch (err) { // Ignore abort errors as they are expected. if ((err as any).name === "AbortError") { @@ -470,7 +491,14 @@ export function useRealtimeBatch( const abortController = new AbortController(); abortControllerRef.current = abortController; - await processRealtimeBatch(batchId, apiClient, mutateRuns, runsRef, abortControllerRef); + await processRealtimeBatch( + batchId, + apiClient, + mutateRuns, + runsRef, + setError, + abortControllerRef + ); } catch (err) { // Ignore abort errors as they are expected. if ((err as any).name === "AbortError") { @@ -506,10 +534,12 @@ async function processRealtimeBatch( apiClient: ApiClient, mutateRunsData: KeyedMutator[]>, existingRunsRef: React.MutableRefObject[]>, + onError: (e: Error) => void, abortControllerRef: React.MutableRefObject ) { const subscription = apiClient.subscribeToBatch>(batchId, { signal: abortControllerRef.current?.signal, + onFetchError: onError, }); for await (const part of subscription) { @@ -541,10 +571,12 @@ async function processRealtimeRunsWithTag( apiClient: ApiClient, mutateRunsData: KeyedMutator[]>, existingRunsRef: React.MutableRefObject[]>, + onError: (e: Error) => void, abortControllerRef: React.MutableRefObject ) { const subscription = apiClient.subscribeToRunsWithTag>(tag, { signal: abortControllerRef.current?.signal, + onFetchError: onError, }); for await (const part of subscription) { @@ -582,6 +614,7 @@ async function processRealtimeRunWithStreams< mutateRunData: KeyedMutator>, mutateStreamData: KeyedMutator>, existingDataRef: React.MutableRefObject>, + onError: (e: Error) => void, abortControllerRef: React.MutableRefObject, stopOnCompletion: boolean = true, throttleInMs?: number @@ -589,6 +622,7 @@ async function processRealtimeRunWithStreams< const subscription = apiClient.subscribeToRun>(runId, { signal: abortControllerRef.current?.signal, closeOnComplete: stopOnCompletion, + onFetchError: onError, }); type StreamUpdate = { @@ -637,12 +671,14 @@ async function processRealtimeRun( runId: string, apiClient: ApiClient, mutateRunData: KeyedMutator>, + onError: (e: Error) => void, abortControllerRef: React.MutableRefObject, stopOnCompletion: boolean = true ) { const subscription = apiClient.subscribeToRun>(runId, { signal: abortControllerRef.current?.signal, closeOnComplete: stopOnCompletion, + onFetchError: onError, }); for await (const part of subscription) { diff --git a/references/nextjs-realtime/src/app/actions.ts b/references/nextjs-realtime/src/app/actions.ts index 1411ae8e30..d129fdf6ea 100644 --- a/references/nextjs-realtime/src/app/actions.ts +++ b/references/nextjs-realtime/src/app/actions.ts @@ -1,7 +1,7 @@ "use server"; import type { exampleTask } from "@/trigger/example"; -import { tasks } from "@trigger.dev/sdk/v3"; +import { auth, tasks } from "@trigger.dev/sdk/v3"; import { cookies } from "next/headers"; import { redirect } from "next/navigation"; import { randomUUID } from "node:crypto"; @@ -11,10 +11,19 @@ export async function triggerExampleTask() { id: randomUUID(), }); - console.log("Setting the run JWT in a cookie", handle.publicAccessToken); + const publicToken = await auth.createPublicToken({ + scopes: { + read: { + runs: [handle.id], + }, + }, + expirationTime: "2s", + }); + + console.log("Setting the run JWT in a cookie", publicToken); // Set JWT in a secure, HTTP-only cookie - cookies().set("run_token", handle.publicAccessToken); + cookies().set("run_token", publicToken); // Redirect to the details page redirect(`/runs/${handle.id}`); diff --git a/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx b/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx index cb729679b2..83edc5a6f9 100644 --- a/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx +++ b/references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx @@ -32,7 +32,7 @@ function RunDetailsWrapper({ }, }); - if (error) { + if (error && !run) { return (