Skip to content

Comments

Add Effect-free workflow implementation for direct Cloudflare Workers execution#1869

Merged
MrgSub merged 1 commit intostagingfrom
ZEROAdd_Effect-free_workflow_implementation_for_direct_Cloudflare_Workers_execution
Jul 29, 2025
Merged

Add Effect-free workflow implementation for direct Cloudflare Workers execution#1869
MrgSub merged 1 commit intostagingfrom
ZEROAdd_Effect-free_workflow_implementation_for_direct_Cloudflare_Workers_execution

Conversation

@MrgSub
Copy link
Collaborator

@MrgSub MrgSub commented Jul 29, 2025

Description

Added support for draft detection in email threads to improve the auto-draft generation workflow. This PR adds an isLatestDraft flag to thread responses, allowing the system to check if a draft already exists in a thread without making additional API calls. The workflow engine has been optimized to skip draft generation for threads that already have drafts, automated emails, or messages older than 7 days.

Additionally, implemented non-Effect.ts versions of the workflow functions to provide an alternative implementation path that doesn't rely on the Effect library, which will help with testing and debugging.

Type of Change

  • ✨ New feature (non-breaking change which adds functionality)
  • ⚡ Performance improvement

Areas Affected

  • Email Integration (Gmail, IMAP, etc.)
  • Data Storage/Management

Testing Done

  • Manual testing performed

Checklist

  • I have performed a self-review of my code
  • My changes generate no new warnings
  • I have commented my code, particularly in complex areas

Additional Notes

The draft detection improvements reduce unnecessary API calls to check for existing drafts, which should improve performance and reduce the likelihood of hitting API rate limits. The workflow engine now also properly skips automated emails and old threads, focusing resources on generating drafts only for relevant conversations.


By submitting this pull request, I confirm that my contribution is made under the terms of the project's license.

Summary by CodeRabbit

  • New Features

    • Added the ability to include or exclude draft messages when retrieving email threads.
    • Thread responses now indicate if the latest message is a draft.
  • Improvements

    • Enhanced logging throughout thread and workflow processing for better traceability.
    • Optimized initial thread synchronization by checking thread count before syncing folders.
    • Simplified and streamlined workflow execution logic.
    • Disabled message vectorization workflow.
  • Configuration

    • Updated environment variables for local and staging environments, including new Cloudflare integration settings.

@jazzberry-ai
Copy link

jazzberry-ai bot commented Jul 29, 2025

This repository is associated with MrgSub whose free trial has ended. Subscribe at jazzberry.ai.
If this is an error contact us at support@jazzberry.ai.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 29, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

This update introduces an isLatestDraft flag to thread responses and updates thread-fetching methods to optionally include drafts. Workflow execution logic is refactored for both Effect-based and standard async/await flows. Draft generation logic is streamlined, configuration variables are updated, and additional logging is added for observability.

Changes

Cohort / File(s) Change Summary
Thread Response & Types
apps/server/src/lib/driver/types.ts
Added isLatestDraft?: boolean to IGetThreadResponse; reformatted getMessageAttachments method signature.
Workflow Execution Refactor
apps/server/src/pipelines.ts
Added async/await (non-Effect) workflow implementations for main, zero, and thread workflows; made Effect-based methods public; added validation helper; improved error handling and logging; filtered out draft/spam messages in zero workflow.
Agent Thread Handling
apps/server/src/routes/agent/index.ts
getThread and getThreadFromDB now accept includeDrafts; include/exclude drafts in response; compute and return isLatestDraft; conditional sync logic in setupAuth.
Agent RPC Interface
apps/server/src/routes/agent/rpc.ts
getThread now accepts includeDrafts parameter and passes it to the main driver.
Draft Generation Logic
apps/server/src/thread-workflow-utils/index.ts
Enhanced logging in shouldGenerateDraft; refactored detection logic to use regex; removed external draft existence check, now relies on isLatestDraft property.
Workflow Engine & Steps
apps/server/src/thread-workflow-utils/workflow-engine.ts
Step execution now breaks on failed condition; removed "check-workflow-execution" steps; simplified draft eligibility; commented out vectorization workflow registration and some cleanup steps.
Workflow Functions
apps/server/src/thread-workflow-utils/workflow-functions.ts
Removed shouldGenerateDraft and checkWorkflowExecution from exports; added logging to existing functions.
Mail Router
apps/server/src/trpc/routes/mail.ts
getThread call now always includes includeDrafts=true argument.
Configuration
apps/server/wrangler.jsonc
Changed NODE_ENV to "local"; set DISABLE_WORKFLOWS to "true" in local and staging; added Cloudflare account and token variables.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant AgentRPC
    participant Agent
    participant DB

    Client->>AgentRPC: getThread(threadId, includeDrafts)
    AgentRPC->>Agent: getThread(threadId, includeDrafts)
    Agent->>DB: getThreadFromDB(threadId, includeDrafts)
    DB-->>Agent: Thread (with/without drafts, isLatestDraft)
    Agent-->>AgentRPC: Thread response
    AgentRPC-->>Client: Thread response
