Dedicated Per-thread Workers for Thread Syncing#1842
Dedicated Per-thread Workers for Thread Syncing#1842jonahseguin wants to merge 6 commits intoMail-0:stagingfrom
Conversation
… with thread sync worker durable objects
Bug Report
Comments? Email us. Your free trial ends in 2 days. |
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
WalkthroughThis change introduces a new Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ZeroDriver
participant ThreadSyncWorker
participant KVBucket
Client->>ZeroDriver: syncThread({ threadId })
ZeroDriver->>ThreadSyncWorker: syncThread(connectionId, _connection, threadId)
ThreadSyncWorker->>ZeroDriver: get driver from connection
ThreadSyncWorker->>ZeroDriver: fetch thread data with retry
ThreadSyncWorker->>KVBucket: store thread data
ThreadSyncWorker-->>ZeroDriver: return latest message
ZeroDriver-->>Client: respond (without thread data)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~15 minutes Possibly related PRs
Suggested labels
Poem
Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
cubic analysis
1 issue found across 4 files • Review in cubic
React with 👍 or 👎 to teach cubic. You can also tag @cubic-dev-ai to give feedback, ask questions, or re-run the review.
| "class_name": "ThinkingMCP", | ||
| }, | ||
| { | ||
| "name": "THREAD_SYNC_WORKER", |
There was a problem hiding this comment.
THREAD_SYNC_WORKER binding is only added for the local environment, so env.THREAD_SYNC_WORKER will be undefined in staging and production causing runtime failures.
Prompt for AI agents
Address the following comment on apps/server/wrangler.jsonc at line 51:
<comment>THREAD_SYNC_WORKER binding is only added for the local environment, so `env.THREAD_SYNC_WORKER` will be undefined in staging and production causing runtime failures.</comment>
<file context>
@@ -47,6 +47,10 @@
"name": "THINKING_MCP",
"class_name": "ThinkingMCP",
},
+ {
+ "name": "THREAD_SYNC_WORKER",
+ "class_name": "ThreadSyncWorker",
+ },
</file context>
There was a problem hiding this comment.
Actionable comments posted: 10
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (4)
apps/server/src/main.ts(2 hunks)apps/server/src/routes/agent/index.ts(8 hunks)apps/server/src/routes/agent/sync-worker.ts(1 hunks)apps/server/wrangler.jsonc(2 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{js,jsx,ts,tsx}
📄 CodeRabbit Inference Engine (AGENT.md)
**/*.{js,jsx,ts,tsx}: Use 2-space indentation
Use single quotes
Limit lines to 100 characters in width
Semicolons are required
Files:
apps/server/src/main.tsapps/server/src/routes/agent/sync-worker.tsapps/server/src/routes/agent/index.ts
**/*.{js,jsx,ts,tsx,css}
📄 CodeRabbit Inference Engine (AGENT.md)
Use Prettier with sort-imports and Tailwind plugins
Files:
apps/server/src/main.tsapps/server/src/routes/agent/sync-worker.tsapps/server/src/routes/agent/index.ts
**/*.{ts,tsx}
📄 CodeRabbit Inference Engine (AGENT.md)
Enable TypeScript strict mode
Files:
apps/server/src/main.tsapps/server/src/routes/agent/sync-worker.tsapps/server/src/routes/agent/index.ts
🧠 Learnings (2)
apps/server/src/routes/agent/sync-worker.ts (1)
Learnt from: retrogtx
PR: #1468
File: apps/server/src/trpc/routes/mail.ts:386-391
Timestamp: 2025-06-27T04:59:29.731Z
Learning: In apps/server/src/trpc/routes/mail.ts, the attachment processing logic conditionally handles mixed attachment types - it preserves existing File-like objects with arrayBuffer methods while only converting serialized attachments that need processing through toAttachmentFiles.
apps/server/src/routes/agent/index.ts (1)
Learnt from: MrgSub
PR: #1837
File: apps/server/src/lib/brain.fallback.prompts.ts:211-217
Timestamp: 2025-07-26T20:39:06.670Z
Learning: In the ThreadLabels prompt system, existing labels should not be automatically preserved. The AI agent should re-evaluate all labels (both existing and new) against the current thread summary and only return labels that currently apply to the thread content, even if it means dropping previously applied labels that are no longer relevant.
🔇 Additional comments (2)
apps/server/src/main.ts (1)
45-45: Good. The import and export are properly wired up.This correctly integrates the ThreadSyncWorker into the application. Clean and simple.
Also applies to: 858-858
apps/server/src/routes/agent/index.ts (1)
528-528: Good call removing threadData from the return. Less memory usage.This aligns with the memory optimization goals. The comment is helpful for future maintainers.
| export class ZeroDriver extends AIChatAgent<typeof env> { | ||
| private foldersInSync: Map<string, boolean> = new Map(); | ||
| private syncThreadsInProgress: Map<string, boolean> = new Map(); | ||
| private _connection: typeof connection.$inferSelect | undefined = undefined; |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Good move adding the connection field, but the naming could be better.
The underscore prefix is old school. Just call it connection and make it readonly to prevent accidental mutations.
- private _connection: typeof connection.$inferSelect | undefined = undefined;
+ private readonly connection: typeof connection.$inferSelect | undefined = undefined;Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts at line 69, rename the private field
from `_connection` to `connection` and add the `readonly` modifier to it. This
change improves naming clarity and prevents accidental mutations by making the
field immutable.
| this._connection = await db.query.connection.findFirst({ | ||
| where: eq(connection.id, this.name), | ||
| }); | ||
| if (_connection) this.driver = connectionToDriver(_connection); | ||
| if (this._connection) this.driver = connectionToDriver(this._connection); | ||
| this.ctx.waitUntil(conn.end()); |
There was a problem hiding this comment.
This is problematic. You're assigning to a readonly field after construction.
If we make connection readonly as suggested, this won't work. Either initialize it in the constructor or keep it mutable but add proper null checks everywhere.
Consider this pattern instead:
- this._connection = await db.query.connection.findFirst({
+ const conn = await db.query.connection.findFirst({
where: eq(connection.id, this.name),
});
- if (this._connection) this.driver = connectionToDriver(this._connection);
+ if (!conn) throw new Error(`Connection not found for ${this.name}`);
+ this._connection = conn;
+ this.driver = connectionToDriver(conn);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| this._connection = await db.query.connection.findFirst({ | |
| where: eq(connection.id, this.name), | |
| }); | |
| if (_connection) this.driver = connectionToDriver(_connection); | |
| if (this._connection) this.driver = connectionToDriver(this._connection); | |
| this.ctx.waitUntil(conn.end()); | |
| const conn = await db.query.connection.findFirst({ | |
| where: eq(connection.id, this.name), | |
| }); | |
| if (!conn) throw new Error(`Connection not found for ${this.name}`); | |
| this._connection = conn; | |
| this.driver = connectionToDriver(conn); | |
| this.ctx.waitUntil(conn.end()); |
🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts around lines 240 to 244, you are
assigning a value to a readonly field _connection after the object construction,
which is not allowed if the field is marked readonly. To fix this, either
initialize _connection during the constructor or remove the readonly modifier
and ensure you add null checks wherever _connection is accessed to handle its
potential absence safely.
| const threadSyncWorker = env.THREAD_SYNC_WORKER.get(env.THREAD_SYNC_WORKER.newUniqueId()); | ||
| // Sync thread with the worker | ||
| const latest = await threadSyncWorker.syncThread(this.name, this._connection, threadId); |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Creating a new Durable Object for each thread sync is wasteful.
I get that you want stateless workers, but creating a new DO instance for every single thread is like using a new rocket for every satellite. Consider a worker pool pattern.
The current approach will create thousands of DOs for large sync operations. Consider:
- Reusing DO instances with a cleanup mechanism between syncs
- Implementing a worker pool pattern with a fixed number of workers
- Using the Durable Object ID as a shard key to distribute load
Would you like me to open an issue to track this optimization?
🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts around lines 487 to 489, the code
creates a new Durable Object instance for each thread sync, which is inefficient
and can lead to excessive resource usage. Refactor this by implementing a worker
pool pattern that reuses a fixed number of Durable Object instances. Use the
Durable Object ID as a shard key to distribute load evenly, and add a cleanup
mechanism to reset worker state between syncs. This will optimize resource usage
and improve scalability.
| yield * Effect.sleep(200); | ||
|
|
||
| const threadData = yield* Effect.tryPromise(() => self.getWithRetry(threadId)); | ||
| // Uses the dedicated ThreadSyncWorker durable object | ||
| const result = yield * Effect.tryPromise(() => self.syncThread({ threadId })); | ||
|
|
||
| if (!threadData || !threadData.latest || !threadData.latest.threadId) { | ||
| if (result && result.success) { | ||
| return 1 as const; | ||
| } else { | ||
| console.log(`Failed to sync thread ${threadId}:`, result?.reason || 'Unknown error'); | ||
| return 0 as const; | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
The sleep duration seems arbitrary. Why 200ms?
Magic numbers without explanation are bad. Either make it configurable or add a comment explaining the rationale.
+ // Small delay to prevent rate limiting when processing multiple threads
+ const THREAD_SYNC_DELAY_MS = 200;
- yield * Effect.sleep(200);
+ yield * Effect.sleep(THREAD_SYNC_DELAY_MS);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| yield * Effect.sleep(200); | |
| const threadData = yield* Effect.tryPromise(() => self.getWithRetry(threadId)); | |
| // Uses the dedicated ThreadSyncWorker durable object | |
| const result = yield * Effect.tryPromise(() => self.syncThread({ threadId })); | |
| if (!threadData || !threadData.latest || !threadData.latest.threadId) { | |
| if (result && result.success) { | |
| return 1 as const; | |
| } else { | |
| console.log(`Failed to sync thread ${threadId}:`, result?.reason || 'Unknown error'); | |
| return 0 as const; | |
| } | |
| // Small delay to prevent rate limiting when processing multiple threads | |
| const THREAD_SYNC_DELAY_MS = 200; | |
| yield * Effect.sleep(THREAD_SYNC_DELAY_MS); | |
| // Uses the dedicated ThreadSyncWorker durable object | |
| const result = yield * Effect.tryPromise(() => self.syncThread({ threadId })); | |
| if (result && result.success) { | |
| return 1 as const; | |
| } else { | |
| console.log(`Failed to sync thread ${threadId}:`, result?.reason || 'Unknown error'); | |
| return 0 as const; | |
| } |
🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts around lines 580 to 590, the sleep
duration of 200ms is a magic number without explanation. To fix this, either
replace the hardcoded 200 with a named constant that clearly indicates its
purpose or make the duration configurable via a parameter or configuration
setting. Alternatively, add a comment above the sleep call explaining why 200ms
was chosen to clarify its intent.
| let currentLabels: string[]; | ||
| try { | ||
| currentLabels = JSON.parse(result[0].latest_label_ids || '[]') as string[]; | ||
| currentLabels = JSON.parse(result[0].latest_label_ids?.toString() || '[]') as string[]; |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
The toString() fix is good but could be cleaner.
Nice catch on the JSON parsing issue. Consider using optional chaining for even better safety.
- currentLabels = JSON.parse(result[0].latest_label_ids?.toString() || '[]') as string[];
+ currentLabels = JSON.parse(result[0]?.latest_label_ids?.toString() ?? '[]') as string[];📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| currentLabels = JSON.parse(result[0].latest_label_ids?.toString() || '[]') as string[]; | |
| currentLabels = JSON.parse(result[0]?.latest_label_ids?.toString() ?? '[]') as string[]; |
🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts at line 1006, improve the JSON parsing
line by using optional chaining to safely access latest_label_ids before calling
toString(), replacing the current usage with something like
result[0].latest_label_ids?.toString() to avoid errors if latest_label_ids is
undefined or null, ensuring cleaner and safer code.
| private getThreadKey(connectionId: string, threadId: string) { | ||
| return `${connectionId}/${threadId}.json`; | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
This method should be static. It doesn't use any instance state.
Private methods that don't access instance properties should be static for better performance and clarity.
- private getThreadKey(connectionId: string, threadId: string) {
+ private static getThreadKey(connectionId: string, threadId: string) {
return `${connectionId}/${threadId}.json`;
}And update the usage:
- this.getThreadKey(connectionId, threadId),
+ ThreadSyncWorker.getThreadKey(connectionId, threadId),🤖 Prompt for AI Agents
In apps/server/src/routes/agent/sync-worker.ts around lines 14 to 16, the method
getThreadKey does not use any instance state and should be made static. Change
the method declaration to static and update all calls to this method to use the
class name instead of an instance reference to reflect this change.
| public async syncThread( | ||
| connectionId: string, | ||
| _connection: typeof connection.$inferSelect, | ||
| threadId: string, | ||
| ): Promise<ParsedMessage | undefined> { | ||
| // Get driver from connection | ||
| const driver = connectionToDriver(_connection); | ||
| if (!driver) throw new Error('No driver available'); | ||
|
|
||
| // Get thread | ||
| const thread = await Effect.runPromise( | ||
| withRetry(Effect.tryPromise(() => driver!.get(threadId))), | ||
| ); | ||
|
|
||
| // Store thread | ||
| await env.THREADS_BUCKET.put( | ||
| this.getThreadKey(connectionId, threadId), | ||
| JSON.stringify(thread), | ||
| { | ||
| customMetadata: { | ||
| threadId, | ||
| }, | ||
| }, | ||
| ); | ||
|
|
||
| // Return latest message in thread | ||
| return thread.latest; | ||
| } | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Add error handling for R2 operations and validate inputs.
This method needs better error handling and input validation. What happens if R2 is down? What if threadId is invalid?
public async syncThread(
connectionId: string,
_connection: typeof connection.$inferSelect,
threadId: string,
): Promise<ParsedMessage | undefined> {
+ // Validate inputs
+ if (!connectionId || !threadId) {
+ throw new Error('Invalid connectionId or threadId');
+ }
+
// Get driver from connection
const driver = connectionToDriver(_connection);
if (!driver) throw new Error('No driver available');
// Get thread
const thread = await Effect.runPromise(
withRetry(Effect.tryPromise(() => driver.get(threadId))),
);
- // Store thread
- await env.THREADS_BUCKET.put(
- ThreadSyncWorker.getThreadKey(connectionId, threadId),
- JSON.stringify(thread),
- {
- customMetadata: {
- threadId,
- },
- },
- );
+ // Store thread with error handling
+ try {
+ await env.THREADS_BUCKET.put(
+ ThreadSyncWorker.getThreadKey(connectionId, threadId),
+ JSON.stringify(thread),
+ {
+ customMetadata: {
+ threadId,
+ syncedAt: new Date().toISOString(),
+ },
+ },
+ );
+ } catch (error) {
+ console.error(`Failed to store thread ${threadId} in R2:`, error);
+ throw new Error(`R2 storage failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
+ }
// Return latest message in thread
return thread.latest;
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In apps/server/src/routes/agent/sync-worker.ts lines 18 to 46, the syncThread
method lacks error handling for R2 storage operations and does not validate
inputs like threadId. Add checks to ensure threadId is valid before proceeding.
Wrap the R2 put operation in a try-catch block to handle potential failures such
as R2 downtime, and throw or return meaningful errors if these operations fail.
This will make the method more robust and prevent unhandled exceptions.
| const driver = connectionToDriver(_connection); | ||
| if (!driver) throw new Error('No driver available'); | ||
|
|
||
| // Get thread | ||
| const thread = await Effect.runPromise( | ||
| withRetry(Effect.tryPromise(() => driver!.get(threadId))), | ||
| ); |
There was a problem hiding this comment.
Remove the non-null assertion. Handle the error properly.
Using driver! with non-null assertion after checking for null is bad practice. The type system already knows driver is not null inside the Effect.
// Get thread
const thread = await Effect.runPromise(
- withRetry(Effect.tryPromise(() => driver!.get(threadId))),
+ withRetry(Effect.tryPromise(() => driver.get(threadId))),
);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const driver = connectionToDriver(_connection); | |
| if (!driver) throw new Error('No driver available'); | |
| // Get thread | |
| const thread = await Effect.runPromise( | |
| withRetry(Effect.tryPromise(() => driver!.get(threadId))), | |
| ); | |
| const driver = connectionToDriver(_connection); | |
| if (!driver) throw new Error('No driver available'); | |
| // Get thread | |
| const thread = await Effect.runPromise( | |
| withRetry(Effect.tryPromise(() => driver.get(threadId))), | |
| ); |
🤖 Prompt for AI Agents
In apps/server/src/routes/agent/sync-worker.ts around lines 24 to 30, remove the
non-null assertion operator (!) used on the driver variable inside the
Effect.tryPromise call. Since driver is already checked for null and an error is
thrown if it is null, the type system knows driver is not null here. Simply use
driver without the non-null assertion to handle the error properly and improve
code clarity.
| { | ||
| "name": "THREAD_SYNC_WORKER", | ||
| "class_name": "ThreadSyncWorker", | ||
| }, |
There was a problem hiding this comment.
This is broken. You forgot to add ThreadSyncWorker to staging and production.
The ThreadSyncWorker binding is only added to the local environment. This will cause deployment failures in staging and production. We need to move fast and break things, but not literally break production.
Add the missing configuration:
# In staging environment (after line 209):
{
"name": "THINKING_MCP",
"class_name": "ThinkingMCP",
},
+ {
+ "name": "THREAD_SYNC_WORKER",
+ "class_name": "ThreadSyncWorker",
+ },
# In production environment (after line 374):
{
"name": "THINKING_MCP",
"class_name": "ThinkingMCP",
},
+ {
+ "name": "THREAD_SYNC_WORKER",
+ "class_name": "ThreadSyncWorker",
+ },Also applies to: 189-210, 354-375
🤖 Prompt for AI Agents
In apps/server/wrangler.jsonc around lines 50 to 53, the ThreadSyncWorker
binding is only configured for the local environment, missing in staging and
production which will cause deployment failures. Add the ThreadSyncWorker
binding configuration to the staging and production sections of the file,
ensuring it matches the local environment setup. Also, apply the same fix to the
other specified line ranges 189-210 and 354-375 to maintain consistency across
all environments.
| { | ||
| "tag": "v7", | ||
| "new_classes": ["ThreadSyncWorker"], | ||
| }, |
There was a problem hiding this comment.
Missing migrations in staging and production. This is not good.
The v7 migration for ThreadSyncWorker is only in local. Need to add it to staging and production environments to actually deploy this thing.
Add after the existing migrations in both environments:
# In staging migrations (after line 266):
{
"tag": "v7",
"new_sqlite_classes": ["ThinkingMCP"],
},
+ {
+ "tag": "v8",
+ "new_classes": ["ThreadSyncWorker"],
+ },
# In production migrations (after line 425):
{
"tag": "v7",
"new_sqlite_classes": ["ThinkingMCP"],
},
+ {
+ "tag": "v8",
+ "new_classes": ["ThreadSyncWorker"],
+ },Also applies to: 238-267, 397-426
🤖 Prompt for AI Agents
In apps/server/wrangler.jsonc around lines 101 to 104 and also in lines 238-267
and 397-426, the v7 migration for ThreadSyncWorker is only added in the local
environment but missing in staging and production. To fix this, add the same v7
migration entry with "new_classes": ["ThreadSyncWorker"] after the existing
migrations in both the staging and production sections to ensure the migration
is deployed across all environments.
|
This PR has merge conflicts and has been open for more than 3 days. It will be automatically closed. Please resolve the conflicts and reopen the PR if you'd like to continue working on it. |
Description
Adds a new binding for the
SyncThreadWorkerDurable Object. This Durable Object is designed as a short-lived execution context for syncing an individual mail thread from a provider into R2.This improves the opportunity for increased parallelization, and critically avoids memory limit issues when syncing many/large threads. Since each Durable Object has a memory limit of 128mb, this allows large threads and large quantities of threads to be synced more reliably.
Type of Change
Areas Affected
Testing Done
Describe the tests you've done:
Security Considerations
For changes involving data or authentication:
Checklist
Additional Notes
There is potential for other patterns to be explored: such as creating a pool of the worker threads in a size that matches the concurrency number (currently
3) and re-using those durable objects for multiple thread syncs. A potential caveat of this is that memory may not be freed betweensyncThread()RPC invocations. This is why I chose the pattern of a new Durable Object for each thread sync execution.The
ThreadSyncWorkerDurable Object is intentionally designed with no state and no storage to make it as lightweight as possible.Important note: I removed the
threadDataproperty from the returned object insyncThreads()- from my testing and investigating I could not usage of this property.This is optimal for memory conservation within the
ZeroDriverDO.If this is incorrect, and the
threadDataproperty is used/needed, let me know.By submitting this pull request, I confirm that my contribution is made under the terms of the project's license.
Summary by cubic
Added a dedicated Durable Object worker for each mail thread sync to improve parallelization and prevent memory issues when syncing large or many threads.
Summary by CodeRabbit