Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions apps/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,7 @@ export class DbRpcDO extends RpcTarget {
return await this.mainDo.deleteEmailTemplate(this.userId, templateId);
}

async updateEmailTemplate(
templateId: string,
data: Partial<typeof emailTemplate.$inferInsert>,
) {
async updateEmailTemplate(templateId: string, data: Partial<typeof emailTemplate.$inferInsert>) {
return await this.mainDo.updateEmailTemplate(this.userId, templateId, data);
}
}
Expand Down Expand Up @@ -522,7 +519,10 @@ class ZeroDB extends DurableObject<Env> {
});
}

async createEmailTemplate(userId: string, payload: Omit<typeof emailTemplate.$inferInsert, 'userId'>) {
async createEmailTemplate(
userId: string,
payload: Omit<typeof emailTemplate.$inferInsert, 'userId'>,
) {
return await this.db
.insert(emailTemplate)
.values({
Expand Down
34 changes: 6 additions & 28 deletions apps/server/src/pipelines.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ export class WorkflowRunner extends DurableObject<Env> {
{ concurrency: 6 }, // Limit concurrency to avoid rate limits
);

const syncedCount = syncResults.length;
const syncedCount = syncResults.filter((result) => result.result.success).length;
const failedCount = threadWorkflowParams.length - syncedCount;

if (failedCount > 0) {
Expand Down Expand Up @@ -627,37 +627,15 @@ export class WorkflowRunner extends DurableObject<Env> {
// Execute configured workflows using the workflow engine
const workflowResults = yield* Effect.tryPromise({
try: async () => {
const allResults = new Map<string, any>();
const allErrors = new Map<string, Error>();

// Execute all workflows registered in the engine
const workflowNames = workflowEngine.getWorkflowNames();

for (const workflowName of workflowNames) {
console.log(`[THREAD_WORKFLOW] Executing workflow: ${workflowName}`);

try {
const { results, errors } = await workflowEngine.executeWorkflow(
workflowName,
workflowContext,
);

// Merge results and errors using efficient Map operations
results.forEach((value, key) => allResults.set(key, value));
errors.forEach((value, key) => allErrors.set(key, value));

console.log(`[THREAD_WORKFLOW] Completed workflow: ${workflowName}`);
} catch (error) {
console.error(
`[THREAD_WORKFLOW] Failed to execute workflow ${workflowName}:`,
error,
);
const errorObj = error instanceof Error ? error : new Error(String(error));
allErrors.set(workflowName, errorObj);
}
}
const { results, errors } = await workflowEngine.executeWorkflowChain(
workflowNames,
workflowContext,
);

return { results: allResults, errors: allErrors };
return { results, errors };
},
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
});
Expand Down
17 changes: 12 additions & 5 deletions apps/server/src/routes/agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
private getDataStreamResponse(
onFinish: StreamTextOnFinishCallback<{}>,
options?: {
abortSignal: AbortSignal | undefined;
abortSignal?: AbortSignal;
connectionId?: string;
},
) {
Expand Down Expand Up @@ -1822,7 +1822,10 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
onError: (error) => {
console.error('Error in streamText', error);
},
system: await getPrompt(getPromptName(connectionId, EPrompts.Chat), AiChatPrompt(currentThreadId)),
system: await getPrompt(
getPromptName(connectionId, EPrompts.Chat),
AiChatPrompt(currentThreadId),
),
});

result.mergeIntoDataStream(dataStream);
Expand Down Expand Up @@ -1912,7 +1915,9 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
await this.persistMessages(finalMessages, [connection.id]);
this.removeAbortController(chatMessageId);
},
abortSignal ? { abortSignal, connectionId: connection.id } : { connectionId: connection.id },
abortSignal
? { abortSignal, connectionId: connection.id }
: { connectionId: connection.id },
);

if (response) {
Expand Down Expand Up @@ -1955,7 +1960,9 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
}
case IncomingMessageType.ThreadIdUpdate: {
this.connectionThreadIds.set(connection.id, data.threadId);
console.log(`[ZeroAgent] Updated threadId for connection ${connection.id}: ${data.threadId}`);
console.log(
`[ZeroAgent] Updated threadId for connection ${connection.id}: ${data.threadId}`,
);
break;
}
// case IncomingMessageType.Mail_List: {
Expand Down Expand Up @@ -2023,7 +2030,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
async onChatMessage(
onFinish: StreamTextOnFinishCallback<{}>,
options?: {
abortSignal: AbortSignal | undefined;
abortSignal?: AbortSignal;
connectionId?: string;
},
) {
Expand Down
46 changes: 43 additions & 3 deletions apps/server/src/thread-workflow-utils/workflow-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ export class WorkflowEngine {
async executeWorkflow(
workflowName: string,
context: WorkflowContext,
existingResults?: Map<string, any>,
): Promise<{ results: Map<string, any>; errors: Map<string, Error> }> {
const workflow = this.workflows.get(workflowName);
if (!workflow) {
throw new Error(`Workflow "${workflowName}" not found`);
}

const results = new Map<string, any>();
const results = new Map<string, any>(existingResults || []);
const errors = new Map<string, Error>();

for (const step of workflow.steps) {
Expand Down Expand Up @@ -83,6 +84,45 @@ export class WorkflowEngine {
return { results, errors };
}

async executeWorkflowChain(
workflowNames: string[],
context: WorkflowContext,
): Promise<{ results: Map<string, any>; errors: Map<string, Error> }> {
let sharedResults = new Map<string, any>();
let allErrors = new Map<string, Error>();

for (const workflowName of workflowNames) {
console.log(`[WORKFLOW_ENGINE] Executing workflow in chain: ${workflowName}`);
try {
const { results, errors } = await this.executeWorkflow(
workflowName,
context,
sharedResults,
);

// Merge results
for (const [key, value] of results) {
sharedResults.set(key, value);
}

// Merge errors
for (const [key, error] of errors) {
allErrors.set(key, error);
}

console.log(
`[WORKFLOW_ENGINE] Completed workflow: ${workflowName}, total results: ${sharedResults.size}`,
);
} catch (error) {
const errorObj = error instanceof Error ? error : new Error(String(error));
console.error(`[WORKFLOW_ENGINE] Failed to execute workflow ${workflowName}:`, errorObj);
allErrors.set(workflowName, errorObj);
}
}

return { results: sharedResults, errors: allErrors };
}

clearContext(context: WorkflowContext): void {
if (context.results) {
context.results.clear();
Expand Down Expand Up @@ -158,7 +198,7 @@ export const createDefaultWorkflows = (): WorkflowEngine => {
],
};

const _vectorizationWorkflow: WorkflowDefinition = {
const vectorizationWorkflow: WorkflowDefinition = {
name: 'message-vectorization',
description: 'Vectorizes thread messages for search and analysis',
steps: [
Expand Down Expand Up @@ -272,7 +312,7 @@ export const createDefaultWorkflows = (): WorkflowEngine => {
};

engine.registerWorkflow(autoDraftWorkflow);
// engine.registerWorkflow(vectorizationWorkflow);
engine.registerWorkflow(vectorizationWorkflow);
engine.registerWorkflow(threadSummaryWorkflow);
engine.registerWorkflow(labelGenerationWorkflow);

Expand Down