Skip to content

Comments

Optimize thread synchronization with rate limiting and concurrency control#1845

Merged
MrgSub merged 1 commit intostagingfrom
ZEROOptimize_thread_synchronization_with_rate_limiting_and_concurrency_control
Jul 28, 2025
Merged

Optimize thread synchronization with rate limiting and concurrency control#1845
MrgSub merged 1 commit intostagingfrom
ZEROOptimize_thread_synchronization_with_rate_limiting_and_concurrency_control

Conversation

@MrgSub
Copy link
Collaborator

@MrgSub MrgSub commented Jul 27, 2025

READ CAREFULLY THEN REMOVE

Remove bullet points that are not relevant.

PLEASE REFRAIN FROM USING AI TO WRITE YOUR CODE AND PR DESCRIPTION. IF YOU DO USE AI TO WRITE YOUR CODE PLEASE PROVIDE A DESCRIPTION AND REVIEW IT CAREFULLY. MAKE SURE YOU UNDERSTAND THE CODE YOU ARE SUBMITTING USING AI.

  • Pull requests that do not follow these guidelines will be closed without review or comment.
  • If you use AI to write your PR description your pr will be close without review or comment.
  • If you are unsure about anything, feel free to ask for clarification.

Description

Please provide a clear description of your changes.


Type of Change

Please delete options that are not relevant.

  • 🐛 Bug fix (non-breaking change which fixes an issue)
  • ✨ New feature (non-breaking change which adds functionality)
  • 💥 Breaking change (fix or feature with breaking changes)
  • 📝 Documentation update
  • 🎨 UI/UX improvement
  • 🔒 Security enhancement
  • ⚡ Performance improvement

Areas Affected

Please check all that apply:

  • Email Integration (Gmail, IMAP, etc.)
  • User Interface/Experience
  • Authentication/Authorization
  • Data Storage/Management
  • API Endpoints
  • Documentation
  • Testing Infrastructure
  • Development Workflow
  • Deployment/Infrastructure

Testing Done

Describe the tests you've done:

  • Unit tests added/updated
  • Integration tests added/updated
  • Manual testing performed
  • Cross-browser testing (if UI changes)
  • Mobile responsiveness verified (if UI changes)

Security Considerations

For changes involving data or authentication:

  • No sensitive data is exposed
  • Authentication checks are in place
  • Input validation is implemented
  • Rate limiting is considered (if applicable)

Checklist

  • I have read the CONTRIBUTING document
  • My code follows the project's style guidelines
  • I have performed a self-review of my code
  • I have commented my code, particularly in complex areas
  • I have updated the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix/feature works
  • All tests pass locally
  • Any dependent changes are merged and published

Additional Notes

Add any other context about the pull request here.

Screenshots/Recordings

Add screenshots or recordings here if applicable.


By submitting this pull request, I confirm that my contribution is made under the terms of the project's license.


Summary by cubic

Improved thread synchronization by adding rate limiting and stricter concurrency control to prevent API overload and duplicate syncs.

  • Performance
    • Added delays between thread and page syncs to respect rate limits.
    • Limited concurrent thread syncs to one at a time.

Summary by CodeRabbit

  • Refactor
    • Enhanced topic retrieval with caching and detailed status reporting.
    • Improved thread and folder synchronization with robust error handling, concurrency control, and progress tracking.
    • Added detailed sync result reporting and broadcast updates for better synchronization visibility.
    • Minor fix to label handling to prevent data issues.

No new user-facing features introduced; improvements focus on reliability and transparency of background sync processes.

@jazzberry-ai
Copy link

jazzberry-ai bot commented Jul 27, 2025

Bug Report

Name Severity Example test case Description
Rate Limiting Inefficiency Medium Set a very low rate limit on the Gmail API and observe the sync behavior. The static sleep durations may not be optimal.
Concurrency Bottleneck Medium Measure the sync time for a large inbox with concurrency set to 1 and then compare it to a higher concurrency level. Limiting concurrency to 1 might be too restrictive.
Missing Error Reporting Medium Intentionally cause errors during thread synchronization and verify that the user receives an error message. Failed threads are not reported to the user.
Lack of Progress Indication Low Start a sync and observe the UI to see if there's any indication of progress. The user lacks feedback on the sync's progress.
Potential Data Loss Medium Simulate a network outage during thread synchronization and verify that the failed threads are eventually synced. If self.syncThread persistently fails, the thread is skipped with no recourse.
shouldLoop Dependency High Set THREAD_SYNC_LOOP to false, sync, and ensure more than the first page is not synced. If THREAD_SYNC_LOOP is not properly configured, only the first page of threads will be synchronized.
No Error Logging for Initial API Calls Medium Simulate a failure in the initial driver.list or driver.get call and observe whether the error is logged and handled correctly. Errors occurring during the initial API calls in listWithRetry and getWithRetry are not properly caught or logged.

Comments? Email us. Your free trial ends in 2 days.

