diff --git a/apps/restate/src/audioPipeline.ts b/apps/restate/src/audioPipeline.ts index 9dd0c347ec..8a773cb7d2 100644 --- a/apps/restate/src/audioPipeline.ts +++ b/apps/restate/src/audioPipeline.ts @@ -3,11 +3,12 @@ import * as restate from "@restatedev/restate-sdk-cloudflare-workers"; import { serde } from "@restatedev/restate-sdk-zod"; import { z } from "zod"; +import { createSignedUrl, deleteFile } from "./supabase.js"; import { limiterForUser } from "./userRateLimiter.js"; const StartAudioPipeline = z.object({ userId: z.string(), - audioUrl: z.string(), + fileId: z.string(), }); export type StartAudioPipelineInput = z.infer; @@ -135,6 +136,20 @@ export const audioPipeline = restate.workflow({ req: StartAudioPipelineInput, ): Promise => { ctx.set("status", "QUEUED" as PipelineStatusType); + ctx.set("fileId", req.fileId); + + const env = ( + ctx as unknown as { + env?: { + RESTATE_INGRESS_URL?: string; + DEEPGRAM_API_KEY?: string; + LLM_API_URL?: string; + LLM_API_KEY?: string; + SUPABASE_URL?: string; + SUPABASE_SERVICE_ROLE_KEY?: string; + }; + } + ).env; try { const limiter = limiterForUser(ctx, req.userId); @@ -145,31 +160,36 @@ export const audioPipeline = restate.workflow({ ctx.set("status", "TRANSCRIBING" as PipelineStatusType); ctx.set("userId", req.userId); - ctx.set("audioUrl", req.audioUrl); - const ingressBase = ( - ctx as unknown as { env?: { RESTATE_INGRESS_URL?: string } } - ).env?.RESTATE_INGRESS_URL; + const ingressBase = env?.RESTATE_INGRESS_URL; if (!ingressBase) { throw new restate.TerminalError( "RESTATE_INGRESS_URL env var is required for callback URL", ); } - const deepgramApiKey = ( - ctx as unknown as { env?: { DEEPGRAM_API_KEY?: string } } - ).env?.DEEPGRAM_API_KEY; + const deepgramApiKey = env?.DEEPGRAM_API_KEY; if (!deepgramApiKey) { throw new restate.TerminalError( "DEEPGRAM_API_KEY env var is required", ); } + if (!env?.SUPABASE_URL || !env?.SUPABASE_SERVICE_ROLE_KEY) { + throw new restate.TerminalError( + "SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY env vars are required", + ); + } + + const audioUrl = await ctx.run("create-signed-url", () => + createSignedUrl(env, req.fileId, 3600), + ); + const pipelineId = ctx.key; const callbackUrl = `${ingressBase.replace(/\/+$/, "")}/AudioPipeline/${encodeURIComponent(pipelineId)}/onDeepgramResult`; const requestId = await ctx.run("deepgram", () => - callDeepgram(req.audioUrl, callbackUrl, deepgramApiKey), + callDeepgram(audioUrl, callbackUrl, deepgramApiKey), ); ctx.set("deepgramRequestId", requestId); @@ -179,16 +199,12 @@ export const audioPipeline = restate.workflow({ ctx.set("status", "LLM_RUNNING" as PipelineStatusType); - const llmApiUrl = ( - ctx as unknown as { env?: { LLM_API_URL?: string } } - ).env?.LLM_API_URL; + const llmApiUrl = env?.LLM_API_URL; if (!llmApiUrl) { throw new restate.TerminalError("LLM_API_URL env var is required"); } - const llmApiKey = ( - ctx as unknown as { env?: { LLM_API_KEY?: string } } - ).env?.LLM_API_KEY; + const llmApiKey = env?.LLM_API_KEY; const llmResult = await ctx.run("llm", () => callLLM(transcript, req.userId, llmApiUrl, llmApiKey), @@ -212,6 +228,17 @@ export const audioPipeline = restate.workflow({ ctx.set("error", message); throw err; + } finally { + if (env?.SUPABASE_URL && env?.SUPABASE_SERVICE_ROLE_KEY) { + try { + await ctx.run("cleanup-file", () => deleteFile(env, req.fileId)); + } catch (cleanupErr) { + console.error( + "Failed to delete file after processing:", + cleanupErr, + ); + } + } } }, ), diff --git a/apps/restate/src/supabase.ts b/apps/restate/src/supabase.ts new file mode 100644 index 0000000000..a01caead3f --- /dev/null +++ b/apps/restate/src/supabase.ts @@ -0,0 +1,81 @@ +interface SupabaseEnv { + SUPABASE_URL?: string; + SUPABASE_SERVICE_ROLE_KEY?: string; +} + +function getSupabaseConfig(env: SupabaseEnv): { + url: string; + serviceRoleKey: string; +} { + const url = env.SUPABASE_URL; + const serviceRoleKey = env.SUPABASE_SERVICE_ROLE_KEY; + + if (!url) { + throw new Error("SUPABASE_URL env var is required"); + } + if (!serviceRoleKey) { + throw new Error("SUPABASE_SERVICE_ROLE_KEY env var is required"); + } + + return { url: url.replace(/\/+$/, ""), serviceRoleKey }; +} + +export async function createSignedUrl( + env: SupabaseEnv, + fileId: string, + expiresInSeconds: number = 3600, +): Promise { + const { url, serviceRoleKey } = getSupabaseConfig(env); + + const response = await fetch( + `${url}/storage/v1/object/sign/audio-files/${encodeURIComponent(fileId)}`, + { + method: "POST", + headers: { + Authorization: `Bearer ${serviceRoleKey}`, + apikey: serviceRoleKey, + "Content-Type": "application/json", + }, + body: JSON.stringify({ expiresIn: expiresInSeconds }), + }, + ); + + if (!response.ok) { + const body = await response.text(); + throw new Error(`Failed to create signed URL: ${response.status} ${body}`); + } + + const data = (await response.json()) as { signedURL: string }; + + if (!data.signedURL) { + throw new Error("Signed URL not returned from Supabase"); + } + + if (data.signedURL.startsWith("http")) { + return data.signedURL; + } + return `${url}/storage/v1${data.signedURL}`; +} + +export async function deleteFile( + env: SupabaseEnv, + fileId: string, +): Promise { + const { url, serviceRoleKey } = getSupabaseConfig(env); + + const response = await fetch( + `${url}/storage/v1/object/audio-files/${encodeURIComponent(fileId)}`, + { + method: "DELETE", + headers: { + Authorization: `Bearer ${serviceRoleKey}`, + apikey: serviceRoleKey, + }, + }, + ); + + if (!response.ok) { + const body = await response.text(); + throw new Error(`Failed to delete file: ${response.status} ${body}`); + } +} diff --git a/apps/web/src/functions/transcription.ts b/apps/web/src/functions/transcription.ts index 24667f8c82..5c0a1887e7 100644 --- a/apps/web/src/functions/transcription.ts +++ b/apps/web/src/functions/transcription.ts @@ -20,18 +20,18 @@ export type PipelineStatusType = z.infer; const StatusState = z.object({ status: PipelineStatus, transcript: z.string().optional(), - llmResult: z.unknown().optional(), + llmResult: z.string().optional(), error: z.string().optional(), }); export type StatusStateType = z.infer; type AudioPipeline = { - run: (input: { - userId: string; - audioUrl: string; - }) => Promise; - getStatus: () => Promise; + run: ( + ctx: unknown, + input: { userId: string; fileId: string }, + ) => Promise; + getStatus: (ctx: unknown) => Promise; }; function getRestateClient() { @@ -41,7 +41,7 @@ function getRestateClient() { export const startAudioPipeline = createServerFn({ method: "POST" }) .inputValidator( z.object({ - audioUrl: z.string(), + fileId: z.string(), pipelineId: z.string().optional(), }), ) @@ -53,13 +53,30 @@ export const startAudioPipeline = createServerFn({ method: "POST" }) return { error: true, message: "Unauthorized" }; } + const userId = userData.user.id; + + // Validate fileId belongs to the authenticated user + // fileId format: {userId}/{timestamp}-{fileName} + const segments = data.fileId.split("/").filter(Boolean); + const [ownerId, ...rest] = segments; + + if ( + !ownerId || + ownerId !== userId || + rest.length === 0 || + rest.some((s) => s === "." || s === "..") + ) { + return { error: true, message: "Invalid fileId" }; + } + + const safeFileId = `${userId}/${rest.join("/")}`; const pipelineId = data.pipelineId ?? crypto.randomUUID(); try { const restateClient = getRestateClient(); const handle = await restateClient .workflowClient({ name: "AudioPipeline" }, pipelineId) - .workflowSubmit({ userId: userData.user.id, audioUrl: data.audioUrl }); + .workflowSubmit({ userId, fileId: safeFileId }); return { success: true, diff --git a/apps/web/src/functions/upload.ts b/apps/web/src/functions/upload.ts index a5f6a99d5d..c6ab2c2a24 100644 --- a/apps/web/src/functions/upload.ts +++ b/apps/web/src/functions/upload.ts @@ -33,9 +33,5 @@ export const uploadAudioFile = createServerFn({ method: "POST" }) return { error: true, message: error.message }; } - const { data: urlData } = supabase.storage - .from("audio-files") - .getPublicUrl(uploadData.path); - - return { success: true, url: urlData.publicUrl }; + return { success: true, fileId: uploadData.path }; }); diff --git a/apps/web/src/routes/_view/app/file-transcription.tsx b/apps/web/src/routes/_view/app/file-transcription.tsx index b4b3603b39..5ead442854 100644 --- a/apps/web/src/routes/_view/app/file-transcription.tsx +++ b/apps/web/src/routes/_view/app/file-transcription.tsx @@ -38,7 +38,9 @@ function Component() { if (!pipelineId) { throw new Error("Missing pipelineId"); } - const res = await getAudioPipelineStatus({ data: { pipelineId } }); + const res = (await getAudioPipelineStatus({ data: { pipelineId } })) as + | { success: true; status: StatusStateType } + | { error: true; message?: string }; if ("error" in res && res.error) { throw new Error(res.message ?? "Failed to get pipeline status"); } @@ -105,14 +107,14 @@ function Component() { return; } - if (!("url" in uploadResult)) { - setUploadError("Failed to get upload URL"); + if (!("fileId" in uploadResult)) { + setUploadError("Failed to get file ID"); return; } const pipelineResult = await startAudioPipeline({ data: { - audioUrl: uploadResult.url, + fileId: uploadResult.fileId, }, }); diff --git a/apps/web/src/utils/restate.ts b/apps/web/src/utils/restate.ts index 31f22acebf..863fe68f31 100644 --- a/apps/web/src/utils/restate.ts +++ b/apps/web/src/utils/restate.ts @@ -10,7 +10,7 @@ export interface StatusState { | "DONE" | "ERROR"; transcript?: string; - llmResult?: unknown; + llmResult?: string; error?: string; } @@ -31,13 +31,13 @@ export interface DeepgramCallbackPayload { export async function startAudioPipeline(params: { pipelineId: string; userId: string; - audioUrl: string; + fileId: string; }): Promise { const url = `${RESTATE_INGRESS_URL}/AudioPipeline/${encodeURIComponent(params.pipelineId)}/run/send`; const res = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ userId: params.userId, audioUrl: params.audioUrl }), + body: JSON.stringify({ userId: params.userId, fileId: params.fileId }), }); if (!res.ok) { throw new Error(`Failed to start pipeline: ${res.status}`); diff --git a/supabase/migrations/20251128131538_make_audio_files_private.sql b/supabase/migrations/20251128131538_make_audio_files_private.sql new file mode 100644 index 0000000000..d5a86354e7 --- /dev/null +++ b/supabase/migrations/20251128131538_make_audio_files_private.sql @@ -0,0 +1,12 @@ +-- Make audio-files bucket private for security +-- Files will be accessed via signed URLs generated by the backend service + +UPDATE storage.buckets +SET public = false +WHERE id = 'audio-files'; + +-- Drop the public SELECT policy since the bucket is now private +DROP POLICY IF EXISTS "audio_files_select_all" ON storage.objects; + +-- Add a policy for authenticated users to SELECT their own files +CREATE POLICY "audio_files_select_owner" ON storage.objects FOR SELECT TO authenticated USING (bucket_id = 'audio-files' AND auth.uid()::text = (storage.foldername(name))[1]); diff --git a/supabase/tests/004-storage-audio-files-policies.sql b/supabase/tests/004-storage-audio-files-policies.sql index 29de714de3..500a0dddd6 100644 --- a/supabase/tests/004-storage-audio-files-policies.sql +++ b/supabase/tests/004-storage-audio-files-policies.sql @@ -15,7 +15,7 @@ select lives_ok( select results_eq( $$select count(*) from storage.objects where bucket_id = 'audio-files'$$, array[1::bigint], - 'Owner can view files in bucket' + 'Owner can view own files in bucket' ); select tests.clear_authentication(); @@ -29,10 +29,11 @@ select throws_ok( 'Cannot upload to another user folder' ); +-- Bucket is now private, so other users cannot view files they do not own select results_eq( $$select count(*) from storage.objects where bucket_id = 'audio-files'$$, - array[1::bigint], - 'Public can view files in bucket' + array[0::bigint], + 'Other users cannot view files in private bucket' ); select * from finish();