Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions apps/mail/app/api/driver/connections/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { processIP, getRatelimitModule, checkRateLimit, getAuthenticatedUserId } from '../../utils';
import {
processIP,
getRatelimitModule,
checkRateLimit,
getAuthenticatedUserId,
logoutUser,
} from '../../utils';
import { NextRequest, NextResponse } from 'next/server';
import { Ratelimit } from '@upstash/ratelimit';
import { connection } from '@zero/db/schema';
Expand All @@ -7,33 +13,39 @@ import { eq } from 'drizzle-orm';
import { db } from '@zero/db';

export const GET = async (req: NextRequest) => {
const userId = await getAuthenticatedUserId();
const finalIp = processIP(req);
const ratelimit = getRatelimitModule({
prefix: `ratelimit:get-connections-${userId}`,
limiter: Ratelimit.slidingWindow(60, '1m'),
});
const { success, headers } = await checkRateLimit(ratelimit, finalIp);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
}
try {
const userId = await getAuthenticatedUserId();
const finalIp = processIP(req);
const ratelimit = getRatelimitModule({
prefix: `ratelimit:get-connections-${userId}`,
limiter: Ratelimit.slidingWindow(60, '1m'),
});
const { success, headers } = await checkRateLimit(ratelimit, finalIp);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
}

const connections = (await db
.select({
id: connection.id,
email: connection.email,
name: connection.name,
picture: connection.picture,
createdAt: connection.createdAt,
})
.from(connection)
.where(eq(connection.userId, userId))) as IConnection[];
const connections = (await db
.select({
id: connection.id,
email: connection.email,
name: connection.name,
picture: connection.picture,
createdAt: connection.createdAt,
})
.from(connection)
.where(eq(connection.userId, userId))) as IConnection[];

return NextResponse.json(connections, {
status: 200,
headers,
});
return NextResponse.json(connections, {
status: 200,
headers,
});
} catch (error) {
console.warn('Error getting connections:', error);
await logoutUser();
return NextResponse.json([]);
}
};
50 changes: 31 additions & 19 deletions apps/mail/app/api/driver/count/route.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,38 @@
import { checkRateLimit, getAuthenticatedUserId, getRatelimitModule, processIP } from '../../utils';
import {
checkRateLimit,
getAuthenticatedUserId,
getRatelimitModule,
logoutUser,
processIP,
} from '../../utils';
import { type NextRequest, NextResponse } from 'next/server';
import { getActiveDriver } from '@/actions/utils';
import { Ratelimit } from '@upstash/ratelimit';

export const GET = async (req: NextRequest) => {
const userId = await getAuthenticatedUserId();
const finalIp = processIP(req);
const ratelimit = getRatelimitModule({
prefix: `ratelimit:get-count-${userId}`,
limiter: Ratelimit.slidingWindow(60, '1m'),
});
const { success, headers } = await checkRateLimit(ratelimit, finalIp);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
try {
const userId = await getAuthenticatedUserId();
const finalIp = processIP(req);
const ratelimit = getRatelimitModule({
prefix: `ratelimit:get-count-${userId}`,
limiter: Ratelimit.slidingWindow(60, '1m'),
});
const { success, headers } = await checkRateLimit(ratelimit, finalIp);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
}
const driver = await getActiveDriver();
const count = await driver.count();
return NextResponse.json(count, {
status: 200,
headers,
});
} catch (error) {
console.warn('Error getting count:', error);
await logoutUser();
return NextResponse.json({});
}
const driver = await getActiveDriver();
const count = await driver.count();
return NextResponse.json(count, {
status: 200,
headers,
});
};
31 changes: 9 additions & 22 deletions apps/mail/app/api/driver/google.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,22 +129,12 @@ export const driver = async (config: IConfig): Promise<MailManager> => {
process.env.GOOGLE_REDIRECT_URI as string,
);

