Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/mail/components/party.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export const NotificationProvider = ({ headers }: { headers: Record<string, stri

usePartySocket({
party: 'zero-agent',
room: activeConnection?.id ? `${activeConnection.id}` : 'general',
room: activeConnection?.id ? String(activeConnection.id) : 'general',
prefix: 'agents',
maxRetries: 1,
query: {
Expand Down
4 changes: 2 additions & 2 deletions apps/mail/components/ui/ai-sidebar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ function AISidebar({ className }: AISidebarProps) {

const agent = useAgent({
agent: 'ZeroAgent',
name: activeConnection?.id ?? 'general',
name: activeConnection?.id ? String(activeConnection.id) : 'general',
host: `${import.meta.env.VITE_PUBLIC_BACKEND_URL}`,
});

Expand Down Expand Up @@ -452,7 +452,7 @@ function AISidebar({ className }: AISidebarProps) {
defaultSize={24}
minSize={24}
maxSize={24}
className="bg-panelLight dark:bg-panelDark mb-1 mr-1 hidden h-[calc(100dvh-8px)] shadow-sm md:block md:rounded-2xl md:shadow-sm "
className="bg-panelLight dark:bg-panelDark mb-1 mr-1 hidden h-[calc(100dvh-8px)] shadow-sm md:block md:rounded-2xl md:shadow-sm"
>
<div className={cn('h-[calc(98vh)]', 'flex flex-col', '', className)}>
<div className="flex h-full flex-col">
Expand Down
3 changes: 2 additions & 1 deletion apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"autumn-js": "catalog:",
"base64-js": "1.5.1",
"better-auth": "catalog:",
"cheerio": "1.1.0",
"date-fns": "^4.1.0",
"dedent": "^1.6.0",
"drizzle-orm": "catalog:",
Expand Down Expand Up @@ -81,4 +82,4 @@
"jiti": "2.4.2",
"typescript": "catalog:"
}
}
}
21 changes: 17 additions & 4 deletions apps/server/src/lib/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import { drizzleAdapter } from 'better-auth/adapters/drizzle';
import { getSocialProviders } from './auth-providers';
import { redis, resend, twilio } from './services';
import { getContext } from 'hono/context-storage';
import { getActiveDriver } from './driver/utils';
import { defaultUserSettings } from './schemas';
import { disableBrainFunction } from './brain';
import { APIError } from 'better-auth/api';
import type { EProviders } from '../types';
import type { HonoContext } from '../ctx';
import { env } from 'cloudflare:workers';
import { createDriver } from './driver';
Expand Down Expand Up @@ -57,11 +58,13 @@ const connectionHandlerHook = async (account: Account) => {
expiresAt: new Date(Date.now() + (account.accessTokenExpiresAt?.getTime() || 3600000)),
};

await c.var.db
const connectionId = crypto.randomUUID();

const [result] = await c.var.db
.insert(connection)
.values({
providerId: account.providerId as 'google' | 'microsoft',
id: crypto.randomUUID(),
id: connectionId,
email: userInfo.address,
userId: account.userId,
createdAt: new Date(),
Expand All @@ -74,7 +77,13 @@ const connectionHandlerHook = async (account: Account) => {
...updatingInfo,
updatedAt: new Date(),
},
});
})
.returning({ id: connection.id });

await env.subscribe_queue.send({
connectionId: result.id,
providerId: account.providerId,
});
};

export const createAuth = () => {
Expand Down Expand Up @@ -111,6 +120,10 @@ export const createAuth = () => {
await Promise.allSettled(
connections.map(async (connection) => {
if (!connection.accessToken || !connection.refreshToken) return false;
await disableBrainFunction({
id: connection.id,
providerId: connection.providerId as EProviders,
});
const driver = createDriver(connection.providerId, {
auth: {
accessToken: connection.accessToken,
Expand Down
71 changes: 68 additions & 3 deletions apps/server/src/lib/brain.fallback.prompts.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
export const SummarizeMessage = `
import { defaultLabels } from '../types';
import dedent from 'dedent';

export const SummarizeMessage = dedent`
<system_prompt>
<role>You are a high-accuracy email summarization agent. Your task is to extract and summarize emails in XML format with absolute precision, ensuring no critical details are lost while maintaining high efficiency.</role>

Expand Down Expand Up @@ -46,7 +49,7 @@ export const SummarizeMessage = `

<strict_guidelines>Strictly follow these rules. No missing details. No extra fluff. Just precise, high-performance summarization. Never say "Here is"</strict_guidelines>
</system_prompt>`;
export const SummarizeThread = `
export const SummarizeThread = dedent`
<system_prompt>
<role>You are a high-accuracy email thread summarization agent. Your task is to process a full email thread with multiple messages and generate a structured, limited-length summary that retains all critical details, ensuring no information is lost.</role>

Expand Down Expand Up @@ -119,7 +122,7 @@ export const SummarizeThread = `
<strict_guidelines>Never say "Here is"</strict_guidelines>
</system_prompt>
`;
export const ReSummarizeThread = `
export const ReSummarizeThread = dedent`
<system_prompt>
<role>You are a high-accuracy email thread summarization agent. Your task is to process a full email thread, including new messages and an existing summary, and generate a structured, limited-length updated summary that retains all critical details.</role>

Expand Down Expand Up @@ -190,3 +193,65 @@ export const ReSummarizeThread = `

<strict_guidelines>Maintain absolute accuracy. No missing details. No extra assumptions. No modifications to previous content beyond appending updates. Ensure clarity and brevity within the length limit. Never say "Here is"</strict_guidelines>
</system_prompt>`;

export const ThreadLabels = (labels: { name: string; usecase: string }[]) => dedent`
<system_prompt>
<role>You are a precise thread labeling agent. Your task is to analyze email thread summaries and assign relevant labels from a predefined set, ensuring accurate categorization while maintaining consistency.</role>
<strict_guidelines>Maintain absolute accuracy in labeling. Use only the predefined labels. Never generate new labels. Never include personal names. Always return labels in comma-separated format without spaces.</strict_guidelines>
<strict_guidelines>Never say "Here is" or explain the process of labeling.</strict_guidelines>
<instructions>
<input_structure>
<item>Thread summary containing participants, messages, and context</item>
</input_structure>

<labeling_rules>
<item>Use only the predefined set of labels</item>
<item>Return labels as comma-separated values without spaces</item>
<item>Include company names as labels when heavily referenced</item>
<item>Include bank names as labels when heavily referenced</item>
<item>Do not use personal names as labels</item>
<item>Choose the most relevant labels, typically 1-3 labels per thread</item>
</labeling_rules>

<allowed_labels>
${labels
.map(
(label) => `<item>
<name>${label.name}</name>
<usecase>${defaultLabels.find((e) => e.name === label.name)?.usecase || ''}</usecase>
</item>`,
)
.join('\n')}
</allowed_labels>
</instructions>

<example_input>
<thread_summary>
Thread: Product Launch Planning
Participants: Sarah, Mike, David

- March 15, 10:00 AM - Sarah requests urgent review of the new feature documentation before the launch.
- March 15, 11:30 AM - Mike suggests changes to the marketing strategy for better customer engagement.
- March 15, 2:00 PM - David approves the final product specifications and sets a launch date.
</thread_summary>
</example_input>

<expected_output>
<labels>urgent,product,marketing</labels>
</expected_output>

<example_input>
<thread_summary>
Thread: Stripe Integration Update
Participants: Alex, Jamie, Stripe Support

- March 16, 9:00 AM - Alex reports issues with Stripe payment processing.
- March 16, 10:15 AM - Stripe Support provides troubleshooting steps.
- March 16, 11:30 AM - Jamie confirms the fix and requests additional security review.
</thread_summary>
</example_input>

<expected_output>
<labels>support,finance,stripe</labels>
</expected_output>
</system_prompt>`;
27 changes: 17 additions & 10 deletions apps/server/src/lib/brain.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
import { ReSummarizeThread, SummarizeMessage, SummarizeThread } from './brain.fallback.prompts';
import { getSubscriptionFactory } from './factories/subscription-factory.registry';
import { EPrompts, EProviders } from '../types';
import { env } from 'cloudflare:workers';
import { EPrompts } from '../types';

export const enableBrainFunction = async (connection: { id: string; providerId: string }) => {
return await env.zero.subscribe({
connectionId: connection.id,
providerId: connection.providerId,
});
export const enableBrainFunction = async (connection: { id: string; providerId: EProviders }) => {
const subscriptionFactory = getSubscriptionFactory(connection.providerId);
const response = await subscriptionFactory.subscribe({ body: { connectionId: connection.id } });
if (!response.ok) {
throw new Error(`Failed to enable brain function: ${response.status} ${response.statusText}`);
}
return response;
};

export const disableBrainFunction = async (connection: { id: string; providerId: string }) => {
return await env.zero.unsubscribe({
connectionId: connection.id,
providerId: connection.providerId,
export const disableBrainFunction = async (connection: { id: string; providerId: EProviders }) => {
const subscriptionFactory = getSubscriptionFactory(connection.providerId);
const response = await subscriptionFactory.unsubscribe({
body: { connectionId: connection.id, providerId: connection.providerId },
});
if (!response.ok) {
throw new Error(`Failed to disable brain function: ${response.status} ${response.statusText}`);
}
return response;
};

const getPromptName = (connectionId: string, prompt: EPrompts) => {
Expand Down
19 changes: 18 additions & 1 deletion apps/server/src/lib/driver/google.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,24 @@ export class GoogleMailManager implements MailManager {
'https://www.googleapis.com/auth/userinfo.email',
].join(' ');
}
public getAttachment(messageId: string, attachmentId: string) {
public async listHistory<T>(historyId: string): Promise<{ history: T[]; historyId: string }> {
return this.withErrorHandler(
'listHistory',
async () => {
const response = await this.gmail.users.history.list({
userId: 'me',
startHistoryId: historyId,
});

const history = response.data.history || [];
const nextHistoryId = response.data.historyId || historyId;

return { history: history as T[], historyId: nextHistoryId };
},
{ historyId },
);
}
Comment on lines +47 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Typed cast hides the actual Gmail Schema$History

history is force-cast to T[], losing type safety and IntelliSense.

If you want a generic return, expose the concrete type and let callers up-cast:

-  public async listHistory<T>(historyId: string): Promise<{ history: T[]; historyId: string }> {
+  public async listHistory(
+    historyId: string,
+  ): Promise<{ history: gmail_v1.Schema$History[]; historyId: string }> {-        const history = response.data.history || [];
+        const history = response.data.history ?? [];

Call sites that care about sub-shapes can as-cast, but the default is now correct.

Also consider handling the “History cursor is too old” 404 explicitly and marking the connection as stale so a new watch can be issued.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In apps/server/src/lib/driver/google.ts lines 47 to 63, the method listHistory
force-casts the Gmail API response history to a generic type T[], which hides
the actual Gmail Schema$History type and reduces type safety. To fix this,
change the method to return the concrete Gmail Schema$History[] type by default
and let callers cast to more specific types if needed. Additionally, add
explicit handling for the "History cursor is too old" 404 error from the Gmail
API by catching this error, marking the connection as stale, and triggering a
new watch to refresh the history cursor.

public async getAttachment(messageId: string, attachmentId: string) {
return this.withErrorHandler(
'getAttachment',
async () => {
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/lib/driver/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export interface MailManager {
tokens?: ManagerConfig['auth'],
): Promise<{ address: string; name: string; photo: string }>;
getScope(): string;
listHistory<T>(historyId: string): Promise<{ history: T[]; historyId: string }>;
markAsRead(threadIds: string[]): Promise<void>;
markAsUnread(threadIds: string[]): Promise<void>;
normalizeIds(id: string[]): { threadIds: string[] };
Expand Down
44 changes: 44 additions & 0 deletions apps/server/src/lib/factories/base-subscription.factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { defaultLabels, EProviders, type AppContext } from '../../types';
import { connection } from '../../db/schema';
import { env } from 'cloudflare:workers';
import { createDb } from '../../db';

export interface SubscriptionData {
connectionId?: string;
silent?: boolean;
force?: boolean;
}

export interface UnsubscriptionData {
connectionId?: string;
providerId?: EProviders;
}

export abstract class BaseSubscriptionFactory {
abstract readonly providerId: EProviders;

abstract subscribe(data: { body: SubscriptionData }): Promise<Response>;

abstract unsubscribe(data: { body: UnsubscriptionData }): Promise<Response>;

abstract verifyToken(token: string): Promise<boolean>;

protected async getConnectionFromDb(connectionId: string): Promise<any> {
const db = createDb(env.HYPERDRIVE.connectionString);
const { eq } = await import('drizzle-orm');

const [connectionData] = await db
.select()
.from(connection)
.where(eq(connection.id, connectionId));

return connectionData;
}

protected async initializeConnectionLabels(connectionId: string): Promise<void> {
const existingLabels = await env.connection_labels.get(connectionId);
if (!existingLabels?.trim().length) {
await env.connection_labels.put(connectionId, JSON.stringify(defaultLabels));
}
}
}
Loading