diff --git a/apps/mail/components/mail/mail.tsx b/apps/mail/components/mail/mail.tsx index 8f5673b9d8..3b867dc2e0 100644 --- a/apps/mail/components/mail/mail.tsx +++ b/apps/mail/components/mail/mail.tsx @@ -999,7 +999,7 @@ function CategoryDropdown({ isMultiSelectMode }: CategoryDropdownProps) {
- Labels + Categories diff --git a/apps/mail/components/party.tsx b/apps/mail/components/party.tsx index bfa0866390..b60aab5757 100644 --- a/apps/mail/components/party.tsx +++ b/apps/mail/components/party.tsx @@ -54,13 +54,8 @@ export const NotificationProvider = () => { const { threadId } = JSON.parse(message.data); queryClient.invalidateQueries({ queryKey: trpc.mail.get.queryKey({ id: threadId }), - refetchType: 'active', - exact: true, - predicate: (query) => { - const queryAge = Date.now() - (query.state.dataUpdatedAt || 0); - return queryAge > 60000; // 1 minute in milliseconds - }, }); + console.log('invalidated mail get', threadId); } else if (type === IncomingMessageType.Mail_List) { const { folder } = JSON.parse(message.data); queryClient.invalidateQueries({ diff --git a/apps/mail/providers/query-provider.tsx b/apps/mail/providers/query-provider.tsx index 3d661e7da5..7146811d87 100644 --- a/apps/mail/providers/query-provider.tsx +++ b/apps/mail/providers/query-provider.tsx @@ -120,7 +120,12 @@ export function QueryProvider({ persistOptions={{ persister, buster: CACHE_BURST_KEY, - maxAge: 1000 * 60 * 1, // 1 minute, we're storing in DOs + maxAge: 1000 * 60 * 1, // 1 minute, we're storing in DOs, + dehydrateOptions: { + shouldDehydrateQuery(query) { + return (query.queryKey[0] as string[]).some((e) => e === 'listThreads'); + }, + }, }} onSuccess={() => { const threadQueryKey = [['mail', 'listThreads'], { type: 'infinite' }]; diff --git a/apps/server/package.json b/apps/server/package.json index ee6834eda4..00c8be132d 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -48,6 +48,7 @@ "date-fns": "^4.1.0", "dedent": "^1.6.0", "drizzle-orm": "catalog:", + "effect": "3.16.12", "elevenlabs": "1.59.0", "email-addresses": "^5.0.0", "google-auth-library": "9.15.1", diff --git a/apps/server/src/lib/brain.ts b/apps/server/src/lib/brain.ts index 678ea18e6a..0dc11883a9 100644 --- a/apps/server/src/lib/brain.ts +++ b/apps/server/src/lib/brain.ts @@ -1,7 +1,8 @@ import { ReSummarizeThread, SummarizeMessage, SummarizeThread } from './brain.fallback.prompts'; -import { AiChatPrompt, StyledEmailAssistantSystemPrompt } from './prompts'; import { getSubscriptionFactory } from './factories/subscription-factory.registry'; +import { AiChatPrompt, StyledEmailAssistantSystemPrompt } from './prompts'; import { EPrompts, EProviders } from '../types'; +import { getPromptName } from '../pipelines'; import { env } from 'cloudflare:workers'; export const enableBrainFunction = async (connection: { id: string; providerId: EProviders }) => { @@ -24,10 +25,6 @@ export const disableBrainFunction = async (connection: { id: string; providerId: } }; -const getPromptName = (connectionId: string, prompt: EPrompts) => { - return `${connectionId}-${prompt}`; -}; - export const getPrompt = async (promptName: string, fallback: string) => { const existingPrompt = await env.prompts_storage.get(promptName); if (!existingPrompt || existingPrompt === 'undefined') { diff --git a/apps/server/src/lib/factories/google-subscription.factory.ts b/apps/server/src/lib/factories/google-subscription.factory.ts index 967bf96814..ca989d2dae 100644 --- a/apps/server/src/lib/factories/google-subscription.factory.ts +++ b/apps/server/src/lib/factories/google-subscription.factory.ts @@ -31,13 +31,14 @@ class GoogleSubscriptionFactory extends BaseSubscriptionFactory { private getServiceAccount(): GoogleServiceAccount { if (!this.serviceAccount) { const serviceAccountJson = env.GOOGLE_S_ACCOUNT; - if (!serviceAccountJson) { + if (!serviceAccountJson || serviceAccountJson === '{}') { throw new Error('GOOGLE_S_ACCOUNT environment variable is required'); } try { this.serviceAccount = JSON.parse(serviceAccountJson); } catch (error) { + console.log('Invalid GOOGLE_S_ACCOUNT JSON format', serviceAccountJson); throw new Error('Invalid GOOGLE_S_ACCOUNT JSON format'); } return this.serviceAccount as GoogleServiceAccount; diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 9e6110b186..64ba0cc48f 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -15,11 +15,11 @@ import { writingStyleMatrix, } from './db/schema'; import { env, WorkerEntrypoint, DurableObject, RpcTarget } from 'cloudflare:workers'; +import { EProviders, type ISubscribeBatch, type IThreadBatch } from './types'; import { getZeroAgent, getZeroDB, verifyToken } from './lib/server-utils'; -import { MainWorkflow, ThreadWorkflow, ZeroWorkflow } from './pipelines'; import { oAuthDiscoveryMetadata } from 'better-auth/plugins'; -import { EProviders, type ISubscribeBatch } from './types'; import { eq, and, desc, asc, inArray } from 'drizzle-orm'; +import { EWorkflowType, runWorkflow } from './pipelines'; import { contextStorage } from 'hono/context-storage'; import { defaultUserSettings } from './lib/schemas'; import { createLocalJWKSet, jwtVerify } from 'jose'; @@ -40,6 +40,7 @@ import { aiRouter } from './routes/ai'; import { Autumn } from 'autumn-js'; import { appRouter } from './trpc'; import { cors } from 'hono/cors'; +import { Effect } from 'effect'; import { Hono } from 'hono'; export class DbRpcDO extends RpcTarget { @@ -567,12 +568,20 @@ export default class extends WorkerEntrypoint { .use( '*', cors({ - origin: (c) => { - if (c.includes(env.COOKIE_DOMAIN)) { - return c; - } else { + origin: (origin) => { + if (!origin) return null; + let hostname: string; + try { + hostname = new URL(origin).hostname; + } catch { return null; } + const cookieDomain = env.COOKIE_DOMAIN; + if (!cookieDomain) return null; + if (hostname === cookieDomain || hostname.endsWith('.' + cookieDomain)) { + return origin; + } + return null; }, credentials: true, allowHeaders: ['Content-Type', 'Authorization'], @@ -635,27 +644,27 @@ export default class extends WorkerEntrypoint { .get('/', (c) => c.redirect(`${env.VITE_PUBLIC_APP_URL}`)) .post('/a8n/notify/:providerId', async (c) => { if (!c.req.header('Authorization')) return c.json({ error: 'Unauthorized' }, { status: 401 }); - return c.json({ message: 'OK' }, { status: 200 }); const providerId = c.req.param('providerId'); if (providerId === EProviders.google) { const body = await c.req.json<{ historyId: string }>(); const subHeader = c.req.header('x-goog-pubsub-subscription-name'); + if (!subHeader) { + console.log('[GOOGLE] no subscription header', body); + return c.json({}, { status: 200 }); + } const isValid = await verifyToken(c.req.header('Authorization')!.split(' ')[1]); if (!isValid) { console.log('[GOOGLE] invalid request', body); return c.json({}, { status: 200 }); } try { - const instance = await env.MAIN_WORKFLOW.create({ - params: { - providerId, - historyId: body.historyId, - subscriptionName: subHeader, - }, + await env.thread_queue.send({ + providerId, + historyId: body.historyId, + subscriptionName: subHeader!, }); - console.log('[GOOGLE] created instance', instance.id, instance.status); } catch (error) { - console.error('Error creating instance', error, { + console.error('Error sending to thread queue', error, { providerId, historyId: body.historyId, subscriptionName: subHeader, @@ -675,13 +684,13 @@ export default class extends WorkerEntrypoint { return this.app.fetch(request, this.env, this.ctx); } - async queue(batch: MessageBatch) { + async queue(batch: MessageBatch) { switch (true) { case batch.queue.startsWith('subscribe-queue'): { console.log('batch', batch); try { await Promise.all( - batch.messages.map(async (msg) => { + batch.messages.map(async (msg: Message) => { const connectionId = msg.body.connectionId; const providerId = msg.body.providerId; console.log('connectionId', connectionId); @@ -696,12 +705,39 @@ export default class extends WorkerEntrypoint { } }), ); - console.log('batch done'); + console.log('[SUBSCRIBE_QUEUE] batch done'); } finally { batch.ackAll(); } return; } + case batch.queue.startsWith('thread-queue'): { + console.log('batch', batch); + try { + await Promise.all( + batch.messages.map(async (msg: Message) => { + const providerId = msg.body.providerId; + const historyId = msg.body.historyId; + const subscriptionName = msg.body.subscriptionName; + const workflow = runWorkflow(EWorkflowType.MAIN, { + providerId, + historyId, + subscriptionName, + }); + + try { + const result = await Effect.runPromise(workflow); + console.log('[THREAD_QUEUE] result', result); + } catch (error) { + console.error('Error running workflow', error); + } + }), + ); + } finally { + batch.ackAll(); + } + break; + } } } @@ -749,4 +785,4 @@ export default class extends WorkerEntrypoint { } } -export { DurableMailbox, ZeroAgent, ZeroMCP, MainWorkflow, ZeroWorkflow, ThreadWorkflow, ZeroDB }; +export { DurableMailbox, ZeroAgent, ZeroMCP, ZeroDB }; diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts new file mode 100644 index 0000000000..6da0367462 --- /dev/null +++ b/apps/server/src/pipelines.effect.ts @@ -0,0 +1,1100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { + ReSummarizeThread, + SummarizeMessage, + SummarizeThread, + ThreadLabels, +} from './lib/brain.fallback.prompts'; +import { defaultLabels, EPrompts, EProviders, type ParsedMessage, type Sender } from './types'; +import { connectionToDriver, getZeroAgent, notifyUser } from './lib/server-utils'; +import { Effect, Console, pipe, Match, Option } from 'effect'; +import { type gmail_v1 } from '@googleapis/gmail'; +import { getPromptName } from './pipelines'; +import { env } from 'cloudflare:workers'; +import { connection } from './db/schema'; +import * as cheerio from 'cheerio'; +import { eq } from 'drizzle-orm'; +import { createDb } from './db'; +import { z } from 'zod'; + +const showLogs = true; + +const log = (message: string, ...args: any[]) => { + if (showLogs) { + console.log(message, ...args); + return message; + } + return 'no message'; +}; + +const isValidUUID = (str: string): boolean => { + const regex = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + return regex.test(str); +}; + +// Define the workflow parameters type +type MainWorkflowParams = { + providerId: string; + historyId: string; + subscriptionName: string; +}; + +// Define error types +type MainWorkflowError = + | { _tag: 'MissingEnvironmentVariable'; variable: string } + | { _tag: 'InvalidSubscriptionName'; subscriptionName: string } + | { _tag: 'InvalidConnectionId'; connectionId: string } + | { _tag: 'UnsupportedProvider'; providerId: string } + | { _tag: 'WorkflowCreationFailed'; error: unknown }; + +const validateArguments = ( + params: MainWorkflowParams, + serviceAccount: any, +): Effect.Effect => + Effect.gen(function* () { + yield* Console.log('[MAIN_WORKFLOW] Validating arguments'); + const regex = new RegExp( + `projects/${serviceAccount.project_id}/subscriptions/notifications__([a-z0-9-]+)`, + ); + const match = params.subscriptionName.toString().match(regex); + if (!match) { + yield* Console.log('[MAIN_WORKFLOW] Invalid subscription name:', params.subscriptionName); + return yield* Effect.fail({ + _tag: 'InvalidSubscriptionName' as const, + subscriptionName: params.subscriptionName, + }); + } + const [, connectionId] = match; + yield* Console.log('[MAIN_WORKFLOW] Extracted connectionId:', connectionId); + return connectionId; + }); + +const override = false; + +export const runMainWorkflow = ( + params: MainWorkflowParams, +): Effect.Effect => + Effect.gen(function* () { + yield* Console.log('[MAIN_WORKFLOW] Starting workflow with payload:', params); + + const { providerId, historyId, subscriptionName } = params; + + let serviceAccount = null; + if (override) { + serviceAccount = override; + } else { + if (!env.GOOGLE_S_ACCOUNT || env.GOOGLE_S_ACCOUNT === '{}') { + return yield* Effect.fail({ + _tag: 'MissingEnvironmentVariable' as const, + variable: 'GOOGLE_S_ACCOUNT', + }); + } + + serviceAccount = JSON.parse(env.GOOGLE_S_ACCOUNT); + } + + const connectionId = yield* validateArguments(params, serviceAccount); + + if (!isValidUUID(connectionId)) { + yield* Console.log('[MAIN_WORKFLOW] Invalid connection id format:', connectionId); + return yield* Effect.fail({ + _tag: 'InvalidConnectionId' as const, + connectionId, + }); + } + + const previousHistoryId = yield* Effect.tryPromise({ + try: () => env.gmail_history_id.get(connectionId), + catch: () => ({ _tag: 'WorkflowCreationFailed' as const, error: 'Failed to get history ID' }), + }).pipe(Effect.orElse(() => Effect.succeed(null))); + + if (providerId === EProviders.google) { + yield* Console.log('[MAIN_WORKFLOW] Processing Google provider workflow'); + yield* Console.log('[MAIN_WORKFLOW] Previous history ID:', previousHistoryId); + + const zeroWorkflowParams = { + connectionId, + historyId: previousHistoryId || historyId, + nextHistoryId: historyId, + }; + + const result = yield* runZeroWorkflow(zeroWorkflowParams).pipe( + Effect.mapError( + (error): MainWorkflowError => ({ _tag: 'WorkflowCreationFailed' as const, error }), + ), + ); + + yield* Console.log('[MAIN_WORKFLOW] Zero workflow result:', result); + } else { + yield* Console.log('[MAIN_WORKFLOW] Unsupported provider:', providerId); + return yield* Effect.fail({ + _tag: 'UnsupportedProvider' as const, + providerId, + }); + } + + yield* Console.log('[MAIN_WORKFLOW] Workflow completed successfully'); + return 'Workflow completed successfully'; + }).pipe(Effect.tapError((error) => Console.log('[MAIN_WORKFLOW] Error in workflow:', error))); + +// Define the ZeroWorkflow parameters type +type ZeroWorkflowParams = { + connectionId: string; + historyId: string; + nextHistoryId: string; +}; + +// Define error types for ZeroWorkflow +type ZeroWorkflowError = + | { _tag: 'HistoryAlreadyProcessing'; connectionId: string; historyId: string } + | { _tag: 'ConnectionNotFound'; connectionId: string } + | { _tag: 'ConnectionNotAuthorized'; connectionId: string } + | { _tag: 'HistoryNotFound'; historyId: string; connectionId: string } + | { _tag: 'UnsupportedProvider'; providerId: string } + | { _tag: 'DatabaseError'; error: unknown } + | { _tag: 'GmailApiError'; error: unknown } + | { _tag: 'WorkflowCreationFailed'; error: unknown }; + +export const runZeroWorkflow = ( + params: ZeroWorkflowParams, +): Effect.Effect => + Effect.gen(function* () { + yield* Console.log('[ZERO_WORKFLOW] Starting workflow with payload:', params); + const { connectionId, historyId, nextHistoryId } = params; + + const historyProcessingKey = `history_${connectionId}__${historyId}`; + const isProcessing = yield* Effect.tryPromise({ + try: () => env.gmail_processing_threads.get(historyProcessingKey), + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + if (isProcessing === 'true') { + yield* Console.log('[ZERO_WORKFLOW] History already being processed:', { + connectionId, + historyId, + processingStatus: isProcessing, + }); + return yield* Effect.fail({ + _tag: 'HistoryAlreadyProcessing' as const, + connectionId, + historyId, + }); + } + + yield* Effect.tryPromise({ + try: () => + env.gmail_processing_threads.put(historyProcessingKey, 'true', { expirationTtl: 3600 }), + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + yield* Console.log('[ZERO_WORKFLOW] Set processing flag for history:', historyProcessingKey); + + const { db, conn } = createDb(env.HYPERDRIVE.connectionString); + + const foundConnection = yield* Effect.tryPromise({ + try: async () => { + console.log('[ZERO_WORKFLOW] Finding connection:', connectionId); + const [foundConnection] = await db + .select() + .from(connection) + .where(eq(connection.id, connectionId.toString())); + await conn.end(); + if (!foundConnection) { + throw new Error(`Connection not found ${connectionId}`); + } + if (!foundConnection.accessToken || !foundConnection.refreshToken) { + throw new Error(`Connection is not authorized ${connectionId}`); + } + console.log('[ZERO_WORKFLOW] Found connection:', foundConnection.id); + return foundConnection; + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + const agent = yield* Effect.tryPromise({ + try: async () => await getZeroAgent(foundConnection.id), + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + if (foundConnection.providerId === EProviders.google) { + yield* Console.log('[ZERO_WORKFLOW] Processing Google provider workflow'); + + const history = yield* Effect.tryPromise({ + try: async () => { + console.log('[ZERO_WORKFLOW] Getting Gmail history with ID:', historyId); + const { history } = (await agent.listHistory(historyId.toString())) as { + history: gmail_v1.Schema$History[]; + }; + console.log('[ZERO_WORKFLOW] Found history entries:', history); + return history; + }, + catch: (error) => ({ _tag: 'GmailApiError' as const, error }), + }); + + if (!history.length) { + yield* Console.log('[ZERO_WORKFLOW] No history found, skipping'); + return 'No history found'; + } + + yield* Effect.tryPromise({ + try: () => { + console.log('[ZERO_WORKFLOW] Updating next history ID:', nextHistoryId); + return env.gmail_history_id.put(connectionId.toString(), nextHistoryId.toString()); + }, + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + // Extract thread IDs from history + const threadIds = new Set(); + history.forEach((historyItem) => { + if (historyItem.messagesAdded) { + historyItem.messagesAdded.forEach((messageAdded) => { + if (messageAdded.message?.threadId) { + threadIds.add(messageAdded.message.threadId); + } + }); + } + if (historyItem.labelsAdded) { + historyItem.labelsAdded.forEach((labelAdded) => { + if (labelAdded.message?.threadId) { + threadIds.add(labelAdded.message.threadId); + } + }); + } + if (historyItem.labelsRemoved) { + historyItem.labelsRemoved.forEach((labelRemoved) => { + if (labelRemoved.message?.threadId) { + threadIds.add(labelRemoved.message.threadId); + } + }); + } + }); + + yield* Console.log('[ZERO_WORKFLOW] Found unique thread IDs:', Array.from(threadIds)); + + // Process all threads concurrently using Effect.all + if (threadIds.size > 0) { + const threadWorkflowParams = Array.from(threadIds).map((threadId) => ({ + connectionId, + threadId, + providerId: foundConnection.providerId, + })); + + const threadResults = yield* Effect.all( + threadWorkflowParams.map((params) => + Effect.gen(function* () { + // Set processing flag for thread + yield* Effect.tryPromise({ + try: () => { + console.log( + '[ZERO_WORKFLOW] Setting processing flag for thread:', + params.threadId, + ); + return env.gmail_processing_threads.put(params.threadId.toString(), 'true', { + expirationTtl: 1800, + }); + }, + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + // Check if thread is already processing + const isProcessing = yield* Effect.tryPromise({ + try: () => env.gmail_processing_threads.get(params.threadId.toString()), + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + if (isProcessing === 'true') { + yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId); + return 'Thread already processing'; + } + + // Run the thread workflow + return yield* runThreadWorkflow(params).pipe( + Effect.mapError( + (error): ZeroWorkflowError => ({ + _tag: 'WorkflowCreationFailed' as const, + error, + }), + ), + ); + }), + ), + { concurrency: 5 }, // Process up to 5 threads concurrently + ); + + yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length); + } else { + yield* Console.log('[ZERO_WORKFLOW] No threads to process'); + } + + // // Clean up processing flag + // yield* Effect.tryPromise({ + // try: () => { + // console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey); + // return env.gmail_processing_threads.delete(historyProcessingKey); + // }, + // catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + // }).pipe(Effect.orElse(() => Effect.succeed(null))); + + yield* Console.log('[ZERO_WORKFLOW] Processing complete'); + return 'Zero workflow completed successfully'; + } else { + yield* Console.log('[ZERO_WORKFLOW] Unsupported provider:', foundConnection.providerId); + return yield* Effect.fail({ + _tag: 'UnsupportedProvider' as const, + providerId: foundConnection.providerId, + }); + } + }).pipe( + Effect.tapError((error) => Console.log('[ZERO_WORKFLOW] Error in workflow:', error)), + Effect.catchAll((error) => { + // Clean up processing flag on error + return Effect.tryPromise({ + try: () => { + console.log( + '[ZERO_WORKFLOW] Clearing processing flag for history after error:', + `history_${params.connectionId}__${params.historyId}`, + ); + return env.gmail_processing_threads.delete( + `history_${params.connectionId}__${params.historyId}`, + ); + }, + catch: () => ({ + _tag: 'WorkflowCreationFailed' as const, + error: 'Failed to cleanup processing flag', + }), + }).pipe( + Effect.orElse(() => Effect.succeed(null)), + Effect.flatMap(() => Effect.fail(error)), + ); + }), + ); + +// Define the ThreadWorkflow parameters type +type ThreadWorkflowParams = { + connectionId: string; + threadId: string; + providerId: string; +}; + +// Define error types for ThreadWorkflow +type ThreadWorkflowError = + | { _tag: 'ConnectionNotFound'; connectionId: string } + | { _tag: 'ConnectionNotAuthorized'; connectionId: string } + | { _tag: 'ThreadNotFound'; threadId: string } + | { _tag: 'UnsupportedProvider'; providerId: string } + | { _tag: 'DatabaseError'; error: unknown } + | { _tag: 'GmailApiError'; error: unknown } + | { _tag: 'VectorizationError'; error: unknown }; + +export const runThreadWorkflow = ( + params: ThreadWorkflowParams, +): Effect.Effect => + Effect.gen(function* () { + yield* Console.log('[THREAD_WORKFLOW] Starting workflow with payload:', params); + const { connectionId, threadId, providerId } = params; + + if (providerId === EProviders.google) { + yield* Console.log('[THREAD_WORKFLOW] Processing Google provider workflow'); + const { db, conn } = createDb(env.HYPERDRIVE.connectionString); + + const foundConnection = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Finding connection:', connectionId); + const [foundConnection] = await db + .select() + .from(connection) + .where(eq(connection.id, connectionId.toString())); + await conn.end(); + if (!foundConnection) { + throw new Error(`Connection not found ${connectionId}`); + } + if (!foundConnection.accessToken || !foundConnection.refreshToken) { + throw new Error(`Connection is not authorized ${connectionId}`); + } + console.log('[THREAD_WORKFLOW] Found connection:', foundConnection.id); + return foundConnection; + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + const agent = yield* Effect.tryPromise({ + try: async () => await getZeroAgent(foundConnection.id), + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + const thread = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Getting thread:', threadId); + const thread = await agent.getThread(threadId.toString()); + console.log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages.length); + return thread; + }, + catch: (error) => ({ _tag: 'GmailApiError' as const, error }), + }); + + if (!thread.messages || thread.messages.length === 0) { + yield* Console.log('[THREAD_WORKFLOW] Thread has no messages, skipping processing'); + return 'Thread has no messages'; + } + + yield* Console.log('[THREAD_WORKFLOW] Processing thread messages and vectorization'); + + const messagesToVectorize = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Finding messages to vectorize'); + console.log('[THREAD_WORKFLOW] Getting message IDs from thread'); + const messageIds = thread.messages.map((message) => message.id); + console.log('[THREAD_WORKFLOW] Found message IDs:', messageIds); + + console.log('[THREAD_WORKFLOW] Fetching existing vectorized messages'); + const existingMessages = await env.VECTORIZE_MESSAGE.getByIds(messageIds); + console.log('[THREAD_WORKFLOW] Found existing messages:', existingMessages.length); + + const existingMessageIds = new Set(existingMessages.map((message) => message.id)); + console.log('[THREAD_WORKFLOW] Existing message IDs:', Array.from(existingMessageIds)); + + const messagesToVectorize = thread.messages.filter( + (message) => !existingMessageIds.has(message.id), + ); + console.log('[THREAD_WORKFLOW] Messages to vectorize:', messagesToVectorize.length); + + return messagesToVectorize; + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + let finalEmbeddings: VectorizeVector[] = []; + + if (messagesToVectorize.length === 0) { + yield* Console.log('[THREAD_WORKFLOW] No messages to vectorize, skipping vectorization'); + } else { + finalEmbeddings = yield* Effect.tryPromise({ + try: async () => { + console.log( + '[THREAD_WORKFLOW] Starting message vectorization for', + messagesToVectorize.length, + 'messages', + ); + + const maxConcurrentMessages = 3; + const results: VectorizeVector[] = []; + + for (let i = 0; i < messagesToVectorize.length; i += maxConcurrentMessages) { + const batch = messagesToVectorize.slice(i, i + maxConcurrentMessages); + const batchResults = await Promise.all( + batch.map(async (message) => { + try { + console.log('[THREAD_WORKFLOW] Converting message to XML:', message.id); + const prompt = await messageToXML(message); + if (!prompt) { + console.log('[THREAD_WORKFLOW] Message has no prompt, skipping:', message.id); + return null; + } + console.log('[THREAD_WORKFLOW] Got XML prompt for message:', message.id); + + console.log( + '[THREAD_WORKFLOW] Getting summarize prompt for connection:', + message.connectionId ?? '', + ); + const SummarizeMessagePrompt = await getPrompt( + getPromptName(message.connectionId ?? '', EPrompts.SummarizeMessage), + SummarizeMessage, + ); + console.log('[THREAD_WORKFLOW] Got summarize prompt for message:', message.id); + + console.log('[THREAD_WORKFLOW] Generating summary for message:', message.id); + const messages = [ + { role: 'system', content: SummarizeMessagePrompt }, + { role: 'user', content: prompt }, + ]; + const response: any = await env.AI.run( + '@cf/meta/llama-4-scout-17b-16e-instruct', + { + messages, + }, + ); + console.log( + `[THREAD_WORKFLOW] Summary generated for message ${message.id}:`, + response, + ); + const summary = 'response' in response ? response.response : response; + if (!summary || typeof summary !== 'string') { + throw new Error(`Invalid summary response for message ${message.id}`); + } + + console.log( + '[THREAD_WORKFLOW] Getting embedding vector for message:', + message.id, + ); + const embeddingVector = await getEmbeddingVector(summary); + console.log('[THREAD_WORKFLOW] Got embedding vector for message:', message.id); + + if (!embeddingVector) + throw new Error(`Message Embedding vector is null ${message.id}`); + + return { + id: message.id, + metadata: { + connection: message.connectionId ?? '', + thread: message.threadId ?? '', + summary, + }, + values: embeddingVector, + } satisfies VectorizeVector; + } catch (error) { + console.log('[THREAD_WORKFLOW] Failed to vectorize message:', { + messageId: message.id, + error: error instanceof Error ? error.message : String(error), + }); + return null; + } + }), + ); + + const validResults = batchResults.filter( + (result): result is NonNullable => result !== null, + ); + results.push(...validResults); + + if (i + maxConcurrentMessages < messagesToVectorize.length) { + console.log('[THREAD_WORKFLOW] Sleeping between message batches'); + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } + + return results; + }, + catch: (error) => ({ _tag: 'VectorizationError' as const, error }), + }); + + yield* Console.log('[THREAD_WORKFLOW] Generated embeddings for all messages'); + + if (finalEmbeddings.length > 0) { + yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Upserting message vectors:', finalEmbeddings.length); + await env.VECTORIZE_MESSAGE.upsert(finalEmbeddings); + console.log('[THREAD_WORKFLOW] Successfully upserted message vectors'); + }, + catch: (error) => ({ _tag: 'VectorizationError' as const, error }), + }); + } + } + + const existingThreadSummary = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Getting existing thread summary for:', threadId); + const threadSummary = await env.VECTORIZE.getByIds([threadId.toString()]); + if (!threadSummary.length) { + console.log('[THREAD_WORKFLOW] No existing thread summary found'); + return null; + } + console.log('[THREAD_WORKFLOW] Found existing thread summary'); + return threadSummary[0].metadata as any; + }, + catch: (error) => ({ _tag: 'VectorizationError' as const, error }), + }); + + // Early exit if no new messages (prevents infinite loop from label changes) + const newestMessage = thread.messages[thread.messages.length - 1]; + if (existingThreadSummary && existingThreadSummary.lastMsg === newestMessage?.id) { + yield* Console.log( + '[THREAD_WORKFLOW] No new messages since last processing, skipping AI processing', + ); + return 'Thread workflow completed - no new messages'; + } + + const finalSummary = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Generating final thread summary'); + if (existingThreadSummary) { + console.log('[THREAD_WORKFLOW] Using existing summary as context'); + return await summarizeThread( + connectionId.toString(), + thread.messages, + existingThreadSummary.summary, + ); + } else { + console.log('[THREAD_WORKFLOW] Generating new summary without context'); + return await summarizeThread(connectionId.toString(), thread.messages); + } + }, + catch: (error) => ({ _tag: 'VectorizationError' as const, error }), + }); + + const userAccountLabels = yield* Effect.tryPromise({ + try: async () => { + const userAccountLabels = await agent.getUserLabels(); + return userAccountLabels; + }, + catch: (error) => ({ _tag: 'GmailApiError' as const, error }), + }); + + if (finalSummary) { + yield* Console.log('[THREAD_WORKFLOW] Got final summary, processing labels'); + + const userLabels = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Getting user labels for connection:', connectionId); + let userLabels: { name: string; usecase: string }[] = []; + const connectionLabels = await env.connection_labels.get(connectionId.toString()); + if (connectionLabels) { + try { + console.log('[THREAD_WORKFLOW] Parsing existing connection labels'); + const parsed = JSON.parse(connectionLabels); + if ( + Array.isArray(parsed) && + parsed.every((label) => typeof label === 'object' && label.name && label.usecase) + ) { + userLabels = parsed; + } else { + throw new Error('Invalid label format'); + } + } catch { + console.log('[THREAD_WORKFLOW] Failed to parse labels, using defaults'); + await env.connection_labels.put( + connectionId.toString(), + JSON.stringify(defaultLabels), + ); + userLabels = defaultLabels; + } + } else { + console.log('[THREAD_WORKFLOW] No labels found, using defaults'); + await env.connection_labels.put( + connectionId.toString(), + JSON.stringify(defaultLabels), + ); + userLabels = defaultLabels; + } + return userLabels.length ? userLabels : defaultLabels; + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + const generatedLabels = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Generating labels for thread:', threadId); + const labelsResponse: any = await env.AI.run( + '@cf/meta/llama-3.3-70b-instruct-fp8-fast', + { + messages: [ + { role: 'system', content: ThreadLabels(userLabels, thread.labels) }, + { role: 'user', content: finalSummary }, + ], + }, + ); + if (labelsResponse?.response?.replaceAll('!', '').trim()?.length) { + console.log('[THREAD_WORKFLOW] Labels generated:', labelsResponse.response); + const labels: string[] = labelsResponse?.response + ?.split(',') + .map((e: string) => e.trim()) + .filter((e: string) => e.length > 0) + .filter((e: string) => + userLabels.find((label) => label.name.toLowerCase() === e.toLowerCase()), + ); + return labels; + } else { + console.log('[THREAD_WORKFLOW] No labels generated'); + return []; + } + }, + catch: (error) => ({ _tag: 'VectorizationError' as const, error }), + }).pipe(Effect.orElse(() => Effect.succeed([]))); + + if (generatedLabels && generatedLabels.length > 0) { + yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Modifying thread labels:', generatedLabels); + const validLabelIds = generatedLabels + .map((name) => userAccountLabels.find((e) => e.name === name)?.id) + .filter((id): id is string => id !== undefined && id !== ''); + + if (validLabelIds.length > 0) { + // Check delta - only modify if there are actual changes + const currentLabelIds = thread.labels?.map((l) => l.id) || []; + const labelsToAdd = validLabelIds.filter((id) => !currentLabelIds.includes(id)); + const aiLabelIds = userAccountLabels + .filter((l) => userLabels.some((ul) => ul.name === l.name)) + .map((l) => l.id); + const labelsToRemove = currentLabelIds.filter( + (id) => aiLabelIds.includes(id) && !validLabelIds.includes(id), + ); + + if (labelsToAdd.length > 0 || labelsToRemove.length > 0) { + console.log('[THREAD_WORKFLOW] Applying label changes:', { + add: labelsToAdd, + remove: labelsToRemove, + }); + await agent.modifyLabels([threadId.toString()], labelsToAdd, labelsToRemove); + await agent.syncThread(threadId.toString()); + console.log('[THREAD_WORKFLOW] Successfully modified thread labels'); + } else { + console.log('[THREAD_WORKFLOW] No label changes needed - labels already match'); + } + } + }, + catch: (error) => ({ _tag: 'GmailApiError' as const, error }), + }); + } + + const embeddingVector = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Getting thread embedding vector'); + const embeddingVector = await getEmbeddingVector(finalSummary); + console.log('[THREAD_WORKFLOW] Got thread embedding vector'); + return embeddingVector; + }, + catch: (error) => ({ _tag: 'VectorizationError' as const, error }), + }); + + if (!embeddingVector) { + yield* Console.log( + '[THREAD_WORKFLOW] Thread Embedding vector is null, skipping vector upsert', + ); + return 'Thread workflow completed successfully'; + } + + yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Upserting thread vector'); + const newestMessage = thread.messages[thread.messages.length - 1]; + await env.VECTORIZE.upsert([ + { + id: threadId.toString(), + metadata: { + connection: connectionId.toString(), + thread: threadId.toString(), + summary: finalSummary, + lastMsg: newestMessage?.id, // Store last message ID to prevent reprocessing + }, + values: embeddingVector, + }, + ]); + console.log('[THREAD_WORKFLOW] Successfully upserted thread vector'); + }, + catch: (error) => ({ _tag: 'VectorizationError' as const, error }), + }); + } else { + yield* Console.log( + '[THREAD_WORKFLOW] No summary generated for thread', + threadId, + 'messages count:', + thread.messages.length, + ); + } + + // Clean up thread processing flag + yield* Effect.tryPromise({ + try: () => { + console.log('[THREAD_WORKFLOW] Clearing processing flag for thread:', threadId); + return env.gmail_processing_threads.delete(threadId.toString()); + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }).pipe(Effect.orElse(() => Effect.succeed(null))); + + yield* Console.log('[THREAD_WORKFLOW] Thread processing complete'); + return 'Thread workflow completed successfully'; + } else { + yield* Console.log('[THREAD_WORKFLOW] Unsupported provider:', providerId); + return yield* Effect.fail({ + _tag: 'UnsupportedProvider' as const, + providerId, + }); + } + }).pipe( + Effect.tapError((error) => Console.log('[THREAD_WORKFLOW] Error in workflow:', error)), + Effect.catchAll((error) => { + // Clean up thread processing flag on error + return Effect.tryPromise({ + try: () => { + console.log( + '[THREAD_WORKFLOW] Clearing processing flag for thread after error:', + params.threadId, + ); + return env.gmail_processing_threads.delete(params.threadId.toString()); + }, + catch: () => ({ + _tag: 'DatabaseError' as const, + error: 'Failed to cleanup thread processing flag', + }), + }).pipe( + Effect.orElse(() => Effect.succeed(null)), + Effect.flatMap(() => Effect.fail(error)), + ); + }), + ); + +// Helper functions for vectorization and AI processing +type VectorizeVectorMetadata = 'connection' | 'thread' | 'summary'; +type IThreadSummaryMetadata = Record; + +export async function htmlToText(decodedBody: string): Promise { + try { + if (!decodedBody || typeof decodedBody !== 'string') { + return ''; + } + const $ = cheerio.load(decodedBody); + $('script').remove(); + $('style').remove(); + return $('body') + .text() + .replace(/\r?\n|\r/g, ' ') + .replace(/\s+/g, ' ') + .trim(); + } catch (error) { + log('Error extracting text from HTML:', error); + return ''; + } +} + +const escapeXml = (text: string): string => { + if (!text) return ''; + return text + .replace(/&/g, '&') + .replace(//g, '>') + .replace(/"/g, '"') + .replace(/'/g, '''); +}; + +const messageToXML = async (message: ParsedMessage) => { + try { + if (!message.decodedBody) return null; + const body = await htmlToText(message.decodedBody || ''); + log('[MESSAGE_TO_XML] Body', body); + if (!body || body.length < 10) { + log('Skipping message with body length < 10', body); + return null; + } + + const safeSenderName = escapeXml(message.sender?.name || 'Unknown'); + const safeSubject = escapeXml(message.subject || ''); + const safeDate = escapeXml(message.receivedOn || ''); + + const toElements = (message.to || []) + .map((r) => `${escapeXml(r?.email || '')}`) + .join(''); + const ccElements = (message.cc || []) + .map((r) => `${escapeXml(r?.email || '')}`) + .join(''); + + return ` + + ${safeSenderName} + ${toElements} + ${ccElements} + ${safeDate} + ${safeSubject} + ${escapeXml(body)} + + `; + } catch (error) { + log('[MESSAGE_TO_XML] Failed to convert message to XML:', { + messageId: message.id, + error: error instanceof Error ? error.message : String(error), + }); + return null; + } +}; + +export const getPrompt = async (promptName: string, fallback: string) => { + try { + if (!promptName || typeof promptName !== 'string') { + log('[GET_PROMPT] Invalid prompt name:', promptName); + return fallback; + } + + const existingPrompt = await env.prompts_storage.get(promptName); + if (!existingPrompt) { + await env.prompts_storage.put(promptName, fallback); + return fallback; + } + return existingPrompt; + } catch (error) { + log('[GET_PROMPT] Failed to get prompt:', { + promptName, + error: error instanceof Error ? error.message : String(error), + }); + return fallback; + } +}; + +export const getEmbeddingVector = async (text: string) => { + try { + if (!text || typeof text !== 'string' || text.trim().length === 0) { + log('[getEmbeddingVector] Empty or invalid text provided'); + return null; + } + + const embeddingResponse = await env.AI.run( + '@cf/baai/bge-large-en-v1.5', + { text: text.trim() }, + { + gateway: { + id: 'vectorize-save', + }, + }, + ); + const embeddingVector = (embeddingResponse as any).data?.[0]; + return embeddingVector ?? null; + } catch (error) { + log('[getEmbeddingVector] failed', error); + return null; + } +}; + +const getParticipants = (messages: ParsedMessage[]) => { + if (!messages || !Array.isArray(messages) || messages.length === 0) { + return []; + } + + const result = new Map(); + const setIfUnset = (sender: Sender) => { + if (sender?.email && !result.has(sender.email)) { + result.set(sender.email, sender.name || ''); + } + }; + + for (const msg of messages) { + if (msg?.sender) { + setIfUnset(msg.sender); + } + if (msg?.cc && Array.isArray(msg.cc)) { + for (const ccParticipant of msg.cc) { + if (ccParticipant) setIfUnset(ccParticipant); + } + } + if (msg?.to && Array.isArray(msg.to)) { + for (const toParticipant of msg.to) { + if (toParticipant) setIfUnset(toParticipant); + } + } + } + return Array.from(result.entries()); +}; + +const threadToXML = async (messages: ParsedMessage[], existingSummary?: string) => { + if (!messages || !Array.isArray(messages) || messages.length === 0) { + throw new Error('No messages provided for thread XML generation'); + } + + const firstMessage = messages[0]; + if (!firstMessage) { + throw new Error('First message is null or undefined'); + } + + const { subject = '', title = '' } = firstMessage; + const participants = getParticipants(messages); + const messagesXML = await Promise.all(messages.map(messageToXML)); + const validMessagesXML = messagesXML.filter((xml): xml is string => xml !== null); + + if (existingSummary) { + return ` + ${escapeXml(title)} + ${escapeXml(subject)} + + ${participants.map(([email, name]) => { + return `${escapeXml(name || email)} ${name ? `< ${escapeXml(email)} >` : ''}`; + })} + + + ${escapeXml(existingSummary)} + + + ${validMessagesXML.map((e) => e + '\n')} + + `; + } + return ` + ${escapeXml(title)} + ${escapeXml(subject)} + + ${participants.map(([email, name]) => { + return `${escapeXml(name || email)} < ${escapeXml(email)} >`; + })} + + + ${validMessagesXML.map((e) => e + '\n')} + + `; +}; + +const summarizeThread = async ( + connectionId: string, + messages: ParsedMessage[], + existingSummary?: string, +): Promise => { + try { + if (!messages || !Array.isArray(messages) || messages.length === 0) { + log('[SUMMARIZE_THREAD] No messages provided for summarization'); + return null; + } + + if (!connectionId || typeof connectionId !== 'string') { + log('[SUMMARIZE_THREAD] Invalid connection ID provided'); + return null; + } + + const prompt = await threadToXML(messages, existingSummary); + if (!prompt) { + log('[SUMMARIZE_THREAD] Failed to generate thread XML'); + return null; + } + + if (existingSummary) { + const ReSummarizeThreadPrompt = await getPrompt( + getPromptName(connectionId, EPrompts.ReSummarizeThread), + ReSummarizeThread, + ); + const promptMessages = [ + { role: 'system', content: ReSummarizeThreadPrompt }, + { + role: 'user', + content: prompt, + }, + ]; + const response: any = await env.AI.run('@cf/meta/llama-3.3-70b-instruct-fp8-fast', { + messages: promptMessages, + }); + const summary = response?.response; + return typeof summary === 'string' ? summary : null; + } else { + const SummarizeThreadPrompt = await getPrompt( + getPromptName(connectionId, EPrompts.SummarizeThread), + SummarizeThread, + ); + const promptMessages = [ + { role: 'system', content: SummarizeThreadPrompt }, + { + role: 'user', + content: prompt, + }, + ]; + const response: any = await env.AI.run('@cf/meta/llama-3.3-70b-instruct-fp8-fast', { + messages: promptMessages, + }); + const summary = response?.response; + return typeof summary === 'string' ? summary : null; + } + } catch (error) { + log('[SUMMARIZE_THREAD] Failed to summarize thread:', { + connectionId, + messageCount: messages?.length || 0, + hasExistingSummary: !!existingSummary, + error: error instanceof Error ? error.message : String(error), + }); + return null; + } +}; diff --git a/apps/server/src/pipelines.ts b/apps/server/src/pipelines.ts index cd2d1c96a3..f6926f1866 100644 --- a/apps/server/src/pipelines.ts +++ b/apps/server/src/pipelines.ts @@ -11,1170 +11,56 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { - ReSummarizeThread, - SummarizeMessage, - SummarizeThread, - ThreadLabels, -} from './lib/brain.fallback.prompts'; -import { defaultLabels, EPrompts, EProviders, type ParsedMessage, type Sender } from './types'; -import { WorkflowEntrypoint, WorkflowStep, type WorkflowEvent } from 'cloudflare:workers'; -import { connectionToDriver, getZeroAgent, notifyUser } from './lib/server-utils'; -import { type gmail_v1 } from '@googleapis/gmail'; -import { env } from 'cloudflare:workers'; -import { connection } from './db/schema'; -import * as cheerio from 'cheerio'; -import { eq } from 'drizzle-orm'; -import { createDb } from './db'; -import { z } from 'zod'; - -const showLogs = true; - -const log = (message: string, ...args: any[]) => { - if (showLogs) { - console.log(message, ...args); - return message; - } - return 'no message'; -}; - -type VectorizeVectorMetadata = 'connection' | 'thread' | 'summary'; - -type IThreadSummaryMetadata = Record; - -export class MainWorkflow extends WorkflowEntrypoint { - async run( - event: Readonly>>, - step: WorkflowStep, - ) { - log('[MAIN_WORKFLOW] Starting workflow with payload:', event.payload); - try { - const { providerId, historyId, subscriptionName } = event.payload; - - if (!env.GOOGLE_S_ACCOUNT) { - throw new Error('GOOGLE_S_ACCOUNT environment variable is not set'); - } - - const serviceAccount = JSON.parse(env.GOOGLE_S_ACCOUNT); - const connectionId = await step.do( - `[MAIN_WORKFLOW] Validate Arguments ${providerId} ${subscriptionName} ${historyId}`, - async () => { - log('[MAIN_WORKFLOW] Validating arguments'); - const regex = new RegExp( - `projects/${serviceAccount.project_id}/subscriptions/notifications__([a-z0-9-]+)`, - ); - const match = subscriptionName.toString().match(regex); - if (!match) { - log('[MAIN_WORKFLOW] Invalid subscription name:', subscriptionName); - throw new Error(`Invalid subscription name ${subscriptionName}`); - } - const [, connectionId] = match; - log('[MAIN_WORKFLOW] Extracted connectionId:', connectionId); - return connectionId; - }, - ); - if (!isValidUUID(connectionId)) { - log('[MAIN_WORKFLOW] Invalid connection id format:', connectionId); - return 'Invalid connection id'; - } - const previousHistoryId = await env.gmail_history_id.get(connectionId); - if (providerId === EProviders.google) { - log('[MAIN_WORKFLOW] Processing Google provider workflow'); - await step.do( - `[MAIN_WORKFLOW] Send to Zero Workflow ${connectionId} ${historyId}`, - async () => { - log('[MAIN_WORKFLOW] Previous history ID:', previousHistoryId); - if (previousHistoryId) { - log('[MAIN_WORKFLOW] Creating workflow instance with previous history'); - const instance = await env.ZERO_WORKFLOW.create({ - params: { - connectionId, - historyId: previousHistoryId, - nextHistoryId: historyId, - }, - }); - log('[MAIN_WORKFLOW] Created instance:', { - id: instance.id, - status: await instance.status(), - }); - } else { - log('[MAIN_WORKFLOW] Creating workflow instance with current history'); - // const existingInstance = await env.ZERO_WORKFLOW.get( - // `${connectionId}__${historyId}`, - // ).catch(() => null); - // if (existingInstance && (await existingInstance.status()).status === 'running') { - // log('[MAIN_WORKFLOW] History already processing:', existingInstance.id); - // return; - // } - const instance = await env.ZERO_WORKFLOW.create({ - // id: `${connectionId}__${historyId}`, - params: { - connectionId, - historyId: historyId, - nextHistoryId: historyId, - }, - }); - log('[MAIN_WORKFLOW] Created instance:', { - id: instance.id, - status: await instance.status(), - }); - } - }, - ); - } else { - log('[MAIN_WORKFLOW] Unsupported provider:', providerId); - throw new Error(`Unsupported provider: ${providerId}`); - } - log('[MAIN_WORKFLOW] Workflow completed successfully'); - } catch (error) { - log('[MAIN_WORKFLOW] Error in workflow:', error); - log('[MAIN_WORKFLOW] Error details:', { - providerId: event.payload.providerId, - historyId: event.payload.historyId, - subscriptionName: event.payload.subscriptionName, - errorMessage: error instanceof Error ? error.message : String(error), - errorStack: error instanceof Error ? error.stack : undefined, - }); - throw error; - } - } -} - -export class ZeroWorkflow extends WorkflowEntrypoint { - async run( - event: Readonly>>, - step: WorkflowStep, - ) { - log('[ZERO_WORKFLOW] Starting workflow with payload:', event.payload); - try { - const { connectionId, historyId, nextHistoryId } = event.payload; - - const historyProcessingKey = `history_${connectionId}__${historyId}`; - const isProcessing = await env.gmail_processing_threads.get(historyProcessingKey); - if (isProcessing === 'true') { - return log('[ZERO_WORKFLOW] History already being processed:', { - connectionId, - historyId, - processingStatus: isProcessing, - }); - } - - await env.gmail_processing_threads.put(historyProcessingKey, 'true', { expirationTtl: 3600 }); - log('[ZERO_WORKFLOW] Set processing flag for history:', historyProcessingKey); - - const { db, conn } = createDb(env.HYPERDRIVE.connectionString); - - const foundConnection = await step.do( - `[ZERO_WORKFLOW] Find Connection ${connectionId}`, - async () => { - log('[ZERO_WORKFLOW] Finding connection:', connectionId); - const [foundConnection] = await db - .select() - .from(connection) - .where(eq(connection.id, connectionId.toString())); - if (!foundConnection) throw new Error(`Connection not found ${connectionId}`); - if (!foundConnection.accessToken || !foundConnection.refreshToken) - throw new Error(`Connection is not authorized ${connectionId}`); - log('[ZERO_WORKFLOW] Found connection:', foundConnection.id); - return foundConnection; - }, - ); - - const driver = connectionToDriver(foundConnection); - if (foundConnection.providerId === EProviders.google) { - log('[ZERO_WORKFLOW] Processing Google provider workflow'); - const history = await step.do( - `[ZERO_WORKFLOW] Get Gmail History ${foundConnection.id} ${historyId}`, - async () => { - try { - log('[ZERO_WORKFLOW] Getting Gmail history with ID:', historyId); - const { history } = await driver.listHistory( - historyId.toString(), - ); - if (!history.length) throw new Error(`No history found ${historyId} ${connectionId}`); - log('[ZERO_WORKFLOW] Found history entries:', history.length); - return history; - } catch (error) { - log('[ZERO_WORKFLOW] Failed to get Gmail history:', { - historyId, - connectionId: foundConnection.id, - error: error instanceof Error ? error.message : String(error), - }); - throw error; - } - }, - ); - await step.do( - `[ZERO_WORKFLOW] Update next history id ${foundConnection.id} ${nextHistoryId}`, - async () => { - log('[ZERO_WORKFLOW] Updating next history ID:', nextHistoryId); - await env.gmail_history_id.put(connectionId.toString(), nextHistoryId.toString()); - }, - ); - const threadsAdded = await step.do( - `[ZERO_WORKFLOW] Get new Threads ${connectionId}`, - async () => { - log('[ZERO_WORKFLOW] Finding threads with changed messages'); - const historiesWithChangedMessages = history.filter( - (history) => history.messagesAdded?.length, - ); - const threadsAdded = [ - ...new Set( - historiesWithChangedMessages.flatMap((history) => - history - .messagesAdded!.map((message) => message.message?.threadId) - .filter((threadId): threadId is string => threadId !== undefined), - ), - ), - ]; - log('[ZERO_WORKFLOW] Found new threads:', threadsAdded.length); - return threadsAdded; - }, - ); - - const threadsAddLabels = await step.do( - `[ZERO_WORKFLOW] Get Threads with new labels ${connectionId}`, - async () => { - log('[ZERO_WORKFLOW] Finding threads with new labels'); - const historiesWithNewLabels = history.filter((history) => history.labelsAdded?.length); - const threadsWithLabelsAdded = [ - ...new Set( - historiesWithNewLabels.flatMap((history) => - history - .labelsAdded!.filter((label) => label.message?.threadId) - .map((label) => label.message!.threadId) - .filter((threadId): threadId is string => threadId !== undefined), - ), - ), - ]; - log('[ZERO_WORKFLOW] Found threads with new labels:', threadsWithLabelsAdded.length); - return threadsWithLabelsAdded; - }, - ); - - const threadsRemoveLabels = await step.do( - `[ZERO_WORKFLOW] Get Threads with removed labels ${connectionId}`, - async () => { - log('[ZERO_WORKFLOW] Finding threads with removed labels'); - const historiesWithRemovedLabels = history.filter( - (history) => history.labelsRemoved?.length, - ); - const threadsWithLabelsRemoved = [ - ...new Set( - historiesWithRemovedLabels.flatMap((history) => - history - .labelsRemoved!.filter((label) => label.message?.threadId) - .map((label) => label.message!.threadId) - .filter((threadId): threadId is string => threadId !== undefined), - ), - ), - ]; - log( - '[ZERO_WORKFLOW] Found threads with removed labels:', - threadsWithLabelsRemoved.length, - ); - return threadsWithLabelsRemoved; - }, - ); - - // const lastPage = await step.do( - // `[ZERO_WORKFLOW] Get last page ${connectionId}`, - // async () => { - // log('[ZERO_WORKFLOW] Getting last page of threads'); - // const lastThreads = await driver.list({ - // folder: 'inbox', - // query: 'NOT is:spam', - // maxResults: 10, - // }); - // log('[ZERO_WORKFLOW] Found threads in last page:', lastThreads.threads.length); - // return lastThreads.threads.map((thread) => thread.id); - // }, - // ); - - const threadsToProcess = await step.do( - `[ZERO_WORKFLOW] Get threads to process ${connectionId}`, - async () => { - log('[ZERO_WORKFLOW] Combining threads to process'); - const threadsToProcess = [ - ...new Set([ - ...threadsAdded, - // ...lastPage, - ...threadsAddLabels, - ...threadsRemoveLabels, - ]), - ]; - log('[ZERO_WORKFLOW] Total threads to process:', threadsToProcess.length); - return threadsToProcess; - }, - ); - - const agent = await getZeroAgent(connectionId.toString()); - - await step.do(`[ZERO_WORKFLOW] Sync Threads ${historyProcessingKey}`, async () => { - for (const threadId of threadsToProcess) { - try { - await agent.syncThread(threadId.toString()); - } catch (error) { - log('[ZERO_WORKFLOW] Failed to sync thread:', { - threadId, - error: error instanceof Error ? error.message : String(error), - }); - } - } - }); - - const status = await env.subscribed_accounts.get( - `${connectionId}__${foundConnection.providerId}`, - ); - if (!status || status === 'pending') { - log('[MAIN_WORKFLOW] Connection id is missing or not enabled %s', connectionId); - return 'Connection is not enabled, not processing threads'; - } - - await step.do( - `[ZERO_WORKFLOW] Send Thread Workflow Instances ${connectionId}`, - async () => { - const maxConcurrentThreads = 5; - const delayBetweenBatches = 2000; - - for (let i = 0; i < threadsToProcess.length; i += maxConcurrentThreads) { - const batch = threadsToProcess.slice(i, i + maxConcurrentThreads); - - await Promise.all( - batch.map(async (threadId) => { - try { - const isProcessing = await env.gmail_processing_threads.get( - threadId.toString(), - ); - if (isProcessing) { - log('[ZERO_WORKFLOW] Thread already processing:', isProcessing, threadId); - return; - } - await env.gmail_processing_threads.put(threadId.toString(), 'true', { - expirationTtl: 1800, - }); - // const existingInstance = await env.THREAD_WORKFLOW.get( - // `${threadId.toString()}__${connectionId.toString()}`, - // ).catch(() => null); - // if ( - // existingInstance && - // (await existingInstance.status()).status === 'running' - // ) { - // log('[ZERO_WORKFLOW] Thread already processing:', isProcessing, threadId); - // await env.gmail_processing_threads.delete(threadId.toString()); - // return; - // } - const instance = await env.THREAD_WORKFLOW.create({ - // id: `${threadId.toString()}__${connectionId.toString()}`, - params: { connectionId, threadId, providerId: foundConnection.providerId }, - }); - log('[ZERO_WORKFLOW] Created instance:', { - id: instance.id, - status: await instance.status(), - }); - } catch (error) { - log('[ZERO_WORKFLOW] Failed to process thread:', { - threadId, - connectionId, - error: error instanceof Error ? error.message : String(error), - }); - - try { - await env.gmail_processing_threads.delete(threadId.toString()); - } catch (cleanupError) { - log('[ZERO_WORKFLOW] Failed to cleanup processing flag:', { - threadId, - error: - cleanupError instanceof Error - ? cleanupError.message - : String(cleanupError), - }); - } - } - }), - ); - - if (i + maxConcurrentThreads < threadsToProcess.length) { - log('[ZERO_WORKFLOW] Sleeping between batches:', delayBetweenBatches); - await step.sleep( - `[ZERO_WORKFLOW] Sleeping between batches ${i} ${threadsToProcess.length}`, - delayBetweenBatches, - ); - } - } - }, - ); - } else { - log('[ZERO_WORKFLOW] Unsupported provider:', foundConnection.providerId); - throw new Error(`Unsupported provider: ${foundConnection.providerId}`); - } - - try { - await env.gmail_processing_threads.delete(historyProcessingKey); - log('[ZERO_WORKFLOW] Cleared processing flag for history:', historyProcessingKey); - } catch (cleanupError) { - log('[ZERO_WORKFLOW] Failed to clear history processing flag:', { - historyProcessingKey, - error: cleanupError instanceof Error ? cleanupError.message : String(cleanupError), - }); - } - - this.ctx.waitUntil(conn.end()); - } catch (error) { - const historyProcessingKey = `history_${event.payload.connectionId}__${event.payload.historyId}`; - try { - await env.gmail_processing_threads.delete(historyProcessingKey); - log( - '[ZERO_WORKFLOW] Cleared processing flag for history after error:', - historyProcessingKey, - ); - } catch (cleanupError) { - log('[ZERO_WORKFLOW] Failed to clear history processing flag after error:', { - historyProcessingKey, - error: cleanupError instanceof Error ? cleanupError.message : String(cleanupError), - }); - } - - log('[ZERO_WORKFLOW] Error in workflow:', error); - log('[ZERO_WORKFLOW] Error details:', { - connectionId: event.payload.connectionId, - historyId: event.payload.historyId, - nextHistoryId: event.payload.nextHistoryId, - errorMessage: error instanceof Error ? error.message : String(error), - errorStack: error instanceof Error ? error.stack : undefined, - }); - throw error; - } - } -} -export class ThreadWorkflow extends WorkflowEntrypoint { - async run( - event: Readonly>>, - step: WorkflowStep, - ) { - log('[THREAD_WORKFLOW] Starting workflow with payload:', event.payload); - try { - const { connectionId, threadId, providerId } = event.payload; - if (providerId === EProviders.google) { - log('[THREAD_WORKFLOW] Processing Google provider workflow'); - const { db, conn } = createDb(env.HYPERDRIVE.connectionString); - - const foundConnection = await step.do( - `[THREAD_WORKFLOW] Find Connection ${connectionId}`, - async () => { - log('[THREAD_WORKFLOW] Finding connection:', connectionId); - const [foundConnection] = await db - .select() - .from(connection) - .where(eq(connection.id, connectionId.toString())); - if (!foundConnection) throw new Error(`Connection not found ${connectionId}`); - if (!foundConnection.accessToken || !foundConnection.refreshToken) - throw new Error(`Connection is not authorized ${connectionId}`); - log('[THREAD_WORKFLOW] Found connection:', foundConnection.id); - return foundConnection; - }, - ); - const driver = connectionToDriver(foundConnection); - const thread = await step.do( - `[THREAD_WORKFLOW] Get Thread ${threadId} ${connectionId}`, - async () => { - log('[THREAD_WORKFLOW] Getting thread:', threadId); - const thread = await driver.get(threadId.toString()); - // await notifyUser({ - // connectionId: connectionId.toString(), - // result: thread, - // threadId: threadId.toString(), - // }); - log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages.length); - return thread; - }, - ); - - if (!thread.messages || thread.messages.length === 0) { - log('[THREAD_WORKFLOW] Thread has no messages, skipping processing'); - return; - } - - const messagesToVectorize = await step.do( - `[THREAD_WORKFLOW] Get Thread Messages ${threadId} ${connectionId}`, - async () => { - log('[THREAD_WORKFLOW] Finding messages to vectorize'); - log('[THREAD_WORKFLOW] Getting message IDs from thread'); - const messageIds = thread.messages.map((message) => message.id); - log('[THREAD_WORKFLOW] Found message IDs:', messageIds); - - log('[THREAD_WORKFLOW] Fetching existing vectorized messages'); - const existingMessages = await env.VECTORIZE_MESSAGE.getByIds(messageIds); - log('[THREAD_WORKFLOW] Found existing messages:', existingMessages.length); - - const existingMessageIds = new Set(existingMessages.map((message) => message.id)); - log('[THREAD_WORKFLOW] Existing message IDs:', Array.from(existingMessageIds)); - - const messagesToVectorize = thread.messages.filter( - (message) => !existingMessageIds.has(message.id), - ); - log('[THREAD_WORKFLOW] Messages to vectorize:', messagesToVectorize.length); - - return messagesToVectorize; - }, - ); - - if (messagesToVectorize.length === 0) { - log('[THREAD_WORKFLOW] No messages to vectorize, skipping vectorization'); - } else { - const finalEmbeddings: VectorizeVector[] = await step.do( - `[THREAD_WORKFLOW] Vectorize Messages ${threadId} ${connectionId}`, - async () => { - log( - '[THREAD_WORKFLOW] Starting message vectorization for', - messagesToVectorize.length, - 'messages', - ); - - const maxConcurrentMessages = 3; - const results: VectorizeVector[] = []; - - for (let i = 0; i < messagesToVectorize.length; i += maxConcurrentMessages) { - const batch = messagesToVectorize.slice(i, i + maxConcurrentMessages); - const batchResults = await Promise.all( - batch.map(async (message) => { - return step.do( - `[THREAD_WORKFLOW] Vectorize Message ${message.id} ${threadId}`, - async () => { - try { - log('[THREAD_WORKFLOW] Converting message to XML:', message.id); - const prompt = await messageToXML(message); - if (!prompt) { - log('[THREAD_WORKFLOW] Message has no prompt, skipping:', message.id); - return null; - } - log('[THREAD_WORKFLOW] Got XML prompt for message:', message.id); - - const SummarizeMessagePrompt = await step.do( - `[THREAD_WORKFLOW] Get Summarize Message Prompt ${message.id} ${threadId}`, - async () => { - log( - '[THREAD_WORKFLOW] Getting summarize prompt for connection:', - message.connectionId ?? '', - ); - return await getPrompt( - getPromptName( - message.connectionId ?? '', - EPrompts.SummarizeMessage, - ), - SummarizeMessage, - ); - }, - ); - log('[THREAD_WORKFLOW] Got summarize prompt for message:', message.id); - - const summary: string = await step.do( - `[THREAD_WORKFLOW] Summarize Message ${message.id} ${threadId}`, - async () => { - try { - log( - '[THREAD_WORKFLOW] Generating summary for message:', - message.id, - ); - const messages = [ - { role: 'system', content: SummarizeMessagePrompt }, - { - role: 'user', - content: prompt, - }, - ]; - const response: any = await env.AI.run( - '@cf/meta/llama-4-scout-17b-16e-instruct', - { - messages, - }, - ); - log( - `[THREAD_WORKFLOW] Summary generated for message ${message.id}:`, - response, - ); - const summary = - 'response' in response ? response.response : response; - if (!summary || typeof summary !== 'string') { - throw new Error( - `Invalid summary response for message ${message.id}`, - ); - } - return summary; - } catch (error) { - log('[THREAD_WORKFLOW] Failed to generate summary for message:', { - messageId: message.id, - error: error instanceof Error ? error.message : String(error), - }); - throw error; - } - }, - ); - - const embeddingVector = await step.do( - `[THREAD_WORKFLOW] Get Message Embedding Vector ${message.id} ${threadId}`, - async () => { - try { - log( - '[THREAD_WORKFLOW] Getting embedding vector for message:', - message.id, - ); - const embeddingVector = await getEmbeddingVector(summary); - log( - '[THREAD_WORKFLOW] Got embedding vector for message:', - message.id, - ); - return embeddingVector; - } catch (error) { - log( - '[THREAD_WORKFLOW] Failed to get embedding vector for message:', - { - messageId: message.id, - error: error instanceof Error ? error.message : String(error), - }, - ); - throw error; - } - }, - ); - - if (!embeddingVector) - throw new Error(`Message Embedding vector is null ${message.id}`); - - return { - id: message.id, - metadata: { - connection: message.connectionId ?? '', - thread: message.threadId ?? '', - summary, - }, - values: embeddingVector, - } satisfies VectorizeVector; - } catch (error) { - log('[THREAD_WORKFLOW] Failed to vectorize message:', { - messageId: message.id, - error: error instanceof Error ? error.message : String(error), - }); - return null; - } - }, - ); - }), - ); - - const validResults = batchResults.filter( - (result): result is NonNullable => result !== null, - ); - results.push(...validResults); - - if (i + maxConcurrentMessages < messagesToVectorize.length) { - log('[THREAD_WORKFLOW] Sleeping between message batches'); - await step.sleep('[THREAD_WORKFLOW]', 1000); - } - } - - return results; - }, - ); - log('[THREAD_WORKFLOW] Generated embeddings for all messages'); - - if (finalEmbeddings.length > 0) { - await step.do( - `[THREAD_WORKFLOW] Thread Messages Vectors ${threadId} ${connectionId} ${finalEmbeddings.length}`, - async () => { - try { - log('[THREAD_WORKFLOW] Upserting message vectors:', finalEmbeddings.length); - await env.VECTORIZE_MESSAGE.upsert(finalEmbeddings); - log('[THREAD_WORKFLOW] Successfully upserted message vectors'); - } catch (error) { - log('[THREAD_WORKFLOW] Failed to upsert message vectors:', { - threadId, - vectorCount: finalEmbeddings.length, - error: error instanceof Error ? error.message : String(error), - }); - throw error; - } - }, - ); - } - } - - const existingThreadSummary = await step.do( - `[THREAD_WORKFLOW] Get Thread Summary ${threadId} ${connectionId}`, - async () => { - log('[THREAD_WORKFLOW] Getting existing thread summary for:', threadId); - const threadSummary = await env.VECTORIZE.getByIds([threadId.toString()]); - if (!threadSummary.length) { - log('[THREAD_WORKFLOW] No existing thread summary found'); - return null; - } - log('[THREAD_WORKFLOW] Found existing thread summary'); - return threadSummary[0].metadata as IThreadSummaryMetadata; - }, - ); - - const finalSummary = await step.do( - `[THREAD_WORKFLOW] Get Final Summary ${threadId} ${connectionId}`, - async () => { - log('[THREAD_WORKFLOW] Generating final thread summary'); - if (existingThreadSummary) { - log('[THREAD_WORKFLOW] Using existing summary as context'); - return await summarizeThread( - connectionId.toString(), - thread.messages, - existingThreadSummary.summary, - ); - } else { - log('[THREAD_WORKFLOW] Generating new summary without context'); - return await summarizeThread(connectionId.toString(), thread.messages); - } - }, - ); - - const userAccountLabels = await step.do( - `[THREAD_WORKFLOW] Get user-account labels ${connectionId}`, - async () => { - try { - const userAccountLabels = await driver.getUserLabels(); - return userAccountLabels; - } catch (error) { - log('[THREAD_WORKFLOW] Failed to get user account labels:', { - connectionId, - error: error instanceof Error ? error.message : String(error), - }); - throw error; - } - }, - ); - - if (finalSummary) { - log('[THREAD_WORKFLOW] Got final summary, processing labels'); - const userLabels = await step.do( - `[THREAD_WORKFLOW] Get user-defined labels ${connectionId}`, - async () => { - log('[THREAD_WORKFLOW] Getting user labels for connection:', connectionId); - let userLabels: { name: string; usecase: string }[] = []; - const connectionLabels = await env.connection_labels.get(connectionId.toString()); - if (connectionLabels) { - try { - log('[THREAD_WORKFLOW] Parsing existing connection labels'); - const parsed = JSON.parse(connectionLabels); - if ( - Array.isArray(parsed) && - parsed.every( - (label) => typeof label === 'object' && label.name && label.usecase, - ) - ) { - userLabels = parsed; - } else { - throw new Error('Invalid label format'); - } - } catch { - log('[THREAD_WORKFLOW] Failed to parse labels, using defaults'); - await env.connection_labels.put( - connectionId.toString(), - JSON.stringify(defaultLabels), - ); - userLabels = defaultLabels; - } - } else { - log('[THREAD_WORKFLOW] No labels found, using defaults'); - await env.connection_labels.put( - connectionId.toString(), - JSON.stringify(defaultLabels), - ); - userLabels = defaultLabels; - } - return userLabels.length ? userLabels : defaultLabels; - }, - ); - - const generatedLabels = await step.do( - `[THREAD_WORKFLOW] Generate Thread Labels ${threadId} ${connectionId} ${thread.messages.length}`, - async () => { - try { - log('[THREAD_WORKFLOW] Generating labels for thread:', threadId); - const labelsResponse: any = await env.AI.run( - '@cf/meta/llama-3.3-70b-instruct-fp8-fast', - { - messages: [ - { role: 'system', content: ThreadLabels(userLabels, thread.labels) }, - { role: 'user', content: finalSummary }, - ], - }, - ); - if (labelsResponse?.response?.replaceAll('!', '').trim()?.length) { - log('[THREAD_WORKFLOW] Labels generated:', labelsResponse.response); - const labels: string[] = labelsResponse?.response - ?.split(',') - .map((e: string) => e.trim()) - .filter((e: string) => e.length > 0) - .filter((e: string) => - userLabels.find((label) => label.name.toLowerCase() === e.toLowerCase()), - ); - return labels; - } else { - log('[THREAD_WORKFLOW] No labels generated'); - return []; - } - } catch (error) { - log('[THREAD_WORKFLOW] Failed to generate labels for thread:', { - threadId, - error: error instanceof Error ? error.message : String(error), - }); - return []; - } - }, - ); - - if (generatedLabels && generatedLabels.length > 0) { - await step.do( - `[THREAD_WORKFLOW] Modify Thread Labels ${threadId} ${connectionId}`, - async () => { - log('[THREAD_WORKFLOW] Modifying thread labels:', generatedLabels); - const validLabelIds = generatedLabels - .map((name) => userAccountLabels.find((e) => e.name === name)?.id) - .filter((id): id is string => id !== undefined && id !== ''); - - if (validLabelIds.length > 0) { - await driver.modifyLabels([threadId.toString()], { - addLabels: validLabelIds, - removeLabels: [], - }); - } - log('[THREAD_WORKFLOW] Successfully modified thread labels'); - }, - ); - } - - const embeddingVector = await step.do( - `[THREAD_WORKFLOW] Get Thread Embedding Vector ${threadId} ${connectionId}`, - async () => { - log('[THREAD_WORKFLOW] Getting thread embedding vector'); - const embeddingVector = await getEmbeddingVector(finalSummary); - log('[THREAD_WORKFLOW] Got thread embedding vector'); - return embeddingVector; - }, - ); - - if (!embeddingVector) { - log('[THREAD_WORKFLOW] Thread Embedding vector is null, skipping vector upsert'); - return; - } - - try { - log('[THREAD_WORKFLOW] Upserting thread vector'); - await env.VECTORIZE.upsert([ - { - id: threadId.toString(), - metadata: { - connection: connectionId.toString(), - thread: threadId.toString(), - summary: finalSummary, - }, - values: embeddingVector, - }, - ]); - log('[THREAD_WORKFLOW] Successfully upserted thread vector'); - } catch (error) { - log('[THREAD_WORKFLOW] Failed to upsert thread vector:', { - threadId, - connectionId, - error: error instanceof Error ? error.message : String(error), - }); - throw error; - } - } else { - log( - '[THREAD_WORKFLOW] No summary generated for thread', - threadId, - thread.messages.length, - ); - } - - this.ctx.waitUntil(conn.end()); - } else { - log('[THREAD_WORKFLOW] Unsupported provider:', providerId); - throw new Error(`Unsupported provider: ${providerId}`); - } - } catch (error) { - log('[THREAD_WORKFLOW] Error in workflow:', error); - log('[THREAD_WORKFLOW] Error details:', { - connectionId: event.payload.connectionId, - threadId: event.payload.threadId, - providerId: event.payload.providerId, - errorMessage: error instanceof Error ? error.message : String(error), - errorStack: error instanceof Error ? error.stack : undefined, - }); - throw error; - } - } -} - -export async function htmlToText(decodedBody: string): Promise { - try { - if (!decodedBody || typeof decodedBody !== 'string') { - return ''; - } - const $ = cheerio.load(decodedBody); - $('script').remove(); - $('style').remove(); - return $('body') - .text() - .replace(/\r?\n|\r/g, ' ') - .replace(/\s+/g, ' ') - .trim(); - } catch (error) { - log('Error extracting text from HTML:', error); - return ''; - } -} - -const escapeXml = (text: string): string => { - if (!text) return ''; - return text - .replace(/&/g, '&') - .replace(//g, '>') - .replace(/"/g, '"') - .replace(/'/g, '''); -}; - -const messageToXML = async (message: ParsedMessage) => { - try { - if (!message.decodedBody) return null; - const body = await htmlToText(message.decodedBody || ''); - log('[MESSAGE_TO_XML] Body', body); - if (!body || body.length < 10) { - log('Skipping message with body length < 10', body); - return null; - } - - const safeSenderName = escapeXml(message.sender?.name || 'Unknown'); - const safeSubject = escapeXml(message.subject || ''); - const safeDate = escapeXml(message.receivedOn || ''); - - const toElements = (message.to || []) - .map((r) => `${escapeXml(r?.email || '')}`) - .join(''); - const ccElements = (message.cc || []) - .map((r) => `${escapeXml(r?.email || '')}`) - .join(''); - - return ` - - ${safeSenderName} - ${toElements} - ${ccElements} - ${safeDate} - ${safeSubject} - ${escapeXml(body)} - - `; - } catch (error) { - log('[MESSAGE_TO_XML] Failed to convert message to XML:', { - messageId: message.id, - error: error instanceof Error ? error.message : String(error), - }); - return null; - } -}; +import { runMainWorkflow, runZeroWorkflow, runThreadWorkflow } from './pipelines.effect'; +import { EPrompts } from './types'; +import { Effect } from 'effect'; +// Helper function for generating prompt names export const getPromptName = (connectionId: string, prompt: EPrompts) => { return `${connectionId}-${prompt}`; }; -export const getPrompt = async (promptName: string, fallback: string) => { - try { - if (!promptName || typeof promptName !== 'string') { - log('[GET_PROMPT] Invalid prompt name:', promptName); - return fallback; - } - - const existingPrompt = await env.prompts_storage.get(promptName); - if (!existingPrompt) { - await env.prompts_storage.put(promptName, fallback); - return fallback; - } - return existingPrompt; - } catch (error) { - log('[GET_PROMPT] Failed to get prompt:', { - promptName, - error: error instanceof Error ? error.message : String(error), - }); - return fallback; - } -}; - -export const getEmbeddingVector = async (text: string) => { - try { - if (!text || typeof text !== 'string' || text.trim().length === 0) { - log('[getEmbeddingVector] Empty or invalid text provided'); - return null; - } - - const embeddingResponse = await env.AI.run( - '@cf/baai/bge-large-en-v1.5', - { text: text.trim() }, - { - gateway: { - id: 'vectorize-save', - }, - }, - ); - const embeddingVector = (embeddingResponse as any).data?.[0]; - return embeddingVector ?? null; - } catch (error) { - log('[getEmbeddingVector] failed', error); - return null; - } -}; - -const isValidUUID = (string: string) => { - if (!string || typeof string !== 'string') return false; - return z.string().uuid().safeParse(string).success; +export type ZeroWorkflowParams = { + connectionId: string; + historyId: string; + nextHistoryId: string; }; -const getParticipants = (messages: ParsedMessage[]) => { - if (!messages || !Array.isArray(messages) || messages.length === 0) { - return []; - } - - const result = new Map(); - const setIfUnset = (sender: Sender) => { - if (sender?.email && !result.has(sender.email)) { - result.set(sender.email, sender.name || ''); - } - }; - - for (const msg of messages) { - if (msg?.sender) { - setIfUnset(msg.sender); - } - if (msg?.cc && Array.isArray(msg.cc)) { - for (const ccParticipant of msg.cc) { - if (ccParticipant) setIfUnset(ccParticipant); - } - } - if (msg?.to && Array.isArray(msg.to)) { - for (const toParticipant of msg.to) { - if (toParticipant) setIfUnset(toParticipant); - } - } - } - return Array.from(result.entries()); +export type ThreadWorkflowParams = { + connectionId: string; + threadId: string; + providerId: string; }; -const threadToXML = async (messages: ParsedMessage[], existingSummary?: string) => { - if (!messages || !Array.isArray(messages) || messages.length === 0) { - throw new Error('No messages provided for thread XML generation'); - } - - const firstMessage = messages[0]; - if (!firstMessage) { - throw new Error('First message is null or undefined'); - } - - const { subject = '', title = '' } = firstMessage; - const participants = getParticipants(messages); - const messagesXML = await Promise.all(messages.map(messageToXML)); - const validMessagesXML = messagesXML.filter((xml): xml is string => xml !== null); - - if (existingSummary) { - return ` - ${escapeXml(title)} - ${escapeXml(subject)} - - ${participants.map(([email, name]) => { - return `${escapeXml(name || email)} ${name ? `< ${escapeXml(email)} >` : ''}`; - })} - - - ${escapeXml(existingSummary)} - - - ${validMessagesXML.map((e) => e + '\n')} - - `; - } - return ` - ${escapeXml(title)} - ${escapeXml(subject)} - - ${participants.map(([email, name]) => { - return `${escapeXml(name || email)} < ${escapeXml(email)} >`; - })} - - - ${validMessagesXML.map((e) => e + '\n')} - - `; +export type MainWorkflowParams = { + providerId: string; + historyId: string; + subscriptionName: string; }; -const summarizeThread = async ( - connectionId: string, - messages: ParsedMessage[], - existingSummary?: string, -): Promise => { - try { - if (!messages || !Array.isArray(messages) || messages.length === 0) { - log('[SUMMARIZE_THREAD] No messages provided for summarization'); - return null; - } - - if (!connectionId || typeof connectionId !== 'string') { - log('[SUMMARIZE_THREAD] Invalid connection ID provided'); - return null; - } - - const prompt = await threadToXML(messages, existingSummary); - if (!prompt) { - log('[SUMMARIZE_THREAD] Failed to generate thread XML'); - return null; - } +export enum EWorkflowType { + MAIN = 'main', + THREAD = 'thread', + ZERO = 'zero', +} - if (existingSummary) { - const ReSummarizeThreadPrompt = await getPrompt( - getPromptName(connectionId, EPrompts.ReSummarizeThread), - ReSummarizeThread, - ); - const promptMessages = [ - { role: 'system', content: ReSummarizeThreadPrompt }, - { - role: 'user', - content: prompt, - }, - ]; - const response: any = await env.AI.run('@cf/meta/llama-3.3-70b-instruct-fp8-fast', { - messages: promptMessages, - }); - const summary = response?.response; - return typeof summary === 'string' ? summary : null; - } else { - const SummarizeThreadPrompt = await getPrompt( - getPromptName(connectionId, EPrompts.SummarizeThread), - SummarizeThread, - ); - const promptMessages = [ - { role: 'system', content: SummarizeThreadPrompt }, - { - role: 'user', - content: prompt, - }, - ]; - const response: any = await env.AI.run('@cf/meta/llama-3.3-70b-instruct-fp8-fast', { - messages: promptMessages, - }); - const summary = response?.response; - return typeof summary === 'string' ? summary : null; - } - } catch (error) { - log('[SUMMARIZE_THREAD] Failed to summarize thread:', { - connectionId, - messageCount: messages?.length || 0, - hasExistingSummary: !!existingSummary, - error: error instanceof Error ? error.message : String(error), - }); - return null; +export type WorkflowParams = + | { workflowType: 'main'; params: MainWorkflowParams } + | { workflowType: 'thread'; params: ThreadWorkflowParams } + | { workflowType: 'zero'; params: ZeroWorkflowParams }; + +export const runWorkflow = ( + workflowType: EWorkflowType, + params: MainWorkflowParams | ThreadWorkflowParams | ZeroWorkflowParams, +): Effect.Effect => { + switch (workflowType) { + case EWorkflowType.MAIN: + return runMainWorkflow(params as MainWorkflowParams); + case EWorkflowType.ZERO: + return runZeroWorkflow(params as ZeroWorkflowParams); + case EWorkflowType.THREAD: + return runThreadWorkflow(params as ThreadWorkflowParams); + default: + return Effect.fail({ _tag: 'UnsupportedWorkflow', workflowType }); } }; diff --git a/apps/server/src/routes/chat.ts b/apps/server/src/routes/chat.ts index 441eebeb51..d36c0dc741 100644 --- a/apps/server/src/routes/chat.ts +++ b/apps/server/src/routes/chat.ts @@ -1,3 +1,16 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ import { streamText, generateObject, @@ -301,6 +314,10 @@ export class AgentRpcDO extends RpcTarget { // return await this.mainDo.getThreadFromDB(id); // } + async listHistory(historyId: string) { + return await this.mainDo.listHistory(historyId); + } + async syncThreads(folder: string) { return await this.mainDo.syncThreads(folder); } @@ -354,6 +371,7 @@ export class ZeroAgent extends AIChatAgent { const dataStreamResponse = createDataStreamResponse({ execute: async (dataStream) => { const connectionId = this.name; + if (connectionId === 'general') return; if (!connectionId || !this.driver) { console.log('Unauthorized no driver or connectionId [1]', connectionId, this.driver); await this.setupAuth(connectionId); @@ -396,6 +414,7 @@ export class ZeroAgent extends AIChatAgent { } public async setupAuth(connectionId: string) { + if (connectionId === 'general') return; if (!this.driver) { const { db, conn } = createDb(env.HYPERDRIVE.connectionString); const _connection = await db.query.connection.findFirst({ @@ -668,6 +687,13 @@ export class ZeroAgent extends AIChatAgent { }); } + async listHistory(historyId: string) { + if (!this.driver) { + throw new Error('No driver available'); + } + return await this.driver.listHistory(historyId); + } + async getUserLabels() { if (!this.driver) { throw new Error('No driver available'); @@ -852,7 +878,7 @@ export class ZeroAgent extends AIChatAgent { } async syncThread(threadId: string) { - if (!this.driver) { + if (!this.driver && this.name !== 'general') { await this.setupAuth(this.name); } @@ -1147,7 +1173,7 @@ export class ZeroAgent extends AIChatAgent { } } - async getThreadFromDB(id: string): Promise { + async getThreadFromDB(id: string, lastAttempt = false): Promise { try { const result = this.sql` SELECT @@ -1165,17 +1191,13 @@ export class ZeroAgent extends AIChatAgent { LIMIT 1 `; - if (result.length === 0) { - this.ctx.waitUntil(this.syncThread(id)); - return { - messages: [], - latest: undefined, - hasUnread: false, - totalReplies: 0, - labels: [], - } satisfies IGetThreadResponse; + if (!result || result.length === 0) { + if (lastAttempt) { + throw new Error('Thread not found in database, Sync Failed once'); + } + await this.syncThread(id); + return this.getThreadFromDB(id, true); } - const row = result[0] as any; const storedThread = await env.THREADS_BUCKET.get(this.getThreadKey(id)); diff --git a/apps/server/src/types.ts b/apps/server/src/types.ts index 689effb6a6..dc759868f2 100644 --- a/apps/server/src/types.ts +++ b/apps/server/src/types.ts @@ -11,6 +11,12 @@ export interface ISubscribeBatch { providerId: EProviders; } +export interface IThreadBatch { + providerId: EProviders; + historyId: string; + subscriptionName: string; +} + export const defaultLabels = [ { name: 'to respond', diff --git a/apps/server/wrangler.jsonc b/apps/server/wrangler.jsonc index d6c97fe925..2707e45349 100644 --- a/apps/server/wrangler.jsonc +++ b/apps/server/wrangler.jsonc @@ -79,23 +79,7 @@ "new_classes": ["ZeroDB"], }, ], - "workflows": [ - { - "name": "main-workflow-staging", - "binding": "MAIN_WORKFLOW", - "class_name": "MainWorkflow", - }, - { - "name": "zero-workflow-staging", - "binding": "ZERO_WORKFLOW", - "class_name": "ZeroWorkflow", - }, - { - "name": "thread-workflow-staging", - "binding": "THREAD_WORKFLOW", - "class_name": "ThreadWorkflow", - }, - ], + "observability": { "enabled": true, }, @@ -227,23 +211,6 @@ "new_classes": ["ZeroDB"], }, ], - "workflows": [ - { - "name": "main-workflow-staging", - "binding": "MAIN_WORKFLOW", - "class_name": "MainWorkflow", - }, - { - "name": "zero-workflow-staging", - "binding": "ZERO_WORKFLOW", - "class_name": "ZeroWorkflow", - }, - { - "name": "thread-workflow-staging", - "binding": "THREAD_WORKFLOW", - "class_name": "ThreadWorkflow", - }, - ], "observability": { "enabled": true, }, @@ -314,23 +281,6 @@ "index_name": "messages-vector", }, ], - "workflows": [ - { - "name": "main-workflow-prod", - "binding": "MAIN_WORKFLOW", - "class_name": "MainWorkflow", - }, - { - "name": "zero-workflow-prod", - "binding": "ZERO_WORKFLOW", - "class_name": "ZeroWorkflow", - }, - { - "name": "thread-workflow-prod", - "binding": "THREAD_WORKFLOW", - "class_name": "ThreadWorkflow", - }, - ], "observability": { "enabled": true, }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1a1df2feab..db43fb9532 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -557,6 +557,9 @@ importers: drizzle-orm: specifier: 'catalog:' version: 0.43.1(@cloudflare/workers-types@4.20250628.0)(@opentelemetry/api@1.9.0)(kysely@0.28.2)(postgres@3.4.5) + effect: + specifier: 3.16.12 + version: 3.16.12 elevenlabs: specifier: 1.59.0 version: 1.59.0 @@ -3344,6 +3347,9 @@ packages: resolution: {integrity: sha512-hI6twvUkzOmyGZhQMza1gpfqErZxXRw6JEsiVjUbo7tFanVD+8Oil0Ih3l2nGzHdxPI41zFmfUQG7GHqhciKZQ==} hasBin: true + '@standard-schema/spec@1.0.0': + resolution: {integrity: sha512-m2bOd0f2RT9k8QJx1JN85cZYyH1RqFBdlwtkSlf4tBDYLCiiZnv1fIIwacK6cqwXavOydf0NPToMQgpKq+dVlA==} + '@standard-schema/utils@0.3.0': resolution: {integrity: sha512-e7Mew686owMaPJVNNLs55PUvgz371nKgwsc4vxE49zsODpJEnxgxRo2y/OKrqueavXgZNMDVj3DdHFlaSAeU8g==} @@ -4629,6 +4635,9 @@ packages: ee-first@1.1.1: resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==} + effect@3.16.12: + resolution: {integrity: sha512-N39iBk0K71F9nb442TLbTkjl24FLUzuvx2i1I2RsEAQsdAdUTuUoW0vlfUXgkMTUOnYqKnWcFfqw4hK4Pw27hg==} + electron-to-chromium@1.5.177: resolution: {integrity: sha512-7EH2G59nLsEMj97fpDuvVcYi6lwTcM1xuWw3PssD8xzboAW7zj7iB3COEEEATUfjLHrs5uKBLQT03V/8URx06g==} @@ -4908,6 +4917,10 @@ packages: extend@3.0.2: resolution: {integrity: sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==} + fast-check@3.23.2: + resolution: {integrity: sha512-h5+1OzzfCC3Ef7VbtKdcv7zsstUQwUDlYpUTvjeUsJAssPgLn7QzbboPtL5ro04Mq0rPOsMzl7q5hIbRs2wD1A==} + engines: {node: '>=8.0.0'} + fast-deep-equal@2.0.1: resolution: {integrity: sha512-bCK/2Z4zLidyB4ReuIsvALH6w31YfAQDmXMqMx6FyfHqvBxtjC0eRumeSu4Bs3XtXwpyIywtSTrVT99BxY1f9w==} @@ -6499,6 +6512,9 @@ packages: resolution: {integrity: sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==} engines: {node: '>=6'} + pure-rand@6.1.0: + resolution: {integrity: sha512-bVWawvoZoBYpp6yIoQtQXHZjmz35RSVHnUOTefl8Vcjr8snTPY1wnpSPMWekcFwbxI6gtmT7rSYPFvz71ldiOA==} + pvtsutils@1.3.6: resolution: {integrity: sha512-PLgQXQ6H2FWCaeRak8vvk1GW462lMxB5s3Jm673N82zI4vqtVUPuZdffdZbPDFRoU8kAhItWFtPCWiPpp4/EDg==} @@ -10351,6 +10367,8 @@ snapshots: '@sqlite.org/sqlite-wasm@3.48.0-build4': {} + '@standard-schema/spec@1.0.0': {} + '@standard-schema/utils@0.3.0': {} '@swc/helpers@0.5.17': @@ -11638,6 +11656,11 @@ snapshots: ee-first@1.1.1: {} + effect@3.16.12: + dependencies: + '@standard-schema/spec': 1.0.0 + fast-check: 3.23.2 + electron-to-chromium@1.5.177: {} elevenlabs@1.59.0: @@ -12128,6 +12151,10 @@ snapshots: extend@3.0.2: {} + fast-check@3.23.2: + dependencies: + pure-rand: 6.1.0 + fast-deep-equal@2.0.1: {} fast-deep-equal@3.1.3: {} @@ -13824,6 +13851,8 @@ snapshots: punycode@2.3.1: {} + pure-rand@6.1.0: {} + pvtsutils@1.3.6: dependencies: tslib: 2.8.1