From b919a2417ecebc71b9259999f0291f6ebdea5824 Mon Sep 17 00:00:00 2001 From: Aj Wazzan Date: Wed, 2 Jul 2025 22:17:36 -0700 Subject: [PATCH] Slight rollback --- apps/server/src/pipelines.ts | 23 ++++--- apps/server/src/routes/chat.ts | 100 ++++++++++++++-------------- apps/server/src/trpc/routes/mail.ts | 38 +++++------ 3 files changed, 81 insertions(+), 80 deletions(-) diff --git a/apps/server/src/pipelines.ts b/apps/server/src/pipelines.ts index 18ef0b4725..6a19acf62e 100644 --- a/apps/server/src/pipelines.ts +++ b/apps/server/src/pipelines.ts @@ -59,7 +59,7 @@ export class MainWorkflow extends WorkflowEntrypoint { const match = subscriptionName.toString().match(regex); if (!match) { log('[MAIN_WORKFLOW] Invalid subscription name:', subscriptionName); - throw new Error('Invalid subscription name'); + throw new Error(`Invalid subscription name ${subscriptionName}`); } const [, connectionId] = match; log('[MAIN_WORKFLOW] Extracted connectionId:', connectionId); @@ -69,11 +69,11 @@ export class MainWorkflow extends WorkflowEntrypoint { const status = await env.subscribed_accounts.get(`${connectionId}__${providerId}`); if (!status || status === 'pending') { log('[MAIN_WORKFLOW] Connection id is missing or not enabled %s', connectionId); - throw new Error('Connection is not enabled'); + return 'Connection is not enabled'; } if (!isValidUUID(connectionId)) { log('[MAIN_WORKFLOW] Invalid connection id format:', connectionId); - throw new Error('Invalid connection id'); + return 'Invalid connection id'; } const previousHistoryId = await env.gmail_history_id.get(connectionId); if (providerId === EProviders.google) { @@ -164,9 +164,9 @@ export class ZeroWorkflow extends WorkflowEntrypoint { .select() .from(connection) .where(eq(connection.id, connectionId.toString())); - if (!foundConnection) throw new Error('Connection not found'); + if (!foundConnection) throw new Error(`Connection not found ${connectionId}`); if (!foundConnection.accessToken || !foundConnection.refreshToken) - throw new Error('Connection is not authorized'); + throw new Error(`Connection is not authorized ${connectionId}`); log('[ZERO_WORKFLOW] Found connection:', foundConnection.id); return foundConnection; }); @@ -182,7 +182,7 @@ export class ZeroWorkflow extends WorkflowEntrypoint { const { history } = await driver.listHistory( historyId.toString(), ); - if (!history.length) throw new Error('No history found'); + if (!history.length) throw new Error(`No history found ${historyId} ${connectionId}`); log('[ZERO_WORKFLOW] Found history entries:', history.length); return history; } catch (error) { @@ -386,9 +386,9 @@ export class ThreadWorkflow extends WorkflowEntrypoint { .from(connection) .where(eq(connection.id, connectionId.toString())); this.ctx.waitUntil(conn.end()); - if (!foundConnection) throw new Error('Connection not found'); + if (!foundConnection) throw new Error(`Connection not found ${connectionId}`); if (!foundConnection.accessToken || !foundConnection.refreshToken) - throw new Error('Connection is not authorized'); + throw new Error(`Connection is not authorized ${connectionId}`); log('[THREAD_WORKFLOW] Found connection:', foundConnection.id); return foundConnection; }, @@ -441,7 +441,7 @@ export class ThreadWorkflow extends WorkflowEntrypoint { return step.do(`[ZERO] Vectorize Message ${message.id}`, async () => { log('[THREAD_WORKFLOW] Converting message to XML:', message.id); const prompt = await messageToXML(message); - if (!prompt) throw new Error('Message has no prompt'); + if (!prompt) throw new Error(`Message has no prompt ${message.id}`); log('[THREAD_WORKFLOW] Got XML prompt for message:', message.id); log('[THREAD_WORKFLOW] Message:', message); @@ -512,7 +512,8 @@ export class ThreadWorkflow extends WorkflowEntrypoint { }, ); - if (!embeddingVector) throw new Error('Message Embedding vector is null'); + if (!embeddingVector) + throw new Error(`Message Embedding vector is null ${message.id}`); return { id: message.id, @@ -688,7 +689,7 @@ export class ThreadWorkflow extends WorkflowEntrypoint { }, ); - if (!embeddingVector) throw new Error('Thread Embedding vector is null'); + if (!embeddingVector) return console.error('Thread Embedding vector is null'); try { log('[THREAD_WORKFLOW] Upserting thread vector'); diff --git a/apps/server/src/routes/chat.ts b/apps/server/src/routes/chat.ts index 35d36bd758..75df711a59 100644 --- a/apps/server/src/routes/chat.ts +++ b/apps/server/src/routes/chat.ts @@ -189,19 +189,19 @@ export class AgentRpcDO extends RpcTarget { async markThreadsRead(threadIds: string[]) { const result = await this.mainDo.markThreadsRead(threadIds); - await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } async markThreadsUnread(threadIds: string[]) { const result = await this.mainDo.markThreadsUnread(threadIds); - await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } async modifyLabels(threadIds: string[], addLabelIds: string[], removeLabelIds: string[]) { const result = await this.mainDo.modifyLabels(threadIds, addLabelIds, removeLabelIds); - await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } @@ -233,13 +233,13 @@ export class AgentRpcDO extends RpcTarget { async markAsRead(threadIds: string[]) { const result = await this.mainDo.markAsRead(threadIds); - await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } async markAsUnread(threadIds: string[]) { const result = await this.mainDo.markAsUnread(threadIds); - await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } @@ -310,19 +310,19 @@ export class ZeroAgent extends AIChatAgent { constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); if (shouldDropTables) this.dropTables(); - this.sql` - CREATE TABLE IF NOT EXISTS threads ( - id TEXT PRIMARY KEY, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - thread_id TEXT NOT NULL, - provider_id TEXT NOT NULL, - latest_sender TEXT, - latest_received_on TEXT, - latest_subject TEXT, - latest_label_ids TEXT - ); - `; + // this.sql` + // CREATE TABLE IF NOT EXISTS threads ( + // id TEXT PRIMARY KEY, + // created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + // updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + // thread_id TEXT NOT NULL, + // provider_id TEXT NOT NULL, + // latest_sender TEXT, + // latest_received_on TEXT, + // latest_subject TEXT, + // latest_label_ids TEXT + // ); + // `; } async dropTables() { @@ -388,10 +388,10 @@ export class ZeroAgent extends AIChatAgent { }); if (_connection) this.driver = connectionToDriver(_connection); this.ctx.waitUntil(conn.end()); - this.ctx.waitUntil(this.syncThreads('inbox')); - this.ctx.waitUntil(this.syncThreads('sent')); - this.ctx.waitUntil(this.syncThreads('spam')); - this.ctx.waitUntil(this.syncThreads('archive')); + // this.ctx.waitUntil(this.syncThreads('inbox')); + // this.ctx.waitUntil(this.syncThreads('sent')); + // this.ctx.waitUntil(this.syncThreads('spam')); + // this.ctx.waitUntil(this.syncThreads('archive')); } } @@ -515,34 +515,34 @@ export class ZeroAgent extends AIChatAgent { this.cancelChatRequest(data.id); break; } - case IncomingMessageType.Mail_List: { - const result = await this.getThreadsFromDB({ - labelIds: data.labelIds, - folder: data.folder, - q: data.query, - max: data.maxResults, - cursor: data.pageToken, - }); - this.currentFolder = data.folder; - connection.send( - JSON.stringify({ - type: OutgoingMessageType.Mail_List, - result, - }), - ); - break; - } - case IncomingMessageType.Mail_Get: { - const result = await this.getThreadFromDB(data.threadId); - connection.send( - JSON.stringify({ - type: OutgoingMessageType.Mail_Get, - result, - threadId: data.threadId, - }), - ); - break; - } + // case IncomingMessageType.Mail_List: { + // const result = await this.getThreadsFromDB({ + // labelIds: data.labelIds, + // folder: data.folder, + // q: data.query, + // max: data.maxResults, + // cursor: data.pageToken, + // }); + // this.currentFolder = data.folder; + // connection.send( + // JSON.stringify({ + // type: OutgoingMessageType.Mail_List, + // result, + // }), + // ); + // break; + // } + // case IncomingMessageType.Mail_Get: { + // const result = await this.getThreadFromDB(data.threadId); + // connection.send( + // JSON.stringify({ + // type: OutgoingMessageType.Mail_Get, + // result, + // threadId: data.threadId, + // }), + // ); + // break; + // } } } } diff --git a/apps/server/src/trpc/routes/mail.ts b/apps/server/src/trpc/routes/mail.ts index 2450d62bdd..623ffcb2db 100644 --- a/apps/server/src/trpc/routes/mail.ts +++ b/apps/server/src/trpc/routes/mail.ts @@ -36,7 +36,7 @@ export const mailRouter = router({ .query(async ({ input, ctx }) => { const { activeConnection } = ctx; const agent = await getZeroAgent(activeConnection.id); - return await agent.getThreadFromDB(input.id); + return await agent.getThread(input.id); }), count: activeDriverProcedure .output( @@ -75,24 +75,24 @@ export const mailRouter = router({ }); return drafts; } - if (q) { - const threadsResponse = await agent.listThreads({ - labelIds: labelIds, - maxResults: max, - pageToken: cursor, - query: q, - folder, - }); - return threadsResponse; - } - const folderLabelId = getFolderLabelId(folder); - const labelIdsToUse = folderLabelId ? [...labelIds, folderLabelId] : labelIds; - const threadsResponse = await agent.getThreadsFromDB({ - labelIds: labelIdsToUse, - max: max, - cursor: cursor, + // if (q) { + const threadsResponse = await agent.listThreads({ + labelIds: labelIds, + maxResults: max, + pageToken: cursor, + query: q, + folder, }); return threadsResponse; + // } + // const folderLabelId = getFolderLabelId(folder); + // const labelIdsToUse = folderLabelId ? [...labelIds, folderLabelId] : labelIds; + // const threadsResponse = await agent.getThreadsFromDB({ + // labelIds: labelIdsToUse, + // max: max, + // cursor: cursor, + // }); + // return threadsResponse; }), markAsRead: activeDriverProcedure .input( @@ -173,7 +173,7 @@ export const mailRouter = router({ } const threadResults: PromiseSettledResult<{ messages: { tags: { name: string }[] }[] }>[] = - await Promise.allSettled(threadIds.map((id) => agent.getThreadFromDB(id))); + await Promise.allSettled(threadIds.map((id) => agent.getThread(id))); let anyStarred = false; let processedThreads = 0; @@ -217,7 +217,7 @@ export const mailRouter = router({ } const threadResults: PromiseSettledResult<{ messages: { tags: { name: string }[] }[] }>[] = - await Promise.allSettled(threadIds.map((id) => agent.getThreadFromDB(id))); + await Promise.allSettled(threadIds.map((id) => agent.getThread(id))); let anyImportant = false; let processedThreads = 0;