diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 7b3d491d10..88b11e2d37 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -187,10 +187,7 @@ export class DbRpcDO extends RpcTarget { return await this.mainDo.deleteEmailTemplate(this.userId, templateId); } - async updateEmailTemplate( - templateId: string, - data: Partial, - ) { + async updateEmailTemplate(templateId: string, data: Partial) { return await this.mainDo.updateEmailTemplate(this.userId, templateId, data); } } @@ -522,7 +519,10 @@ class ZeroDB extends DurableObject { }); } - async createEmailTemplate(userId: string, payload: Omit) { + async createEmailTemplate( + userId: string, + payload: Omit, + ) { return await this.db .insert(emailTemplate) .values({ diff --git a/apps/server/src/pipelines.ts b/apps/server/src/pipelines.ts index efd118edce..35b0c375ee 100644 --- a/apps/server/src/pipelines.ts +++ b/apps/server/src/pipelines.ts @@ -393,7 +393,7 @@ export class WorkflowRunner extends DurableObject { { concurrency: 6 }, // Limit concurrency to avoid rate limits ); - const syncedCount = syncResults.length; + const syncedCount = syncResults.filter((result) => result.result.success).length; const failedCount = threadWorkflowParams.length - syncedCount; if (failedCount > 0) { @@ -627,37 +627,15 @@ export class WorkflowRunner extends DurableObject { // Execute configured workflows using the workflow engine const workflowResults = yield* Effect.tryPromise({ try: async () => { - const allResults = new Map(); - const allErrors = new Map(); - // Execute all workflows registered in the engine const workflowNames = workflowEngine.getWorkflowNames(); - for (const workflowName of workflowNames) { - console.log(`[THREAD_WORKFLOW] Executing workflow: ${workflowName}`); - - try { - const { results, errors } = await workflowEngine.executeWorkflow( - workflowName, - workflowContext, - ); - - // Merge results and errors using efficient Map operations - results.forEach((value, key) => allResults.set(key, value)); - errors.forEach((value, key) => allErrors.set(key, value)); - - console.log(`[THREAD_WORKFLOW] Completed workflow: ${workflowName}`); - } catch (error) { - console.error( - `[THREAD_WORKFLOW] Failed to execute workflow ${workflowName}:`, - error, - ); - const errorObj = error instanceof Error ? error : new Error(String(error)); - allErrors.set(workflowName, errorObj); - } - } + const { results, errors } = await workflowEngine.executeWorkflowChain( + workflowNames, + workflowContext, + ); - return { results: allResults, errors: allErrors }; + return { results, errors }; }, catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }), }); diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index d8118be985..c8ef379f99 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -1779,7 +1779,7 @@ export class ZeroAgent extends AIChatAgent { private getDataStreamResponse( onFinish: StreamTextOnFinishCallback<{}>, options?: { - abortSignal: AbortSignal | undefined; + abortSignal?: AbortSignal; connectionId?: string; }, ) { @@ -1822,7 +1822,10 @@ export class ZeroAgent extends AIChatAgent { onError: (error) => { console.error('Error in streamText', error); }, - system: await getPrompt(getPromptName(connectionId, EPrompts.Chat), AiChatPrompt(currentThreadId)), + system: await getPrompt( + getPromptName(connectionId, EPrompts.Chat), + AiChatPrompt(currentThreadId), + ), }); result.mergeIntoDataStream(dataStream); @@ -1912,7 +1915,9 @@ export class ZeroAgent extends AIChatAgent { await this.persistMessages(finalMessages, [connection.id]); this.removeAbortController(chatMessageId); }, - abortSignal ? { abortSignal, connectionId: connection.id } : { connectionId: connection.id }, + abortSignal + ? { abortSignal, connectionId: connection.id } + : { connectionId: connection.id }, ); if (response) { @@ -1955,7 +1960,9 @@ export class ZeroAgent extends AIChatAgent { } case IncomingMessageType.ThreadIdUpdate: { this.connectionThreadIds.set(connection.id, data.threadId); - console.log(`[ZeroAgent] Updated threadId for connection ${connection.id}: ${data.threadId}`); + console.log( + `[ZeroAgent] Updated threadId for connection ${connection.id}: ${data.threadId}`, + ); break; } // case IncomingMessageType.Mail_List: { @@ -2023,7 +2030,7 @@ export class ZeroAgent extends AIChatAgent { async onChatMessage( onFinish: StreamTextOnFinishCallback<{}>, options?: { - abortSignal: AbortSignal | undefined; + abortSignal?: AbortSignal; connectionId?: string; }, ) { diff --git a/apps/server/src/thread-workflow-utils/workflow-engine.ts b/apps/server/src/thread-workflow-utils/workflow-engine.ts index 05709b1908..07b5fd4ce6 100644 --- a/apps/server/src/thread-workflow-utils/workflow-engine.ts +++ b/apps/server/src/thread-workflow-utils/workflow-engine.ts @@ -42,13 +42,14 @@ export class WorkflowEngine { async executeWorkflow( workflowName: string, context: WorkflowContext, + existingResults?: Map, ): Promise<{ results: Map; errors: Map }> { const workflow = this.workflows.get(workflowName); if (!workflow) { throw new Error(`Workflow "${workflowName}" not found`); } - const results = new Map(); + const results = new Map(existingResults || []); const errors = new Map(); for (const step of workflow.steps) { @@ -83,6 +84,45 @@ export class WorkflowEngine { return { results, errors }; } + async executeWorkflowChain( + workflowNames: string[], + context: WorkflowContext, + ): Promise<{ results: Map; errors: Map }> { + let sharedResults = new Map(); + let allErrors = new Map(); + + for (const workflowName of workflowNames) { + console.log(`[WORKFLOW_ENGINE] Executing workflow in chain: ${workflowName}`); + try { + const { results, errors } = await this.executeWorkflow( + workflowName, + context, + sharedResults, + ); + + // Merge results + for (const [key, value] of results) { + sharedResults.set(key, value); + } + + // Merge errors + for (const [key, error] of errors) { + allErrors.set(key, error); + } + + console.log( + `[WORKFLOW_ENGINE] Completed workflow: ${workflowName}, total results: ${sharedResults.size}`, + ); + } catch (error) { + const errorObj = error instanceof Error ? error : new Error(String(error)); + console.error(`[WORKFLOW_ENGINE] Failed to execute workflow ${workflowName}:`, errorObj); + allErrors.set(workflowName, errorObj); + } + } + + return { results: sharedResults, errors: allErrors }; + } + clearContext(context: WorkflowContext): void { if (context.results) { context.results.clear(); @@ -158,7 +198,7 @@ export const createDefaultWorkflows = (): WorkflowEngine => { ], }; - const _vectorizationWorkflow: WorkflowDefinition = { + const vectorizationWorkflow: WorkflowDefinition = { name: 'message-vectorization', description: 'Vectorizes thread messages for search and analysis', steps: [ @@ -272,7 +312,7 @@ export const createDefaultWorkflows = (): WorkflowEngine => { }; engine.registerWorkflow(autoDraftWorkflow); - // engine.registerWorkflow(vectorizationWorkflow); + engine.registerWorkflow(vectorizationWorkflow); engine.registerWorkflow(threadSummaryWorkflow); engine.registerWorkflow(labelGenerationWorkflow);