@MrgSub MrgSub marked this pull request as ready for review July 27, 2025 23:30
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jul 27, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

The ZeroDriver class underwent a major overhaul with effect-based asynchronous control flow and robust typed error handling introduced in getUserTopics, syncThread, and syncThreads. These methods now return detailed result objects, incorporate retries, rate limiting, concurrency control, and improved logging. Minor signature and import adjustments were made in related RPC code.

Changes

Cohort / File(s) Change Summary
apps/server/src/routes/agent/index.ts Comprehensive refactor of ZeroDriver: effect-based async control with retries, typed errors, detailed result types, concurrency control (sequential syncThreads), caching logic, label management, and broadcast updates. Minor fix in JSON parsing.
apps/server/src/routes/agent/rpc.ts Consolidated imports for ZeroDriver and FolderSyncResult; updated DriverRpcDO.syncThreads method signature to explicitly return Promise<FolderSyncResult>.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant ZeroDriver
    participant DurableStorage
    participant LabelService
    participant Agent
    participant Database

    Caller->>ZeroDriver: getUserTopics()
    ZeroDriver->>DurableStorage: fetch cached topics
    alt Cache valid and fresh
        DurableStorage-->>ZeroDriver: cached topics
    else Cache missing or stale
        ZeroDriver->>LabelService: generate topics & ensure labels
        LabelService-->>ZeroDriver: topics with labels
        ZeroDriver->>DurableStorage: store topics
    end
    ZeroDriver->>Agent: broadcast topic update (if available)
    ZeroDriver-->>Caller: topics array

    Caller->>ZeroDriver: syncThread(threadId)
    alt Driver unavailable or already syncing
        ZeroDriver-->>Caller: early exit with failure reason
    else
        ZeroDriver->>ZeroDriver: set syncing flag
        ZeroDriver->>Driver: authenticate if needed
        Driver-->>ZeroDriver: auth success
        ZeroDriver->>Driver: fetch thread data (with retry)
        Driver-->>ZeroDriver: thread data
        ZeroDriver->>Database: store thread data
        Database-->>ZeroDriver: ack
        ZeroDriver->>Agent: broadcast thread update (if available)
        ZeroDriver->>ZeroDriver: clear syncing flag
        ZeroDriver-->>Caller: detailed sync result
    end

    Caller->>ZeroDriver: syncThreads(folder)
    alt Driver unavailable or syncing in progress
        ZeroDriver-->>Caller: early exit with failure reason
    else
        ZeroDriver->>ZeroDriver: set folder syncing flag
        loop For each page of threads
            ZeroDriver->>Driver: list threads (with retry)
            Driver-->>ZeroDriver: thread list
            loop For each thread (sequentially)
                ZeroDriver->>ZeroDriver: rate-limit delay
                ZeroDriver->>ZeroDriver: syncThread(threadId)
            end
        end
        ZeroDriver->>Agent: broadcast folder sync complete (if available)
        ZeroDriver->>ZeroDriver: clear folder syncing flag
        ZeroDriver-->>Caller: detailed folder sync result
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

Suggested labels

high priority

Poem

🚀
Threads and topics, now in flight,
Effects and errors, handled right.
Syncs run smooth, one by one,
Labels created, cache well spun.
ZeroDriver’s power, fully unlocked,
Code refined, the future rocked!
🌠


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0656c1d and 8a1871a.

