Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/restate/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
"private": true,
"type": "module",
"scripts": {
"build": "tsc --noEmitOnError",
"dev": "wrangler dev --port 9080",
"start": "wrangler dev --port 9080",
"deploy": "wrangler deploy",
"typecheck": "tsc --noEmit"
},
Expand Down
2 changes: 1 addition & 1 deletion apps/restate/src/audioPipeline.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CallbackUrl, createClient } from "@deepgram/sdk";
import * as restate from "@restatedev/restate-sdk-cloudflare-workers";
import * as restate from "@restatedev/restate-sdk-cloudflare-workers/fetch";
import { serde } from "@restatedev/restate-sdk-zod";
import { z } from "zod";

Expand Down
2 changes: 1 addition & 1 deletion apps/restate/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as restate from "@restatedev/restate-sdk-cloudflare-workers";
import * as restate from "@restatedev/restate-sdk-cloudflare-workers/fetch";

import { audioPipeline } from "./audioPipeline.js";
import { userRateLimiter } from "./userRateLimiter.js";
Expand Down
2 changes: 1 addition & 1 deletion apps/restate/src/userRateLimiter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as restate from "@restatedev/restate-sdk-cloudflare-workers";
import * as restate from "@restatedev/restate-sdk-cloudflare-workers/fetch";
import { z } from "zod";

const RateLimitState = z.object({
Expand Down
1 change: 1 addition & 0 deletions apps/web/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export const env = createEnv({
VITE_SUPABASE_ANON_KEY: z.string().min(1),
VITE_POSTHOG_API_KEY: isDev ? z.string().optional() : z.string().min(1),
VITE_POSTHOG_HOST: z.string().default("https://us.i.posthog.com"),
VITE_RESTATE_INGRESS_URL: z.string().default("http://localhost:8080"),
},

runtimeEnv: { ...process.env, ...import.meta.env },
Expand Down
44 changes: 31 additions & 13 deletions apps/web/src/functions/transcription.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createClient } from "@deepgram/sdk";
import type { IngressWorkflowClient } from "@restatedev/restate-sdk-clients";
import * as clients from "@restatedev/restate-sdk-clients";
import { createServerFn } from "@tanstack/react-start";
import { z } from "zod";
Expand Down Expand Up @@ -26,14 +27,24 @@ const StatusState = z.object({

export type StatusStateType = z.infer<typeof StatusState>;

type AudioPipeline = {
type StartAudioPipelineInput = {
userId: string;
fileId: string;
};

// Workflow definition type matching the server-side handler signatures.
// The first parameter (ctx) is the Restate context, which is stripped by IngressWorkflowClient.
type AudioPipelineDefinition = {
run: (
ctx: unknown,
input: { userId: string; fileId: string },
input: StartAudioPipelineInput,
) => Promise<StatusStateType>;
getStatus: (ctx: unknown) => Promise<StatusStateType>;
};

// Client type with workflowSubmit, workflowAttach, and other client methods
type AudioPipelineClient = IngressWorkflowClient<AudioPipelineDefinition>;

function getRestateClient() {
return clients.connect({ url: env.RESTATE_INGRESS_URL });
}
Expand Down Expand Up @@ -74,9 +85,15 @@ export const startAudioPipeline = createServerFn({ method: "POST" })

try {
const restateClient = getRestateClient();
const handle = await restateClient
.workflowClient<AudioPipeline>({ name: "AudioPipeline" }, pipelineId)
.workflowSubmit({ userId, fileId: safeFileId });
const workflowClient: AudioPipelineClient =
restateClient.workflowClient<AudioPipelineDefinition>(
{ name: "AudioPipeline" },
pipelineId,
);
const handle = await workflowClient.workflowSubmit({
userId,
fileId: safeFileId,
});

return {
success: true,
Expand Down Expand Up @@ -105,12 +122,12 @@ export const getAudioPipelineStatus = createServerFn({ method: "GET" })

try {
const restateClient = getRestateClient();
const status = await restateClient
.workflowClient<AudioPipeline>(
const workflowClient: AudioPipelineClient =
restateClient.workflowClient<AudioPipelineDefinition>(
{ name: "AudioPipeline" },
data.pipelineId,
)
.getStatus();
);
const status = await workflowClient.getStatus();

return {
success: true,
Expand Down Expand Up @@ -138,10 +155,11 @@ export const getAudioPipelineResult = createServerFn({ method: "GET" })

try {
const restateClient = getRestateClient();
const workflowClient = restateClient.workflowClient<AudioPipeline>(
{ name: "AudioPipeline" },
data.pipelineId,
);
const workflowClient: AudioPipelineClient =
restateClient.workflowClient<AudioPipelineDefinition>(
{ name: "AudioPipeline" },
data.pipelineId,
);

const result = await workflowClient.workflowAttach();

Expand Down
5 changes: 3 additions & 2 deletions apps/web/src/utils/restate.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const RESTATE_INGRESS_URL =
process.env.RESTATE_INGRESS_URL ?? "http://localhost:8080";
import { env } from "@/env";

const RESTATE_INGRESS_URL = env.VITE_RESTATE_INGRESS_URL;

export interface StatusState {
status:
Expand Down