-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add sequential thinking processor and switch to OpenAI model #1808
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,303 @@ | ||
| /* | ||
| * Licensed to Zero Email Inc. under one or more contributor license agreements. | ||
| * You may not use this file except in compliance with the Apache License, Version 2.0 (the "License"). | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| * | ||
| * Reuse or distribution of this file requires a license from Zero Email Inc. | ||
| */ | ||
|
|
||
| import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; | ||
| import type { env } from 'cloudflare:workers'; | ||
| import { McpAgent } from 'agents/mcp'; | ||
| import { z } from 'zod'; | ||
|
|
||
| interface ThoughtData { | ||
| thought: string; | ||
| thoughtNumber: number; | ||
| totalThoughts: number; | ||
| isRevision?: boolean; | ||
| revisesThought?: number; | ||
| branchFromThought?: number; | ||
| branchId?: string; | ||
| needsMoreThoughts?: boolean; | ||
| nextThoughtNeeded: boolean; | ||
| } | ||
|
|
||
| interface SequentialThinkingParams { | ||
| thought: string; | ||
| nextThoughtNeeded: boolean; | ||
| thoughtNumber: number; | ||
| totalThoughts: number; | ||
| isRevision?: boolean; | ||
| revisesThought?: number; | ||
| branchFromThought?: number; | ||
| branchId?: string; | ||
| needsMoreThoughts?: boolean; | ||
| } | ||
|
|
||
| export class SequentialThinkingProcessor { | ||
| private thoughtHistory: ThoughtData[] = []; | ||
| private branches: Record<string, ThoughtData[]> = {}; | ||
| private disableThoughtLogging: boolean; | ||
|
|
||
| constructor() { | ||
| this.disableThoughtLogging = false; // Enable logging by default in Zero | ||
| } | ||
|
|
||
| private validateThoughtData(input: SequentialThinkingParams): ThoughtData { | ||
| if (!input.thought || typeof input.thought !== 'string') { | ||
| throw new Error('Invalid thought: must be a string'); | ||
| } | ||
| if (!input.thoughtNumber || typeof input.thoughtNumber !== 'number') { | ||
| throw new Error('Invalid thoughtNumber: must be a number'); | ||
| } | ||
| if (!input.totalThoughts || typeof input.totalThoughts !== 'number') { | ||
| throw new Error('Invalid totalThoughts: must be a number'); | ||
| } | ||
| if (typeof input.nextThoughtNeeded !== 'boolean') { | ||
| throw new Error('Invalid nextThoughtNeeded: must be a boolean'); | ||
| } | ||
|
|
||
| return { | ||
| thought: input.thought, | ||
| thoughtNumber: input.thoughtNumber, | ||
| totalThoughts: input.totalThoughts, | ||
| nextThoughtNeeded: input.nextThoughtNeeded, | ||
| isRevision: input.isRevision, | ||
| revisesThought: input.revisesThought, | ||
| branchFromThought: input.branchFromThought, | ||
| branchId: input.branchId, | ||
| needsMoreThoughts: input.needsMoreThoughts, | ||
| }; | ||
| } | ||
|
|
||
| private formatThought(thoughtData: ThoughtData): string { | ||
| const { | ||
| thoughtNumber, | ||
| totalThoughts, | ||
| thought, | ||
| isRevision, | ||
| revisesThought, | ||
| branchFromThought, | ||
| branchId, | ||
| } = thoughtData; | ||
|
|
||
| let prefix = ''; | ||
| let context = ''; | ||
|
|
||
| if (isRevision) { | ||
| prefix = '🔄 Revision'; | ||
| context = ` (revising thought ${revisesThought})`; | ||
| } else if (branchFromThought) { | ||
| prefix = '🌿 Branch'; | ||
| context = ` (from thought ${branchFromThought}, ID: ${branchId})`; | ||
| } else { | ||
| prefix = '💭 Thought'; | ||
| context = ''; | ||
| } | ||
|
|
||
| const header = `${prefix} ${thoughtNumber}/${totalThoughts}${context}`; | ||
| const border = '─'.repeat(Math.max(header.length, thought.length) + 4); | ||
|
|
||
| return ` | ||
| ┌${border}┐ | ||
| │ ${header} │ | ||
| ├${border}┤ | ||
| │ ${thought.padEnd(border.length - 2)} │ | ||
| └${border}┘`; | ||
| } | ||
|
|
||
| public processThought(input: SequentialThinkingParams) { | ||
| try { | ||
| const validatedInput = this.validateThoughtData(input); | ||
|
|
||
| if (validatedInput.thoughtNumber > validatedInput.totalThoughts) { | ||
| validatedInput.totalThoughts = validatedInput.thoughtNumber; | ||
| } | ||
|
|
||
| this.thoughtHistory.push(validatedInput); | ||
|
|
||
| if (validatedInput.branchFromThought && validatedInput.branchId) { | ||
| if (!this.branches[validatedInput.branchId]) { | ||
| this.branches[validatedInput.branchId] = []; | ||
| } | ||
| this.branches[validatedInput.branchId].push(validatedInput); | ||
| } | ||
|
|
||
| if (!this.disableThoughtLogging) { | ||
| const formattedThought = this.formatThought(validatedInput); | ||
| console.log(formattedThought); // Use console.log instead of console.error | ||
| } | ||
|
|
||
| return { | ||
| content: [ | ||
| { | ||
| type: 'text' as const, | ||
| text: JSON.stringify( | ||
| { | ||
| thoughtNumber: validatedInput.thoughtNumber, | ||
| totalThoughts: validatedInput.totalThoughts, | ||
| nextThoughtNeeded: validatedInput.nextThoughtNeeded, | ||
| branches: Object.keys(this.branches), | ||
| thoughtHistoryLength: this.thoughtHistory.length, | ||
| }, | ||
| null, | ||
| 2, | ||
| ), | ||
| }, | ||
| ], | ||
| }; | ||
| } catch (error) { | ||
| return { | ||
| content: [ | ||
| { | ||
| type: 'text' as const, | ||
| text: JSON.stringify( | ||
| { | ||
| error: error instanceof Error ? error.message : String(error), | ||
| status: 'failed', | ||
| }, | ||
| null, | ||
| 2, | ||
| ), | ||
| }, | ||
| ], | ||
| isError: true, | ||
| }; | ||
| } | ||
| } | ||
|
|
||
| public getThoughtHistory(): ThoughtData[] { | ||
| return this.thoughtHistory; | ||
| } | ||
|
|
||
| public getBranches(): Record<string, ThoughtData[]> { | ||
| return this.branches; | ||
| } | ||
|
|
||
| public reset(): void { | ||
| this.thoughtHistory = []; | ||
| this.branches = {}; | ||
| } | ||
| } | ||
|
|
||
| export class ThinkingMCP extends McpAgent<typeof env, Record<string, unknown>, { userId: string }> { | ||
| thinkingServer = new SequentialThinkingProcessor(); | ||
| server = new McpServer({ | ||
| name: 'thinking-mcp', | ||
| version: '1.0.0', | ||
| description: 'Thinking MCP', | ||
| }); | ||
|
|
||
| async init(): Promise<void> { | ||
| this.server.tool('Test', () => { | ||
| return { | ||
| content: [{ type: 'text' as const, text: 'Hello World' }], | ||
| }; | ||
| }); | ||
|
|
||
| console.log('Here!'); | ||
|
|
||
| // this.server.registerTool( | ||
| // 'sequentialthinking', | ||
| // { | ||
| // description: `A detailed tool for dynamic and reflective problem-solving through thoughts. | ||
| // This tool helps analyze problems through a flexible thinking process that can adapt and evolve. | ||
| // Each thought can build on, question, or revise previous insights as understanding deepens. | ||
|
|
||
| // When to use this tool: | ||
| // - Breaking down complex problems into steps | ||
| // - Planning and design with room for revision | ||
| // - Analysis that might need course correction | ||
| // - Problems where the full scope might not be clear initially | ||
| // - Problems that require a multi-step solution | ||
| // - Tasks that need to maintain context over multiple steps | ||
| // - Situations where irrelevant information needs to be filtered out | ||
|
|
||
| // Key features: | ||
| // - You can adjust total_thoughts up or down as you progress | ||
| // - You can question or revise previous thoughts | ||
| // - You can add more thoughts even after reaching what seemed like the end | ||
| // - You can express uncertainty and explore alternative approaches | ||
| // - Not every thought needs to build linearly - you can branch or backtrack | ||
| // - Generates a solution hypothesis | ||
| // - Verifies the hypothesis based on the Chain of Thought steps | ||
| // - Repeats the process until satisfied | ||
| // - Provides a correct answer | ||
|
|
||
| // Parameters explained: | ||
| // - thought: Your current thinking step, which can include: | ||
| // * Regular analytical steps | ||
| // * Revisions of previous thoughts | ||
| // * Questions about previous decisions | ||
| // * Realizations about needing more analysis | ||
| // * Changes in approach | ||
| // * Hypothesis generation | ||
| // * Hypothesis verification | ||
| // - next_thought_needed: True if you need more thinking, even if at what seemed like the end | ||
| // - thought_number: Current number in sequence (can go beyond initial total if needed) | ||
| // - total_thoughts: Current estimate of thoughts needed (can be adjusted up/down) | ||
| // - is_revision: A boolean indicating if this thought revises previous thinking | ||
| // - revises_thought: If is_revision is true, which thought number is being reconsidered | ||
| // - branch_from_thought: If branching, which thought number is the branching point | ||
| // - branch_id: Identifier for the current branch (if any) | ||
| // - needs_more_thoughts: If reaching end but realizing more thoughts needed | ||
|
|
||
| // You should: | ||
| // 1. Start with an initial estimate of needed thoughts, but be ready to adjust | ||
| // 2. Feel free to question or revise previous thoughts | ||
| // 3. Don't hesitate to add more thoughts if needed, even at the "end" | ||
| // 4. Express uncertainty when present | ||
| // 5. Mark thoughts that revise previous thinking or branch into new paths | ||
| // 6. Ignore information that is irrelevant to the current step | ||
| // 7. Generate a solution hypothesis when appropriate | ||
| // 8. Verify the hypothesis based on the Chain of Thought steps | ||
| // 9. Repeat the process until satisfied with the solution | ||
| // 10. Provide a single, ideally correct answer as the final output | ||
| // 11. Only set next_thought_needed to false when truly done and a satisfactory answer is reached`, | ||
| // inputSchema: { | ||
| // thought: z.string().describe('Your current thinking step'), | ||
| // nextThoughtNeeded: z.boolean().describe('Whether another thought step is needed'), | ||
| // thoughtNumber: z.number().int().min(1).describe('Current thought number'), | ||
| // totalThoughts: z.number().int().min(1).describe('Estimated total thoughts needed'), | ||
| // isRevision: z.boolean().optional().describe('Whether this revises previous thinking'), | ||
| // revisesThought: z | ||
| // .number() | ||
| // .int() | ||
| // .min(1) | ||
| // .optional() | ||
| // .describe('Which thought is being reconsidered'), | ||
| // branchFromThought: z | ||
| // .number() | ||
| // .int() | ||
| // .min(1) | ||
| // .optional() | ||
| // .describe('Branching point thought number'), | ||
| // branchId: z.string().optional().describe('Branch identifier'), | ||
| // needsMoreThoughts: z.boolean().optional().describe('If more thoughts are needed'), | ||
| // }, | ||
| // }, | ||
| // (params) => { | ||
| // return this.thinkingServer.processThought({ | ||
| // thought: params.thought, | ||
| // nextThoughtNeeded: params.nextThoughtNeeded, | ||
| // thoughtNumber: params.thoughtNumber, | ||
| // totalThoughts: params.totalThoughts, | ||
| // isRevision: params.isRevision, | ||
| // revisesThought: params.revisesThought, | ||
| // branchFromThought: params.branchFromThought, | ||
| // branchId: params.branchId, | ||
| // needsMoreThoughts: params.needsMoreThoughts, | ||
| // }); | ||
| // }, | ||
| // ); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -51,6 +51,7 @@ import { processToolCalls } from './utils'; | |||||
| import { env } from 'cloudflare:workers'; | ||||||
| import type { Connection } from 'agents'; | ||||||
| import { openai } from '@ai-sdk/openai'; | ||||||
| import { openai } from '@ai-sdk/openai'; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove duplicate import. The -import { openai } from '@ai-sdk/openai';📝 Committable suggestion
Suggested change
🧰 Tools🪛 Biome (2.1.2)[error] 54-54: Shouldn't redeclare 'openai'. Consider to delete it or rename it. 'openai' is defined here: (lint/suspicious/noRedeclare) 🪛 GitHub Actions: autofix.ci[error] 53-54: Identifier 🤖 Prompt for AI Agents |
||||||
| import { createDb } from '../../db'; | ||||||
| import { DriverRpcDO } from './rpc'; | ||||||
| import { eq } from 'drizzle-orm'; | ||||||
|
|
@@ -1065,7 +1066,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> { | |||||
| private chatMessageAbortControllers: Map<string, AbortController> = new Map(); | ||||||
|
|
||||||
| async registerZeroMCP() { | ||||||
| await this.mcp.connect(env.VITE_PUBLIC_BACKEND_URL + '/sse', { | ||||||
| await this.mcp.connect(env.VITE_PUBLIC_BACKEND_URL + '/sse?mcpId=zero-mcp', { | ||||||
| transport: { | ||||||
| authProvider: new DurableObjectOAuthClientProvider( | ||||||
| this.ctx.storage, | ||||||
|
|
@@ -1076,8 +1077,20 @@ export class ZeroAgent extends AIChatAgent<typeof env> { | |||||
| }); | ||||||
| } | ||||||
|
|
||||||
| async registerThinkingMCP() { | ||||||
| await this.mcp.connect(env.VITE_PUBLIC_BACKEND_URL + '/sse?mcpId=thinking-mcp', { | ||||||
| transport: { | ||||||
| authProvider: new DurableObjectOAuthClientProvider( | ||||||
| this.ctx.storage, | ||||||
| 'thinking-mcp', | ||||||
| env.VITE_PUBLIC_BACKEND_URL, | ||||||
| ), | ||||||
| }, | ||||||
| }); | ||||||
| } | ||||||
|
|
||||||
| onStart(): void | Promise<void> { | ||||||
| // this.registerZeroMCP(); | ||||||
| // this.registerThinkingMCP(); | ||||||
| } | ||||||
|
|
||||||
| private getDataStreamResponse( | ||||||
|
|
@@ -1091,11 +1104,14 @@ export class ZeroAgent extends AIChatAgent<typeof env> { | |||||
| if (this.name === 'general') return; | ||||||
| const connectionId = this.name; | ||||||
| const orchestrator = new ToolOrchestrator(dataStream, connectionId); | ||||||
| // const mcpTools = await this.mcp.unstable_getAITools(); | ||||||
|
|
||||||
| // const mcpTools = this.mcp.unstable_getAITools(); | ||||||
|
|
||||||
| const rawTools = { | ||||||
| ...(await authTools(connectionId)), | ||||||
| // ...mcpTools, | ||||||
| }; | ||||||
|
|
||||||
| const tools = orchestrator.processTools(rawTools); | ||||||
| const processedMessages = await processToolCalls( | ||||||
| { | ||||||
|
|
@@ -1106,8 +1122,13 @@ export class ZeroAgent extends AIChatAgent<typeof env> { | |||||
| {}, | ||||||
| ); | ||||||
|
|
||||||
| const model = | ||||||
| env.USE_OPENAI === 'true' | ||||||
| ? openai(env.OPENAI_MODEL || 'gpt-4o') | ||||||
| : anthropic(env.OPENAI_MODEL || 'claude-3-7-sonnet-20250219'); | ||||||
|
|
||||||
| const result = streamText({ | ||||||
| model: anthropic(env.OPENAI_MODEL || 'claude-3-5-haiku-latest'), | ||||||
| model, | ||||||
| maxSteps: 10, | ||||||
| messages: processedMessages, | ||||||
| tools, | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -116,6 +116,7 @@ | |
| "THREAD_SYNC_LOOP": "false", | ||
| "DISABLE_WORKFLOWS": "false", | ||
| "AUTORAG_ID": "", | ||
| "USE_OPENAI": "true", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. USE_OPENAI is only defined for the local environment, so staging/production will silently fall back to the Anthropic model, leading to inconsistent behavior across environments. Prompt for AI agents
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add USE_OPENAI to staging and production environments for consistency. The # In staging environment vars section (around line 254)
"vars": {
"NODE_ENV": "development",
"COOKIE_DOMAIN": "0.email",
"VITE_PUBLIC_BACKEND_URL": "https://sapi.0.email",
"VITE_PUBLIC_APP_URL": "https://staging.0.email",
"DISABLE_CALLS": "",
"DROP_AGENT_TABLES": "false",
"THREAD_SYNC_MAX_COUNT": "20",
"THREAD_SYNC_LOOP": "true",
"DISABLE_WORKFLOWS": "true",
+ "USE_OPENAI": "true",
},
# In production environment vars section (around line 395)
"vars": {
"NODE_ENV": "production",
"COOKIE_DOMAIN": "0.email",
"VITE_PUBLIC_BACKEND_URL": "https://api.0.email",
"VITE_PUBLIC_APP_URL": "https://0.email",
"DISABLE_CALLS": "true",
"DROP_AGENT_TABLES": "false",
"THREAD_SYNC_MAX_COUNT": "10",
"THREAD_SYNC_LOOP": "true",
"DISABLE_WORKFLOWS": "true",
+ "USE_OPENAI": "true",
},🤖 Prompt for AI Agents |
||
| }, | ||
| "kv_namespaces": [ | ||
| { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused import.
The
zimport from 'zod' is not used in the active code and should be removed to avoid linting errors.-import { z } from 'zod';📝 Committable suggestion
🧰 Tools
🪛 GitHub Actions: autofix.ci
[warning] 20-20: ESLint (no-unused-vars): Identifier 'z' is imported but never used. Consider removing this import.
🤖 Prompt for AI Agents