Skip to content
Merged

k #1786

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 72 additions & 32 deletions apps/server/src/routes/agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ import { processToolCalls } from './utils';
import { env } from 'cloudflare:workers';
import type { Connection } from 'agents';
import { openai } from '@ai-sdk/openai';
import { parseISO } from 'date-fns';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove unused import.

The parseISO import from 'date-fns' is not used anywhere in the code and should be removed to clean up the imports.

-import { parseISO } from 'date-fns';
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import { parseISO } from 'date-fns';
🧰 Tools
🪛 GitHub Actions: autofix.ci

[warning] 54-54: ESLint (no-unused-vars): Identifier 'parseISO' is imported but never used. Consider removing this import.

🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts at line 54, the import statement for
parseISO from 'date-fns' is unused. Remove this import line entirely to clean up
the code and avoid unnecessary dependencies.

import { createDb } from '../../db';
import { DriverRpcDO } from './rpc';
import { eq } from 'drizzle-orm';
import { Effect } from 'effect';

const decoder = new TextDecoder();

const shouldDropTables = false;
const shouldDropTables = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coding shouldDropTables to true causes the database table to be dropped on every start, leading to data loss

Prompt for AI agents
Address the following comment on apps/server/src/routes/agent/index.ts at line 62:

<comment>Hard-coding shouldDropTables to true causes the database table to be dropped on every start, leading to data loss</comment>

<file context>
@@ -51,16 +51,17 @@ import { processToolCalls } from &#39;./utils&#39;;
 import { env } from &#39;cloudflare:workers&#39;;
 import type { Connection } from &#39;agents&#39;;
 import { openai } from &#39;@ai-sdk/openai&#39;;
+import { parseISO } from &#39;date-fns&#39;;
 import { createDb } from &#39;../../db&#39;;
 import { DriverRpcDO } from &#39;./rpc&#39;;
 import { eq } from &#39;drizzle-orm&#39;;
 import { Effect } from &#39;effect&#39;;
 
</file context>
Suggested change
const shouldDropTables = true;
const shouldDropTables = false;

const maxCount = 20;
const shouldLoop = env.THREAD_SYNC_LOOP !== 'false';
const shouldLoop = true;
Comment on lines +62 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify the table dropping configuration change.

Setting shouldDropTables = true will cause database tables to be dropped on every initialization, which could lead to data loss. Please confirm this is intentional and safe for production environments.

Run this script to check if there are any safeguards or environment-specific handling for this setting:


🏁 Script executed:

#!/bin/bash
# Search for any conditional logic around shouldDropTables usage
rg -A 5 -B 5 "shouldDropTables"

Length of output: 1652


🏁 Script executed:

#!/bin/bash
# Locate the dropTables method to inspect its implementation and assess any safeguards
rg -A5 -B5 "dropTables" -n --glob "*.ts"

Length of output: 1728


Confirm and safeguard the table-dropping behavior

The dropTables method unconditionally runs

DROP TABLE IF EXISTS threads;

because shouldDropTables is hardcoded to true. This means every constructor invocation—including in production—will delete the threads table before re-creating it.

Please verify that:

  • You intend to drop the threads table on every startup in all environments.
  • No other tables require dropping (e.g., if you later add a messages table, it won’t be reset).
  • You’ve considered gating this behind an environment variable or adding a production-safe check.

Locations to review:

  • apps/server/src/routes/agent/index.ts
    • Lines 62–64 (const shouldDropTables = true;)
    • Line 72 (if (shouldDropTables) this.dropTables();)
  • The dropTables implementation at lines 366–369

If this behavior isn’t intentional for production, please revert shouldDropTables to use your environment flag or add appropriate safeguards.

🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts around lines 62 to 64 and 72, the
variable shouldDropTables is hardcoded to true, causing the threads table to be
dropped on every startup, including production. To fix this, replace the
hardcoded true with a check against an environment variable (e.g.,
process.env.DROP_TABLES) or a condition that disables dropping tables in
production environments. Also, review the dropTables method at lines 366 to 369
to ensure only intended tables are dropped and update the logic to prevent
accidental data loss in production.