const getScope = () =>
[
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/userinfo.email',
].join(' ');
const getScope = () => ['https://www.googleapis.com/auth/gmail.modify'].join(' ');
if (config.auth) {
auth.setCredentials({
refresh_token: config.auth.refresh_token,
scope: getScope(),
});
if (process.env.NODE_ENV === 'production') {
EnableBrain()
.then(() => console.log('✅ Driver: Enabled'))
.catch(() => console.log('✅ Driver: Enabled'));
}
}
const parse = ({
id,
Expand Down Expand Up @@ -535,18 +525,15 @@ export const driver = async (config: IConfig): Promise<MailManager> => {
const { folder: normalizedFolder, q: normalizedQ } = normalizeSearch(folder, q ?? '');
const labelIds = [..._labelIds];
if (normalizedFolder) labelIds.push(normalizedFolder.toUpperCase());

return withExponentialBackoff(async () => {
const res = await gmail.users.threads.list({
userId: 'me',
q: normalizedQ ? normalizedQ : undefined,
labelIds: folder === 'inbox' ? labelIds : [],
maxResults,
pageToken: pageToken ? pageToken : undefined,
quotaUser: config.auth?.email,
});
return { ...res.data, threads: res.data.threads } as any;
const res = await gmail.users.threads.list({
userId: 'me',
q: normalizedQ ? normalizedQ : undefined,
labelIds: folder === 'inbox' ? labelIds : [],
maxResults,
pageToken: pageToken ? pageToken : undefined,
quotaUser: config.auth?.email,
});
return { ...res.data, threads: res.data.threads } as any;
Comment on lines +528 to +536
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Removed retry logic for Gmail API calls.

The removal of the withExponentialBackoff wrapper means the Gmail API call no longer has built-in retry logic for rate limiting issues.

This change could reduce the resilience to transient rate limit errors. Ensure that rate limit handling is implemented elsewhere, or consider restoring the retry logic:


🏁 Script executed:

#!/bin/bash
# Check for rate limiting error handling elsewhere
echo "Searching for rate limit error handling:"
rg -A 5 -B 5 "rate.*limit|429" --type ts apps/mail/ | grep -v "gmail-v1"

Length of output: 25720


🏁 Script executed:

#!/bin/bash
# Display lines around retry logic and the list call in google.ts
sed -n '480,600p' apps/mail/app/api/driver/google.ts

Length of output: 4553


Add retry with exponential backoff to the list call

The list method in apps/mail/app/api/driver/google.ts now invokes gmail.users.threads.list directly, whereas the get method still uses your withExponentialBackoff wrapper. Without backoff retries, transient 429/rate‑limit errors will surface immediately, reducing resilience.

• File: apps/mail/app/api/driver/google.ts
• Around lines ~528–536

Suggested change:

     list: async (
       folder: string,
       q: string,
       maxResults = 20,
       _labelIds: string[] = [],
       pageToken?: string,
     ) => {
       const { folder: normalizedFolder, q: normalizedQ } = normalizeSearch(folder, q ?? '');
       const labelIds = [..._labelIds];
       if (normalizedFolder) labelIds.push(normalizedFolder.toUpperCase());
-      const res = await gmail.users.threads.list({
-        userId: 'me',
-        q: normalizedQ || undefined,
-        labelIds: folder === 'inbox' ? labelIds : [],
-        maxResults,
-        pageToken: pageToken || undefined,
-        quotaUser: config.auth?.email,
-      });
-      return { ...res.data, threads: res.data.threads } as any;
+      return withExponentialBackoff(async () => {
+        const res = await gmail.users.threads.list({
+          userId: 'me',
+          q: normalizedQ || undefined,
+          labelIds: folder === 'inbox' ? labelIds : [],
+          maxResults,
+          pageToken: pageToken || undefined,
+          quotaUser: config.auth?.email,
+        });
+        return { ...res.data, threads: res.data.threads } as any;
+      });
     },

This restores automatic retries on rate‑limit and other transient errors, matching your existing get implementation.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const res = await gmail.users.threads.list({
userId: 'me',
q: normalizedQ ? normalizedQ : undefined,
labelIds: folder === 'inbox' ? labelIds : [],
maxResults,
pageToken: pageToken ? pageToken : undefined,
quotaUser: config.auth?.email,
});
return { ...res.data, threads: res.data.threads } as any;
list: async (
folder: string,
q: string,
maxResults = 20,
_labelIds: string[] = [],
pageToken?: string,
) => {
const { folder: normalizedFolder, q: normalizedQ } = normalizeSearch(folder, q ?? '');
const labelIds = [..._labelIds];
if (normalizedFolder) labelIds.push(normalizedFolder.toUpperCase());
return withExponentialBackoff(async () => {
const res = await gmail.users.threads.list({
userId: 'me',
q: normalizedQ || undefined,
labelIds: folder === 'inbox' ? labelIds : [],
maxResults,
pageToken: pageToken || undefined,
quotaUser: config.auth?.email,
});
return { ...res.data, threads: res.data.threads } as any;
});
},

},
get: async (id: string) => {
return withExponentialBackoff(async () => {
Expand Down
58 changes: 35 additions & 23 deletions apps/mail/app/api/driver/notes/route.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
import { processIP, getRatelimitModule, checkRateLimit, getAuthenticatedUserId } from '../../utils';
import {
processIP,
getRatelimitModule,
checkRateLimit,
getAuthenticatedUserId,
logoutUser,
} from '../../utils';
import { NextRequest, NextResponse } from 'next/server';
import { fetchThreadNotes } from '@/actions/notes';
import { Ratelimit } from '@upstash/ratelimit';
import { notesManager } from '../../notes/db';

export const GET = async (req: NextRequest) => {
const userId = await getAuthenticatedUserId();
const finalIp = processIP(req);
const ratelimit = getRatelimitModule({
prefix: `ratelimit:get-thread-notes-${userId}`,
limiter: Ratelimit.slidingWindow(60, '1m'),
});
const { success, headers } = await checkRateLimit(ratelimit, finalIp);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
}
const searchParams = req.nextUrl.searchParams;
try {
const userId = await getAuthenticatedUserId();
const finalIp = processIP(req);
const ratelimit = getRatelimitModule({
prefix: `ratelimit:get-thread-notes-${userId}`,
limiter: Ratelimit.slidingWindow(60, '1m'),
});
const { success, headers } = await checkRateLimit(ratelimit, finalIp);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
}
const searchParams = req.nextUrl.searchParams;

if (!searchParams.get('threadId')) {
return NextResponse.json({ error: 'Missing threadId' }, { status: 400 });
}
if (!searchParams.get('threadId')) {
return NextResponse.json({ error: 'Missing threadId' }, { status: 400 });
}

const notes = await notesManager.getThreadNotes(userId, searchParams.get('threadId')!);
const notes = await notesManager.getThreadNotes(userId, searchParams.get('threadId')!);

return NextResponse.json(notes, {
status: 200,
headers,
});
return NextResponse.json(notes, {
status: 200,
headers,
});
} catch (error) {
console.warn('Error getting thread notes:', error);
await logoutUser();
return NextResponse.json([]);
}
};
2 changes: 1 addition & 1 deletion apps/mail/app/api/driver/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ export const GET = async (req: NextRequest) => {
} catch (error) {
console.warn('Error getting threads:', error);
await logoutUser();
// return NextResponse.redirect('/login?error=unauthorized');
return NextResponse.json({ messages: [], nextPageToken: null });
}
};
3 changes: 3 additions & 0 deletions apps/mail/app/api/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Ratelimit, Algorithm, RatelimitConfig } from '@upstash/ratelimit';
import { deleteActiveConnection } from '@/actions/utils';
import { redirect } from 'next/navigation';
import { NextRequest } from 'next/server';
import { headers } from 'next/headers';
Expand Down Expand Up @@ -42,7 +43,9 @@ export async function getAuthenticatedUserId(): Promise<string> {
return session.user.id;
}

// Forcefully logout the user, this will delete the active connection
export async function logoutUser() {
await deleteActiveConnection();
const headersList = await headers();
await auth.api.signOut({ headers: headersList });
}
Expand Down
60 changes: 42 additions & 18 deletions apps/mail/app/api/v1/hotkeys/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { checkRateLimit, getAuthenticatedUserId, getRatelimitModule } from '../../utils';
import type { Shortcut } from '@/config/shortcuts';
import { Ratelimit } from '@upstash/ratelimit';
import { userHotkeys } from '@zero/db/schema';
import { NextResponse } from 'next/server';
import { headers } from 'next/headers';
Expand All @@ -7,33 +9,48 @@ import { eq } from 'drizzle-orm';
import { db } from '@zero/db';

export async function GET() {
const session = await auth.api.getSession({
headers: await headers(),
const userId = await getAuthenticatedUserId();

const ratelimit = getRatelimitModule({
prefix: 'ratelimit:hotkeys',
limiter: Ratelimit.slidingWindow(60, '1m'),
});
if (!session) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });

const { success, headers } = await checkRateLimit(ratelimit, userId);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
}

const result = await db.select().from(userHotkeys).where(eq(userHotkeys.userId, session.user.id));
const result = await db.select().from(userHotkeys).where(eq(userHotkeys.userId, userId));

return NextResponse.json(result[0]?.shortcuts || []);
}

export async function POST(request: Request) {
const session = await auth.api.getSession({
headers: await headers(),
const userId = await getAuthenticatedUserId();

const ratelimit = getRatelimitModule({
prefix: 'ratelimit:hotkeys-post',
limiter: Ratelimit.slidingWindow(60, '1m'),
});
if (!session) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
}

const { success, headers } = await checkRateLimit(ratelimit, userId);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
}
const shortcuts = (await request.json()) as Shortcut[];
const now = new Date();

await db
.insert(userHotkeys)
.values({
userId: session.user.id,
userId: userId,
shortcuts,
createdAt: now,
updatedAt: now,
Expand All @@ -50,17 +67,24 @@ export async function POST(request: Request) {
}

export async function PUT(request: Request) {
const session = await auth.api.getSession({
headers: await headers(),
const userId = await getAuthenticatedUserId();

const ratelimit = getRatelimitModule({
prefix: 'ratelimit:hotkeys-put',
limiter: Ratelimit.slidingWindow(60, '1m'),
});
if (!session) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
}

const { success, headers } = await checkRateLimit(ratelimit, userId);
if (!success) {
return NextResponse.json(
{ error: 'Too many requests. Please try again later.' },
{ status: 429, headers },
);
}
const shortcut = (await request.json()) as Shortcut;
const now = new Date();

const result = await db.select().from(userHotkeys).where(eq(userHotkeys.userId, session.user.id));
const result = await db.select().from(userHotkeys).where(eq(userHotkeys.userId, userId));

const existingShortcuts = (result[0]?.shortcuts || []) as Shortcut[];
const updatedShortcuts = existingShortcuts.map((s: Shortcut) =>
Expand All @@ -74,7 +98,7 @@ export async function PUT(request: Request) {
await db
.insert(userHotkeys)
.values({
userId: session.user.id,
userId,
shortcuts: updatedShortcuts,
createdAt: now,
updatedAt: now,
Expand Down
Loading