Loading
sequenceDiagram
    participant WorkflowRunner
    participant DB
    participant Gmail
    participant Logger

    WorkflowRunner->>DB: Fetch agent/thread data
    WorkflowRunner->>Gmail: Fetch Gmail history/threads
    WorkflowRunner->>Logger: Log execution steps
    WorkflowRunner->>DB: Sync threads, process labels
    WorkflowRunner->>DB: Clean up processing flags
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

design, content

Poem

🚀
Threads now know if a draft is near,
With logging bright and logic clear.
Async flows and workflows split,
Drafts and spam—now we omit.
Config tuned for local flight,
The future’s threaded, bold, and bright!
🌟


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 762895a and f065829.

📒 Files selected for processing (9)
  • apps/server/src/lib/driver/types.ts (2 hunks)
  • apps/server/src/pipelines.ts (7 hunks)
  • apps/server/src/routes/agent/index.ts (6 hunks)
  • apps/server/src/routes/agent/rpc.ts (1 hunks)
  • apps/server/src/thread-workflow-utils/index.ts (1 hunks)
  • apps/server/src/thread-workflow-utils/workflow-engine.ts (4 hunks)
  • apps/server/src/thread-workflow-utils/workflow-functions.ts (5 hunks)
  • apps/server/src/trpc/routes/mail.ts (1 hunks)
  • apps/server/wrangler.jsonc (3 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ZEROAdd_Effect-free_workflow_implementation_for_direct_Cloudflare_Workers_execution

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Collaborator Author

MrgSub commented Jul 29, 2025

This stack of pull requests is managed by Graphite. Learn more about stacking.

@MrgSub MrgSub marked this pull request as ready for review July 29, 2025 23:37
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@cursor
Copy link

cursor bot commented Jul 29, 2025

🚨 Bugbot Trial Expired

Your Bugbot trial has expired. Please purchase a license in the Cursor dashboard to continue using Bugbot.

Copy link
Collaborator Author

MrgSub commented Jul 29, 2025

Merge activity

  • Jul 29, 11:38 PM UTC: A user started a stack merge that includes this pull request via Graphite.
  • Jul 29, 11:38 PM UTC: @MrgSub merged this pull request with Graphite.

@MrgSub MrgSub merged commit 15bbf89 into staging Jul 29, 2025
7 of 9 checks passed
@MrgSub MrgSub deleted the ZEROAdd_Effect-free_workflow_implementation_for_direct_Cloudflare_Workers_execution branch July 29, 2025 23:38
@coderabbitai coderabbitai bot added content design Improvements & changes to design & UX labels Jul 29, 2025
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cubic analysis

7 issues found across 9 files • Review in cubic

React with 👍 or 👎 to teach cubic. You can also tag @cubic-dev-ai to give feedback, ask questions, or re-run the review.

): Promise<boolean> => {
if (!thread.messages || thread.messages.length === 0) return false;
if (!thread.messages || thread.messages.length === 0) {
console.log('[SHOULD_GENERATE_DRAFT] No messages in thread');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct console.log calls in server-side code bypass the project’s structured logging system, making log filtering and correlation harder in production environments. Replace with the standard logger or remove these statements. (Based on your team's feedback about avoiding console statements in production code.)

Prompt for AI agents
Address the following comment on apps/server/src/thread-workflow-utils/index.ts at line 11:

<comment>Direct console.log calls in server-side code bypass the project’s structured logging system, making log filtering and correlation harder in production environments. Replace with the standard logger or remove these statements. (Based on your team&#39;s feedback about avoiding console statements in production code.)</comment>

<file context>
@@ -1,66 +1,70 @@
 import type { IGetThreadResponse } from &#39;../lib/driver/types&#39;;
 import { composeEmail } from &#39;../trpc/routes/ai/compose&#39;;
-import { getZeroAgent } from &#39;../lib/server-utils&#39;;
 import { type ParsedMessage } from &#39;../types&#39;;
 import { connection } from &#39;../db/schema&#39;;
 
 const shouldGenerateDraft = async (
   thread: IGetThreadResponse,
   foundConnection: typeof connection.$inferSelect,
</file context>

const { activeConnection } = ctx;
const agent = await getZeroAgent(activeConnection.id);
return await agent.getThread(input.id);
return await agent.getThread(input.id, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agent.getThread is now called with includeDrafts=true, which (per the PR description) adds an isLatestDraft flag to the response. However, the tRPC endpoint still validates the response against IGetThreadResponseSchema, which does not contain this field. Zod will silently strip the flag, preventing the client from seeing it and defeating the purpose of the change. Update the schema (and the .output typing) to include isLatestDraft or use .passthrough() to allow the extra key. (Based on your team's feedback about keeping runtime schemas in sync with new response fields.)

Prompt for AI agents
Address the following comment on apps/server/src/trpc/routes/mail.ts at line 43:

<comment>agent.getThread is now called with includeDrafts=true, which (per the PR description) adds an `isLatestDraft` flag to the response. However, the tRPC endpoint still validates the response against `IGetThreadResponseSchema`, which does not contain this field. Zod will silently strip the flag, preventing the client from seeing it and defeating the purpose of the change. Update the schema (and the .output typing) to include `isLatestDraft` or use `.passthrough()` to allow the extra key. (Based on your team&#39;s feedback about keeping runtime schemas in sync with new response fields.)</comment>

<file context>
@@ -40,7 +40,7 @@ export const mailRouter = router({
     .query(async ({ input, ctx }) =&gt; {
       const { activeConnection } = ctx;
       const agent = await getZeroAgent(activeConnection.id);
-      return await agent.getThread(input.id);
+      return await agent.getThread(input.id, true);
     }),
   count: activeDriverProcedure
</file context>

try {
console.log('[THREAD_WORKFLOW] Getting thread:', threadId);
thread = await agent.getThread(threadId.toString());
console.log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assumes thread.messages is always defined; calling .length on undefined can throw at runtime.

Prompt for AI agents
Address the following comment on apps/server/src/pipelines.ts at line 792:

<comment>Assumes `thread.messages` is always defined; calling `.length` on undefined can throw at runtime.</comment>

<file context>
@@ -709,4 +734,533 @@ export class WorkflowRunner extends DurableObject&lt;Env&gt; {
       Effect.provide(loggerLayer),
     );
   }
+
+  /** Testing workflows without Effect */
+  public runThreadWorkflowWithoutEffect(params: ThreadWorkflowParams): Promise&lt;string&gt; {
+    return this.runThreadWorkflowWithoutEffectImpl(params);
+  }
+
</file context>
Suggested change
console.log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages.length);
console.log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages?.length ?? 0);

const response = await this.env.gmail_processing_threads.put(historyProcessingKey, 'true', {
expirationTtl: 3600,
});
lockAcquired = response !== null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock acquisition check always succeeds because KV put() returns void (undefined); response !== null will never be false, so concurrent executions are not prevented.

Prompt for AI agents
Address the following comment on apps/server/src/pipelines.ts at line 987:

<comment>Lock acquisition check always succeeds because KV `put()` returns void (undefined); `response !== null` will never be false, so concurrent executions are not prevented.</comment>

<file context>
@@ -709,4 +734,533 @@ export class WorkflowRunner extends DurableObject&lt;Env&gt; {
       Effect.provide(loggerLayer),
     );
   }
+
+  /** Testing workflows without Effect */
+  public runThreadWorkflowWithoutEffect(params: ThreadWorkflowParams): Promise&lt;string&gt; {
+    return this.runThreadWorkflowWithoutEffectImpl(params);
+  }
+
</file context>

"AUTORAG_ID": "",
"USE_OPENAI": "true",
"CLOUDFLARE_ACCOUNT_ID": "397b3b4fac213b9b382d0f1fafdbb215",
"CLOUDFLARE_API_TOKEN": "wbrJ9McsQhjCxv1pzxLLK8keT-0tM1ab-QbmESg6",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A live Cloudflare API token is committed in plain text; this is a serious security leak that grants full account access.

Prompt for AI agents
Address the following comment on apps/server/wrangler.jsonc at line 137:

<comment>A live Cloudflare API token is committed in plain text; this is a serious security leak that grants full account access.</comment>

<file context>
@@ -130,9 +130,11 @@
         &quot;DROP_AGENT_TABLES&quot;: &quot;false&quot;,
         &quot;THREAD_SYNC_MAX_COUNT&quot;: &quot;5&quot;,
         &quot;THREAD_SYNC_LOOP&quot;: &quot;false&quot;,
-        &quot;DISABLE_WORKFLOWS&quot;: &quot;false&quot;,
+        &quot;DISABLE_WORKFLOWS&quot;: &quot;true&quot;,
         &quot;AUTORAG_ID&quot;: &quot;&quot;,
         &quot;USE_OPENAI&quot;: &quot;true&quot;,
+        &quot;CLOUDFLARE_ACCOUNT_ID&quot;: &quot;397b3b4fac213b9b382d0f1fafdbb215&quot;,
+        &quot;CLOUDFLARE_API_TOKEN&quot;: &quot;wbrJ9McsQhjCxv1pzxLLK8keT-0tM1ab-QbmESg6&quot;,
</file context>
Suggested change
"CLOUDFLARE_API_TOKEN": "wbrJ9McsQhjCxv1pzxLLK8keT-0tM1ab-QbmESg6",
"CLOUDFLARE_API_TOKEN": "${CLOUDFLARE_API_TOKEN}",


generateLabels: async (context) => {
const summaryResult = context.results?.get('generate-thread-summary');
console.log(summaryResult, context.results);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumping summaryResult together with context.results can leak sensitive data and flood the logs; prefer structured logging or limit the output.

Prompt for AI agents
Address the following comment on apps/server/src/thread-workflow-utils/workflow-functions.ts at line 397:

<comment>Dumping summaryResult together with context.results can leak sensitive data and flood the logs; prefer structured logging or limit the output.</comment>

<file context>
@@ -413,6 +394,7 @@ export const workflowFunctions: Record&lt;string, WorkflowFunction&gt; = {
 
   generateLabels: async (context) =&gt; {
     const summaryResult = context.results?.get(&#39;generate-thread-summary&#39;);
+    console.log(summaryResult, context.results);
     if (!summaryResult?.summary) {
       console.log(&#39;[WORKFLOW_FUNCTIONS] No summary available for label generation&#39;);
</file context>
Suggested change
console.log(summaryResult, context.results);
console.log('[WORKFLOW_FUNCTIONS] Summary generation result:', { summary: summaryResult?.summary });

};

history.forEach((historyItem) => {
historyItem.messagesAdded?.forEach((msg) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rule violated: Detect Typescript Performance Bottlenecks

Nested forEach over messagesAdded inside another forEach over history can cause significant performance issues when Gmail history contains many entries. Consider flattening data or using maps/sets to avoid O(n²) iteration.

Prompt for AI agents
Address the following comment on apps/server/src/pipelines.ts at line 1101:

<comment>Nested forEach over `messagesAdded` inside another forEach over `history` can cause significant performance issues when Gmail history contains many entries. Consider flattening data or using maps/sets to avoid O(n²) iteration.</comment>

<file context>
@@ -709,4 +734,533 @@ export class WorkflowRunner extends DurableObject&lt;Env&gt; {
       Effect.provide(loggerLayer),
     );
   }
+
+  /** Testing workflows without Effect */
+  public runThreadWorkflowWithoutEffect(params: ThreadWorkflowParams): Promise&lt;string&gt; {
+    return this.runThreadWorkflowWithoutEffectImpl(params);
+  }
+
</file context>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

content design Improvements & changes to design & UX

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant