diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts index ee9fcf879f..78208bfa6b 100644 --- a/apps/server/src/pipelines.effect.ts +++ b/apps/server/src/pipelines.effect.ts @@ -17,12 +17,17 @@ import { SummarizeThread, ThreadLabels, } from './lib/brain.fallback.prompts'; +import { + generateAutomaticDraft, + shouldGenerateDraft, + analyzeEmailIntent, +} from './thread-workflow-utils'; import { defaultLabels, EPrompts, EProviders, type ParsedMessage, type Sender } from './types'; import { getZeroAgent } from './lib/server-utils'; import { type gmail_v1 } from '@googleapis/gmail'; +import { connection, summary } from './db/schema'; import { getPromptName } from './pipelines'; import { env } from 'cloudflare:workers'; -import { connection } from './db/schema'; import { Effect, Console } from 'effect'; import * as cheerio from 'cheerio'; import { eq } from 'drizzle-orm'; @@ -30,7 +35,7 @@ import { createDb } from './db'; const showLogs = true; -const log = (message: string, ...args: any[]) => { +export const log = (message: string, ...args: any[]) => { if (showLogs) { console.log(message, ...args); return message; @@ -415,7 +420,6 @@ export const runThreadWorkflow = ( .select() .from(connection) .where(eq(connection.id, connectionId.toString())); - await conn.end(); if (!foundConnection) { throw new Error(`Connection not found ${connectionId}`); } @@ -448,6 +452,90 @@ export const runThreadWorkflow = ( return 'Thread has no messages'; } + const autoDraftId = yield* Effect.tryPromise({ + try: async () => { + if (!shouldGenerateDraft(thread, foundConnection)) { + console.log('[THREAD_WORKFLOW] Skipping draft generation for thread:', threadId); + return null; + } + + const latestMessage = thread.messages[thread.messages.length - 1]; + const emailIntent = analyzeEmailIntent(latestMessage); + + console.log('[THREAD_WORKFLOW] Analyzed email intent:', { + threadId, + isQuestion: emailIntent.isQuestion, + isRequest: emailIntent.isRequest, + isMeeting: emailIntent.isMeeting, + isUrgent: emailIntent.isUrgent, + }); + + if ( + !emailIntent.isQuestion && + !emailIntent.isRequest && + !emailIntent.isMeeting && + !emailIntent.isUrgent + ) { + console.log( + '[THREAD_WORKFLOW] Email does not require a response, skipping draft generation', + ); + return null; + } + + console.log('[THREAD_WORKFLOW] Generating automatic draft for thread:', threadId); + const draftContent = await generateAutomaticDraft( + connectionId.toString(), + thread, + foundConnection, + ); + + if (draftContent) { + const latestMessage = thread.messages[thread.messages.length - 1]; + + const replyTo = latestMessage.sender?.email || ''; + const cc = + latestMessage.cc + ?.map((r) => r.email) + .filter((email) => email && email !== foundConnection.email) || []; + + const originalSubject = latestMessage.subject || ''; + const replySubject = originalSubject.startsWith('Re: ') + ? originalSubject + : `Re: ${originalSubject}`; + + const draftData = { + to: replyTo, + cc: cc.join(', '), + bcc: '', + subject: replySubject, + message: draftContent, + attachments: [], + id: null, + threadId: threadId.toString(), + fromEmail: foundConnection.email, + }; + + try { + const createdDraft = await agent.createDraft(draftData); + console.log('[THREAD_WORKFLOW] Created automatic draft:', { + threadId, + draftId: createdDraft?.id, + }); + return createdDraft?.id || null; + } catch (error) { + console.log('[THREAD_WORKFLOW] Failed to create automatic draft:', { + threadId, + error: error instanceof Error ? error.message : String(error), + }); + return null; + } + } + + return null; + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }); + yield* Console.log('[THREAD_WORKFLOW] Processing thread messages and vectorization'); const messagesToVectorize = yield* Effect.tryPromise({ @@ -805,6 +893,14 @@ export const runThreadWorkflow = ( catch: (error) => ({ _tag: 'DatabaseError' as const, error }), }).pipe(Effect.orElse(() => Effect.succeed(null))); + yield* Effect.tryPromise({ + try: async () => { + await conn.end(); + console.log('[THREAD_WORKFLOW] Closed database connection'); + }, + catch: (error) => ({ _tag: 'DatabaseError' as const, error }), + }).pipe(Effect.orElse(() => Effect.succeed(null))); + yield* Console.log('[THREAD_WORKFLOW] Thread processing complete'); return 'Thread workflow completed successfully'; } else { diff --git a/apps/server/src/thread-workflow-utils/index.ts b/apps/server/src/thread-workflow-utils/index.ts new file mode 100644 index 0000000000..9f8e2f00d5 --- /dev/null +++ b/apps/server/src/thread-workflow-utils/index.ts @@ -0,0 +1,111 @@ +import type { IGetThreadResponse } from '../lib/driver/types'; +import { composeEmail } from '../trpc/routes/ai/compose'; +import { type ParsedMessage } from '../types'; +import { log } from '../pipelines.effect'; +import { connection } from '../db/schema'; + +const shouldGenerateDraft = ( + thread: IGetThreadResponse, + foundConnection: typeof connection.$inferSelect, +): boolean => { + if (!thread.messages || thread.messages.length === 0) return false; + + const latestMessage = thread.messages[thread.messages.length - 1]; + + if (latestMessage.sender?.email?.toLowerCase() === foundConnection.email?.toLowerCase()) { + return false; + } + + if ( + latestMessage.sender?.email?.toLowerCase().includes('no-reply') || + latestMessage.sender?.email?.toLowerCase().includes('noreply') || + latestMessage.sender?.email?.toLowerCase().includes('donotreply') || + latestMessage.sender?.email?.toLowerCase().includes('do-not-reply') || + latestMessage.subject?.toLowerCase().includes('newsletter') || + latestMessage.subject?.toLowerCase().includes('unsubscribe') || + latestMessage.subject?.toLowerCase().includes('notification') || + latestMessage.decodedBody?.toLowerCase().includes('do not reply') || + latestMessage.decodedBody?.toLowerCase().includes('this is an automated') + ) { + return false; + } + + if (latestMessage.receivedOn) { + const messageDate = new Date(latestMessage.receivedOn); + const sevenDaysAgo = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); + if (messageDate < sevenDaysAgo) { + return false; + } + } + + return true; +}; + +const analyzeEmailIntent = (message: ParsedMessage) => { + const content = (message.decodedBody || message.body || '').toLowerCase(); + const subject = (message.subject || '').toLowerCase(); + + return { + isQuestion: + /\?/.test(content) || + /\b(what|when|where|how|why|can you|could you|would you)\b/.test(content), + isRequest: /\b(please|request|need|require|can you|could you|would you mind)\b/.test(content), + isMeeting: /\b(meeting|schedule|calendar|appointment|call|zoom|teams|meet)\b/.test( + content + ' ' + subject, + ), + isUrgent: /\b(urgent|asap|immediate|priority|rush)\b/.test(content + ' ' + subject), + }; +}; + +const generateAutomaticDraft = async ( + connectionId: string, + thread: IGetThreadResponse, + foundConnection: typeof connection.$inferSelect, +): Promise => { + try { + const latestMessage = thread.messages[thread.messages.length - 1]; + + const emailAnalysis = analyzeEmailIntent(latestMessage); + + let prompt = 'Generate a professional and contextually appropriate reply to this email thread.'; + + if (emailAnalysis.isQuestion) { + prompt = + 'This email contains questions. Generate a helpful response that addresses the questions asked. Be thorough but concise.'; + } else if (emailAnalysis.isRequest) { + prompt = + 'This email contains a request. Generate a response that acknowledges the request and provides next steps or asks for clarification if needed.'; + } else if (emailAnalysis.isMeeting) { + prompt = + 'This email is about scheduling or meetings. Generate an appropriate response about availability, meeting coordination, or confirmation.'; + } else if (emailAnalysis.isUrgent) { + prompt = + 'This email appears urgent. Generate a prompt acknowledgment response that addresses the urgency and provides next steps.'; + } + + const threadMessages = thread.messages.map((message) => ({ + from: message.sender?.name || message.sender?.email || 'Unknown', + to: message.to?.map((r) => r.name || r.email) || [], + cc: message.cc?.map((r) => r.name || r.email) || [], + subject: message.subject || '', + body: message.decodedBody || message.body || '', + })); + + const draftContent = await composeEmail({ + prompt, + threadMessages, + username: foundConnection.name || foundConnection.email || 'User', + connectionId, + }); + + return draftContent; + } catch (error) { + log('[THREAD_WORKFLOW] Failed to generate automatic draft:', { + connectionId, + error: error instanceof Error ? error.message : String(error), + }); + return null; + } +}; + +export { analyzeEmailIntent, generateAutomaticDraft, shouldGenerateDraft };