📒 Files selected for processing (2)
  • apps/server/src/routes/agent/index.ts (5 hunks)
  • apps/server/src/routes/agent/rpc.ts (2 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch ZEROOptimize_thread_synchronization_with_rate_limiting_and_concurrency_control

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugbot free trial expires on July 29, 2025
Learn more in the Cursor dashboard.

@coderabbitai coderabbitai bot added the High Priority High Priority Work label Jul 27, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7cd0e40 and 0656c1d.

📒 Files selected for processing (1)
  • apps/server/src/routes/agent/index.ts (3 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{js,jsx,ts,tsx}

📄 CodeRabbit Inference Engine (AGENT.md)

**/*.{js,jsx,ts,tsx}: Use 2-space indentation
Use single quotes
Limit lines to 100 characters in width
Semicolons are required

Files:

  • apps/server/src/routes/agent/index.ts
**/*.{js,jsx,ts,tsx,css}

📄 CodeRabbit Inference Engine (AGENT.md)

Use Prettier with sort-imports and Tailwind plugins

Files:

  • apps/server/src/routes/agent/index.ts
**/*.{ts,tsx}

📄 CodeRabbit Inference Engine (AGENT.md)

Enable TypeScript strict mode

Files:

  • apps/server/src/routes/agent/index.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: Cursor Bugbot
🔇 Additional comments (1)
apps/server/src/routes/agent/index.ts (1)

576-585: Solid error handling improvement here! 🚀

The new syncSingleThread function properly encapsulates the retry logic and handles errors gracefully by returning null instead of throwing. This prevents the entire sync operation from failing due to a single thread issue.

});

return 1 as const;
yield* Effect.sleep(500); // Rate limiting delay
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

These hardcoded delays need some flexibility! 🎯

Both the 500ms thread delay and 2000ms page delay are hardcoded. Different scenarios might need different rate limiting strategies.

Add environment variables for configurability:

-        yield* Effect.sleep(500); // Rate limiting delay
+        const threadDelay = env.THREAD_SYNC_DELAY ? parseInt(env.THREAD_SYNC_DELAY) : 500;
+        yield* Effect.sleep(threadDelay); // Rate limiting delay

And for the page delay:

-          yield* Effect.sleep(2000);
+          const pageDelay = env.THREAD_SYNC_PAGE_DELAY ? parseInt(env.THREAD_SYNC_PAGE_DELAY) : 2000;
+          yield* Effect.sleep(pageDelay);

Also applies to: 598-598

🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts at lines 578 and 598, replace the
hardcoded 500ms and 2000ms delays with values read from environment variables to
allow configurable rate limiting. Define new environment variables for these
delays, parse them as integers with fallback defaults (e.g., 500 and 2000), and
use these variables in place of the fixed numbers in the Effect.sleep calls.


const batchSynced = processedCounts.filter((c) => c === 1).length;
totalSynced += batchSynced;
yield* Effect.all(syncEffects, { concurrency: 1, discard: true });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Hold up - this concurrency change is pretty dramatic! 🤔

Reducing concurrency from 3 to 1 means thread syncing will take 3x longer. While this definitely helps with rate limiting, it might significantly impact user experience, especially for accounts with many threads.

Consider making this configurable or implementing adaptive concurrency that starts low and increases if no rate limit errors occur.

-          yield* Effect.all(syncEffects, { concurrency: 1, discard: true });
+          const concurrency = env.THREAD_SYNC_CONCURRENCY ? parseInt(env.THREAD_SYNC_CONCURRENCY) : 1;
+          yield* Effect.all(syncEffects, { concurrency, discard: true });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
yield* Effect.all(syncEffects, { concurrency: 1, discard: true });
const concurrency = env.THREAD_SYNC_CONCURRENCY
? parseInt(env.THREAD_SYNC_CONCURRENCY)
: 1;
yield* Effect.all(syncEffects, { concurrency, discard: true });
🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts at line 612, the concurrency for thread
syncing is reduced from 3 to 1, which slows down syncing significantly. Modify
the code to make the concurrency level configurable via a parameter or
environment variable, or implement adaptive concurrency logic that begins with
concurrency 1 and increases it gradually if no rate limit errors are detected,
to balance rate limiting and performance.

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cubic analysis

No issues found across 1 file. Review in cubic

@MrgSub MrgSub force-pushed the ZEROOptimize_thread_synchronization_with_rate_limiting_and_concurrency_control branch from 0656c1d to 8a1871a Compare July 28, 2025 00:00
@jazzberry-ai
Copy link

jazzberry-ai bot commented Jul 28, 2025

Bug Report

Name Severity Example test case Description
getUserTopics - Potential infinite loop and stale cache issues High Simulate a storage quota error and call getUserTopics. A persistent error during storage put could lead to the function repeatedly trying to regenerate topics and failing to update the cache, causing high resource consumption and stale data.
syncThread - Potential race condition in syncThreadsInProgress map High Simulate a failure in this.getWithRetry after syncThreadsInProgress.set. If setupAuth or getWithRetry throw and are not caught by Effect, the syncThreadsInProgress map entry will not be deleted, leading to a deadlock.
syncThreads - Concurrency issues with this.foldersInSync High Simulate a failure in self.listWithRetry after foldersInSync.set. If self.listWithRetry throw and are not caught by Effect, the foldersInSync map entry will not be deleted, leading to a deadlock.
Error Handling in getUserTopics and syncThreads Medium Simulate a critical error in topic generation and call getUserTopics. The catchAll blocks always return success. The errors should be thrown to the parent process or have an option to retry the operation.
Potential XSS vulnerability Critical Send an email with a subject containing JavaScript code. The latest.subject is inserted directly into the database without sanitization, potentially leading to XSS.

Comments? Email us. Your free trial ends in 2 days.

Copy link
Collaborator Author

MrgSub commented Jul 28, 2025

This stack of pull requests is managed by Graphite. Learn more about stacking.

Copy link
Collaborator Author

MrgSub commented Jul 28, 2025

Merge activity

  • Jul 28, 12:01 AM UTC: A user started a stack merge that includes this pull request via Graphite.
  • Jul 28, 12:01 AM UTC: @MrgSub merged this pull request with Graphite.

@MrgSub MrgSub merged commit 7cd7d85 into staging Jul 28, 2025
6 of 7 checks passed
@MrgSub MrgSub deleted the ZEROOptimize_thread_synchronization_with_rate_limiting_and_concurrency_control branch July 28, 2025 00:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

High Priority High Priority Work

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant