Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions apps/server/src/pipelines.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class MainWorkflow extends WorkflowEntrypoint<Env, Params> {
const match = subscriptionName.toString().match(regex);
if (!match) {
log('[MAIN_WORKFLOW] Invalid subscription name:', subscriptionName);
throw new Error('Invalid subscription name');
throw new Error(`Invalid subscription name ${subscriptionName}`);
}
const [, connectionId] = match;
log('[MAIN_WORKFLOW] Extracted connectionId:', connectionId);
Expand All @@ -69,11 +69,11 @@ export class MainWorkflow extends WorkflowEntrypoint<Env, Params> {
const status = await env.subscribed_accounts.get(`${connectionId}__${providerId}`);
if (!status || status === 'pending') {
log('[MAIN_WORKFLOW] Connection id is missing or not enabled %s', connectionId);
throw new Error('Connection is not enabled');
return 'Connection is not enabled';
}
Comment on lines +72 to 73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: returning error string instead of throwing could break error handling expectations in caller code

if (!isValidUUID(connectionId)) {
log('[MAIN_WORKFLOW] Invalid connection id format:', connectionId);
throw new Error('Invalid connection id');
return 'Invalid connection id';
}
const previousHistoryId = await env.gmail_history_id.get(connectionId);
if (providerId === EProviders.google) {
Expand Down Expand Up @@ -164,9 +164,9 @@ export class ZeroWorkflow extends WorkflowEntrypoint<Env, Params> {
.select()
.from(connection)
.where(eq(connection.id, connectionId.toString()));
if (!foundConnection) throw new Error('Connection not found');
if (!foundConnection) throw new Error(`Connection not found ${connectionId}`);
if (!foundConnection.accessToken || !foundConnection.refreshToken)
throw new Error('Connection is not authorized');
throw new Error(`Connection is not authorized ${connectionId}`);
log('[ZERO_WORKFLOW] Found connection:', foundConnection.id);
return foundConnection;
});
Expand All @@ -182,7 +182,7 @@ export class ZeroWorkflow extends WorkflowEntrypoint<Env, Params> {
const { history } = await driver.listHistory<gmail_v1.Schema$History>(
historyId.toString(),
);
if (!history.length) throw new Error('No history found');
if (!history.length) throw new Error(`No history found ${historyId} ${connectionId}`);
log('[ZERO_WORKFLOW] Found history entries:', history.length);
return history;
} catch (error) {
Expand Down Expand Up @@ -386,9 +386,9 @@ export class ThreadWorkflow extends WorkflowEntrypoint<Env, Params> {
.from(connection)
.where(eq(connection.id, connectionId.toString()));
this.ctx.waitUntil(conn.end());
if (!foundConnection) throw new Error('Connection not found');
if (!foundConnection) throw new Error(`Connection not found ${connectionId}`);
if (!foundConnection.accessToken || !foundConnection.refreshToken)
throw new Error('Connection is not authorized');
throw new Error(`Connection is not authorized ${connectionId}`);
log('[THREAD_WORKFLOW] Found connection:', foundConnection.id);
return foundConnection;
},
Expand Down Expand Up @@ -441,7 +441,7 @@ export class ThreadWorkflow extends WorkflowEntrypoint<Env, Params> {
return step.do(`[ZERO] Vectorize Message ${message.id}`, async () => {
log('[THREAD_WORKFLOW] Converting message to XML:', message.id);
const prompt = await messageToXML(message);
if (!prompt) throw new Error('Message has no prompt');
if (!prompt) throw new Error(`Message has no prompt ${message.id}`);
log('[THREAD_WORKFLOW] Got XML prompt for message:', message.id);
log('[THREAD_WORKFLOW] Message:', message);

Expand Down Expand Up @@ -512,7 +512,8 @@ export class ThreadWorkflow extends WorkflowEntrypoint<Env, Params> {
},
);

if (!embeddingVector) throw new Error('Message Embedding vector is null');
if (!embeddingVector)
throw new Error(`Message Embedding vector is null ${message.id}`);

return {
id: message.id,
Expand Down Expand Up @@ -688,7 +689,7 @@ export class ThreadWorkflow extends WorkflowEntrypoint<Env, Params> {
},
);

if (!embeddingVector) throw new Error('Thread Embedding vector is null');
if (!embeddingVector) return console.error('Thread Embedding vector is null');

try {
log('[THREAD_WORKFLOW] Upserting thread vector');
Expand Down
100 changes: 50 additions & 50 deletions apps/server/src/routes/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,19 @@ export class AgentRpcDO extends RpcTarget {

async markThreadsRead(threadIds: string[]) {
const result = await this.mainDo.markThreadsRead(threadIds);
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
return result;
Comment on lines 191 to 193
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Sync after read/unread has been disabled. Consider implementing debounced sync or event-based updates to prevent stale UI state.

}

async markThreadsUnread(threadIds: string[]) {
const result = await this.mainDo.markThreadsUnread(threadIds);
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
return result;
}

async modifyLabels(threadIds: string[], addLabelIds: string[], removeLabelIds: string[]) {
const result = await this.mainDo.modifyLabels(threadIds, addLabelIds, removeLabelIds);
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
return result;
}

Expand Down Expand Up @@ -233,13 +233,13 @@ export class AgentRpcDO extends RpcTarget {

async markAsRead(threadIds: string[]) {
const result = await this.mainDo.markAsRead(threadIds);
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
return result;
}

async markAsUnread(threadIds: string[]) {
const result = await this.mainDo.markAsUnread(threadIds);
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
return result;
}

Expand Down Expand Up @@ -310,19 +310,19 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
if (shouldDropTables) this.dropTables();
this.sql`
CREATE TABLE IF NOT EXISTS threads (
id TEXT PRIMARY KEY,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
thread_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
latest_sender TEXT,
latest_received_on TEXT,
latest_subject TEXT,
latest_label_ids TEXT
);
`;
// this.sql`
// CREATE TABLE IF NOT EXISTS threads (
// id TEXT PRIMARY KEY,
// created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
// updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
// thread_id TEXT NOT NULL,
// provider_id TEXT NOT NULL,
// latest_sender TEXT,
// latest_received_on TEXT,
// latest_subject TEXT,
// latest_label_ids TEXT
// );
// `;
}

async dropTables() {
Expand Down Expand Up @@ -388,10 +388,10 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
});
if (_connection) this.driver = connectionToDriver(_connection);
this.ctx.waitUntil(conn.end());
this.ctx.waitUntil(this.syncThreads('inbox'));
this.ctx.waitUntil(this.syncThreads('sent'));
this.ctx.waitUntil(this.syncThreads('spam'));
this.ctx.waitUntil(this.syncThreads('archive'));
// this.ctx.waitUntil(this.syncThreads('inbox'));
// this.ctx.waitUntil(this.syncThreads('sent'));
// this.ctx.waitUntil(this.syncThreads('spam'));
// this.ctx.waitUntil(this.syncThreads('archive'));
Comment on lines +391 to +394
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Removing automatic sync after auth setup could lead to initial data loading issues. Consider adding a manual sync trigger or user-controlled refresh mechanism.

}
}

Expand Down Expand Up @@ -515,34 +515,34 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
this.cancelChatRequest(data.id);
break;
}
case IncomingMessageType.Mail_List: {
const result = await this.getThreadsFromDB({
labelIds: data.labelIds,
folder: data.folder,
q: data.query,
max: data.maxResults,
cursor: data.pageToken,
});
this.currentFolder = data.folder;
connection.send(
JSON.stringify({
type: OutgoingMessageType.Mail_List,
result,
}),
);
break;
}
case IncomingMessageType.Mail_Get: {
const result = await this.getThreadFromDB(data.threadId);
connection.send(
JSON.stringify({
type: OutgoingMessageType.Mail_Get,
result,
threadId: data.threadId,
}),
);
break;
}
// case IncomingMessageType.Mail_List: {
// const result = await this.getThreadsFromDB({
// labelIds: data.labelIds,
// folder: data.folder,
// q: data.query,
// max: data.maxResults,
// cursor: data.pageToken,
// });
// this.currentFolder = data.folder;
// connection.send(
// JSON.stringify({
// type: OutgoingMessageType.Mail_List,
// result,
// }),
// );
// break;
// }
// case IncomingMessageType.Mail_Get: {
// const result = await this.getThreadFromDB(data.threadId);
// connection.send(
// JSON.stringify({
// type: OutgoingMessageType.Mail_Get,
// result,
// threadId: data.threadId,
// }),
// );
// break;
// }
}
}
}
Expand Down
38 changes: 19 additions & 19 deletions apps/server/src/trpc/routes/mail.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export const mailRouter = router({
.query(async ({ input, ctx }) => {
const { activeConnection } = ctx;
const agent = await getZeroAgent(activeConnection.id);
return await agent.getThreadFromDB(input.id);
return await agent.getThread(input.id);
}),
count: activeDriverProcedure
.output(
Expand Down Expand Up @@ -75,24 +75,24 @@ export const mailRouter = router({
});
return drafts;
}
if (q) {
const threadsResponse = await agent.listThreads({
labelIds: labelIds,
maxResults: max,
pageToken: cursor,
query: q,
folder,
});
return threadsResponse;
}
const folderLabelId = getFolderLabelId(folder);
const labelIdsToUse = folderLabelId ? [...labelIds, folderLabelId] : labelIds;
const threadsResponse = await agent.getThreadsFromDB({
labelIds: labelIdsToUse,
max: max,
cursor: cursor,
// if (q) {
const threadsResponse = await agent.listThreads({
labelIds: labelIds,
maxResults: max,
pageToken: cursor,
query: q,
folder,
});
return threadsResponse;
// }
// const folderLabelId = getFolderLabelId(folder);
// const labelIdsToUse = folderLabelId ? [...labelIds, folderLabelId] : labelIds;
// const threadsResponse = await agent.getThreadsFromDB({
// labelIds: labelIdsToUse,
// max: max,
// cursor: cursor,
// });
// return threadsResponse;
}),
markAsRead: activeDriverProcedure
.input(
Expand Down Expand Up @@ -173,7 +173,7 @@ export const mailRouter = router({
}

const threadResults: PromiseSettledResult<{ messages: { tags: { name: string }[] }[] }>[] =
await Promise.allSettled(threadIds.map((id) => agent.getThreadFromDB(id)));
await Promise.allSettled(threadIds.map((id) => agent.getThread(id)));

let anyStarred = false;
let processedThreads = 0;
Expand Down Expand Up @@ -217,7 +217,7 @@ export const mailRouter = router({
}

const threadResults: PromiseSettledResult<{ messages: { tags: { name: string }[] }[] }>[] =
await Promise.allSettled(threadIds.map((id) => agent.getThreadFromDB(id)));
await Promise.allSettled(threadIds.map((id) => agent.getThread(id)));

let anyImportant = false;
let processedThreads = 0;
Expand Down