Skip to content

Comments

refactor pipelines#1696

Merged
MrgSub merged 1 commit intostagingfrom
07-09-refactor_pipelines
Jul 9, 2025
Merged

refactor pipelines#1696
MrgSub merged 1 commit intostagingfrom
07-09-refactor_pipelines

Conversation

@MrgSub
Copy link
Collaborator

@MrgSub MrgSub commented Jul 9, 2025

Refactor Email Processing Pipeline to Use Effect Library

This PR refactors the email processing workflow from using Cloudflare Workers' Workflow API to using the Effect library for better error handling and functional programming patterns. The main changes include:

  1. Replaced workflow-based implementation with Effect-based implementation
  2. Added queue-based processing for thread notifications
  3. Improved CORS handling with more robust origin validation
  4. Fixed Google service account validation to properly handle empty JSON
  5. Changed UI label from "Labels" to "Categories" in the mail component
  6. Added selective query persistence in the query provider
  7. Improved error handling throughout the notification pipeline

Type of Change

  • ✨ New feature (non-breaking change which adds functionality)
  • ⚡ Performance improvement

Areas Affected

  • Email Integration (Gmail, IMAP, etc.)
  • User Interface/Experience
  • Data Storage/Management
  • API Endpoints

Testing Done

  • Manual testing performed

Security Considerations

  • No sensitive data is exposed
  • Authentication checks are in place
  • Input validation is implemented

Checklist

  • I have performed a self-review of my code
  • I have commented my code, particularly in complex areas
  • My changes generate no new warnings

Additional Notes

The Effect library provides better error handling and composability compared to the previous workflow implementation. This change should improve reliability and maintainability of the email processing pipeline.

Summary by CodeRabbit

  • New Features

    • Enhanced email processing with new effect-based workflows for email data handling, summarization, and vectorization.
    • Improved thread and message history retrieval capabilities with retry logic.
  • Bug Fixes

    • Fixed label text in the mail UI from "Labels" to "Categories".
    • Improved error handling and validation for environment variables.
  • Refactor

    • Major restructuring of workflow logic to use effect-based functions for better reliability and maintainability.
    • Simplified query cache invalidation and persistence logic.
    • Updated server workflow imports and exports to use new pipeline functions.
    • Streamlined server route logic for thread synchronization and authorization.
    • Replaced local prompt name helper with centralized implementation.
  • Chores

    • Updated server dependencies.
    • Cleaned up configuration files by removing unused workflow bindings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 9, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

This update replaces class-based workflow logic in the server's pipelines with new effect-based workflow functions, introduces a new effect-driven workflow module, and updates server routing and queue handling accordingly. Additional changes include improved environment variable validation, configuration updates, cache invalidation simplification, UI label adjustment, and refined query persistence and agent methods.

Changes

File(s) Change Summary
apps/mail/components/mail/mail.tsx Changed the visible label in the CategoryDropdown trigger from "Labels" to "Categories".
apps/mail/components/party.tsx Simplified query invalidation by removing filtering options and adding a console log for invalidation events.
apps/mail/providers/query-provider.tsx Added a shouldDehydrateQuery function to restrict query persistence to those with keys containing 'listThreads'.
apps/server/package.json Added the "effect" dependency (v3.16.12).
apps/server/src/lib/factories/google-subscription.factory.ts Enhanced validation for the GOOGLE_S_ACCOUNT environment variable, treating '{}' as invalid and logging invalid JSON on parse failure.
apps/server/src/main.ts Switched workflow imports to effect-based functions, updated CORS origin validation, revised the /a8n/notify/:providerId handler to use new queueing logic, and adjusted queue processing to use new workflow runners. Updated exports accordingly.
apps/server/src/pipelines.effect.ts New file: Implements effect-based workflow runners (runMainWorkflow, runZeroWorkflow, runThreadWorkflow) for email processing, vectorization, AI summarization, and concurrency control, with extensive error handling and helper functions.
apps/server/src/pipelines.ts Removed all workflow classes and helper functions; now dispatches to effect-based workflow runners from pipelines.effect.ts. Added runWorkflow as a new export.
apps/server/src/routes/chat.ts Added listHistory methods to agent classes, improved handling for 'general' connections, and updated getThreadFromDB to add retry logic and stricter error handling.
apps/server/src/types.ts Added IThreadBatch interface for use in thread queue processing.
apps/server/wrangler.jsonc Removed all "workflows" entries from configuration for all environments.
apps/server/src/lib/brain.ts Removed local getPromptName function and replaced with import from ../pipelines to unify prompt naming logic.

Sequence Diagram(s)

sequenceDiagram
    participant API as /a8n/notify/:providerId
    participant Queue as thread_queue
    participant Pipeline as runMainWorkflow
    participant Zero as runZeroWorkflow
    participant Thread as runThreadWorkflow
    participant DB as Database
    participant Gmail as Gmail API
    participant AI as AI Service
    participant Vector as Vector Store

    API->>Queue: Send {providerId, historyId, subscriptionName}
    Queue->>Pipeline: For each message, call runMainWorkflow
    Pipeline->>Zero: If provider is Google, call runZeroWorkflow
    Zero->>DB: Get connection info
    Zero->>Gmail: Fetch Gmail history
    Zero->>Thread: For each thread, call runThreadWorkflow
    Thread->>Gmail: Fetch thread data
    Thread->>AI: Summarize messages, generate embeddings
    Thread->>Vector: Upsert vectors
    Thread->>Gmail: Apply generated labels (if needed)
    Thread->>DB: Update thread summary
    Thread-->>Zero: Return processing result
    Zero-->>Pipeline: Return processing result
    Pipeline-->>Queue: Log result or error
Loading

Possibly related PRs

  • Typo #1608: Changes method signatures and logic for getThreadFromDB and listHistory in apps/server/src/routes/chat.ts, related to agent and thread fetching improvements.
  • Minor Fixes #1347: Modifies internal logic of the original MainWorkflow class in apps/server/src/pipelines.ts before it was replaced by effect-based workflows in this PR.
  • fix(i18n): fixed i18n support for previously untranslated fields (labels) #1172: Updates category UI labels in the mail component, related to the label text change in apps/mail/components/mail/mail.tsx here.

Poem

🐇✨
Pipelines leap from class to effect,
Async flows now neatly connect.
Labels are clearer, queries persist,
Agents retry, no thread is missed.
With every hop, the code refines—
A rabbit’s joy in streamlined lines!
🥕🚀


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 181e5f8 and 200d4e8.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (12)
  • apps/mail/components/mail/mail.tsx (1 hunks)
  • apps/mail/components/party.tsx (1 hunks)
  • apps/mail/providers/query-provider.tsx (1 hunks)
  • apps/server/package.json (1 hunks)
  • apps/server/src/lib/brain.ts (1 hunks)
  • apps/server/src/lib/factories/google-subscription.factory.ts (1 hunks)
  • apps/server/src/main.ts (7 hunks)
  • apps/server/src/pipelines.effect.ts (1 hunks)
  • apps/server/src/pipelines.ts (1 hunks)
  • apps/server/src/routes/chat.ts (8 hunks)
  • apps/server/src/types.ts (1 hunks)
  • apps/server/wrangler.jsonc (1 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@MrgSub MrgSub marked this pull request as ready for review July 9, 2025 18:30
Copy link
Collaborator Author

MrgSub commented Jul 9, 2025

@coderabbitai review

Copy link
Collaborator Author

MrgSub commented Jul 9, 2025

This stack of pull requests is managed by Graphite. Learn more about stacking.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 9, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

cursor[bot]

This comment was marked as outdated.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Nitpick comments (4)
apps/mail/providers/query-provider.tsx (1)

123-128: LGTM! Consider clarifying the comment.

The selective persistence strategy for listThreads queries is a good optimization. However, the comment on line 123 appears incomplete.

-        maxAge: 1000 * 60 * 1, // 1 minute, we're storing in DOs,
+        maxAge: 1000 * 60 * 1, // 1 minute (storing in Durable Objects)
apps/server/src/pipelines.ts (1)

35-35: Remove unnecessary type cast

The as any cast is not needed here since the params type should already match.

-      return runZeroWorkflow(params as any);
+      return runZeroWorkflow(params);
apps/server/src/pipelines.effect.ts (2)

31-39: Make logging configurable via environment variable

The showLogs flag is hardcoded to true. Consider making it configurable for different environments.

-const showLogs = true;
+const showLogs = env.ENABLE_DEBUG_LOGS === 'true';

 const log = (message: string, ...args: any[]) => {
   if (showLogs) {
     console.log(message, ...args);
-    return message;
   }
-  return 'no message';
 };

Also consider using a proper logging library that supports log levels and structured logging.


930-950: Consider propagating errors instead of silently returning fallbacks

The getPrompt and getEmbeddingVector functions catch errors and return fallback values, which could hide important issues.

Consider using Effect for consistent error handling:

const getPrompt = (promptName: string, fallback: string): Effect.Effect<string, { _tag: 'PromptError'; error: unknown }> =>
  Effect.gen(function* () {
    if (!promptName || typeof promptName !== 'string') {
      return fallback; // This is intentional fallback for invalid input
    }
    
    return yield* Effect.tryPromise({
      try: async () => {
        const existingPrompt = await env.prompts_storage.get(promptName);
        if (!existingPrompt) {
          await env.prompts_storage.put(promptName, fallback);
          return fallback;
        }
        return existingPrompt;
      },
      catch: (error) => ({ _tag: 'PromptError' as const, error })
    });
  });

This allows callers to decide how to handle errors rather than hiding them.

Also applies to: 952-974

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7cc9838 and ce9e86d.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (11)
  • apps/mail/components/mail/mail.tsx (1 hunks)
  • apps/mail/components/party.tsx (1 hunks)
  • apps/mail/providers/query-provider.tsx (1 hunks)
  • apps/server/package.json (1 hunks)
  • apps/server/src/lib/factories/google-subscription.factory.ts (1 hunks)
  • apps/server/src/main.ts (7 hunks)
  • apps/server/src/pipelines.effect.ts (1 hunks)
  • apps/server/src/pipelines.ts (1 hunks)
  • apps/server/src/routes/chat.ts (8 hunks)
  • apps/server/src/types.ts (1 hunks)
  • apps/server/wrangler.jsonc (1 hunks)
🧰 Additional context used
🧠 Learnings (4)
apps/server/package.json (1)
Learnt from: JagjeevanAK
PR: Mail-0/Zero#1583
File: apps/docs/package.json:1-0
Timestamp: 2025-07-01T12:53:32.495Z
Learning: The Zero project prefers to handle dependency updates through automated tools like Dependabot rather than immediate manual updates, allowing for proper testing and validation through their established workflow.
apps/mail/components/mail/mail.tsx (1)
Learnt from: retrogtx
PR: Mail-0/Zero#1328
File: apps/mail/lib/hotkeys/mail-list-hotkeys.tsx:202-209
Timestamp: 2025-06-18T17:26:50.918Z
Learning: In apps/mail/lib/hotkeys/mail-list-hotkeys.tsx, the switchCategoryByIndex function using hardcoded indices for category hotkeys does not break when users reorder categories, contrary to the theoretical index-shifting issue. The actual implementation has constraints or mechanisms that prevent hotkey targeting issues.
apps/mail/components/party.tsx (3)
Learnt from: retrogtx
PR: Mail-0/Zero#1468
File: apps/server/src/trpc/routes/mail.ts:331-331
Timestamp: 2025-06-28T03:56:09.376Z
Learning: In apps/server/src/trpc/routes/mail.ts, the user indicated they are not using ISO format for the scheduleAt parameter, despite the frontend code showing toISOString() usage in the ScheduleSendPicker component.
Learnt from: retrogtx
PR: Mail-0/Zero#1328
File: apps/mail/lib/hotkeys/mail-list-hotkeys.tsx:202-209
Timestamp: 2025-06-18T17:26:50.918Z
Learning: In apps/mail/lib/hotkeys/mail-list-hotkeys.tsx, the switchCategoryByIndex function using hardcoded indices for category hotkeys does not break when users reorder categories, contrary to the theoretical index-shifting issue. The actual implementation has constraints or mechanisms that prevent hotkey targeting issues.
Learnt from: retrogtx
PR: Mail-0/Zero#1468
File: apps/server/src/trpc/routes/mail.ts:386-391
Timestamp: 2025-06-27T04:59:29.731Z
Learning: In apps/server/src/trpc/routes/mail.ts, the attachment processing logic conditionally handles mixed attachment types - it preserves existing File-like objects with arrayBuffer methods while only converting serialized attachments that need processing through toAttachmentFiles.
apps/server/src/pipelines.effect.ts (1)
Learnt from: retrogtx
PR: Mail-0/Zero#1468
File: apps/server/src/trpc/routes/mail.ts:386-391
Timestamp: 2025-06-27T04:59:29.731Z
Learning: In apps/server/src/trpc/routes/mail.ts, the attachment processing logic conditionally handles mixed attachment types - it preserves existing File-like objects with arrayBuffer methods while only converting serialized attachments that need processing through toAttachmentFiles.
🧬 Code Graph Analysis (1)
apps/server/src/routes/chat.ts (4)
apps/server/src/db/index.ts (1)
  • createDb (7-11)
apps/server/src/db/schema.ts (1)
  • connection (118-142)
apps/server/src/lib/server-utils.ts (1)
  • connectionToDriver (45-58)
apps/server/src/lib/driver/types.ts (1)
  • IGetThreadResponse (6-12)
🪛 Gitleaks (8.26.0)
apps/server/src/pipelines.effect.ts

89-89: Identified a Private Key, which may compromise cryptographic security and sensitive data encryption.

(private-key)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Cursor BugBot
🔇 Additional comments (14)
apps/server/package.json (1)

51-51: effect package version and security status confirmed

The pinned version 3.16.12 is the latest stable release of the effect package, and there are no known security advisories against it. Approving this addition.

• apps/server/package.json: line 51 [ "effect": "3.16.12" ]

apps/mail/components/mail/mail.tsx (1)

1002-1002: LGTM: UI terminology improvement.

The change from "Labels" to "Categories" improves UI consistency and aligns with the application's terminology.

apps/server/src/types.ts (1)

14-18: LGTM: Well-defined interface for queue processing.

The IThreadBatch interface provides a clear contract for the new queue-based thread processing system. The properties are appropriately typed and follow consistent naming conventions.

apps/server/src/lib/factories/google-subscription.factory.ts (2)

34-34: LGTM: Enhanced validation prevents common misconfiguration.

The explicit check for the '{}' string as invalid prevents a common misconfiguration issue where the environment variable is set to an empty JSON object.


41-41: LGTM: Improved error diagnostics.

Adding logging of the invalid JSON string before throwing an error will help with debugging configuration issues.

apps/server/wrangler.jsonc (1)

82-82: LGTM: Workflow binding removal aligns with architectural changes.

The removal of workflow bindings is consistent with the migration from class-based to effect-based workflows, where workflows are now managed programmatically rather than through Cloudflare Workers bindings.

apps/mail/components/party.tsx (1)

55-57: Verify the removal of time-based query filtering

The simplified cache invalidation removes the predicate that filtered queries older than one minute. This means all matching queries will be invalidated regardless of age.

Was the removal of the time-based filtering intentional? This could potentially invalidate more queries than before, which might impact performance if there are many cached queries for a thread.

apps/server/src/main.ts (2)

571-585: Excellent CORS origin validation improvements!

The enhanced CORS validation properly parses URLs and supports subdomains, which is more robust than the previous substring check. The error handling for invalid URLs is also well implemented.


714-740: Well-structured Effect-based workflow execution

The new thread-queue handling is cleanly implemented with proper error handling and logging. The use of Effect.runPromise aligns well with the effect-based workflow architecture.

apps/server/src/routes/chat.ts (3)

1-13: LGTM! Proper license header added.

Good practice to include the Apache 2.0 license header.


317-320: Well-implemented listHistory methods

The listHistory methods in both AgentRpcDO and ZeroAgent follow the established pattern and properly handle the driver availability check.

Also applies to: 691-697


1177-1201: Excellent retry mechanism implementation

The addition of the lastAttempt parameter and retry logic improves reliability by attempting to sync the thread if it's not found on the first attempt. The error message is also clear and informative.

apps/server/src/pipelines.ts (1)

14-50: Clean refactoring to effect-based workflows!

The transformation to a thin wrapper that delegates to effect-based implementations is well-structured. The dispatcher pattern provides a clean interface for workflow execution.

apps/server/src/pipelines.effect.ts (1)

505-590: Good implementation of message batching for rate limiting

The batching logic with concurrent processing limit and delays between batches is well-implemented to prevent overwhelming the AI service.

@MrgSub MrgSub force-pushed the 07-09-refactor_pipelines branch from ce9e86d to 451f397 Compare July 9, 2025 18:41
cursor[bot]

This comment was marked as outdated.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (6)
apps/server/src/pipelines.effect.ts (6)

83-105: Remove unused override logic and simplify environment variable handling

The override variable is set to false, making the conditional check unnecessary. This appears to be leftover from development testing.

-const override = false;
-
 export const runMainWorkflow = (
   params: MainWorkflowParams,
 ): Effect.Effect<string, MainWorkflowError> =>
   Effect.gen(function* () {
     yield* Console.log('[MAIN_WORKFLOW] Starting workflow with payload:', params);

     const { providerId, historyId, subscriptionName } = params;

-    let serviceAccount = null;
-    if (override) {
-      serviceAccount = override;
-    } else {
-      if (!env.GOOGLE_S_ACCOUNT || env.GOOGLE_S_ACCOUNT === '{}') {
-        return yield* Effect.fail({
-          _tag: 'MissingEnvironmentVariable' as const,
-          variable: 'GOOGLE_S_ACCOUNT',
-        });
-      }
-
-      serviceAccount = JSON.parse(env.GOOGLE_S_ACCOUNT);
-    }
+    if (!env.GOOGLE_S_ACCOUNT || env.GOOGLE_S_ACCOUNT === '{}') {
+      return yield* Effect.fail({
+        _tag: 'MissingEnvironmentVariable' as const,
+        variable: 'GOOGLE_S_ACCOUNT',
+      });
+    }
+
+    const serviceAccount = JSON.parse(env.GOOGLE_S_ACCOUNT);

176-200: Fix race condition in processing flag check

The current implementation checks the processing flag before setting it, creating a race condition where multiple workflows could start processing the same history concurrently.

Consider using an atomic "check-and-set" operation or setting the flag first, then checking if it was already set by another process.


340-347: Uncomment the cleanup code to prevent stuck processing flags

The cleanup logic is essential for clearing processing flags after workflow completion. Without it, subsequent workflows may be incorrectly blocked.

-      //   // Clean up processing flag
-      //   yield* Effect.tryPromise({
-      //     try: () => {
-      //       console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey);
-      //       return env.gmail_processing_threads.delete(historyProcessingKey);
-      //     },
-      //     catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
-      //   }).pipe(Effect.orElse(() => Effect.succeed(null)));
+      // Clean up processing flag
+      yield* Effect.tryPromise({
+        try: () => {
+          console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey);
+          return env.gmail_processing_threads.delete(historyProcessingKey);
+        },
+        catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
+      }).pipe(Effect.orElse(() => Effect.succeed(null)));

202-222: Extract duplicated database connection logic into a shared function

The database connection and query logic is duplicated between runZeroWorkflow and runThreadWorkflow, violating DRY principles.

Consider creating a shared function as suggested in the previous review to reduce duplication and ensure consistent error handling.

Also applies to: 410-429


400-836: Refactor the long thread workflow function for better maintainability

The runThreadWorkflow function is over 400 lines long and handles multiple responsibilities. This makes it difficult to maintain, test, and understand.

Consider extracting logical sections into separate functions as suggested in the previous review for improved readability and maintainability.


296-319: Fix race condition in thread processing logic

The code sets the processing flag before checking if the thread is already being processed. This makes the check ineffective as it will never detect concurrent processing.

-              // Set processing flag for thread
-              yield* Effect.tryPromise({
-                try: () => {
-                  console.log(
-                    '[ZERO_WORKFLOW] Setting processing flag for thread:',
-                    params.threadId,
-                  );
-                  return env.gmail_processing_threads.put(params.threadId.toString(), 'true', {
-                    expirationTtl: 1800,
-                  });
-                },
-                catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
-              });
-
               // Check if thread is already processing
               const isProcessing = yield* Effect.tryPromise({
                 try: () => env.gmail_processing_threads.get(params.threadId.toString()),
                 catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
               });

               if (isProcessing === 'true') {
                 yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId);
                 return 'Thread already processing';
               }
+
+              // Set processing flag for thread only if not already processing
+              yield* Effect.tryPromise({
+                try: () => {
+                  console.log(
+                    '[ZERO_WORKFLOW] Setting processing flag for thread:',
+                    params.threadId,
+                  );
+                  return env.gmail_processing_threads.put(params.threadId.toString(), 'true', {
+                    expirationTtl: 1800,
+                  });
+                },
+                catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
+              });
🧹 Nitpick comments (1)
apps/server/src/pipelines.effect.ts (1)

31-39: Make logging configurable via environment variable

The showLogs flag is hardcoded to true. In production, you may want to control logging verbosity through configuration.

-const showLogs = true;
+const showLogs = env.ENABLE_PIPELINE_LOGS === 'true' || env.NODE_ENV === 'development';

 const log = (message: string, ...args: any[]) => {
   if (showLogs) {
     console.log(message, ...args);
     return message;
   }
   return 'no message';
 };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ce9e86d and 451f397.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (11)
  • apps/mail/components/mail/mail.tsx (1 hunks)
  • apps/mail/components/party.tsx (1 hunks)
  • apps/mail/providers/query-provider.tsx (1 hunks)
  • apps/server/package.json (1 hunks)
  • apps/server/src/lib/factories/google-subscription.factory.ts (1 hunks)
  • apps/server/src/main.ts (7 hunks)
  • apps/server/src/pipelines.effect.ts (1 hunks)
  • apps/server/src/pipelines.ts (1 hunks)
  • apps/server/src/routes/chat.ts (8 hunks)
  • apps/server/src/types.ts (1 hunks)
  • apps/server/wrangler.jsonc (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • apps/server/package.json
🚧 Files skipped from review as they are similar to previous changes (8)
  • apps/server/wrangler.jsonc
  • apps/mail/components/mail/mail.tsx
  • apps/server/src/types.ts
  • apps/mail/components/party.tsx
  • apps/mail/providers/query-provider.tsx
  • apps/server/src/lib/factories/google-subscription.factory.ts
  • apps/server/src/main.ts
  • apps/server/src/routes/chat.ts
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: JagjeevanAK
PR: Mail-0/Zero#1583
File: apps/docs/package.json:1-0
Timestamp: 2025-07-01T12:53:32.495Z
Learning: The Zero project prefers to handle dependency updates through automated tools like Dependabot rather than immediate manual updates, allowing for proper testing and validation through their established workflow.
apps/server/src/pipelines.effect.ts (2)
Learnt from: retrogtx
PR: Mail-0/Zero#1468
File: apps/server/src/trpc/routes/mail.ts:386-391
Timestamp: 2025-06-27T04:59:29.731Z
Learning: In apps/server/src/trpc/routes/mail.ts, the attachment processing logic conditionally handles mixed attachment types - it preserves existing File-like objects with arrayBuffer methods while only converting serialized attachments that need processing through toAttachmentFiles.
Learnt from: retrogtx
PR: Mail-0/Zero#1622
File: apps/server/src/lib/email-verification.ts:189-189
Timestamp: 2025-07-05T05:27:24.592Z
Learning: During testing phases, debug logging should be kept active in apps/server/src/lib/email-verification.ts for BIMI validation and email verification debugging, even if it's verbose.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Cursor BugBot
🔇 Additional comments (2)
apps/server/src/pipelines.effect.ts (2)

360-381: Excellent error handling and cleanup implementation

The error handling pattern with proper cleanup of processing flags in the catchAll blocks ensures resources are released even when errors occur. This prevents resource leaks and stuck states.

Also applies to: 817-835


861-869: Secure XML escaping implementation

The escapeXml function properly escapes all XML special characters, preventing XML injection vulnerabilities. This is a critical security measure when dealing with user-generated content.

@MrgSub MrgSub force-pushed the 07-09-refactor_pipelines branch from 451f397 to 181e5f8 Compare July 9, 2025 18:55
cursor[bot]

This comment was marked as outdated.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (8)
apps/server/src/pipelines.ts (2)

21-24: Remove duplicate getPromptName function

This function is already defined and exported from ./pipelines.effect.ts. Import it instead of duplicating the implementation.

-// Helper function for generating prompt names
-export const getPromptName = (connectionId: string, prompt: EPrompts) => {
-  return `${connectionId}-${prompt}`;
-};
+// Re-export the helper function
+export { getPromptName } from './pipelines.effect';

27-41: Fix type casting to maintain type safety

The as any type casting defeats TypeScript's type safety. Define proper parameter types for each workflow to ensure compile-time type checking.

+type ZeroWorkflowParams = {
+  connectionId: string;
+  historyId: string;
+  nextHistoryId: string;
+};
+
+type ThreadWorkflowParams = {
+  connectionId: string;
+  threadId: string;
+  providerId: string;
+};
+
 export const runWorkflow = (
   workflowType: 'main' | 'zero' | 'thread',
-  params: { providerId: string; historyId: string; subscriptionName: string },
+  params: any,
 ): Effect.Effect<string, any> => {
   switch (workflowType) {
     case 'main':
-      return runMainWorkflow(params);
+      return runMainWorkflow(params as { providerId: string; historyId: string; subscriptionName: string });
     case 'zero':
-      return runZeroWorkflow(params as any);
+      return runZeroWorkflow(params as ZeroWorkflowParams);
     case 'thread':
-      return runThreadWorkflow(params as any);
+      return runThreadWorkflow(params as ThreadWorkflowParams);
     default:
       return Effect.fail({ _tag: 'UnsupportedWorkflow', workflowType });
   }
 };
apps/server/src/pipelines.effect.ts (6)

84-106: Fix unreachable environment variable logic

The environment variable check is unreachable because override is always falsy (set to false).

-    let serviceAccount = null;
-    if (override) {
-      serviceAccount = override;
-    } else {
-      if (!env.GOOGLE_S_ACCOUNT || env.GOOGLE_S_ACCOUNT === '{}') {
-        return yield* Effect.fail({
-          _tag: 'MissingEnvironmentVariable' as const,
-          variable: 'GOOGLE_S_ACCOUNT',
-        });
-      }
-
-      serviceAccount = JSON.parse(env.GOOGLE_S_ACCOUNT);
-    }
+    if (!env.GOOGLE_S_ACCOUNT || env.GOOGLE_S_ACCOUNT === '{}') {
+      return yield* Effect.fail({
+        _tag: 'MissingEnvironmentVariable' as const,
+        variable: 'GOOGLE_S_ACCOUNT',
+      });
+    }
+
+    const serviceAccount = JSON.parse(env.GOOGLE_S_ACCOUNT);

341-348: Uncomment the cleanup code to prevent stuck processing flags

The commented cleanup code should be active to ensure processing flags are properly cleared.

-      //   // Clean up processing flag
-      //   yield* Effect.tryPromise({
-      //     try: () => {
-      //       console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey);
-      //       return env.gmail_processing_threads.delete(historyProcessingKey);
-      //     },
-      //     catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
-      //   }).pipe(Effect.orElse(() => Effect.succeed(null)));
+      // Clean up processing flag
+      yield* Effect.tryPromise({
+        try: () => {
+          console.log('[ZERO_WORKFLOW] Clearing processing flag for history:', historyProcessingKey);
+          return env.gmail_processing_threads.delete(historyProcessingKey);
+        },
+        catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
+      }).pipe(Effect.orElse(() => Effect.succeed(null)));

203-223: Extract duplicated database connection logic

The database connection and query logic is duplicated between runZeroWorkflow and runThreadWorkflow.

Consider extracting a shared function:

const getConnectionById = (connectionId: string): Effect.Effect<Connection, DatabaseError> =>
  Effect.gen(function* () {
    const { db, conn } = createDb(env.HYPERDRIVE.connectionString);
    
    try {
      const [foundConnection] = await db
        .select()
        .from(connection)
        .where(eq(connection.id, connectionId));
      
      if (!foundConnection) {
        throw new Error(`Connection not found ${connectionId}`);
      }
      
      if (!foundConnection.accessToken || !foundConnection.refreshToken) {
        throw new Error(`Connection is not authorized ${connectionId}`);
      }
      
      return foundConnection;
    } finally {
      await conn.end();
    }
  });

This reduces duplication and ensures consistent error handling and resource cleanup.


401-837: Refactor the long thread workflow function for better maintainability

The runThreadWorkflow function is over 400 lines long and handles multiple responsibilities. Consider breaking it down into smaller, focused functions.

Consider extracting these logical sections into separate functions:

  • Message vectorization logic (lines 468-608)
  • Thread summarization logic (lines 610-649)
  • Label generation and application logic (lines 659-764)
  • Vector upsertion logic (lines 766-802)

This would improve readability, testability, and maintainability. Each extracted function should have a single responsibility and clear error handling.


912-932: Refactor getPromptName into a shared helper

There are three identical definitions of getPromptName in the repo, which violates DRY and makes future updates error-prone:

• apps/server/src/pipelines.ts
• apps/server/src/pipelines.effect.ts
• apps/server/src/lib/brain.ts

Action items:

  • Extract getPromptName into a single file (e.g. src/utils/getPromptName.ts).
  • Remove the duplicate definitions in pipelines.effect.ts and lib/brain.ts.
  • Update those files (and any others) to import the shared helper.

This will centralize changes to how prompt names are built and keep the codebase consistent.


310-320: Fix thread processing race condition

There's a logic error in the thread processing flow. The code sets the processing flag first, then immediately checks if the thread is already processing, which will always be true.

-              // Set processing flag for thread
-              yield* Effect.tryPromise({
-                try: () => {
-                  console.log(
-                    '[ZERO_WORKFLOW] Setting processing flag for thread:',
-                    params.threadId,
-                  );
-                  return env.gmail_processing_threads.put(params.threadId.toString(), 'true', {
-                    expirationTtl: 1800,
-                  });
-                },
-                catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
-              });
-
               // Check if thread is already processing
               const isProcessing = yield* Effect.tryPromise({
                 try: () => env.gmail_processing_threads.get(params.threadId.toString()),
                 catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
               });

               if (isProcessing === 'true') {
                 yield* Console.log('[ZERO_WORKFLOW] Thread already processing:', params.threadId);
                 return 'Thread already processing';
               }

+              // Set processing flag for thread
+              yield* Effect.tryPromise({
+                try: () => {
+                  console.log(
+                    '[ZERO_WORKFLOW] Setting processing flag for thread:',
+                    params.threadId,
+                  );
+                  return env.gmail_processing_threads.put(params.threadId.toString(), 'true', {
+                    expirationTtl: 1800,
+                  });
+                },
+                catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
+              });
🧹 Nitpick comments (1)
apps/server/src/pipelines.effect.ts (1)

32-40: Consider making logging configurable

The showLogs constant is hardcoded to true, which may generate excessive logging in production. Consider making this configurable via environment variables.

-const showLogs = true;
+const showLogs = env.DEBUG_LOGS === 'true' || env.NODE_ENV === 'development';
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 451f397 and 181e5f8.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (12)
  • apps/mail/components/mail/mail.tsx (1 hunks)
  • apps/mail/components/party.tsx (1 hunks)
  • apps/mail/providers/query-provider.tsx (1 hunks)
  • apps/server/package.json (1 hunks)
  • apps/server/src/lib/brain.ts (1 hunks)
  • apps/server/src/lib/factories/google-subscription.factory.ts (1 hunks)
  • apps/server/src/main.ts (7 hunks)
  • apps/server/src/pipelines.effect.ts (1 hunks)
  • apps/server/src/pipelines.ts (1 hunks)
  • apps/server/src/routes/chat.ts (8 hunks)
  • apps/server/src/types.ts (1 hunks)
  • apps/server/wrangler.jsonc (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • apps/server/src/lib/brain.ts
🚧 Files skipped from review as they are similar to previous changes (9)
  • apps/server/package.json
  • apps/server/src/lib/factories/google-subscription.factory.ts
  • apps/mail/components/mail/mail.tsx
  • apps/mail/providers/query-provider.tsx
  • apps/server/src/types.ts
  • apps/server/wrangler.jsonc
  • apps/mail/components/party.tsx
  • apps/server/src/main.ts
  • apps/server/src/routes/chat.ts
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: JagjeevanAK
PR: Mail-0/Zero#1583
File: apps/docs/package.json:1-0
Timestamp: 2025-07-01T12:53:32.495Z
Learning: The Zero project prefers to handle dependency updates through automated tools like Dependabot rather than immediate manual updates, allowing for proper testing and validation through their established workflow.
apps/server/src/pipelines.effect.ts (2)
Learnt from: retrogtx
PR: Mail-0/Zero#1468
File: apps/server/src/trpc/routes/mail.ts:386-391
Timestamp: 2025-06-27T04:59:29.731Z
Learning: In apps/server/src/trpc/routes/mail.ts, the attachment processing logic conditionally handles mixed attachment types - it preserves existing File-like objects with arrayBuffer methods while only converting serialized attachments that need processing through toAttachmentFiles.
Learnt from: retrogtx
PR: Mail-0/Zero#1622
File: apps/server/src/lib/email-verification.ts:189-189
Timestamp: 2025-07-05T05:27:24.592Z
Learning: During testing phases, debug logging should be kept active in apps/server/src/lib/email-verification.ts for BIMI validation and email verification debugging, even if it's verbose.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Cursor BugBot
  • GitHub Check: Analyze (javascript-typescript)

@MrgSub MrgSub force-pushed the 07-09-refactor_pipelines branch from 181e5f8 to 200d4e8 Compare July 9, 2025 20:39
Copy link
Collaborator Author

MrgSub commented Jul 9, 2025

Merge activity

  • Jul 9, 8:41 PM UTC: A user started a stack merge that includes this pull request via Graphite.
  • Jul 9, 8:42 PM UTC: @MrgSub merged this pull request with Graphite.

@MrgSub MrgSub merged commit a48775d into staging Jul 9, 2025
6 of 7 checks passed
@MrgSub MrgSub deleted the 07-09-refactor_pipelines branch July 9, 2025 20:42
const dataStreamResponse = createDataStreamResponse({
execute: async (dataStream) => {
const connectionId = this.name;
if (connectionId === 'general') return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding curly braces to the early return statement for consistency with the codebase's style conventions. Even for single-line control statements, braces help prevent potential bugs during future modifications:

if (connectionId === 'general') {
  return;
}

This pattern appears in multiple places throughout the PR and would benefit from consistent formatting.

Suggested change
if (connectionId === 'general') return;
if (connectionId === 'general') return;

Spotted by Diamond (based on custom rules)

Is this helpful? React 👍 or 👎 to let us know.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Dead Code and Incorrect Assignment

The override variable is hardcoded to false, rendering the if (override) block dead code. Furthermore, if override were ever set to true, serviceAccount would be incorrectly assigned the boolean true instead of a valid service account object, leading to runtime errors.

apps/server/src/pipelines.effect.ts#L83-L97

const override = false;
export const runMainWorkflow = (
params: MainWorkflowParams,
): Effect.Effect<string, MainWorkflowError> =>
Effect.gen(function* () {
yield* Console.log('[MAIN_WORKFLOW] Starting workflow with payload:', params);
const { providerId, historyId, subscriptionName } = params;
let serviceAccount = null;
if (override) {
serviceAccount = override;
} else {

Fix in CursorFix in Web


Bug: Query Dehydration Fails on String Query Keys

The shouldDehydrateQuery function in the query provider incorrectly assumes query.queryKey[0] is always a string array. It attempts to call .some() on it, which causes a runtime TypeError and crashes the provider during query dehydration if query.queryKey[0] is a string (e.g., ['posts', ...]) instead of an array.

apps/mail/providers/query-provider.tsx#L123-L128

maxAge: 1000 * 60 * 1, // 1 minute, we're storing in DOs,
dehydrateOptions: {
shouldDehydrateQuery(query) {
return (query.queryKey[0] as string[]).some((e) => e === 'listThreads');
},
},

Fix in CursorFix in Web


Bug: Missing Return for Non-Google Providers

The /a8n/notify/:providerId endpoint handler fails to return a response for non-Google providers. A previously existing early return was removed, and the current return statement is now exclusively within the if (providerId === EProviders.google) block. This omission causes HTTP requests for other providers to hang or result in undefined behavior.

apps/server/src/main.ts#L643-L675

.get('/health', (c) => c.json({ message: 'Zero Server is Up!' }))
.get('/', (c) => c.redirect(`${env.VITE_PUBLIC_APP_URL}`))
.post('/a8n/notify/:providerId', async (c) => {
if (!c.req.header('Authorization')) return c.json({ error: 'Unauthorized' }, { status: 401 });
const providerId = c.req.param('providerId');
if (providerId === EProviders.google) {
const body = await c.req.json<{ historyId: string }>();
const subHeader = c.req.header('x-goog-pubsub-subscription-name');
if (!subHeader) {
console.log('[GOOGLE] no subscription header', body);
return c.json({}, { status: 200 });
}
const isValid = await verifyToken(c.req.header('Authorization')!.split(' ')[1]);
if (!isValid) {
console.log('[GOOGLE] invalid request', body);
return c.json({}, { status: 200 });
}
try {
await env.thread_queue.send({
providerId,
historyId: body.historyId,
subscriptionName: subHeader!,
});
} catch (error) {
console.error('Error sending to thread queue', error, {
providerId,
historyId: body.historyId,
subscriptionName: subHeader,
});
}
return c.json({ message: 'OK' }, { status: 200 });
}
});

Fix in CursorFix in Web


Was this report helpful? Give feedback by reacting with 👍 or 👎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant