From e4547ae88295eab0e2fa926ffe3050d500f11d30 Mon Sep 17 00:00:00 2001 From: Dak Washbrook Date: Tue, 3 Jun 2025 12:52:40 -0700 Subject: [PATCH 1/9] feat: implement voice routing and integrate ElevenLabs for audio processing --- apps/server/package.json | 4 +- apps/server/src/ctx.ts | 4 +- apps/server/src/db/schema.ts | 7 + apps/server/src/lib/auth.ts | 42 +- apps/server/src/lib/prompts.ts | 3 +- apps/server/src/lib/schemas.ts | 2 - apps/server/src/lib/server-utils.ts | 12 +- apps/server/src/main.ts | 93 ++-- apps/server/src/routes/agent/tools.ts | 53 +- apps/server/src/routes/ai.ts | 85 +++ .../src/services/call-service/call-service.ts | 520 ++++++++++++++++++ .../eleven-labs-incoming-message-schema.ts | 140 +++++ .../eleven-labs-outgoing-message-schema.ts | 62 +++ .../services/call-service/system-prompt.ts | 261 +++++++++ .../twilio-socket-message-schema.ts | 50 ++ apps/server/src/services/mcp-service/mcp.ts | 433 +++++++++++++++ apps/server/src/trpc/index.ts | 4 +- apps/server/src/trpc/routes/ai/index.ts | 8 +- apps/server/src/trpc/routes/ai/search.ts | 2 +- apps/server/src/trpc/routes/ai/webSearch.ts | 17 - apps/server/src/trpc/routes/connections.ts | 12 +- apps/server/src/trpc/routes/user.ts | 11 - .../src/trpc/routes/voice/get-signed-url.ts | 18 + apps/server/src/trpc/routes/voice/index.ts | 6 + apps/server/src/trpc/trpc.ts | 4 +- apps/server/wrangler.jsonc | 1 - package.json | 2 + 27 files changed, 1740 insertions(+), 116 deletions(-) create mode 100644 apps/server/src/routes/ai.ts create mode 100644 apps/server/src/services/call-service/call-service.ts create mode 100644 apps/server/src/services/call-service/eleven-labs-incoming-message-schema.ts create mode 100644 apps/server/src/services/call-service/eleven-labs-outgoing-message-schema.ts create mode 100644 apps/server/src/services/call-service/system-prompt.ts create mode 100644 apps/server/src/services/call-service/twilio-socket-message-schema.ts create mode 100644 apps/server/src/services/mcp-service/mcp.ts delete mode 100644 apps/server/src/trpc/routes/ai/webSearch.ts create mode 100644 apps/server/src/trpc/routes/voice/get-signed-url.ts create mode 100644 apps/server/src/trpc/routes/voice/index.ts diff --git a/apps/server/package.json b/apps/server/package.json index aa6f9d2985..cb6965e557 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -34,7 +34,6 @@ "@react-email/render": "^1.1.0", "@trpc/client": "catalog:", "@trpc/server": "catalog:", - "@tsndr/cloudflare-worker-jwt": "3.2.0", "@upstash/ratelimit": "^2.0.5", "@upstash/redis": "^1.34.9", "agents": "0.0.93", @@ -45,12 +44,14 @@ "date-fns": "^4.1.0", "dedent": "^1.6.0", "drizzle-orm": "catalog:", + "elevenlabs": "1.59.0", "email-addresses": "^5.0.0", "google-auth-library": "9.15.1", "he": "^1.2.0", "hono": "^4.7.8", "hono-agents": "0.0.83", "hono-party": "^0.0.12", + "jose": "6.0.11", "jsonrepair": "^3.12.0", "mimetext": "^3.0.27", "p-retry": "6.2.1", @@ -62,6 +63,7 @@ "sanitize-html": "^2.16.0", "string-strip-html": "^13.4.12", "superjson": "catalog:", + "twilio": "5.7.0", "wrangler": "catalog:", "zod": "catalog:" }, diff --git a/apps/server/src/ctx.ts b/apps/server/src/ctx.ts index 79f8df0530..6aea15aee9 100644 --- a/apps/server/src/ctx.ts +++ b/apps/server/src/ctx.ts @@ -3,9 +3,11 @@ import type { Autumn } from 'autumn-js'; import type { Auth } from './lib/auth'; import type { DB } from './db'; +export type SessionUser = NonNullable>>['user']; + export type HonoVariables = { auth: Auth; - session: Awaited>; + sessionUser?: SessionUser; db: DB; autumn: Autumn; }; diff --git a/apps/server/src/db/schema.ts b/apps/server/src/db/schema.ts index d34b6d45d3..8751a48f4f 100644 --- a/apps/server/src/db/schema.ts +++ b/apps/server/src/db/schema.ts @@ -165,3 +165,10 @@ export const writingStyleMatrix = createTable( ]; }, ); + +export const jwks = createTable('jwks', { + id: text('id').primaryKey(), + publicKey: text('public_key').notNull(), + privateKey: text('private_key').notNull(), + createdAt: timestamp('created_at').notNull(), +}); diff --git a/apps/server/src/lib/auth.ts b/apps/server/src/lib/auth.ts index c3c4e65376..9c1776564c 100644 --- a/apps/server/src/lib/auth.ts +++ b/apps/server/src/lib/auth.ts @@ -6,8 +6,8 @@ import { session, userHotkeys, } from '../db/schema'; +import { createAuthMiddleware, phoneNumber, jwt, bearer } from 'better-auth/plugins'; import { type Account, betterAuth, type BetterAuthOptions } from 'better-auth'; -import { createAuthMiddleware, phoneNumber } from 'better-auth/plugins'; import { getBrowserTimezone, isValidTimezone } from './timezones'; import { drizzleAdapter } from 'better-auth/adapters/drizzle'; import { getSocialProviders } from './auth-providers'; @@ -83,6 +83,8 @@ export const createAuth = () => { return betterAuth({ plugins: [ + jwt(), + bearer(), phoneNumber({ sendOTP: async ({ code, phoneNumber }) => { await twilioClient.messages @@ -290,3 +292,41 @@ export const createSimpleAuth = () => { export type Auth = ReturnType; export type SimpleAuth = ReturnType; + +// Helper ------------------------------------------------------------------- +// This helper returns an existing *session* bearer token (compatible with the +// `bearer()` plugin) for the provided `userId`, or lazily creates a fresh +// session when none is found. The generated token can be used to authenticate +// out-of-band requests (e.g. Server-sent MCP connections) by supplying it in +// a `Bearer ` Authorization header. +export const getBearerTokenForUser = async (userId: string): Promise => { + const db = createDb(env.HYPERDRIVE.connectionString); + + // Try to reuse an active (non-expired) session + const existingSession = await db.query.session.findFirst({ + where: (s, ops) => ops.and(ops.eq(s.userId, userId), ops.gt(s.expiresAt, new Date())), + orderBy: (s, ops) => [ops.desc(s.expiresAt)], + }); + + if (existingSession) { + return existingSession.token; + } + + // No active session – create a minimal one so the token becomes valid + const now = new Date(); + const newToken = crypto.randomUUID(); + const expiresAt = new Date(now.getTime() + 1000 * 60 * 60 * 24 * 7); // 7 days + + await db.insert(session).values({ + id: crypto.randomUUID(), + userId, + createdAt: now, + updatedAt: now, + expiresAt, + token: newToken, + ipAddress: null, + userAgent: null, + }); + + return newToken; +}; diff --git a/apps/server/src/lib/prompts.ts b/apps/server/src/lib/prompts.ts index e32fa9017b..7326290ea4 100644 --- a/apps/server/src/lib/prompts.ts +++ b/apps/server/src/lib/prompts.ts @@ -259,7 +259,7 @@ export const GmailSearchAssistantSystemPrompt = () => When asked to search for plural of a word, use the OR operator to search for the singular form of the word, example: "referrals" should also be searched as "referral", example: "rewards" should also be searched as "reward", example: "comissions" should also be searched as "commission". - When asked to search always use the OR operator to search for related terms, example: "emails from canva" should also be searched as "from:canva.com OR from:canva OR canva". + Exclude promotional emails by default by adding -category:promotions to all search queries unless explicitly requested to include them. Return only the final Gmail search query string, with no additional text, explanations, or formatting. @@ -278,7 +278,6 @@ export const AiChatPrompt = (threadId: string, currentFolder: string, currentFil ${getCurrentDateContext()} NEVER include markdown, XML tags or code formatting in the final response. - Do not use markdown formatting in your response. diff --git a/apps/server/src/lib/schemas.ts b/apps/server/src/lib/schemas.ts index f804bcf93f..f4ab858c6b 100644 --- a/apps/server/src/lib/schemas.ts +++ b/apps/server/src/lib/schemas.ts @@ -43,7 +43,6 @@ export const defaultUserSettings = { trustedSenders: [], isOnboarded: false, colorTheme: 'system', - zeroSignature: true, } satisfies UserSettings; export const userSettingsSchema = z.object({ @@ -55,7 +54,6 @@ export const userSettingsSchema = z.object({ isOnboarded: z.boolean().optional(), trustedSenders: z.string().array().optional(), colorTheme: z.enum(['light', 'dark', 'system']).default('system'), - zeroSignature: z.boolean().default(true), }); export type UserSettings = z.infer; diff --git a/apps/server/src/lib/server-utils.ts b/apps/server/src/lib/server-utils.ts index b6f4311e70..8905d38665 100644 --- a/apps/server/src/lib/server-utils.ts +++ b/apps/server/src/lib/server-utils.ts @@ -6,17 +6,17 @@ import { and, eq } from 'drizzle-orm'; export const getActiveConnection = async () => { const c = getContext(); - const { session, db } = c.var; - if (!session?.user) throw new Error('Session Not Found'); + const { sessionUser, db } = c.var; + if (!sessionUser) throw new Error('Session Not Found'); const userData = await db.query.user.findFirst({ - where: eq(user.id, session.user.id), + where: eq(user.id, sessionUser.id), }); if (userData?.defaultConnectionId) { const activeConnection = await db.query.connection.findFirst({ where: and( - eq(connection.userId, session.user.id), + eq(connection.userId, sessionUser.id), eq(connection.id, userData.defaultConnectionId), ), }); @@ -24,10 +24,10 @@ export const getActiveConnection = async () => { } const firstConnection = await db.query.connection.findFirst({ - where: and(eq(connection.userId, session.user.id)), + where: and(eq(connection.userId, sessionUser.id)), }); if (!firstConnection) { - console.error(`No connections found for user ${session.user.id}`); + console.error(`No connections found for user ${sessionUser.id}`); throw new Error('No connections found for user'); } diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 82b03cd789..22ad74f8ea 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -1,13 +1,16 @@ import { env, WorkerEntrypoint } from 'cloudflare:workers'; import { contextStorage } from 'hono/context-storage'; -import { ZeroAgent, ZeroMCP } from './routes/chat'; +import { ZeroMCP } from './services/mcp-service/mcp'; +import { createLocalJWKSet, jwtVerify } from 'jose'; import { routePartykitRequest } from 'partyserver'; import { trpcServer } from '@hono/trpc-server'; import { agentsMiddleware } from 'hono-agents'; import { DurableMailbox } from './lib/party'; import { autumnApi } from './routes/autumn'; +import { ZeroAgent } from './routes/chat'; import type { HonoContext } from './ctx'; import { createAuth } from './lib/auth'; +import { aiRouter } from './routes/ai'; import { Autumn } from 'autumn-js'; import { appRouter } from './trpc'; import { cors } from 'hono/cors'; @@ -22,18 +25,46 @@ const api = new Hono() const auth = createAuth(); c.set('auth', auth); const session = await auth.api.getSession({ headers: c.req.raw.headers }); - c.set('session', session); + c.set('sessionUser', session?.user); + + // Bearer token if no session user yet + if (c.req.header('Authorization') && !session?.user) { + const token = c.req.header('Authorization')?.split(' ')[1]; + + if (token) { + const localJwks = await auth.api.getJwks(); + const jwks = createLocalJWKSet(localJwks); + + const { payload } = await jwtVerify(token, jwks); + const userId = payload.sub; + + if (userId) { + c.set( + 'sessionUser', + await db.query.user.findFirst({ + where: (user, ops) => { + return ops.eq(user.id, userId); + }, + }), + ); + } + } + } + const autumn = new Autumn({ secretKey: env.AUTUMN_SECRET_KEY }); c.set('autumn', autumn); await next(); }) + .route('/ai', aiRouter) .route('/autumn', autumnApi) .on(['GET', 'POST'], '/auth/*', (c) => c.var.auth.handler(c.req.raw)) .use( trpcServer({ endpoint: '/api/trpc', router: appRouter, - createContext: (_, c) => ({ c, session: c.var['session'], db: c.var['db'] }), + createContext: (_, c) => { + return { c, sessionUser: c.var['sessionUser'], db: c.var['db'] }; + }, allowMethodOverride: true, onError: (opts) => { console.error('Error in TRPC handler:', opts.error); @@ -68,34 +99,34 @@ const app = new Hono() exposeHeaders: ['X-Zero-Redirect'], }), ) - .mount( - '/sse', - async (request, env, ctx) => { - const authBearer = request.headers.get('Authorization'); - if (!authBearer) { - return new Response('Unauthorized', { status: 401 }); - } - ctx.props = { - cookie: authBearer, - }; - return ZeroMCP.serveSSE('/sse', { binding: 'ZERO_MCP' }).fetch(request, env, ctx); - }, - { replaceRequest: false }, - ) - .mount( - '/mcp', - async (request, env, ctx) => { - const authBearer = request.headers.get('Authorization'); - if (!authBearer) { - return new Response('Unauthorized', { status: 401 }); - } - ctx.props = { - cookie: authBearer, - }; - return ZeroMCP.serve('/mcp', { binding: 'ZERO_MCP' }).fetch(request, env, ctx); - }, - { replaceRequest: false }, - ) + // .mount( + // '/sse', + // async (request, env, ctx) => { + // const authBearer = request.headers.get('Authorization'); + // if (!authBearer) { + // return new Response('Unauthorized', { status: 401 }); + // } + // ctx.props = { + // cookie: authBearer, + // }; + // return ZeroMCP.serveSSE('/sse', { binding: 'ZERO_MCP' }).fetch(request, env, ctx); + // }, + // { replaceRequest: false }, + // ) + // .mount( + // '/mcp', + // async (request, env, ctx) => { + // const authBearer = request.headers.get('Authorization'); + // if (!authBearer) { + // return new Response('Unauthorized', { status: 401 }); + // } + // ctx.props = { + // cookie: authBearer, + // }; + // return ZeroMCP.serve('/mcp', { binding: 'ZERO_MCP' }).fetch(request, env, ctx); + // }, + // { replaceRequest: false }, + // ) .route('/api', api) .use( '*', diff --git a/apps/server/src/routes/agent/tools.ts b/apps/server/src/routes/agent/tools.ts index 0db4307b4b..1a4f938121 100644 --- a/apps/server/src/routes/agent/tools.ts +++ b/apps/server/src/routes/agent/tools.ts @@ -1,4 +1,3 @@ -import { connectionToDriver, getActiveConnection } from '../../lib/server-utils'; import { composeEmail } from '../../trpc/routes/ai/compose'; import type { MailManager } from '../../lib/driver/types'; import { perplexity } from '@ai-sdk/perplexity'; @@ -125,11 +124,11 @@ const composeEmailTool = (connectionId: string) => threadMessages: z .array( z.object({ - from: z.string(), - to: z.array(z.string()), - cc: z.array(z.string()).optional(), - subject: z.string(), - body: z.string(), + from: z.string().describe('The sender of the email'), + to: z.array(z.string()).describe('The recipients of the email'), + cc: z.array(z.string()).optional().describe('The CC recipients of the email'), + subject: z.string().describe('The subject of the email'), + body: z.string().describe('The body of the email'), }), ) .optional() @@ -149,11 +148,11 @@ const listEmails = (driver: MailManager) => tool({ description: 'List emails in a specific folder', parameters: z.object({ - folder: z.string(), - query: z.string().optional(), - maxResults: z.number().optional(), - labelIds: z.array(z.string()).optional(), - pageToken: z.string().optional(), + folder: z.string().describe('The folder to list emails from'), + query: z.string().optional().describe('The query to filter emails'), + maxResults: z.number().optional().describe('The maximum number of results to return'), + labelIds: z.array(z.string()).optional().describe('The labels to filter emails'), + pageToken: z.string().optional().describe('The page token to continue listing emails'), }), execute: async (params) => { return await driver.list(params); @@ -164,7 +163,7 @@ const markAsRead = (driver: MailManager) => tool({ description: 'Mark emails as read', parameters: z.object({ - threadIds: z.array(z.string()), + threadIds: z.array(z.string()).describe('The IDs of the threads to mark as read'), }), execute: async ({ threadIds }) => { await driver.markAsRead(threadIds); @@ -176,7 +175,7 @@ const markAsUnread = (driver: MailManager) => tool({ description: 'Mark emails as unread', parameters: z.object({ - threadIds: z.array(z.string()), + threadIds: z.array(z.string()).describe('The IDs of the threads to mark as unread'), }), execute: async ({ threadIds }) => { await driver.markAsUnread(threadIds); @@ -188,10 +187,10 @@ const modifyLabels = (driver: MailManager) => tool({ description: 'Modify labels on emails', parameters: z.object({ - threadIds: z.array(z.string()), + threadIds: z.array(z.string()).describe('The IDs of the threads to modify'), options: z.object({ - addLabels: z.array(z.string()).default([]), - removeLabels: z.array(z.string()).default([]), + addLabels: z.array(z.string()).default([]).describe('The labels to add'), + removeLabels: z.array(z.string()).default([]).describe('The labels to remove'), }), }), execute: async ({ threadIds, options }) => { @@ -215,31 +214,31 @@ const sendEmail = (driver: MailManager) => parameters: z.object({ to: z.array( z.object({ - email: z.string(), - name: z.string().optional(), + email: z.string().describe('The email address of the recipient'), + name: z.string().optional().describe('The name of the recipient'), }), ), - subject: z.string(), - message: z.string(), + subject: z.string().describe('The subject of the email'), + message: z.string().describe('The body of the email'), cc: z .array( z.object({ - email: z.string(), - name: z.string().optional(), + email: z.string().describe('The email address of the recipient'), + name: z.string().optional().describe('The name of the recipient'), }), ) .optional(), bcc: z .array( z.object({ - email: z.string(), - name: z.string().optional(), + email: z.string().describe('The email address of the recipient'), + name: z.string().optional().describe('The name of the recipient'), }), ) .optional(), - threadId: z.string().optional(), + threadId: z.string().optional().describe('The ID of the thread to send the email from'), // fromEmail: z.string().optional(), - draftId: z.string().optional(), + draftId: z.string().optional().describe('The ID of the draft to send'), }), execute: async (data) => { try { @@ -340,8 +339,6 @@ export const webSearch = tool({ model: perplexity('sonar'), messages: [ { role: 'system', content: 'Be precise and concise.' }, - { role: 'system', content: 'Do not include sources in your response.' }, - { role: 'system', content: 'Do not use markdown formatting in your response.' }, { role: 'user', content: query }, ], maxTokens: 1024, diff --git a/apps/server/src/routes/ai.ts b/apps/server/src/routes/ai.ts new file mode 100644 index 0000000000..f8561f99eb --- /dev/null +++ b/apps/server/src/routes/ai.ts @@ -0,0 +1,85 @@ +import { CallService } from '../services/call-service/call-service'; +import { ZeroMCP } from '../services/mcp-service/mcp'; +import twilio from 'twilio'; +import { Hono } from 'hono'; + +export const aiRouter = new Hono(); + +aiRouter.get('/', (c) => c.text('Twilio + ElevenLabs + AI Phone System Ready')); + +aiRouter.mount( + '/mcp', + async (request, env, ctx) => { + const phoneNumber = request.headers.get('X-Phone-Number'); + if (!phoneNumber) { + return new Response('Unauthorized', { status: 401 }); + } + + ctx.props = { + phoneNumber, + }; + + return ZeroMCP.serve('/api/ai/mcp', { binding: 'ZERO_MCP' }).fetch(request, env, ctx); + }, + { replaceRequest: false }, +); + +aiRouter.post('/voice', async (c) => { + const formData = await c.req.formData(); + const callSid = formData.get('CallSid') as string; + const from = formData.get('From') as string; + + console.log(`Incoming call from ${from} with callSid ${callSid}`); + + const hostHeader = c.req.header('host'); + const voiceResponse = new twilio.twiml.VoiceResponse(); + voiceResponse.connect().stream({ + url: `wss://${hostHeader}/api/ai/call/${callSid}`, + }); + + c.header('Content-Type', 'application/xml'); + return c.body(voiceResponse.toString()); +}); + +aiRouter.get('/call/:callSid', async (c) => { + const hostname = c.req.header('host'); + if (!hostname) { + return new Response('No hostname specified', { status: 500 }); + } + + const callSid = c.req.param('callSid'); + + console.log(`[Twilio] WebSocket connection requested`); + + // Check for WebSocket upgrade header + const upgradeHeader = c.req.header('Upgrade'); + if (upgradeHeader !== 'websocket') { + return new Response('Expected Upgrade: websocket', { status: 426 }); + } + + console.log(`[Twilio] WebSocket connection requested for call ${callSid}`); + + // Create WebSocket pair + const [client, server] = Object.values(new WebSocketPair()); + + // Accept the server WebSocket + server.accept(); + + const callService = new CallService(callSid); + console.log(`[Twilio] Call service created`); + + c.executionCtx.waitUntil(callService.startCall(server, hostname)); + + // Handle WebSocket events + server.addEventListener('open', () => { + console.log(`[Twilio] WebSocket connection opened`); + }); + + // Return response with status 101 and client WebSocket + console.log(`[Twilio] Returning response with status 101 and client WebSocket`); + + return new Response(null, { + status: 101, + webSocket: client, + }); +}); diff --git a/apps/server/src/services/call-service/call-service.ts b/apps/server/src/services/call-service/call-service.ts new file mode 100644 index 0000000000..e87abc3874 --- /dev/null +++ b/apps/server/src/services/call-service/call-service.ts @@ -0,0 +1,520 @@ +import { + twilioSocketMessageSchema, + type TwilioSocketMessage, +} from './twilio-socket-message-schema'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; +import { elevenLabsIncomingSocketMessageSchema } from './eleven-labs-incoming-message-schema'; +import { generateText, type Tool, experimental_createMCPClient as createMCPClient } from 'ai'; +import type { ElevenLabsOutgoingSocketMessage } from './eleven-labs-outgoing-message-schema'; +import { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import type { MailManager } from '../../lib/driver/types'; +import { tools } from '../../routes/agent/tools'; +import { createDriver } from '../../lib/driver'; +import { systemPrompt } from './system-prompt'; +import { ElevenLabsClient } from 'elevenlabs'; +import { env } from 'cloudflare:workers'; +import { openai } from '@ai-sdk/openai'; +import { createDb } from '../../db'; +import z, { ZodError } from 'zod'; +import { Twilio } from 'twilio'; + +// TODO: Remove this once we have a proper phone mapping +const mapping: Record = { + '+18185176315': { + connectionId: '0f2a3874-8106-441c-86d7-ecad65d063f0', + }, +}; + +// TODO: remove this too asap +const phoneMapping = async (phoneNumber: string) => { + console.log('[DEBUG] phoneMapping', phoneNumber); + + const db = createDb(env.HYPERDRIVE.connectionString); + + const obj = mapping[phoneNumber]; + const connection = await db.query.connection.findFirst({ + where: (connection, ops) => { + return ops.eq(connection.id, obj.connectionId); + }, + }); + + if (!connection) { + throw new Error('No connection found.'); + } + + if (!connection.accessToken || !connection.refreshToken) { + throw new Error('Invalid connection'); + } + + const driver = createDriver(connection.providerId, { + auth: { + userId: connection.userId, + accessToken: connection.accessToken, + refreshToken: connection.refreshToken, + email: connection.email, + }, + }); + + return { + driver, + connectionId: connection.id, + }; +}; + +export class CallService { + private phoneNumber: string | null = null; + private streamSid: string | null = null; + private elevenLabsWebSocket: WebSocket | null = null; + private callWebSocket: WebSocket | null = null; + private twilio: Twilio; + private mailDriver: MailManager | null = null; + private tools: Record | null = null; + private conversationHistory: { + role: 'user' | 'assistant'; + content: string; + }[] = []; + // private mcpClient: Client | null = null; + private mcpClient: Awaited> | null = null; + + constructor(private callSid: string) { + this.twilio = new Twilio(env.TWILIO_ACCOUNT_SID, env.TWILIO_AUTH_TOKEN); + } + + public async startCall(callWebSocket: WebSocket, hostname: string) { + this.attachCallWebSocketEventListeners(callWebSocket); + + // Get the caller phone number from Twilio + const twilioCall = await this.twilio.calls(this.callSid).fetch(); + this.phoneNumber = twilioCall.from; + + // Initialize the mail driver and tools + this.callWebSocket = callWebSocket; + await this.initializeMailDriver(this.phoneNumber); + + // this.mcpClient = await this.connectToMCP(hostname); + this.mcpClient = await this.connectToMCP_AISDK(hostname); + + // Attach event listeners to the call WebSocket + await this.connectToElevenLabs(); + + console.log(`[Twilio] WebSocket connected for call ${this.callSid}`); + } + + private async initializeMailDriver(phoneNumber: string) { + const { driver, connectionId } = await phoneMapping(phoneNumber); + + this.mailDriver = driver; + this.tools = tools(driver, connectionId); + } + + public async stopCall() { + this.mcpClient?.close(); + this.elevenLabsWebSocket?.close(); + await this.endTwilioCall(); + } + + private async endTwilioCall() { + if (!this.callSid) { + throw new Error('[Twilio] Call SID not set'); + } + + await this.twilio.calls(this.callSid).update({ + status: 'completed', + }); + this.callWebSocket?.close(); + } + + private attachCallWebSocketEventListeners(callWebSocket: WebSocket) { + callWebSocket.addEventListener('message', async (event) => { + try { + await this.handleTwilioMessage(event.data.toString()); + } catch (error) { + console.error(`[Twilio] Error processing Twilio message for call ${this.callSid}:`, error); + } + }); + + callWebSocket.addEventListener('close', (event) => { + console.log(`[Twilio] WebSocket closed for call ${this.callSid}, code: ${event.code}`); + this.elevenLabsWebSocket?.close(); + }); + + callWebSocket.addEventListener('error', (event) => { + console.error(`[Twilio] WebSocket error for call ${this.callSid}:`, event); + this.elevenLabsWebSocket?.close(); + }); + } + + private async handleTwilioMessage(message: string) { + try { + const data = twilioSocketMessageSchema.parse(JSON.parse(message)); + + switch (data.event) { + case 'connected': + console.log('[DEBUG] handling twilio connected message', data); + console.log(`[Twilio] Connected for call ${this.callSid}`); + break; + case 'start': + console.log('[DEBUG] handling twilio start message', data); + console.log(`[Twilio] Media stream started for call ${this.callSid}`); + this.streamSid = data.streamSid; + console.log('[DEBUG] params', data.start); + break; + case 'media': + // (Twilio -> ElevenLabs) + this.sendToElevenLabs({ + user_audio_chunk: data.media.payload, + }); + break; + case 'stop': + console.log(`[Twilio] Media stream stopped for call ${this.callSid}`); + this.elevenLabsWebSocket?.close(); + await this.endTwilioCall(); + break; + default: + console.warn(`[Twilio] Unhandled event: ${data['event']}`); + break; + } + } catch (error) { + if (error instanceof ZodError) { + console.error( + `[Twilio] [Zod] Error processing Twilio message for call ${this.callSid}:`, + JSON.stringify(error.errors), + ); + console.log(`[Twilio] Errored Message: ${message}`); + } else { + console.error(`[Twilio] Error processing Twilio message for call ${this.callSid}:`, error); + console.log(`[Twilio] Errored Message: ${message}`); + } + } + } + + private async connectToElevenLabs() { + return new Promise(async (resolve, reject) => { + try { + const elevenLabs = new ElevenLabsClient({ + apiKey: env.ELEVENLABS_API_KEY, + }); + + const signedUrlResponse = await elevenLabs.conversationalAi.getSignedUrl({ + agent_id: env.ELEVENLABS_AGENT_ID, + }); + + this.elevenLabsWebSocket = new WebSocket(signedUrlResponse.signed_url); + this.elevenLabsWebSocket.addEventListener('open', () => { + console.log(`[ElevenLabs] WebSocket connected`); + + this.sendToElevenLabs({ + type: 'conversation_initiation_client_data', + }); + + resolve(); + }); + this.elevenLabsWebSocket.addEventListener('message', async (event) => { + await this.handleElevenLabsMessage(event.data.toString()); + }); + this.elevenLabsWebSocket.addEventListener('error', async (event) => { + console.error(`[ElevenLabs] WebSocket error:`, event); + await this.endTwilioCall(); + }); + this.elevenLabsWebSocket.addEventListener('close', async (event) => { + console.log(`[ElevenLabs] WebSocket closed:`, event); + await this.endTwilioCall(); + }); + } catch (error) { + console.error(`[ElevenLabs] Error connecting to ElevenLabs:`, error); + reject(error); + } + }); + } + + private sendToElevenLabs(message: ElevenLabsOutgoingSocketMessage) { + if (!this.elevenLabsWebSocket || this.elevenLabsWebSocket.readyState !== WebSocket.OPEN) { + // console.warn('[ElevenLabs] WebSocket not connected or not open, skipping message'); + + return; + } + + this.elevenLabsWebSocket.send(JSON.stringify(message)); + } + + private async handleElevenLabsMessage(message: string) { + const data = await elevenLabsIncomingSocketMessageSchema.parseAsync(JSON.parse(message)); + + switch (data.type) { + case 'conversation_initiation_metadata': + console.log( + '[ElevenLabs] Conversation initiation metadata received', + data.conversation_initiation_metadata_event, + ); + break; + case 'contextual_update': + console.log(`[ElevenLabs] Contextual update received`); + break; + case 'vad_score': + console.log(`[ElevenLabs] VAD score received`); + break; + case 'internal_tentative_agent_response': + console.log(`[ElevenLabs] Internal tentative agent response received`); + break; + case 'agent_response': + console.log( + '[ElevenLabs] Agent response received:', + `"${data.agent_response_event?.agent_response}"`, + ); + this.conversationHistory.push({ + role: 'assistant', + content: data.agent_response_event?.agent_response ?? '', + }); + break; + case 'ping': + this.sendToElevenLabs({ + type: 'pong', + event_id: data.ping_event?.event_id ?? 0, + }); + break; + case 'audio': + // (ElevenLabs -> Twilio) + if (data.audio_event?.audio_base_64) { + await this.sendAudioToTwilio(data.audio_event.audio_base_64); + } + break; + case 'client_tool_call': + console.log(`[ElevenLabs] Client tool call received`); + if ( + data.client_tool_call && + data.client_tool_call.tool_name && + data.client_tool_call.tool_call_id + ) { + const toolName = data.client_tool_call.tool_name; + const toolCallId = data.client_tool_call.tool_call_id; + const parameters = data.client_tool_call.parameters; + await this.handleToolCall(toolName, toolCallId, parameters); + } else { + console.warn('No tool call data'); + } + break; + case 'agent_response_correction': + console.log(`[ElevenLabs] Agent response correction received`); + break; + case 'interruption': + console.log(`[ElevenLabs] Interruption received`); + break; + case 'user_transcript': + console.log( + `[ElevenLabs] User transcript received:`, + `"${data.user_transcription_event?.user_transcript}"`, + ); + this.conversationHistory.push({ + role: 'user', + content: data.user_transcription_event?.user_transcript ?? '', + }); + + if (!this.streamSid) { + console.warn('[Twilio] Stream SID not set, skipping clear message'); + + return; + } + + this.sendToTwilio({ + event: 'clear', + streamSid: this.streamSid, + }); + break; + } + } + + private async handleToolCall( + toolName: string, + toolCallId: string, + parameters: Record | null, + ) { + console.log('[DEBUG - TOOL CALL] handleToolCall', toolName, toolCallId, parameters); + + switch (toolName) { + case 'manage_email': + try { + const parsedParameters = z + .object({ + query: z.string(), + }) + .parse(parameters); + const aiResponse = await this.generateAIResponse(parsedParameters.query); + + this.sendToElevenLabs({ + type: 'client_tool_result', + tool_call_id: toolCallId, + result: aiResponse, + is_error: false, + }); + } catch (error) { + console.error('[DEBUG - TOOL CALL] error', error); + + if (error instanceof ZodError) { + console.error('[DEBUG - TOOL CALL] zod error', error.errors); + + this.sendToElevenLabs({ + type: 'client_tool_result', + tool_call_id: toolCallId, + result: error.issues.map((issue) => issue.message).join(', '), + is_error: true, + }); + + return; + } + + this.sendToElevenLabs({ + type: 'client_tool_result', + tool_call_id: toolCallId, + result: 'I had trouble processing your request. Please try again.', + is_error: true, + }); + } + break; + default: + console.warn('[ElevenLabs] Unhandled tool call:', toolName); + break; + } + + // get driver and connection id + // switch (toolName) { + // case 'get_thread': + // break; + // case 'list_threads': + // break; + // default: + // console.warn('[ElevenLabs] Unhandled tool call:', toolName); + // } + } + + private async sendAudioToTwilio(audio: string) { + if ( + !this.callWebSocket || + this.callWebSocket.readyState !== WebSocket.OPEN || + !this.streamSid + ) { + console.error('[Twilio] WebSocket sendAudioToTwilio error'); + + throw new Error('[Twilio] WebSocket not connected or not open'); + } + + this.sendToTwilio({ + event: 'media', + streamSid: this.streamSid, + media: { + payload: audio, + }, + }); + } + + private sendToTwilio(message: TwilioSocketMessage) { + console.log('[DEBUG] sending message to twilio'); + + if (!this.callWebSocket || this.callWebSocket.readyState !== WebSocket.OPEN) { + throw new Error('[Twilio] WebSocket not connected or not open'); + } + + this.callWebSocket.send(JSON.stringify(message)); + console.log('[DEBUG] sent message to twilio'); + } + + private async generateAIResponse(query: string) { + if (!this.mcpClient) { + throw new Error('[Twilio] MCP client not connected'); + } + + try { + console.log('[DEBUG] query', query); + const mcpTools = await this.mcpClient.tools(); + + const { text } = await generateText({ + model: openai('gpt-4o-mini'), + system: systemPrompt, + prompt: query, + tools: mcpTools, + maxSteps: 10, + }); + + console.log('[DEBUG] llm response', text); + + return text; + } catch (error) { + console.error('AI processing error', error); + + return "I'm sorry, I had trouble processing your request. Please try again."; + } + } + + // private async getMcpToolSet(): Promise> { + // if (!this.mcpClient) { + // throw new Error('[Twilio] MCP client not connected'); + // } + + // // Retrieve tool metadata from MCP + // const { tools: mcpTools } = (await this.mcpClient.listTools()) as unknown as { + // tools: Array<{ + // name: string; + // description?: string; + // // The schema is JSON-schema – we treat it as unknown for now + // inputSchema?: unknown; + // }>; + // }; + + // // Convert to ai-sdk ToolSet + // return mcpTools.reduce>((acc, { name, description }) => { + // acc[name] = { + // description, + // // We can't infer the exact shape here – allow any parameters + // parameters: z.any(), + // execute: async (args) => { + // if (!this.mcpClient) throw new Error('[Twilio] MCP client not connected'); + // return this.mcpClient.callTool({ name, args }); + // }, + // } as Tool; + + // return acc; + // }, {}); + // } + + private async connectToMCP(hostname: string) { + const client = new Client({ + name: 'zero-agent', + version: '1.0.0', + }); + + if (!this.phoneNumber) { + throw new Error('[Twilio] Phone number not set'); + } + + const mcpUrl = new URL('/api/ai/mcp', `https://${hostname}`); + const transport = new StreamableHTTPClientTransport(mcpUrl, { + requestInit: { + headers: { + 'X-Phone-Number': this.phoneNumber, + }, + }, + }); + + await client.connect(transport); + + return client; + } + + private async connectToMCP_AISDK(hostname: string) { + if (!this.phoneNumber) { + throw new Error('[Twilio] Phone number not set'); + } + + const mcpUrl = new URL('/api/ai/mcp', `https://${hostname}`); + const transport = new StreamableHTTPClientTransport(mcpUrl, { + requestInit: { + headers: { + 'X-Phone-Number': this.phoneNumber, + }, + }, + }); + + return createMCPClient({ + transport, + }); + } +} diff --git a/apps/server/src/services/call-service/eleven-labs-incoming-message-schema.ts b/apps/server/src/services/call-service/eleven-labs-incoming-message-schema.ts new file mode 100644 index 0000000000..e72fda0aa8 --- /dev/null +++ b/apps/server/src/services/call-service/eleven-labs-incoming-message-schema.ts @@ -0,0 +1,140 @@ +import { z } from 'zod'; + +export const elevenLabsIncomingSocketMessageSchema = z.discriminatedUnion('type', [ + z.object({ + type: z.literal('conversation_initiation_metadata'), + conversation_initiation_metadata_event: z + .object({ + conversation_id: z + .string() + .nullable() + .describe('Unique identifier for the conversation session.'), + agent_output_audio_format: z + .string() + .nullable() + .describe("Audio format specification for agent's speech output."), + user_input_audio_format: z + .string() + .nullable() + .describe("Audio format specification for user's speech input."), + }) + .nullable() + .describe('Initial conversation metadata'), + }), + z.object({ + type: z.literal('user_transcript'), + user_transcription_event: z + .object({ + user_transcript: z + .string() + .nullable() + .describe("Transcribed text from user's speech input."), + }) + .nullable() + .describe('Transcription event data'), + }), + z.object({ + type: z.literal('agent_response'), + agent_response_event: z + .object({ + agent_response: z.string().describe("Text content of the agent's response."), + }) + .nullable() + .describe('Agent response event data'), + }), + z.object({ + type: z.literal('agent_response_correction'), + correction_event: z + .object({ + corrected_response: z + .string() + .describe('The corrected text content replacing the previous response'), + }) + .nullable() + .describe('Correction event data'), + }), + z.object({ + type: z.literal('audio'), + audio_event: z + .object({ + audio_base_64: z + .string() + .nullable() + .describe("Base64-encoded audio data of agent's speech."), + event_id: z + .number() + .int() + .nullable() + .describe('Sequential identifier for the audio chunk.'), + }) + .nullable() + .describe('Audio event data'), + }), + z.object({ + type: z.literal('interruption'), + interruption_event: z + .object({ + event_id: z.number().int().nullable().describe('ID of the event that was interrupted.'), + }) + .nullable() + .describe('Interruption event data'), + }), + z.object({ + type: z.literal('ping'), + ping_event: z + .object({ + event_id: z.number().int().nullable().describe('Unique identifier for the ping event.'), + ping_ms: z + .number() + .int() + .nullable() + .describe('Measured round-trip latency in milliseconds.'), + }) + .nullable() + .describe('Ping event data'), + }), + z.object({ + type: z.literal('client_tool_call'), + client_tool_call: z + .object({ + tool_name: z.string().nullable().describe('Identifier of the tool to be executed.'), + tool_call_id: z + .string() + .nullable() + .describe('Unique identifier for this tool call request.'), + parameters: z + .record(z.string(), z.union([z.string(), z.number(), z.boolean()])) + .nullable() + .describe('Tool-specific parameters for the execution request.'), + }) + .nullable() + .describe(''), + }), + z.object({ + type: z.literal('contextual_update'), + text: z.string().describe('Contextual information to be added to the conversation state.'), + }), + z.object({ + type: z.literal('vad_score'), + vad_score_event: z + .object({ + vad_score: z + .number() + .min(0) + .max(1) + .describe('Voice activity detection confidence score between 0 and 1'), + }) + .nullable() + .describe('VAD event data'), + }), + z.object({ + type: z.literal('internal_tentative_agent_response'), + tentative_agent_response_internal_event: z + .object({ + tentative_agent_response: z.string().describe('Preliminary text from the agent'), + }) + .nullable(), + }), +]); + +export type ElevenLabsIncomingSocketMessage = z.infer; diff --git a/apps/server/src/services/call-service/eleven-labs-outgoing-message-schema.ts b/apps/server/src/services/call-service/eleven-labs-outgoing-message-schema.ts new file mode 100644 index 0000000000..2218ad9f5d --- /dev/null +++ b/apps/server/src/services/call-service/eleven-labs-outgoing-message-schema.ts @@ -0,0 +1,62 @@ +import { z } from 'zod'; + +export const elevenLabsOutgoingSocketMessageSchema = z.union([ + z.object({ + user_audio_chunk: z.string(), + }), + z.object({ + type: z.literal('pong'), + event_id: z.number().int(), + }), + z.object({ + type: z.literal('conversation_initiation_client_data'), + conversation_config_override: z + .object({ + agent: z + .object({ + prompt: z + .object({ + prompt: z.string().nullable().default(null), + }) + .optional(), + first_message: z.string().optional(), + language: z.string().optional(), + }) + .optional(), + tts: z + .object({ + voice_id: z.string(), + }) + .optional(), + }) + .optional(), + custom_llm_extra_body: z + .object({ + temperature: z.number().int().optional(), + max_tokens: z.number().int().optional(), + }) + .optional(), + dynamic_variables: z + .record(z.string(), z.union([z.string(), z.number(), z.boolean()])) + .optional(), + }), + z.object({ + type: z.literal('client_tool_result'), + tool_call_id: z.string().optional(), + result: z.string().optional(), + is_error: z.boolean().optional(), + }), + z.object({ + type: z.literal('contextual_update'), + text: z.string(), + }), + z.object({ + type: z.literal('user_message'), + text: z.string().optional(), + }), + z.object({ + type: z.literal('user_activity'), + }), +]); + +export type ElevenLabsOutgoingSocketMessage = z.infer; diff --git a/apps/server/src/services/call-service/system-prompt.ts b/apps/server/src/services/call-service/system-prompt.ts new file mode 100644 index 0000000000..8e6fd47121 --- /dev/null +++ b/apps/server/src/services/call-service/system-prompt.ts @@ -0,0 +1,261 @@ +export const systemPrompt = `You are an AI email assistant whose sole purpose is to help users manage and interact with their email efficiently using the tools provided by the ZeroMCP server. Follow these guidelines: + +1. Core Role and Tone +- You are friendly, concise, and professional. +- Always write in clear, natural language. +- Avoid using hyphens for compound phrases or pauses. +- When interacting with the user, confirm your understanding and ask clarifying questions before taking any actions that alter or delete email data. Always confirm before any action besides reading data. +- Keep responses informational yet concise unless the user asks you to read an entire email. Then be more detailed and parse only the important text portions so the email makes full sense. + +2. When to Call Tools +- If the user asks you to read or summarize existing messages, you may call tools that read data without confirmation. +- When the user asks to list their emails or “latest threads,” always set \`maxResults\` to no more than the number requested, up to a maximum of 10. If they ask for “last 5,” use \`maxResults: 5\`. +- After calling \`listThreads\`, immediately call \`getThread\` for each returned thread ID (up to the same limit) to fetch full content. Use that full content to provide context or summaries. +- If the user refers to a particular thread without providing its ID, ask the user to describe which thread they mean (for example: “Which thread are you referring to? You can describe the subject or sender”). Then: + 1. Call \`buildGmailSearchQuery\` with that description (e.g. \`"project update from last week"\`) to get a Gmail search string. + 2. Call \`listThreads\` with \`folder: "INBOX"\` (or the folder they specify) and \`query\` set to the string returned by \`buildGmailSearchQuery\`. + 3. Present the candidate thread subjects and senders to the user and ask, “Is this the thread you mean?” + 4. Once the user confirms, call \`getThread\` for that thread’s ID and proceed with the requested action. +- If the user asks you to modify labels (mark as read, mark as unread, archive, trash, create, or delete labels), ask which emails or how they identify those threads. Then follow these steps: + 1. If they gave a description, generate a search query via \`buildGmailSearchQuery\` and call \`listThreads\` to locate them. Otherwise, use \`listThreads\` with an explicit \`query\` or \`labelIds\`. + 2. Call \`getThread\` on each returned thread ID to display subjects and senders, and ask, “Do you want to proceed with these?” + 3. After confirmation, call the appropriate tool among \`markThreadsRead\`, \`markThreadsUnread\`, \`modifyLabels\`, \`bulkDelete\`, or \`bulkArchive\`. +- If the user wants to see their custom labels, call \`getUserLabels\`. If they want details on a specific label by ID, call \`getLabel\`. +- If the user wants the current date context, call \`getCurrentDate\`. +- If the user asks to create a new label (for example: “Create a label called Important with color X and Y”), ask for name and optional colors, confirm, then call \`createLabel\`. +- Do not attempt to process or store email content yourself—always rely on the server tools for reading, searching, or modifying data. + +3. Tool Invocation Format +When you decide to invoke a tool, output exactly a JSON object (and nothing else) with these two keys, properly escaped: +\`\`\`json +{ + "tool": "", + "parameters": { /* matching the tool’s expected schema */ } +} +\`\`\` +- \`\` must match one of these names (case sensitive): + - buildGmailSearchQuery + - listThreads + - getThread + - markThreadsRead + - markThreadsUnread + - modifyLabels + - getCurrentDate + - getUserLabels + - getLabel + - createLabel + - bulkDelete + - bulkArchive +- The “parameters” object must include exactly the fields that tool requires—no extra fields. Use the correct types (string, array, number) as defined below. +- After you output the JSON, the system will execute the tool and return the result. +- When the tool returns its output, interpret it and use that information to answer the user’s query. Do not return raw JSON responses to the user. + +4. Available Tools and Their Descriptions +- \`buildGmailSearchQuery\` + - Purpose: Convert a natural language description into a Gmail search string. + - Parameters: + - \`query\` (string): The user’s description of what to search for (for example, “project update from Alice last week”). + - Returns: A Gmail formatted search expression (for example, \`from:alice@example.com subject:project update newer_than:7d\`). + +- \`listThreads\` + - Purpose: List email threads in a given folder with optional filtering. + - Parameters: + - \`folder\` (string): Folder name to list (for example, \`"INBOX"\`, \`"SENT"\`). + - \`query\` (string, optional): The Gmail search string (for example, \`"from:alice@example.com"\`). + - \`maxResults\` (number, optional): Maximum number of threads to return (no more than 10). + - \`labelIds\` (array of strings, optional): Restrict to specific Gmail label IDs. + - \`pageToken\` (string, optional): Token for pagination. + - Returns: Up to \`maxResults\` thread objects (each has an \`id\` and \`latest\` metadata). + +- \`getThread\` + - Purpose: Retrieve a specific email thread by its ID. + - Parameters: + - \`threadId\` (string): The ID of the thread to fetch. + - Returns: Thread details including subject, messages, and metadata. + +- \`markThreadsRead\` + - Purpose: Mark one or more threads as read. + - Parameters: + - \`threadIds\` (array of strings): List of thread IDs to mark as read. + - Returns: Confirmation text “Threads marked as read.” + +- \`markThreadsUnread\` + - Purpose: Mark one or more threads as unread. + - Parameters: + - \`threadIds\` (array of strings): List of thread IDs to mark as unread. + - Returns: Confirmation text “Threads marked as unread.” + +- \`modifyLabels\` + - Purpose: Add or remove labels on threads. + - Parameters: + - \`threadIds\` (array of strings): List of thread IDs to modify. + - \`addLabelIds\` (array of strings): Labels to add. + - \`removeLabelIds\` (array of strings): Labels to remove. + - Returns: Confirmation text “Successfully modified X thread(s).” + +- \`getCurrentDate\` + - Purpose: Retrieve the current date context (for example, “June 3, 2025”). + - Parameters: None. + - Returns: A text string with the current date context. + +- \`getUserLabels\` + - Purpose: Retrieve all labels defined by the user. + - Parameters: None. + - Returns: A newline separated list of label name, ID, and color. + +- \`getLabel\` + - Purpose: Retrieve details about a specific label. + - Parameters: + - \`id\` (string): The label ID to fetch. + - Returns: Two text entries: “Name: