Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions apps/restate/src/audioPipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ import * as restate from "@restatedev/restate-sdk-cloudflare-workers";
import { serde } from "@restatedev/restate-sdk-zod";
import { z } from "zod";

import { createSignedUrl, deleteFile } from "./supabase.js";
import { limiterForUser } from "./userRateLimiter.js";

const StartAudioPipeline = z.object({
userId: z.string(),
audioUrl: z.string(),
fileId: z.string(),
});

export type StartAudioPipelineInput = z.infer<typeof StartAudioPipeline>;
Expand Down Expand Up @@ -135,6 +136,20 @@ export const audioPipeline = restate.workflow({
req: StartAudioPipelineInput,
): Promise<StatusStateType> => {
ctx.set("status", "QUEUED" as PipelineStatusType);
ctx.set("fileId", req.fileId);

const env = (
ctx as unknown as {
env?: {
RESTATE_INGRESS_URL?: string;
DEEPGRAM_API_KEY?: string;
LLM_API_URL?: string;
LLM_API_KEY?: string;
SUPABASE_URL?: string;
SUPABASE_SERVICE_ROLE_KEY?: string;
};
}
).env;

try {
const limiter = limiterForUser(ctx, req.userId);
Expand All @@ -145,31 +160,36 @@ export const audioPipeline = restate.workflow({

ctx.set("status", "TRANSCRIBING" as PipelineStatusType);
ctx.set("userId", req.userId);
ctx.set("audioUrl", req.audioUrl);

const ingressBase = (
ctx as unknown as { env?: { RESTATE_INGRESS_URL?: string } }
).env?.RESTATE_INGRESS_URL;
const ingressBase = env?.RESTATE_INGRESS_URL;
if (!ingressBase) {
throw new restate.TerminalError(
"RESTATE_INGRESS_URL env var is required for callback URL",
);
}

const deepgramApiKey = (
ctx as unknown as { env?: { DEEPGRAM_API_KEY?: string } }
).env?.DEEPGRAM_API_KEY;
const deepgramApiKey = env?.DEEPGRAM_API_KEY;
if (!deepgramApiKey) {
throw new restate.TerminalError(
"DEEPGRAM_API_KEY env var is required",
);
}

if (!env?.SUPABASE_URL || !env?.SUPABASE_SERVICE_ROLE_KEY) {
throw new restate.TerminalError(
"SUPABASE_URL and SUPABASE_SERVICE_ROLE_KEY env vars are required",
);
}

const audioUrl = await ctx.run("create-signed-url", () =>
createSignedUrl(env, req.fileId, 3600),
);

const pipelineId = ctx.key;
const callbackUrl = `${ingressBase.replace(/\/+$/, "")}/AudioPipeline/${encodeURIComponent(pipelineId)}/onDeepgramResult`;

const requestId = await ctx.run("deepgram", () =>
callDeepgram(req.audioUrl, callbackUrl, deepgramApiKey),
callDeepgram(audioUrl, callbackUrl, deepgramApiKey),
);
ctx.set("deepgramRequestId", requestId);

Expand All @@ -179,16 +199,12 @@ export const audioPipeline = restate.workflow({

ctx.set("status", "LLM_RUNNING" as PipelineStatusType);

const llmApiUrl = (
ctx as unknown as { env?: { LLM_API_URL?: string } }
).env?.LLM_API_URL;
const llmApiUrl = env?.LLM_API_URL;
if (!llmApiUrl) {
throw new restate.TerminalError("LLM_API_URL env var is required");
}

const llmApiKey = (
ctx as unknown as { env?: { LLM_API_KEY?: string } }
).env?.LLM_API_KEY;
const llmApiKey = env?.LLM_API_KEY;

const llmResult = await ctx.run("llm", () =>
callLLM(transcript, req.userId, llmApiUrl, llmApiKey),
Expand All @@ -212,6 +228,17 @@ export const audioPipeline = restate.workflow({
ctx.set("error", message);

throw err;
} finally {
if (env?.SUPABASE_URL && env?.SUPABASE_SERVICE_ROLE_KEY) {
try {
await ctx.run("cleanup-file", () => deleteFile(env, req.fileId));
} catch (cleanupErr) {
console.error(
"Failed to delete file after processing:",
cleanupErr,
);
}
}
}
},
),
Expand Down
81 changes: 81 additions & 0 deletions apps/restate/src/supabase.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
interface SupabaseEnv {
SUPABASE_URL?: string;
SUPABASE_SERVICE_ROLE_KEY?: string;
}

function getSupabaseConfig(env: SupabaseEnv): {
url: string;
serviceRoleKey: string;
} {
const url = env.SUPABASE_URL;
const serviceRoleKey = env.SUPABASE_SERVICE_ROLE_KEY;

if (!url) {
throw new Error("SUPABASE_URL env var is required");
}
if (!serviceRoleKey) {
throw new Error("SUPABASE_SERVICE_ROLE_KEY env var is required");
}

return { url: url.replace(/\/+$/, ""), serviceRoleKey };
}

export async function createSignedUrl(
env: SupabaseEnv,
fileId: string,
expiresInSeconds: number = 3600,
): Promise<string> {
const { url, serviceRoleKey } = getSupabaseConfig(env);

const response = await fetch(
`${url}/storage/v1/object/sign/audio-files/${encodeURIComponent(fileId)}`,
{
method: "POST",
headers: {
Authorization: `Bearer ${serviceRoleKey}`,
apikey: serviceRoleKey,
"Content-Type": "application/json",
},
body: JSON.stringify({ expiresIn: expiresInSeconds }),
},
);

if (!response.ok) {
const body = await response.text();
throw new Error(`Failed to create signed URL: ${response.status} ${body}`);
}

const data = (await response.json()) as { signedURL: string };

if (!data.signedURL) {
throw new Error("Signed URL not returned from Supabase");
}

if (data.signedURL.startsWith("http")) {
return data.signedURL;
}
return `${url}/storage/v1${data.signedURL}`;
}

export async function deleteFile(
env: SupabaseEnv,
fileId: string,
): Promise<void> {
const { url, serviceRoleKey } = getSupabaseConfig(env);

const response = await fetch(
`${url}/storage/v1/object/audio-files/${encodeURIComponent(fileId)}`,
{
method: "DELETE",
headers: {
Authorization: `Bearer ${serviceRoleKey}`,
apikey: serviceRoleKey,
},
},
);

if (!response.ok) {
const body = await response.text();
throw new Error(`Failed to delete file: ${response.status} ${body}`);
}
}
33 changes: 25 additions & 8 deletions apps/web/src/functions/transcription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ export type PipelineStatusType = z.infer<typeof PipelineStatus>;
const StatusState = z.object({
status: PipelineStatus,
transcript: z.string().optional(),
llmResult: z.unknown().optional(),
llmResult: z.string().optional(),
error: z.string().optional(),
});

export type StatusStateType = z.infer<typeof StatusState>;

type AudioPipeline = {
run: (input: {
userId: string;
audioUrl: string;
}) => Promise<StatusStateType>;
getStatus: () => Promise<StatusStateType>;
run: (
ctx: unknown,
input: { userId: string; fileId: string },
) => Promise<StatusStateType>;
getStatus: (ctx: unknown) => Promise<StatusStateType>;
};

function getRestateClient() {
Expand All @@ -41,7 +41,7 @@ function getRestateClient() {
export const startAudioPipeline = createServerFn({ method: "POST" })
.inputValidator(
z.object({
audioUrl: z.string(),
fileId: z.string(),
pipelineId: z.string().optional(),
}),
)
Expand All @@ -53,13 +53,30 @@ export const startAudioPipeline = createServerFn({ method: "POST" })
return { error: true, message: "Unauthorized" };
}

const userId = userData.user.id;

// Validate fileId belongs to the authenticated user
// fileId format: {userId}/{timestamp}-{fileName}
const segments = data.fileId.split("/").filter(Boolean);
const [ownerId, ...rest] = segments;

if (
!ownerId ||
ownerId !== userId ||
rest.length === 0 ||
rest.some((s) => s === "." || s === "..")
) {
return { error: true, message: "Invalid fileId" };
}

const safeFileId = `${userId}/${rest.join("/")}`;
const pipelineId = data.pipelineId ?? crypto.randomUUID();

try {
const restateClient = getRestateClient();
const handle = await restateClient
.workflowClient<AudioPipeline>({ name: "AudioPipeline" }, pipelineId)
.workflowSubmit({ userId: userData.user.id, audioUrl: data.audioUrl });
.workflowSubmit({ userId, fileId: safeFileId });

return {
success: true,
Expand Down
6 changes: 1 addition & 5 deletions apps/web/src/functions/upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,5 @@ export const uploadAudioFile = createServerFn({ method: "POST" })
return { error: true, message: error.message };
}

const { data: urlData } = supabase.storage
.from("audio-files")
.getPublicUrl(uploadData.path);

return { success: true, url: urlData.publicUrl };
return { success: true, fileId: uploadData.path };
});
10 changes: 6 additions & 4 deletions apps/web/src/routes/_view/app/file-transcription.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ function Component() {
if (!pipelineId) {
throw new Error("Missing pipelineId");
}
const res = await getAudioPipelineStatus({ data: { pipelineId } });
const res = (await getAudioPipelineStatus({ data: { pipelineId } })) as
| { success: true; status: StatusStateType }
| { error: true; message?: string };
if ("error" in res && res.error) {
throw new Error(res.message ?? "Failed to get pipeline status");
}
Expand Down Expand Up @@ -105,14 +107,14 @@ function Component() {
return;
}

if (!("url" in uploadResult)) {
setUploadError("Failed to get upload URL");
if (!("fileId" in uploadResult)) {
setUploadError("Failed to get file ID");
return;
}

const pipelineResult = await startAudioPipeline({
data: {
audioUrl: uploadResult.url,
fileId: uploadResult.fileId,
},
});

Expand Down
6 changes: 3 additions & 3 deletions apps/web/src/utils/restate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export interface StatusState {
| "DONE"
| "ERROR";
transcript?: string;
llmResult?: unknown;
llmResult?: string;
error?: string;
}

Expand All @@ -31,13 +31,13 @@ export interface DeepgramCallbackPayload {
export async function startAudioPipeline(params: {
pipelineId: string;
userId: string;
audioUrl: string;
fileId: string;
}): Promise<void> {
const url = `${RESTATE_INGRESS_URL}/AudioPipeline/${encodeURIComponent(params.pipelineId)}/run/send`;
const res = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ userId: params.userId, audioUrl: params.audioUrl }),
body: JSON.stringify({ userId: params.userId, fileId: params.fileId }),
});
if (!res.ok) {
throw new Error(`Failed to start pipeline: ${res.status}`);
Expand Down
12 changes: 12 additions & 0 deletions supabase/migrations/20251128131538_make_audio_files_private.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Make audio-files bucket private for security
-- Files will be accessed via signed URLs generated by the backend service

UPDATE storage.buckets
SET public = false
WHERE id = 'audio-files';

-- Drop the public SELECT policy since the bucket is now private
DROP POLICY IF EXISTS "audio_files_select_all" ON storage.objects;

-- Add a policy for authenticated users to SELECT their own files
CREATE POLICY "audio_files_select_owner" ON storage.objects FOR SELECT TO authenticated USING (bucket_id = 'audio-files' AND auth.uid()::text = (storage.foldername(name))[1]);
7 changes: 4 additions & 3 deletions supabase/tests/004-storage-audio-files-policies.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ select lives_ok(
select results_eq(
$$select count(*) from storage.objects where bucket_id = 'audio-files'$$,
array[1::bigint],
'Owner can view files in bucket'
'Owner can view own files in bucket'
);

select tests.clear_authentication();
Expand All @@ -29,10 +29,11 @@ select throws_ok(
'Cannot upload to another user folder'
);

-- Bucket is now private, so other users cannot view files they do not own
select results_eq(
$$select count(*) from storage.objects where bucket_id = 'audio-files'$$,
array[1::bigint],
'Public can view files in bucket'
array[0::bigint],
'Other users cannot view files in private bucket'
);

select * from finish();
Expand Down