diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index ea45b65a52..19886f2aa5 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -691,7 +691,7 @@ export default class extends WorkerEntrypoint { await env.thread_queue.send({ providerId, historyId: body.historyId, - subscriptionName: subHeader!, + subscriptionName: subHeader, }); } catch (error) { console.error('Error sending to thread queue', error, { @@ -705,12 +705,6 @@ export default class extends WorkerEntrypoint { }); async fetch(request: Request): Promise { - if (request.url.includes('/zero/durable-mailbox')) { - const res = await routePartykitRequest(request, env as unknown as Record, { - prefix: 'zero', - }); - if (res) return res; - } return this.app.fetch(request, this.env, this.ctx); } @@ -723,8 +717,6 @@ export default class extends WorkerEntrypoint { batch.messages.map(async (msg: Message) => { const connectionId = msg.body.connectionId; const providerId = msg.body.providerId; - console.log('connectionId', connectionId); - console.log('providerId', providerId); try { await enableBrainFunction({ id: connectionId, providerId }); } catch (error) { @@ -742,7 +734,6 @@ export default class extends WorkerEntrypoint { return; } case batch.queue.startsWith('thread-queue'): { - console.log('batch', batch); try { await Promise.all( batch.messages.map(async (msg: Message) => { diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts index 1fcc4b44f0..669add8fe8 100644 --- a/apps/server/src/pipelines.effect.ts +++ b/apps/server/src/pipelines.effect.ts @@ -65,7 +65,7 @@ type MainWorkflowError = const validateArguments = ( params: MainWorkflowParams, - serviceAccount: any, + serviceAccount: { project_id: string }, ): Effect.Effect => Effect.gen(function* () { yield* Console.log('[MAIN_WORKFLOW] Validating arguments'); @@ -87,6 +87,12 @@ const validateArguments = ( const override = false; +/** + * This function runs the main workflow. The main workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. + * It validates the subscription name and extracts the connection ID. + * @param params + * @returns + */ export const runMainWorkflow = ( params: MainWorkflowParams, ): Effect.Effect => @@ -265,36 +271,82 @@ export const runZeroWorkflow = ( }); // Extract thread IDs from history - const threadIds = new Set(); + const threadsChanged = new Set(); + const threadsAdded = new Set(); history.forEach((historyItem) => { if (historyItem.messagesAdded) { historyItem.messagesAdded.forEach((messageAdded) => { if (messageAdded.message?.threadId) { - threadIds.add(messageAdded.message.threadId); + threadsChanged.add(messageAdded.message.threadId); + threadsAdded.add(messageAdded.message.threadId); } }); } if (historyItem.labelsAdded) { historyItem.labelsAdded.forEach((labelAdded) => { if (labelAdded.message?.threadId) { - threadIds.add(labelAdded.message.threadId); + threadsChanged.add(labelAdded.message.threadId); } }); } if (historyItem.labelsRemoved) { historyItem.labelsRemoved.forEach((labelRemoved) => { if (labelRemoved.message?.threadId) { - threadIds.add(labelRemoved.message.threadId); + threadsChanged.add(labelRemoved.message.threadId); } }); } }); - yield* Console.log('[ZERO_WORKFLOW] Found unique thread IDs:', Array.from(threadIds)); + yield* Console.log( + '[ZERO_WORKFLOW] Found unique thread IDs:', + Array.from(threadsChanged), + Array.from(threadsAdded), + ); + + if (threadsAdded.size > 0) { + const threadWorkflowParams = Array.from(threadsAdded); + + // Sync threads with proper error handling - use allSuccesses to collect successful syncs + const syncResults = yield* Effect.allSuccesses( + threadWorkflowParams.map((threadId) => + Effect.tryPromise({ + try: async () => { + const result = await agent.syncThread({ threadId }); + console.log(`[ZERO_WORKFLOW] Successfully synced thread ${threadId}`); + return { threadId, result }; + }, + catch: (error) => { + console.error(`[ZERO_WORKFLOW] Failed to sync thread ${threadId}:`, error); + // Let this effect fail so allSuccesses will exclude it + throw new Error( + `Failed to sync thread ${threadId}: ${error instanceof Error ? error.message : String(error)}`, + ); + }, + }), + ), + { concurrency: 1 }, // Limit concurrency to avoid rate limits + ); + + const syncedCount = syncResults.length; + const failedCount = threadWorkflowParams.length - syncedCount; + + if (failedCount > 0) { + yield* Console.log( + `[ZERO_WORKFLOW] Warning: ${failedCount}/${threadWorkflowParams.length} thread syncs failed. Successfully synced: ${syncedCount}`, + ); + // Continue with processing - sync failures shouldn't stop the entire workflow + // The thread processing will continue with whatever data is available + } else { + yield* Console.log(`[ZERO_WORKFLOW] Successfully synced all ${syncedCount} threads`); + } + + yield* Console.log('[ZERO_WORKFLOW] Synced threads:', syncResults); + } // Process all threads concurrently using Effect.all - if (threadIds.size > 0) { - const threadWorkflowParams = Array.from(threadIds).map((threadId) => ({ + if (threadsChanged.size > 0) { + const threadWorkflowParams = Array.from(threadsChanged).map((threadId) => ({ connectionId, threadId, providerId: foundConnection.providerId, @@ -339,7 +391,7 @@ export const runZeroWorkflow = ( ); }), ), - { concurrency: 1 }, // Process up to 5 threads concurrently + { concurrency: 1, discard: true }, // Process up to 5 threads concurrently ); yield* Console.log('[ZERO_WORKFLOW] All thread workflows completed:', threadResults.length); @@ -407,6 +459,11 @@ type ThreadWorkflowError = | { _tag: 'GmailApiError'; error: unknown } | { _tag: 'VectorizationError'; error: unknown }; +/** + * Runs the main workflow for processing a thread. The workflow is responsible for processing incoming messages from a Pub/Sub subscription and passing them to the appropriate pipeline. + * @param params + * @returns + */ export const runThreadWorkflow = ( params: ThreadWorkflowParams, ): Effect.Effect => @@ -613,12 +670,9 @@ export const runThreadWorkflow = ( { role: 'system', content: SummarizeMessagePrompt }, { role: 'user', content: prompt }, ]; - const response: any = await env.AI.run( - '@cf/meta/llama-4-scout-17b-16e-instruct', - { - messages, - }, - ); + const response = await env.AI.run('@cf/meta/llama-4-scout-17b-16e-instruct', { + messages, + }); console.log( `[THREAD_WORKFLOW] Summary generated for message ${message.id}:`, response, @@ -696,7 +750,7 @@ export const runThreadWorkflow = ( return null; } console.log('[THREAD_WORKFLOW] Found existing thread summary'); - return threadSummary[0].metadata as any; + return threadSummary[0].metadata as { summary: string; lastMsg: string }; }, catch: (error) => ({ _tag: 'VectorizationError' as const, error }), }); diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index 5790deab17..f773ba3905 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -15,11 +15,11 @@ */ import { - type StreamTextOnFinishCallback, - createDataStreamResponse, - streamText, appendResponseMessages, + createDataStreamResponse, generateText, + streamText, + type StreamTextOnFinishCallback, } from 'ai'; import { IncomingMessageType, @@ -30,10 +30,10 @@ import { import { EPrompts, type IOutgoingMessage, - type ParsedMessage, type ISnoozeBatch, + type ParsedMessage, } from '../../types'; -import type { MailManager, IGetThreadResponse, IGetThreadsResponse } from '../../lib/driver/types'; +import type { IGetThreadResponse, IGetThreadsResponse, MailManager } from '../../lib/driver/types'; import { DurableObjectOAuthClientProvider } from 'agents/mcp/do-oauth-client-provider'; import { AiChatPrompt, GmailSearchAssistantSystemPrompt } from '../../lib/prompts'; import { connectionToDriver, getZeroSocketAgent } from '../../lib/server-utils'; @@ -852,8 +852,7 @@ export class ZeroDriver extends AIChatAgent { `; if (!result || result.length === 0) { - const res = await this.queue('syncThread', { threadId: id }); - console.log('res', res); + await this.queue('syncThread', { threadId: id }); return { messages: [], latest: undefined,