export class ZeroDriver extends AIChatAgent<typeof env> {
private foldersInSync: Map<string, boolean> = new Map();
private syncThreadsInProgress: Map<string, boolean> = new Map();
Expand Down Expand Up @@ -168,10 +169,8 @@ export class ZeroDriver extends AIChatAgent<typeof env> {
if (_connection) this.driver = connectionToDriver(_connection);
this.ctx.waitUntil(conn.end());
this.ctx.waitUntil(this.syncThreads('inbox'));
if (env.NODE_ENV === 'production') {
this.ctx.waitUntil(this.syncThreads('sent'));
this.ctx.waitUntil(this.syncThreads('spam'));
}
this.ctx.waitUntil(this.syncThreads('sent'));
this.ctx.waitUntil(this.syncThreads('spam'));
}
}
async rawListThreads(params: {
Expand Down Expand Up @@ -393,7 +392,12 @@ export class ZeroDriver extends AIChatAgent<typeof env> {

if (latest) {
// Convert receivedOn to ISO format for proper sorting
const normalizedReceivedOn = new Date(latest.receivedOn).toISOString();
let normalizedReceivedOn: string;
try {
normalizedReceivedOn = new Date(latest.receivedOn).toISOString();
} catch (error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the unused catch parameter 'error' to '_error' to indicate it's intentionally unused, following the convention for unused variables.

Spotted by Diamond (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.

normalizedReceivedOn = new Date().toISOString();
}

await env.THREADS_BUCKET.put(this.getThreadKey(threadId), JSON.stringify(threadData), {
customMetadata: {
Expand Down Expand Up @@ -476,38 +480,74 @@ export class ZeroDriver extends AIChatAgent<typeof env> {

this.foldersInSync.set(folder, true);

try {
let totalSynced = 0;
const self = this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the 'self = this' aliasing and instead use arrow functions or bind methods to preserve the 'this' context where needed.

Spotted by Diamond (based on CI logs)

Is this helpful? React 👍 or 👎 to let us know.


const syncSingleThread = (threadId: string) =>
Effect.gen(function* () {
yield* Effect.sleep(500); // Rate limiting delay
return yield* withRetry(Effect.tryPromise(() => self.syncThread({ threadId })));
}).pipe(
Effect.catchAll((error) => {
console.error(`Failed to sync thread ${threadId}:`, error);
return Effect.succeed(null);
}),
);
Comment on lines +483 to +494
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Address linting issue and consider error handling improvements.

The this aliasing violates TypeScript/ESLint rules. Use arrow functions or bind to maintain proper scope without aliasing.

-    const self = this;
-
-    const syncSingleThread = (threadId: string) =>
+    const syncSingleThread = (threadId: string) =>
       Effect.gen(function* () {
         yield* Effect.sleep(500); // Rate limiting delay
-        return yield* withRetry(Effect.tryPromise(() => self.syncThread({ threadId })));
+        return yield* withRetry(Effect.tryPromise(() => this.syncThread({ threadId })));
       }).pipe(
         Effect.catchAll((error) => {
           console.error(`Failed to sync thread ${threadId}:`, error);
           return Effect.succeed(null);
         }),
-      );
+      ).bind(this);

Also consider whether swallowing all errors in syncSingleThread is appropriate - you might want to track failed syncs for monitoring purposes.

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: autofix.ci

[warning] 483-483: typescript-eslint (no-this-alias): Unexpected aliasing of 'this' to local variable. Assigning a variable to this instead of properly using arrow lambdas may be a symptom of pre-ES6 practices or not managing scope well.

🤖 Prompt for AI Agents
In apps/server/src/routes/agent/index.ts around lines 483 to 494, the code
aliases `this` to `self`, which violates TypeScript/ESLint rules. Refactor
`syncSingleThread` into an arrow function to preserve `this` context without
aliasing. Additionally, revise error handling to avoid silently swallowing all
errors; instead, log or track failed sync attempts for monitoring while still
allowing the function to complete gracefully.


const syncProgram = Effect.gen(
function* () {
let totalSynced = 0;
let pageToken: string | null = null;
let hasMore = true;
let _pageCount = 0;

while (hasMore) {
_pageCount++;

// Rate limiting delay between pages
yield* Effect.sleep(2000);

const result: IGetThreadsResponse = yield* Effect.tryPromise(() =>
self.listWithRetry({
folder,
maxResults: maxCount,
pageToken: pageToken || undefined,
}),
);

// Process threads one by one without buffering
for await (const thread of this.streamThreads(folder)) {
try {
const id = await this.queue('syncThread', { threadId: thread.id });
console.log(`Synced thread ${thread.id} to queue ${id}`);
totalSynced++;
} catch (error) {
console.error(`Failed to sync thread ${thread.id}:`, error);
// Process threads with controlled concurrency to avoid rate limits
const threadIds = result.threads.map((thread) => thread.id);
const syncEffects = threadIds.map(syncSingleThread);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rule violated: Detect Typescript Performance Bottlenecks

  Nested iteration over result pages and per-page thread arrays can create an O(pages × threads) bottleneck. Consider flattening the logic or processing threads in a streaming fashion without an explicit inner loop.
Prompt for AI agents
Address the following comment on apps/server/src/routes/agent/index.ts at line 519:

<comment>Nested iteration over result pages and per-page thread arrays can create an O(pages × threads) bottleneck. Consider flattening the logic or processing threads in a streaming fashion without an explicit inner loop.</comment>

<file context>
@@ -476,38 +480,74 @@ export class ZeroDriver extends AIChatAgent&lt;typeof env&gt; {
 
     this.foldersInSync.set(folder, true);
 
-    try {
-      let totalSynced = 0;
+    const self = this;
+
+    const syncSingleThread = (threadId: string) =&gt;
+      Effect.gen(function* () {
</file context>


yield* Effect.all(syncEffects, { concurrency: 1, discard: true });

totalSynced += result.threads.length;
pageToken = result.nextPageToken;
hasMore = pageToken !== null && shouldLoop;
}

// // Broadcast progress after each thread
// this.agent.broadcastChatMessage({
// type: OutgoingMessageType.Mail_List,
// folder,
// });
}
return { synced: totalSynced };
}.bind(this),
);

return { synced: totalSynced };
try {
const result = await Effect.runPromise(
syncProgram.pipe(
Effect.ensuring(
Effect.sync(() => {
console.log('Setting isSyncing to false');
this.foldersInSync.delete(folder);
this.agent?.broadcastChatMessage({
type: OutgoingMessageType.Mail_List,
folder,
});
}),
),
),
);
return result;
} catch (error) {
console.error('Failed to sync inbox threads:', error);
throw error;
} finally {
console.log('Setting isSyncing to false');
this.foldersInSync.delete(folder);
if (this.agent)
this.agent.broadcastChatMessage({
type: OutgoingMessageType.Mail_List,
folder,
});
}
}

Expand Down
Loading