From 7b6a46cfbfcb3bc5f0c35ad154258123a50bd892 Mon Sep 17 00:00:00 2001 From: Aj Wazzan Date: Mon, 28 Jul 2025 16:31:40 -0700 Subject: [PATCH] Refactor workflow processing to use durable objects for better concurrency --- apps/server/src/main.ts | 17 +- apps/server/src/pipelines.effect.ts | 639 ----------------- apps/server/src/pipelines.ts | 649 +++++++++++++++++- .../thread-workflow-utils/workflow-engine.ts | 83 +++ .../workflow-functions.ts | 56 +- apps/server/wrangler.jsonc | 24 + 6 files changed, 802 insertions(+), 666 deletions(-) diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 7b90870260..1fc263736b 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -19,7 +19,6 @@ import { EProviders, type ISubscribeBatch, type IThreadBatch } from './types'; import { oAuthDiscoveryMetadata } from 'better-auth/plugins'; import { getZeroDB, verifyToken } from './lib/server-utils'; import { eq, and, desc, asc, inArray } from 'drizzle-orm'; -import { EWorkflowType, runWorkflow } from './pipelines'; import { ThinkingMCP } from './lib/sequential-thinking'; import { ZeroAgent, ZeroDriver } from './routes/agent'; import { contextStorage } from 'hono/context-storage'; @@ -31,6 +30,7 @@ import { trpcServer } from '@hono/trpc-server'; import { agentsMiddleware } from 'hono-agents'; import { ZeroMCP } from './routes/agent/mcp'; import { publicRouter } from './routes/auth'; +import { WorkflowRunner } from './pipelines'; import { autumnApi } from './routes/autumn'; import type { HonoContext } from './ctx'; import { createDb, type DB } from './db'; @@ -39,7 +39,6 @@ import { aiRouter } from './routes/ai'; import { Autumn } from 'autumn-js'; import { appRouter } from './trpc'; import { cors } from 'hono/cors'; -import { Effect } from 'effect'; import { Hono } from 'hono'; @@ -753,14 +752,14 @@ export default class extends WorkerEntrypoint { const providerId = msg.body.providerId; const historyId = msg.body.historyId; const subscriptionName = msg.body.subscriptionName; - const workflow = runWorkflow(EWorkflowType.MAIN, { - providerId, - historyId, - subscriptionName, - }); try { - const result = await Effect.runPromise(workflow); + const workflowRunner = env.WORKFLOW_RUNNER.get(env.WORKFLOW_RUNNER.newUniqueId()); + const result = await workflowRunner.runMainWorkflow({ + providerId, + historyId, + subscriptionName, + }); console.log('[THREAD_QUEUE] result', result); } catch (error) { console.error('Error running workflow', error); @@ -864,4 +863,4 @@ export default class extends WorkerEntrypoint { } } -export { ZeroAgent, ZeroMCP, ZeroDB, ZeroDriver, ThinkingMCP }; +export { ZeroAgent, ZeroMCP, ZeroDB, ZeroDriver, ThinkingMCP, WorkflowRunner }; diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts index 848fd64ea9..58430fecf3 100644 --- a/apps/server/src/pipelines.effect.ts +++ b/apps/server/src/pipelines.effect.ts @@ -11,17 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { createDefaultWorkflows } from './thread-workflow-utils/workflow-engine'; -import { getServiceAccount } from './lib/factories/google-subscription.factory'; -import { EWorkflowType, runWorkflow } from './pipelines'; -import { getZeroAgent } from './lib/server-utils'; -import { type gmail_v1 } from '@googleapis/gmail'; -import { Effect, Console, Logger } from 'effect'; import { env } from 'cloudflare:workers'; -import { connection } from './db/schema'; -import { EProviders } from './types'; -import { eq } from 'drizzle-orm'; -import { createDb } from './db'; const showLogs = true; @@ -34,641 +24,12 @@ const log = (message: string, ...args: any[]) => { }; // Configure pretty logger to stderr -export const loggerLayer = Logger.add(Logger.prettyLogger({ stderr: true })); - -const isValidUUID = (str: string): boolean => { - const regex = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; - return regex.test(str); -}; - -// Define the workflow parameters type -type MainWorkflowParams = { - providerId: string; - historyId: string; - subscriptionName: string; -}; - -// Define error types -type MainWorkflowError = - | { _tag: 'MissingEnvironmentVariable'; variable: string } - | { _tag: 'InvalidSubscriptionName'; subscriptionName: string } - | { _tag: 'InvalidConnectionId'; connectionId: string } - | { _tag: 'UnsupportedProvider'; providerId: string } - | { _tag: 'WorkflowCreationFailed'; error: unknown }; - -const validateArguments = ( - params: MainWorkflowParams, - serviceAccount: { project_id: string }, -): Effect.Effect => - Effect.gen(function* () { - yield* Console.log('[MAIN_WORKFLOW] Validating arguments'); - const regex = new RegExp( - `projects/${serviceAccount.project_id}/subscriptions/notifications__([a-z0-9-]+)`, - ); - const match = params.subscriptionName.toString().match(regex); - if (!match) { - yield* Console.log('[MAIN_WORKFLOW] Invalid subscription name:', params.subscriptionName); - return yield* Effect.fail({ - _tag: 'InvalidSubscriptionName' as const, - subscriptionName: params.subscriptionName, - }); - } - const [, connectionId] = match; - yield* Console.log('[MAIN_WORKFLOW] Extracted connectionId:', connectionId); - return connectionId; - }); - -/** - * This function runs the main workflow. The main workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. - * It validates the subscription name and extracts the connection ID. - * @param params - * @returns - */ -export const runMainWorkflow = ( - params: MainWorkflowParams, -): Effect.Effect => - Effect.gen(function* () { - yield* Console.log('[MAIN_WORKFLOW] Starting workflow with payload:', params); - - const { providerId, historyId } = params; - - const serviceAccount = getServiceAccount(); - - const connectionId = yield* validateArguments(params, serviceAccount); - - if (!isValidUUID(connectionId)) { - yield* Console.log('[MAIN_WORKFLOW] Invalid connection id format:', connectionId); - return yield* Effect.fail({ - _tag: 'InvalidConnectionId' as const, - connectionId, - }); - } - - const previousHistoryId = yield* Effect.tryPromise({ - try: () => env.gmail_history_id.get(connectionId), - catch: () => ({ _tag: 'WorkflowCreationFailed' as const, error: 'Failed to get history ID' }), - }).pipe(Effect.orElse(() => Effect.succeed(null))); - - if (providerId === EProviders.google) { - yield* Console.log('[MAIN_WORKFLOW] Processing Google provider workflow'); - yield* Console.log('[MAIN_WORKFLOW] Previous history ID:', previousHistoryId); - - const zeroWorkflowParams = { - connectionId, - historyId: previousHistoryId || historyId, - nextHistoryId: historyId, - }; - - const result = yield* runWorkflow(EWorkflowType.ZERO, zeroWorkflowParams).pipe( - Effect.mapError( - (error): MainWorkflowError => ({ _tag: 'WorkflowCreationFailed' as const, error }), - ), - ); - - yield* Console.log('[MAIN_WORKFLOW] Zero workflow result:', result); - } else { - yield* Console.log('[MAIN_WORKFLOW] Unsupported provider:', providerId); - return yield* Effect.fail({ - _tag: 'UnsupportedProvider' as const, - providerId, - }); - } - - yield* Console.log('[MAIN_WORKFLOW] Workflow completed successfully'); - return 'Workflow completed successfully'; - }).pipe( - Effect.tapError((error) => Console.log('[MAIN_WORKFLOW] Error in workflow:', error)), - Effect.provide(loggerLayer), - ); - -// Define the ZeroWorkflow parameters type -type ZeroWorkflowParams = { - connectionId: string; - historyId: string; - nextHistoryId: string; -}; - -// Define error types for ZeroWorkflow -type ZeroWorkflowError = - | { _tag: 'HistoryAlreadyProcessing'; connectionId: string; historyId: string } - | { _tag: 'ConnectionNotFound'; connectionId: string } - | { _tag: 'ConnectionNotAuthorized'; connectionId: string } - | { _tag: 'HistoryNotFound'; historyId: string; connectionId: string } - | { _tag: 'UnsupportedProvider'; providerId: string } - | { _tag: 'DatabaseError'; error: unknown } - | { _tag: 'GmailApiError'; error: unknown } - | { _tag: 'WorkflowCreationFailed'; error: unknown } - | { _tag: 'LabelModificationFailed'; error: unknown; threadId: string }; - -export const runZeroWorkflow = ( - params: ZeroWorkflowParams, -): Effect.Effect => - Effect.gen(function* () { - yield* Console.log('[ZERO_WORKFLOW] Starting workflow with payload:', params); - const { connectionId, historyId, nextHistoryId } = params; - - const historyProcessingKey = `history_${connectionId}__${historyId}`; - - // Atomic lock acquisition to prevent race conditions - const lockAcquired = yield* Effect.tryPromise({ - try: async () => { - const response = await env.gmail_processing_threads.put(historyProcessingKey, 'true', { - expirationTtl: 3600, - }); - return response !== null; // null means key already existed - }, - catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - }); - - if (!lockAcquired) { - yield* Console.log('[ZERO_WORKFLOW] History already being processed:', { - connectionId, - historyId, - }); - return yield* Effect.fail({ - _tag: 'HistoryAlreadyProcessing' as const, - connectionId, - historyId, - }); - } - - yield* Console.log( - '[ZERO_WORKFLOW] Acquired processing lock for history:', - historyProcessingKey, - ); - - const { db, conn } = createDb(env.HYPERDRIVE.connectionString); - - const foundConnection = yield* Effect.tryPromise({ - try: async () => { - console.log('[ZERO_WORKFLOW] Finding connection:', connectionId); - const [foundConnection] = await db - .select() - .from(connection) - .where(eq(connection.id, connectionId.toString())); - await conn.end(); - if (!foundConnection) { - throw new Error(`Connection not found ${connectionId}`); - } - if (!foundConnection.accessToken || !foundConnection.refreshToken) { - throw new Error(`Connection is not authorized ${connectionId}`); - } - console.log('[ZERO_WORKFLOW] Found connection:', foundConnection.id); - return foundConnection; - }, - catch: (error) => ({ _tag: 'DatabaseError' as const, error }), - }); - - yield* Effect.tryPromise({ - try: async () => conn.end(), - catch: (error) => ({ _tag: 'DatabaseError' as const, error }), - }); - - const agent = yield* Effect.tryPromise({ - try: async () => await getZeroAgent(foundConnection.id), - catch: (error) => ({ _tag: 'DatabaseError' as const, error }), - }); - - if (foundConnection.providerId === EProviders.google) { - yield* Console.log('[ZERO_WORKFLOW] Processing Google provider workflow'); - - const history = yield* Effect.tryPromise({ - try: async () => { - console.log('[ZERO_WORKFLOW] Getting Gmail history with ID:', historyId); - const { history } = (await agent.listHistory(historyId.toString())) as { - history: gmail_v1.Schema$History[]; - }; - console.log('[ZERO_WORKFLOW] Found history entries:', history); - return history; - }, - catch: (error) => ({ _tag: 'GmailApiError' as const, error }), - }); - - yield* Effect.tryPromise({ - try: () => { - console.log('[ZERO_WORKFLOW] Updating next history ID:', nextHistoryId); - return env.gmail_history_id.put(connectionId.toString(), nextHistoryId.toString()); - }, - catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - }); - - if (!history.length) { - yield* Console.log('[ZERO_WORKFLOW] No history found, skipping'); - return 'No history found'; - } - - // Extract thread IDs from history and track label changes - const threadsAdded = new Set(); - const threadLabelChanges = new Map< - string, - { addLabels: Set; removeLabels: Set } - >(); - - // Optimal single-pass functional processing - const processLabelChange = ( - labelChange: { message?: gmail_v1.Schema$Message; labelIds?: string[] | null }, - isAddition: boolean, - ) => { - const threadId = labelChange.message?.threadId; - if (!threadId || !labelChange.labelIds?.length) return; - - let changes = threadLabelChanges.get(threadId); - if (!changes) { - changes = { addLabels: new Set(), removeLabels: new Set() }; - threadLabelChanges.set(threadId, changes); - } - - const targetSet = isAddition ? changes.addLabels : changes.removeLabels; - labelChange.labelIds.forEach((labelId) => targetSet.add(labelId)); - }; - - history.forEach((historyItem) => { - // Extract thread IDs from messages - historyItem.messagesAdded?.forEach((msg) => { - if (msg.message?.threadId) { - threadsAdded.add(msg.message.threadId); - } - }); - - // Process label changes using shared helper - historyItem.labelsAdded?.forEach((labelAdded) => processLabelChange(labelAdded, true)); - historyItem.labelsRemoved?.forEach((labelRemoved) => - processLabelChange(labelRemoved, false), - ); - }); - - yield* Console.log( - '[ZERO_WORKFLOW] Found unique thread IDs:', - Array.from(threadLabelChanges.keys()), - Array.from(threadsAdded), - ); - - if (threadsAdded.size > 0) { - const threadWorkflowParams = Array.from(threadsAdded); - - // Sync threads with proper error handling - use allSuccesses to collect successful syncs - const syncResults = yield* Effect.allSuccesses( - threadWorkflowParams.map((threadId) => - Effect.tryPromise({ - try: async () => { - const result = await agent.syncThread({ threadId }); - console.log(`[ZERO_WORKFLOW] Successfully synced thread ${threadId}`); - return { threadId, result }; - }, - catch: (error) => { - console.error(`[ZERO_WORKFLOW] Failed to sync thread ${threadId}:`, error); - // Let this effect fail so allSuccesses will exclude it - throw new Error( - `Failed to sync thread ${threadId}: ${error instanceof Error ? error.message : String(error)}`, - ); - }, - }), - ), - { concurrency: 1 }, // Limit concurrency to avoid rate limits - ); - - const syncedCount = syncResults.length; - const failedCount = threadWorkflowParams.length - syncedCount; - - if (failedCount > 0) { - yield* Console.log( - `[ZERO_WORKFLOW] Warning: ${failedCount}/${threadWorkflowParams.length} thread syncs failed. Successfully synced: ${syncedCount}`, - ); - // Continue with processing - sync failures shouldn't stop the entire workflow - // The thread processing will continue with whatever data is available - } else { - yield* Console.log(`[ZERO_WORKFLOW] Successfully synced all ${syncedCount} threads`); - } - - yield* Console.log('[ZERO_WORKFLOW] Synced threads:', syncResults); - - // Run thread workflow for each successfully synced thread - if (syncedCount > 0) { - yield* Effect.tryPromise({ - try: () => agent.reloadFolder('inbox'), - catch: (error) => ({ _tag: 'GmailApiError' as const, error }), - }).pipe( - Effect.tap(() => Console.log('[ZERO_WORKFLOW] Successfully reloaded inbox folder')), - Effect.orElse(() => - Effect.gen(function* () { - yield* Console.log('[ZERO_WORKFLOW] Failed to reload inbox folder'); - return undefined; - }), - ), - ); - - yield* Console.log( - `[ZERO_WORKFLOW] Running thread workflows for ${syncedCount} synced threads`, - ); - - const threadWorkflowResults = yield* Effect.allSuccesses( - syncResults.map(({ threadId }) => - runWorkflow(EWorkflowType.THREAD, { - connectionId, - threadId, - providerId: foundConnection.providerId, - }).pipe( - Effect.tap(() => - Console.log(`[ZERO_WORKFLOW] Successfully ran thread workflow for ${threadId}`), - ), - Effect.tapError((error) => - Console.log( - `[ZERO_WORKFLOW] Failed to run thread workflow for ${threadId}:`, - error, - ), - ), - ), - ), - { concurrency: 1 }, // Limit concurrency to avoid overwhelming the system - ); - - const threadWorkflowSuccessCount = threadWorkflowResults.length; - const threadWorkflowFailedCount = syncedCount - threadWorkflowSuccessCount; - - if (threadWorkflowFailedCount > 0) { - yield* Console.log( - `[ZERO_WORKFLOW] Warning: ${threadWorkflowFailedCount}/${syncedCount} thread workflows failed. Successfully processed: ${threadWorkflowSuccessCount}`, - ); - } else { - yield* Console.log( - `[ZERO_WORKFLOW] Successfully ran all ${threadWorkflowSuccessCount} thread workflows`, - ); - } - } - } - - // Process label changes for threads - if (threadLabelChanges.size > 0) { - yield* Console.log( - `[ZERO_WORKFLOW] Processing label changes for ${threadLabelChanges.size} threads`, - ); - - // Process each thread's label changes - for (const [threadId, changes] of threadLabelChanges) { - const addLabels = Array.from(changes.addLabels); - const removeLabels = Array.from(changes.removeLabels); - - // Only call if there are actual changes to make - if (addLabels.length > 0 || removeLabels.length > 0) { - yield* Console.log( - `[ZERO_WORKFLOW] Modifying labels for thread ${threadId}: +${addLabels.length} -${removeLabels.length}`, - ); - yield* Effect.tryPromise({ - try: () => agent.modifyThreadLabelsInDB(threadId, addLabels, removeLabels), - catch: (error) => ({ _tag: 'LabelModificationFailed' as const, error, threadId }), - }).pipe( - Effect.orElse(() => - Effect.gen(function* () { - yield* Console.log( - `[ZERO_WORKFLOW] Failed to modify labels for thread ${threadId}`, - ); - return undefined; - }), - ), - ); - } - } - - yield* Console.log('[ZERO_WORKFLOW] Completed label modifications'); - } else { - yield* Console.log('[ZERO_WORKFLOW] No threads with label changes to process'); - } - - // Clean up processing flag - yield* Effect.tryPromise({ - try: () => { - console.log( - '[ZERO_WORKFLOW] Clearing processing flag for history:', - historyProcessingKey, - ); - return env.gmail_processing_threads.delete(historyProcessingKey); - }, - catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - }).pipe(Effect.orElse(() => Effect.succeed(null))); - - yield* Console.log('[ZERO_WORKFLOW] Processing complete'); - return 'Zero workflow completed successfully'; - } else { - yield* Console.log('[ZERO_WORKFLOW] Unsupported provider:', foundConnection.providerId); - return yield* Effect.fail({ - _tag: 'UnsupportedProvider' as const, - providerId: foundConnection.providerId, - }); - } - }).pipe( - Effect.tapError((error) => Console.log('[ZERO_WORKFLOW] Error in workflow:', error)), - Effect.catchAll((error) => { - // Clean up processing flag on error - return Effect.tryPromise({ - try: () => { - console.log( - '[ZERO_WORKFLOW] Clearing processing flag for history after error:', - `history_${params.connectionId}__${params.historyId}`, - ); - return env.gmail_processing_threads.delete( - `history_${params.connectionId}__${params.historyId}`, - ); - }, - catch: () => ({ - _tag: 'WorkflowCreationFailed' as const, - error: 'Failed to cleanup processing flag', - }), - }).pipe( - Effect.orElse(() => Effect.succeed(null)), - Effect.flatMap(() => Effect.fail(error)), - ); - }), - Effect.provide(loggerLayer), - ); - -// Define the ThreadWorkflow parameters type -type ThreadWorkflowParams = { - connectionId: string; - threadId: string; - providerId: string; -}; - -// Define error types for ThreadWorkflow -type ThreadWorkflowError = - | { _tag: 'ConnectionNotFound'; connectionId: string } - | { _tag: 'ConnectionNotAuthorized'; connectionId: string } - | { _tag: 'ThreadNotFound'; threadId: string } - | { _tag: 'UnsupportedProvider'; providerId: string } - | { _tag: 'DatabaseError'; error: unknown } - | { _tag: 'GmailApiError'; error: unknown } - | { _tag: 'VectorizationError'; error: unknown } - | { _tag: 'WorkflowCreationFailed'; error: unknown }; /** * Runs the main workflow for processing a thread. The workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. * @param params * @returns */ -export const runThreadWorkflow = ( - params: ThreadWorkflowParams, -): Effect.Effect => - Effect.gen(function* () { - yield* Console.log('[THREAD_WORKFLOW] Starting workflow with payload:', params); - const { connectionId, threadId, providerId } = params; - - if (providerId === EProviders.google) { - yield* Console.log('[THREAD_WORKFLOW] Processing Google provider workflow'); - const { db, conn } = createDb(env.HYPERDRIVE.connectionString); - - const foundConnection = yield* Effect.tryPromise({ - try: async () => { - console.log('[THREAD_WORKFLOW] Finding connection:', connectionId); - const [foundConnection] = await db - .select() - .from(connection) - .where(eq(connection.id, connectionId.toString())); - if (!foundConnection) { - throw new Error(`Connection not found ${connectionId}`); - } - if (!foundConnection.accessToken || !foundConnection.refreshToken) { - throw new Error(`Connection is not authorized ${connectionId}`); - } - console.log('[THREAD_WORKFLOW] Found connection:', foundConnection.id); - return foundConnection; - }, - catch: (error) => ({ _tag: 'DatabaseError' as const, error }), - }); - - yield* Effect.tryPromise({ - try: async () => conn.end(), - catch: (error) => ({ _tag: 'DatabaseError' as const, error }), - }); - - const agent = yield* Effect.tryPromise({ - try: async () => await getZeroAgent(foundConnection.id), - catch: (error) => ({ _tag: 'DatabaseError' as const, error }), - }); - - const thread = yield* Effect.tryPromise({ - try: async () => { - console.log('[THREAD_WORKFLOW] Getting thread:', threadId); - const thread = await agent.getThread(threadId.toString()); - console.log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages.length); - return thread; - }, - catch: (error) => ({ _tag: 'GmailApiError' as const, error }), - }); - - if (!thread.messages || thread.messages.length === 0) { - yield* Console.log('[THREAD_WORKFLOW] Thread has no messages, skipping processing'); - return 'Thread has no messages'; - } - - // Initialize workflow engine with default workflows - const workflowEngine = createDefaultWorkflows(); - - // Create workflow context - const workflowContext = { - connectionId: connectionId.toString(), - threadId: threadId.toString(), - thread, - foundConnection, - agent, - env, - }; - - // Execute configured workflows using the workflow engine - const workflowResults = yield* Effect.tryPromise({ - try: async () => { - const allResults = new Map(); - const allErrors = new Map(); - - // Execute all workflows registered in the engine - const workflowNames = workflowEngine.getWorkflowNames(); - - for (const workflowName of workflowNames) { - console.log(`[THREAD_WORKFLOW] Executing workflow: ${workflowName}`); - - try { - const { results, errors } = await workflowEngine.executeWorkflow( - workflowName, - workflowContext, - ); - - // Merge results and errors using efficient Map operations - results.forEach((value, key) => allResults.set(key, value)); - errors.forEach((value, key) => allErrors.set(key, value)); - - console.log(`[THREAD_WORKFLOW] Completed workflow: ${workflowName}`); - } catch (error) { - console.error(`[THREAD_WORKFLOW] Failed to execute workflow ${workflowName}:`, error); - const errorObj = error instanceof Error ? error : new Error(String(error)); - allErrors.set(workflowName, errorObj); - } - } - - return { results: allResults, errors: allErrors }; - }, - catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), - }); - - // Log workflow results - const successfulSteps = Array.from(workflowResults.results.keys()); - const failedSteps = Array.from(workflowResults.errors.keys()); - - if (successfulSteps.length > 0) { - yield* Console.log('[THREAD_WORKFLOW] Successfully executed steps:', successfulSteps); - } - - if (failedSteps.length > 0) { - yield* Console.log('[THREAD_WORKFLOW] Failed steps:', failedSteps); - // Log errors efficiently using forEach to avoid nested iteration - workflowResults.errors.forEach((error, stepId) => { - console.log(`[THREAD_WORKFLOW] Error in step ${stepId}:`, error.message); - }); - } - - // Clean up thread processing flag - yield* Effect.tryPromise({ - try: () => { - console.log('[THREAD_WORKFLOW] Clearing processing flag for thread:', threadId); - return env.gmail_processing_threads.delete(threadId.toString()); - }, - catch: (error) => ({ _tag: 'DatabaseError' as const, error }), - }).pipe(Effect.orElse(() => Effect.succeed(null))); - - yield* Effect.tryPromise({ - try: async () => conn.end(), - catch: (error) => ({ _tag: 'DatabaseError' as const, error }), - }); - - yield* Console.log('[THREAD_WORKFLOW] Thread processing complete'); - return 'Thread workflow completed successfully'; - } else { - yield* Console.log('[THREAD_WORKFLOW] Unsupported provider:', providerId); - return yield* Effect.fail({ - _tag: 'UnsupportedProvider' as const, - providerId, - }); - } - }).pipe( - Effect.tapError((error) => Console.log('[THREAD_WORKFLOW] Error in workflow:', error)), - Effect.catchAll((error) => { - // Clean up thread processing flag on error - return Effect.tryPromise({ - try: () => { - console.log( - '[THREAD_WORKFLOW] Clearing processing flag for thread after error:', - params.threadId, - ); - return env.gmail_processing_threads.delete(params.threadId.toString()); - }, - catch: () => ({ - _tag: 'DatabaseError' as const, - error: 'Failed to cleanup thread processing flag', - }), - }).pipe( - Effect.orElse(() => Effect.succeed(null)), - Effect.flatMap(() => Effect.fail(error)), - ); - }), - Effect.provide(loggerLayer), - ); export const getPrompt = async (promptName: string, fallback: string) => { try { diff --git a/apps/server/src/pipelines.ts b/apps/server/src/pipelines.ts index f6926f1866..fcfb281c70 100644 --- a/apps/server/src/pipelines.ts +++ b/apps/server/src/pipelines.ts @@ -11,9 +11,47 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { runMainWorkflow, runZeroWorkflow, runThreadWorkflow } from './pipelines.effect'; +import { createDefaultWorkflows } from './thread-workflow-utils/workflow-engine'; +import { getServiceAccount } from './lib/factories/google-subscription.factory'; +import { DurableObject, env } from 'cloudflare:workers'; +import { getZeroAgent } from './lib/server-utils'; +import { type gmail_v1 } from '@googleapis/gmail'; +import { Effect, Console, Logger } from 'effect'; +import { connection } from './db/schema'; +import { EProviders } from './types'; import { EPrompts } from './types'; -import { Effect } from 'effect'; +import { eq } from 'drizzle-orm'; +import { createDb } from './db'; + +// Configure pretty logger to stderr +export const loggerLayer = Logger.add(Logger.prettyLogger({ stderr: true })); + +const isValidUUID = (str: string): boolean => { + const regex = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + return regex.test(str); +}; + +const validateArguments = ( + params: MainWorkflowParams, + serviceAccount: { project_id: string }, +): Effect.Effect => + Effect.gen(function* () { + yield* Console.log('[MAIN_WORKFLOW] Validating arguments'); + const regex = new RegExp( + `projects/${serviceAccount.project_id}/subscriptions/notifications__([a-z0-9-]+)`, + ); + const match = params.subscriptionName.toString().match(regex); + if (!match) { + yield* Console.log('[MAIN_WORKFLOW] Invalid subscription name:', params.subscriptionName); + return yield* Effect.fail({ + _tag: 'InvalidSubscriptionName' as const, + subscriptionName: params.subscriptionName, + }); + } + const [, connectionId] = match; + yield* Console.log('[MAIN_WORKFLOW] Extracted connectionId:', connectionId); + return connectionId; + }); // Helper function for generating prompt names export const getPromptName = (connectionId: string, prompt: EPrompts) => { @@ -49,18 +87,597 @@ export type WorkflowParams = | { workflowType: 'thread'; params: ThreadWorkflowParams } | { workflowType: 'zero'; params: ZeroWorkflowParams }; -export const runWorkflow = ( - workflowType: EWorkflowType, - params: MainWorkflowParams | ThreadWorkflowParams | ZeroWorkflowParams, -): Effect.Effect => { - switch (workflowType) { - case EWorkflowType.MAIN: - return runMainWorkflow(params as MainWorkflowParams); - case EWorkflowType.ZERO: - return runZeroWorkflow(params as ZeroWorkflowParams); - case EWorkflowType.THREAD: - return runThreadWorkflow(params as ThreadWorkflowParams); - default: - return Effect.fail({ _tag: 'UnsupportedWorkflow', workflowType }); +export type MainWorkflowError = + | { _tag: 'MissingEnvironmentVariable'; variable: string } + | { _tag: 'InvalidSubscriptionName'; subscriptionName: string } + | { _tag: 'InvalidConnectionId'; connectionId: string } + | { _tag: 'UnsupportedProvider'; providerId: string } + | { _tag: 'WorkflowCreationFailed'; error: unknown }; + +export type ZeroWorkflowError = + | { _tag: 'HistoryAlreadyProcessing'; connectionId: string; historyId: string } + | { _tag: 'ConnectionNotFound'; connectionId: string } + | { _tag: 'ConnectionNotAuthorized'; connectionId: string } + | { _tag: 'HistoryNotFound'; historyId: string; connectionId: string } + | { _tag: 'UnsupportedProvider'; providerId: string } + | { _tag: 'DatabaseError'; error: unknown } + | { _tag: 'GmailApiError'; error: unknown } + | { _tag: 'WorkflowCreationFailed'; error: unknown } + | { _tag: 'LabelModificationFailed'; error: unknown; threadId: string }; + +export type ThreadWorkflowError = + | { _tag: 'ConnectionNotFound'; connectionId: string } + | { _tag: 'ConnectionNotAuthorized'; connectionId: string } + | { _tag: 'ThreadNotFound'; threadId: string } + | { _tag: 'UnsupportedProvider'; providerId: string } + | { _tag: 'DatabaseError'; error: unknown } + | { _tag: 'GmailApiError'; error: unknown } + | { _tag: 'VectorizationError'; error: unknown } + | { _tag: 'WorkflowCreationFailed'; error: unknown }; + +export type UnsupportedWorkflowError = { _tag: 'UnsupportedWorkflow'; workflowType: never }; + +export type WorkflowError = + | MainWorkflowError + | ZeroWorkflowError + | ThreadWorkflowError + | UnsupportedWorkflowError; + +export class WorkflowRunner extends DurableObject { + constructor(state: DurableObjectState, env: Env) { + super(state, env); } -}; + + /** + * This function runs the main workflow. The main workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. + * It validates the subscription name and extracts the connection ID. + * @param params + * @returns + */ + public runMainWorkflow(params: MainWorkflowParams) { + return Effect.gen(this, function* () { + yield* Console.log('[MAIN_WORKFLOW] Starting workflow with payload:', params); + + const { providerId, historyId } = params; + + const serviceAccount = getServiceAccount(); + + const connectionId = yield* validateArguments(params, serviceAccount); + + if (!isValidUUID(connectionId)) { + yield* Console.log('[MAIN_WORKFLOW] Invalid connection id format:', connectionId); + return yield* Effect.fail({ + _tag: 'InvalidConnectionId' as const, + connectionId, + }); + } + + const previousHistoryId = yield* Effect.tryPromise({ + try: () => env.gmail_history_id.get(connectionId), + catch: () => ({ + _tag: 'WorkflowCreationFailed' as const, + error: 'Failed to get history ID', + }), + }).pipe(Effect.orElse(() => Effect.succeed(null))); + + if (providerId === EProviders.google) { + yield* Console.log('[MAIN_WORKFLOW] Processing Google provider workflow'); + yield* Console.log('[MAIN_WORKFLOW] Previous history ID:', previousHistoryId); + + const zeroWorkflowParams = { + connectionId, + historyId: previousHistoryId || historyId, + nextHistoryId: historyId, + }; + + const result = yield* Effect.tryPromise({ + try: () => this.runZeroWorkflow(zeroWorkflowParams), + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + yield* Console.log('[MAIN_WORKFLOW] Zero workflow result:', result); + } else { + yield* Console.log('[MAIN_WORKFLOW] Unsupported provider:', providerId); + return yield* Effect.fail({ + _tag: 'UnsupportedProvider' as const, + providerId, + }); + } + + yield* Console.log('[MAIN_WORKFLOW] Workflow completed successfully'); + return 'Workflow completed successfully'; + }).pipe( + Effect.tapError((error) => Console.log('[MAIN_WORKFLOW] Error in workflow:', error)), + Effect.provide(loggerLayer), + Effect.runPromise, + ); + } + + private runZeroWorkflow(params: ZeroWorkflowParams) { + return Effect.gen(this, function* () { + yield* Console.log('[ZERO_WORKFLOW] Starting workflow with payload:', params); + const { connectionId, historyId, nextHistoryId } = params; + + const historyProcessingKey = `history_${connectionId}__${historyId}`; + + // Atomic lock acquisition to prevent race conditions + const lockAcquired = yield* Effect.tryPromise({ + try: async () => { + const response = await env.gmail_processing_threads.put(historyProcessingKey, 'true', { + expirationTtl: 3600, + }); + return response !== null; // null means key already existed + }, + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + if (!lockAcquired) { + yield* Console.log('[ZERO_WORKFLOW] History already being processed:', { + connectionId, + historyId, + }); + return yield* Effect.fail({ + _tag: 'HistoryAlreadyProcessing' as const, + connectionId, + historyId, + }); + } + + yield* Console.log( + '[ZERO_WORKFLOW] Acquired processing lock for history:', + historyProcessingKey, + ); + + const { db, conn } = createDb(env.HYPERDRIVE.connectionString); + + const foundConnection = yield* Effect.tryPromise({ + try: async () => { + console.log('[ZERO_WORKFLOW] Finding connection:', connectionId); + const [foundConnection] = await db + .select() + .from(connection) + .where(eq(connection.id, connectionId.toString())); + await conn.end(); + if (!foundConnection) { + throw new Error(`Connection not found ${connectionId}`); + } + if (!foundConnection.accessToken || !foundConnection.refreshToken) { + throw new Error(`Connection is not authorized ${connectionId}`); + } + console.log('[ZERO_WORKFLOW] Found connection:', foundConnection.id); + return foundConnection; + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + yield* Effect.tryPromise({ + try: async () => conn.end(), + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + const agent = yield* Effect.tryPromise({ + try: async () => await getZeroAgent(foundConnection.id), + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + if (foundConnection.providerId === EProviders.google) { + yield* Console.log('[ZERO_WORKFLOW] Processing Google provider workflow'); + + const history = yield* Effect.tryPromise({ + try: async () => { + console.log('[ZERO_WORKFLOW] Getting Gmail history with ID:', historyId); + const { history } = (await agent.listHistory(historyId.toString())) as { + history: gmail_v1.Schema$History[]; + }; + console.log('[ZERO_WORKFLOW] Found history entries:', history); + return history; + }, + catch: (error) => ({ _tag: 'GmailApiError' as const, error }), + }); + + yield* Effect.tryPromise({ + try: () => { + console.log('[ZERO_WORKFLOW] Updating next history ID:', nextHistoryId); + return env.gmail_history_id.put(connectionId.toString(), nextHistoryId.toString()); + }, + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + if (!history.length) { + yield* Console.log('[ZERO_WORKFLOW] No history found, skipping'); + return 'No history found'; + } + + // Extract thread IDs from history and track label changes + const threadsAdded = new Set(); + const threadLabelChanges = new Map< + string, + { addLabels: Set; removeLabels: Set } + >(); + + // Optimal single-pass functional processing + const processLabelChange = ( + labelChange: { message?: gmail_v1.Schema$Message; labelIds?: string[] | null }, + isAddition: boolean, + ) => { + const threadId = labelChange.message?.threadId; + if (!threadId || !labelChange.labelIds?.length) return; + + let changes = threadLabelChanges.get(threadId); + if (!changes) { + changes = { addLabels: new Set(), removeLabels: new Set() }; + threadLabelChanges.set(threadId, changes); + } + + const targetSet = isAddition ? changes.addLabels : changes.removeLabels; + labelChange.labelIds.forEach((labelId) => targetSet.add(labelId)); + }; + + history.forEach((historyItem) => { + // Extract thread IDs from messages + historyItem.messagesAdded?.forEach((msg) => { + if (msg.message?.threadId) { + threadsAdded.add(msg.message.threadId); + } + }); + + // Process label changes using shared helper + historyItem.labelsAdded?.forEach((labelAdded) => processLabelChange(labelAdded, true)); + historyItem.labelsRemoved?.forEach((labelRemoved) => + processLabelChange(labelRemoved, false), + ); + }); + + yield* Console.log( + '[ZERO_WORKFLOW] Found unique thread IDs:', + Array.from(threadLabelChanges.keys()), + Array.from(threadsAdded), + ); + + if (threadsAdded.size > 0) { + const threadWorkflowParams = Array.from(threadsAdded); + + // Sync threads with proper error handling - use allSuccesses to collect successful syncs + const syncResults = yield* Effect.allSuccesses( + threadWorkflowParams.map((threadId) => + Effect.tryPromise({ + try: async () => { + const result = await agent.syncThread({ threadId }); + console.log(`[ZERO_WORKFLOW] Successfully synced thread ${threadId}`); + return { threadId, result }; + }, + catch: (error) => { + console.error(`[ZERO_WORKFLOW] Failed to sync thread ${threadId}:`, error); + // Let this effect fail so allSuccesses will exclude it + throw new Error( + `Failed to sync thread ${threadId}: ${error instanceof Error ? error.message : String(error)}`, + ); + }, + }), + ), + { concurrency: 6 }, // Limit concurrency to avoid rate limits + ); + + const syncedCount = syncResults.length; + const failedCount = threadWorkflowParams.length - syncedCount; + + if (failedCount > 0) { + yield* Console.log( + `[ZERO_WORKFLOW] Warning: ${failedCount}/${threadWorkflowParams.length} thread syncs failed. Successfully synced: ${syncedCount}`, + ); + // Continue with processing - sync failures shouldn't stop the entire workflow + // The thread processing will continue with whatever data is available + } else { + yield* Console.log(`[ZERO_WORKFLOW] Successfully synced all ${syncedCount} threads`); + } + + yield* Console.log('[ZERO_WORKFLOW] Synced threads:', syncResults); + + // Run thread workflow for each successfully synced thread + if (syncedCount > 0) { + yield* Effect.tryPromise({ + try: () => agent.reloadFolder('inbox'), + catch: (error) => ({ _tag: 'GmailApiError' as const, error }), + }).pipe( + Effect.tap(() => Console.log('[ZERO_WORKFLOW] Successfully reloaded inbox folder')), + Effect.orElse(() => + Effect.gen(function* () { + yield* Console.log('[ZERO_WORKFLOW] Failed to reload inbox folder'); + return undefined; + }), + ), + ); + + yield* Console.log( + `[ZERO_WORKFLOW] Running thread workflows for ${syncedCount} synced threads`, + ); + + const threadWorkflowResults = yield* Effect.allSuccesses( + syncResults.map(({ threadId }) => + this.runThreadWorkflow({ + connectionId, + threadId, + providerId: foundConnection.providerId, + }).pipe( + Effect.tap(() => + Console.log(`[ZERO_WORKFLOW] Successfully ran thread workflow for ${threadId}`), + ), + Effect.tapError((error) => + Console.log( + `[ZERO_WORKFLOW] Failed to run thread workflow for ${threadId}:`, + error, + ), + ), + ), + ), + { concurrency: 6 }, // Limit concurrency to avoid overwhelming the system + ); + + const threadWorkflowSuccessCount = threadWorkflowResults.length; + const threadWorkflowFailedCount = syncedCount - threadWorkflowSuccessCount; + + if (threadWorkflowFailedCount > 0) { + yield* Console.log( + `[ZERO_WORKFLOW] Warning: ${threadWorkflowFailedCount}/${syncedCount} thread workflows failed. Successfully processed: ${threadWorkflowSuccessCount}`, + ); + } else { + yield* Console.log( + `[ZERO_WORKFLOW] Successfully ran all ${threadWorkflowSuccessCount} thread workflows`, + ); + } + } + } + + // Process label changes for threads + if (threadLabelChanges.size > 0) { + yield* Console.log( + `[ZERO_WORKFLOW] Processing label changes for ${threadLabelChanges.size} threads`, + ); + + // Process each thread's label changes + for (const [threadId, changes] of threadLabelChanges) { + const addLabels = Array.from(changes.addLabels); + const removeLabels = Array.from(changes.removeLabels); + + // Only call if there are actual changes to make + if (addLabels.length > 0 || removeLabels.length > 0) { + yield* Console.log( + `[ZERO_WORKFLOW] Modifying labels for thread ${threadId}: +${addLabels.length} -${removeLabels.length}`, + ); + yield* Effect.tryPromise({ + try: () => agent.modifyThreadLabelsInDB(threadId, addLabels, removeLabels), + catch: (error) => ({ _tag: 'LabelModificationFailed' as const, error, threadId }), + }).pipe( + Effect.orElse(() => + Effect.gen(function* () { + yield* Console.log( + `[ZERO_WORKFLOW] Failed to modify labels for thread ${threadId}`, + ); + return undefined; + }), + ), + ); + } + } + + yield* Console.log('[ZERO_WORKFLOW] Completed label modifications'); + } else { + yield* Console.log('[ZERO_WORKFLOW] No threads with label changes to process'); + } + + // Clean up processing flag + yield* Effect.tryPromise({ + try: () => { + console.log( + '[ZERO_WORKFLOW] Clearing processing flag for history:', + historyProcessingKey, + ); + return env.gmail_processing_threads.delete(historyProcessingKey); + }, + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }).pipe(Effect.orElse(() => Effect.succeed(null))); + + yield* Console.log('[ZERO_WORKFLOW] Processing complete'); + return 'Zero workflow completed successfully'; + } else { + yield* Console.log('[ZERO_WORKFLOW] Unsupported provider:', foundConnection.providerId); + return yield* Effect.fail({ + _tag: 'UnsupportedProvider' as const, + providerId: foundConnection.providerId, + }); + } + }).pipe( + Effect.tapError((error) => Console.log('[ZERO_WORKFLOW] Error in workflow:', error)), + Effect.catchAll((error) => { + // Clean up processing flag on error + return Effect.tryPromise({ + try: () => { + console.log( + '[ZERO_WORKFLOW] Clearing processing flag for history after error:', + `history_${params.connectionId}__${params.historyId}`, + ); + return env.gmail_processing_threads.delete( + `history_${params.connectionId}__${params.historyId}`, + ); + }, + catch: () => ({ + _tag: 'WorkflowCreationFailed' as const, + error: 'Failed to cleanup processing flag', + }), + }).pipe( + Effect.orElse(() => Effect.succeed(null)), + Effect.flatMap(() => Effect.fail(error)), + ); + }), + Effect.provide(loggerLayer), + Effect.runPromise, + ); + } + + private runThreadWorkflow(params: ThreadWorkflowParams) { + return Effect.gen(this, function* () { + yield* Console.log('[THREAD_WORKFLOW] Starting workflow with payload:', params); + const { connectionId, threadId, providerId } = params; + + if (providerId === EProviders.google) { + yield* Console.log('[THREAD_WORKFLOW] Processing Google provider workflow'); + const { db, conn } = createDb(env.HYPERDRIVE.connectionString); + + const foundConnection = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Finding connection:', connectionId); + const [foundConnection] = await db + .select() + .from(connection) + .where(eq(connection.id, connectionId.toString())); + if (!foundConnection) { + throw new Error(`Connection not found ${connectionId}`); + } + if (!foundConnection.accessToken || !foundConnection.refreshToken) { + throw new Error(`Connection is not authorized ${connectionId}`); + } + console.log('[THREAD_WORKFLOW] Found connection:', foundConnection.id); + return foundConnection; + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + yield* Effect.tryPromise({ + try: async () => conn.end(), + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + const agent = yield* Effect.tryPromise({ + try: async () => await getZeroAgent(foundConnection.id), + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + + const thread = yield* Effect.tryPromise({ + try: async () => { + console.log('[THREAD_WORKFLOW] Getting thread:', threadId); + const thread = await agent.getThread(threadId.toString()); + console.log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages.length); + return thread; + }, + catch: (error) => ({ _tag: 'GmailApiError' as const, error }), + }); + + if (!thread.messages || thread.messages.length === 0) { + yield* Console.log('[THREAD_WORKFLOW] Thread has no messages, skipping processing'); + return 'Thread has no messages'; + } + + // Initialize workflow engine with default workflows + const workflowEngine = createDefaultWorkflows(); + + // Create workflow context + const workflowContext = { + connectionId: connectionId.toString(), + threadId: threadId.toString(), + thread, + foundConnection, + agent, + env, + results: new Map(), + }; + + // Execute configured workflows using the workflow engine + const workflowResults = yield* Effect.tryPromise({ + try: async () => { + const allResults = new Map(); + const allErrors = new Map(); + + // Execute all workflows registered in the engine + const workflowNames = workflowEngine.getWorkflowNames(); + + for (const workflowName of workflowNames) { + console.log(`[THREAD_WORKFLOW] Executing workflow: ${workflowName}`); + + try { + const { results, errors } = await workflowEngine.executeWorkflow( + workflowName, + workflowContext, + ); + + // Merge results and errors using efficient Map operations + results.forEach((value, key) => allResults.set(key, value)); + errors.forEach((value, key) => allErrors.set(key, value)); + + console.log(`[THREAD_WORKFLOW] Completed workflow: ${workflowName}`); + } catch (error) { + console.error( + `[THREAD_WORKFLOW] Failed to execute workflow ${workflowName}:`, + error, + ); + const errorObj = error instanceof Error ? error : new Error(String(error)); + allErrors.set(workflowName, errorObj); + } + } + + return { results: allResults, errors: allErrors }; + }, + catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), + }); + + // Clear workflow context after execution + workflowEngine.clearContext(workflowContext); + + // Log workflow results + const successfulSteps = Array.from(workflowResults.results.keys()); + const failedSteps = Array.from(workflowResults.errors.keys()); + + if (successfulSteps.length > 0) { + yield* Console.log('[THREAD_WORKFLOW] Successfully executed steps:', successfulSteps); + } + + if (failedSteps.length > 0) { + yield* Console.log('[THREAD_WORKFLOW] Failed steps:', failedSteps); + // Log errors efficiently using forEach to avoid nested iteration + workflowResults.errors.forEach((error, stepId) => { + console.log(`[THREAD_WORKFLOW] Error in step ${stepId}:`, error.message); + }); + } + + // Clean up thread processing flag + yield* Effect.tryPromise({ + try: () => { + console.log('[THREAD_WORKFLOW] Clearing processing flag for thread:', threadId); + return env.gmail_processing_threads.delete(threadId.toString()); + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }).pipe(Effect.orElse(() => Effect.succeed(null))); + + yield* Console.log('[THREAD_WORKFLOW] Thread processing complete'); + return 'Thread workflow completed successfully'; + } else { + yield* Console.log('[THREAD_WORKFLOW] Unsupported provider:', providerId); + return yield* Effect.fail({ + _tag: 'UnsupportedProvider' as const, + providerId, + }); + } + }).pipe( + Effect.tapError((error) => Console.log('[THREAD_WORKFLOW] Error in workflow:', error)), + Effect.catchAll((error) => { + // Clean up thread processing flag on error + return Effect.tryPromise({ + try: () => { + console.log( + '[THREAD_WORKFLOW] Clearing processing flag for thread after error:', + params.threadId, + ); + return env.gmail_processing_threads.delete(params.threadId.toString()); + }, + catch: () => ({ + _tag: 'DatabaseError' as const, + error: 'Failed to cleanup thread processing flag', + }), + }).pipe( + Effect.orElse(() => Effect.succeed(null)), + Effect.flatMap(() => Effect.fail(error)), + ); + }), + Effect.provide(loggerLayer), + ); + } +} diff --git a/apps/server/src/thread-workflow-utils/workflow-engine.ts b/apps/server/src/thread-workflow-utils/workflow-engine.ts index 345b8c2c60..ef570e5e30 100644 --- a/apps/server/src/thread-workflow-utils/workflow-engine.ts +++ b/apps/server/src/thread-workflow-utils/workflow-engine.ts @@ -82,6 +82,13 @@ export class WorkflowEngine { return { results, errors }; } + + clearContext(context: WorkflowContext): void { + if (context.results) { + context.results.clear(); + } + console.log('[WORKFLOW_ENGINE] Context cleared'); + } } export const createDefaultWorkflows = (): WorkflowEngine => { @@ -91,12 +98,23 @@ export const createDefaultWorkflows = (): WorkflowEngine => { name: 'auto-draft-generation', description: 'Automatically generates drafts for threads that require responses', steps: [ + { + id: 'check-workflow-execution', + name: 'Check Workflow Execution', + description: 'Checks if this workflow has already been executed for this thread', + enabled: true, + action: workflowFunctions.checkWorkflowExecution, + }, { id: 'check-draft-eligibility', name: 'Check Draft Eligibility', description: 'Determines if a draft should be generated for this thread', enabled: true, condition: async (context) => { + const executionCheck = context.results?.get('check-workflow-execution'); + if (executionCheck?.alreadyExecuted) { + return false; + } return shouldGenerateDraft(context.thread, context.foundConnection); }, action: async (context) => { @@ -137,6 +155,14 @@ export const createDefaultWorkflows = (): WorkflowEngine => { action: workflowFunctions.createDraft, errorHandling: 'continue', }, + { + id: 'cleanup-workflow-execution', + name: 'Cleanup Workflow Execution', + description: 'Removes workflow execution tracking', + enabled: true, + action: workflowFunctions.cleanupWorkflowExecution, + errorHandling: 'continue', + }, ], }; @@ -144,11 +170,22 @@ export const createDefaultWorkflows = (): WorkflowEngine => { name: 'message-vectorization', description: 'Vectorizes thread messages for search and analysis', steps: [ + { + id: 'check-workflow-execution', + name: 'Check Workflow Execution', + description: 'Checks if this workflow has already been executed for this thread', + enabled: true, + action: workflowFunctions.checkWorkflowExecution, + }, { id: 'find-messages-to-vectorize', name: 'Find Messages to Vectorize', description: 'Identifies messages that need vectorization', enabled: true, + condition: async (context) => { + const executionCheck = context.results?.get('check-workflow-execution'); + return !executionCheck?.alreadyExecuted; + }, action: workflowFunctions.findMessagesToVectorize, }, { @@ -166,6 +203,14 @@ export const createDefaultWorkflows = (): WorkflowEngine => { action: workflowFunctions.upsertEmbeddings, errorHandling: 'continue', }, + { + id: 'cleanup-workflow-execution', + name: 'Cleanup Workflow Execution', + description: 'Removes workflow execution tracking', + enabled: true, + action: workflowFunctions.cleanupWorkflowExecution, + errorHandling: 'continue', + }, ], }; @@ -173,11 +218,22 @@ export const createDefaultWorkflows = (): WorkflowEngine => { name: 'thread-summary', description: 'Generates and stores thread summaries', steps: [ + { + id: 'check-workflow-execution', + name: 'Check Workflow Execution', + description: 'Checks if this workflow has already been executed for this thread', + enabled: true, + action: workflowFunctions.checkWorkflowExecution, + }, { id: 'check-existing-summary', name: 'Check Existing Summary', description: 'Checks if a thread summary already exists', enabled: true, + condition: async (context) => { + const executionCheck = context.results?.get('check-workflow-execution'); + return !executionCheck?.alreadyExecuted; + }, action: workflowFunctions.checkExistingSummary, }, { @@ -196,6 +252,14 @@ export const createDefaultWorkflows = (): WorkflowEngine => { action: workflowFunctions.upsertThreadSummary, errorHandling: 'continue', }, + { + id: 'cleanup-workflow-execution', + name: 'Cleanup Workflow Execution', + description: 'Removes workflow execution tracking', + enabled: true, + action: workflowFunctions.cleanupWorkflowExecution, + errorHandling: 'continue', + }, ], }; @@ -203,11 +267,22 @@ export const createDefaultWorkflows = (): WorkflowEngine => { name: 'label-generation', description: 'Generates and applies labels to threads', steps: [ + { + id: 'check-workflow-execution', + name: 'Check Workflow Execution', + description: 'Checks if this workflow has already been executed for this thread', + enabled: true, + action: workflowFunctions.checkWorkflowExecution, + }, { id: 'get-user-labels', name: 'Get User Labels', description: 'Retrieves user-defined labels', enabled: true, + condition: async (context) => { + const executionCheck = context.results?.get('check-workflow-execution'); + return !executionCheck?.alreadyExecuted; + }, action: workflowFunctions.getUserLabels, }, { @@ -226,6 +301,14 @@ export const createDefaultWorkflows = (): WorkflowEngine => { action: workflowFunctions.applyLabels, errorHandling: 'continue', }, + { + id: 'cleanup-workflow-execution', + name: 'Cleanup Workflow Execution', + description: 'Removes workflow execution tracking', + enabled: true, + action: workflowFunctions.cleanupWorkflowExecution, + errorHandling: 'continue', + }, ], }; diff --git a/apps/server/src/thread-workflow-utils/workflow-functions.ts b/apps/server/src/thread-workflow-utils/workflow-functions.ts index 40f38a0f16..d70c6ec223 100644 --- a/apps/server/src/thread-workflow-utils/workflow-functions.ts +++ b/apps/server/src/thread-workflow-utils/workflow-functions.ts @@ -21,6 +21,23 @@ export const workflowFunctions: Record = { return shouldGenerateDraft(context.thread, context.foundConnection); }, + checkWorkflowExecution: async (context) => { + const workflowKey = `workflow_${context.threadId}`; + const lastExecution = await env.gmail_processing_threads.get(workflowKey); + + if (lastExecution) { + console.log('[WORKFLOW_FUNCTIONS] Workflow already executed for thread:', context.threadId); + return { alreadyExecuted: true }; + } + + await env.gmail_processing_threads.put(workflowKey, Date.now().toString(), { + expirationTtl: 3600, + }); + + console.log('[WORKFLOW_FUNCTIONS] Marked workflow as executed for thread:', context.threadId); + return { alreadyExecuted: false }; + }, + analyzeEmailIntent: async (context) => { if (!context.thread.messages || context.thread.messages.length === 0) { throw new Error('Cannot analyze email intent: No messages in thread'); @@ -125,8 +142,33 @@ export const workflowFunctions: Record = { const messageIds = context.thread.messages.map((message) => message.id); console.log('[WORKFLOW_FUNCTIONS] Found message IDs:', messageIds); - const existingMessages = await env.VECTORIZE_MESSAGE.getByIds(messageIds); - console.log('[WORKFLOW_FUNCTIONS] Found existing messages:', existingMessages.length); + const batchSize = 20; + const batches = []; + for (let i = 0; i < messageIds.length; i += batchSize) { + batches.push(messageIds.slice(i, i + batchSize)); + } + + const getExistingMessagesBatch = (batch: string[]): Effect.Effect => + Effect.tryPromise(async () => { + console.log('[WORKFLOW_FUNCTIONS] Fetching batch of', batch.length, 'message IDs'); + return await env.VECTORIZE_MESSAGE.getByIds(batch); + }).pipe( + Effect.catchAll((error) => { + console.log('[WORKFLOW_FUNCTIONS] Failed to fetch batch:', error); + return Effect.succeed([]); + }), + ); + + const batchEffects = batches.map(getExistingMessagesBatch); + const program = Effect.all(batchEffects, { concurrency: 3 }).pipe( + Effect.map((results) => { + const allExistingMessages = results.flat(); + console.log('[WORKFLOW_FUNCTIONS] Found existing messages:', allExistingMessages.length); + return allExistingMessages; + }), + ); + + const existingMessages = await Effect.runPromise(program); const existingMessageIds = new Set(existingMessages.map((message: any) => message.id)); const messagesToVectorize = context.thread.messages.filter( @@ -248,6 +290,16 @@ export const workflowFunctions: Record = { return { upserted: vectorizeResult.embeddings.length }; }, + cleanupWorkflowExecution: async (context) => { + const workflowKey = `workflow_${context.threadId}`; + await env.gmail_processing_threads.delete(workflowKey); + console.log( + '[WORKFLOW_FUNCTIONS] Cleaned up workflow execution tracking for thread:', + context.threadId, + ); + return { cleaned: true }; + }, + checkExistingSummary: async (context) => { console.log('[WORKFLOW_FUNCTIONS] Getting existing thread summary for:', context.threadId); const threadSummary = await env.VECTORIZE.getByIds([context.threadId.toString()]); diff --git a/apps/server/wrangler.jsonc b/apps/server/wrangler.jsonc index 9e2d293623..879d87e370 100644 --- a/apps/server/wrangler.jsonc +++ b/apps/server/wrangler.jsonc @@ -47,6 +47,10 @@ "name": "THINKING_MCP", "class_name": "ThinkingMCP", }, + { + "name": "WORKFLOW_RUNNER", + "class_name": "WorkflowRunner", + }, ], }, "queues": { @@ -94,6 +98,10 @@ "tag": "v6", "new_sqlite_classes": ["ThinkingMCP"], }, + { + "tag": "v7", + "new_sqlite_classes": ["WorkflowRunner"], + }, ], "observability": { @@ -199,6 +207,10 @@ "name": "THINKING_MCP", "class_name": "ThinkingMCP", }, + { + "name": "WORKFLOW_RUNNER", + "class_name": "WorkflowRunner", + }, ], }, "r2_buckets": [ @@ -256,6 +268,10 @@ "tag": "v7", "new_sqlite_classes": ["ThinkingMCP"], }, + { + "tag": "v8", + "new_sqlite_classes": ["WorkflowRunner"], + }, ], "observability": { "enabled": true, @@ -364,6 +380,10 @@ "name": "THINKING_MCP", "class_name": "ThinkingMCP", }, + { + "name": "WORKFLOW_RUNNER", + "class_name": "WorkflowRunner", + }, ], }, "queues": { @@ -415,6 +435,10 @@ "tag": "v7", "new_sqlite_classes": ["ThinkingMCP"], }, + { + "tag": "v8", + "new_sqlite_classes": ["WorkflowRunner"], + }, ], "vars": { "NODE_ENV": "production",