diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index af4cda0caf..9637045d44 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -51,6 +51,7 @@ import { processToolCalls } from './utils'; import { env } from 'cloudflare:workers'; import type { Connection } from 'agents'; import { openai } from '@ai-sdk/openai'; +import { parseISO } from 'date-fns'; import { createDb } from '../../db'; import { DriverRpcDO } from './rpc'; import { eq } from 'drizzle-orm'; @@ -58,9 +59,9 @@ import { Effect } from 'effect'; const decoder = new TextDecoder(); -const shouldDropTables = false; +const shouldDropTables = true; const maxCount = 20; -const shouldLoop = env.THREAD_SYNC_LOOP !== 'false'; +const shouldLoop = true; export class ZeroDriver extends AIChatAgent { private foldersInSync: Map = new Map(); private syncThreadsInProgress: Map = new Map(); @@ -168,10 +169,8 @@ export class ZeroDriver extends AIChatAgent { if (_connection) this.driver = connectionToDriver(_connection); this.ctx.waitUntil(conn.end()); this.ctx.waitUntil(this.syncThreads('inbox')); - if (env.NODE_ENV === 'production') { - this.ctx.waitUntil(this.syncThreads('sent')); - this.ctx.waitUntil(this.syncThreads('spam')); - } + this.ctx.waitUntil(this.syncThreads('sent')); + this.ctx.waitUntil(this.syncThreads('spam')); } } async rawListThreads(params: { @@ -393,7 +392,12 @@ export class ZeroDriver extends AIChatAgent { if (latest) { // Convert receivedOn to ISO format for proper sorting - const normalizedReceivedOn = new Date(latest.receivedOn).toISOString(); + let normalizedReceivedOn: string; + try { + normalizedReceivedOn = new Date(latest.receivedOn).toISOString(); + } catch (error) { + normalizedReceivedOn = new Date().toISOString(); + } await env.THREADS_BUCKET.put(this.getThreadKey(threadId), JSON.stringify(threadData), { customMetadata: { @@ -476,38 +480,74 @@ export class ZeroDriver extends AIChatAgent { this.foldersInSync.set(folder, true); - try { - let totalSynced = 0; + const self = this; + + const syncSingleThread = (threadId: string) => + Effect.gen(function* () { + yield* Effect.sleep(500); // Rate limiting delay + return yield* withRetry(Effect.tryPromise(() => self.syncThread({ threadId }))); + }).pipe( + Effect.catchAll((error) => { + console.error(`Failed to sync thread ${threadId}:`, error); + return Effect.succeed(null); + }), + ); + + const syncProgram = Effect.gen( + function* () { + let totalSynced = 0; + let pageToken: string | null = null; + let hasMore = true; + let _pageCount = 0; + + while (hasMore) { + _pageCount++; + + // Rate limiting delay between pages + yield* Effect.sleep(2000); + + const result: IGetThreadsResponse = yield* Effect.tryPromise(() => + self.listWithRetry({ + folder, + maxResults: maxCount, + pageToken: pageToken || undefined, + }), + ); - // Process threads one by one without buffering - for await (const thread of this.streamThreads(folder)) { - try { - const id = await this.queue('syncThread', { threadId: thread.id }); - console.log(`Synced thread ${thread.id} to queue ${id}`); - totalSynced++; - } catch (error) { - console.error(`Failed to sync thread ${thread.id}:`, error); + // Process threads with controlled concurrency to avoid rate limits + const threadIds = result.threads.map((thread) => thread.id); + const syncEffects = threadIds.map(syncSingleThread); + + yield* Effect.all(syncEffects, { concurrency: 1, discard: true }); + + totalSynced += result.threads.length; + pageToken = result.nextPageToken; + hasMore = pageToken !== null && shouldLoop; } - // // Broadcast progress after each thread - // this.agent.broadcastChatMessage({ - // type: OutgoingMessageType.Mail_List, - // folder, - // }); - } + return { synced: totalSynced }; + }.bind(this), + ); - return { synced: totalSynced }; + try { + const result = await Effect.runPromise( + syncProgram.pipe( + Effect.ensuring( + Effect.sync(() => { + console.log('Setting isSyncing to false'); + this.foldersInSync.delete(folder); + this.agent?.broadcastChatMessage({ + type: OutgoingMessageType.Mail_List, + folder, + }); + }), + ), + ), + ); + return result; } catch (error) { console.error('Failed to sync inbox threads:', error); throw error; - } finally { - console.log('Setting isSyncing to false'); - this.foldersInSync.delete(folder); - if (this.agent) - this.agent.broadcastChatMessage({ - type: OutgoingMessageType.Mail_List, - folder, - }); } }