Skip to content

Comments

Implement Gmail push notifications with workflows#1318

Merged
MrgSub merged 1 commit intostagingfrom
06-16-background_pipelines
Jun 17, 2025
Merged

Implement Gmail push notifications with workflows#1318
MrgSub merged 1 commit intostagingfrom
06-16-background_pipelines

Conversation

@MrgSub
Copy link
Collaborator

@MrgSub MrgSub commented Jun 17, 2025

Implement Gmail Push Notifications with Workflows

This PR implements a comprehensive Gmail push notification system using Cloudflare Workflows. It creates a subscription factory pattern to handle different email providers, with Google implementation completed. The system processes Gmail history updates, vectorizes messages, generates thread summaries, and applies AI-generated labels.

Type of Change

  • ✨ New feature (non-breaking change which adds functionality)
  • ⚡ Performance improvement

Areas Affected

  • Email Integration (Gmail, IMAP, etc.)
  • User Interface/Experience
  • Data Storage/Management
  • API Endpoints

Testing Done

  • Manual testing performed

Checklist

  • I have performed a self-review of my code
  • I have commented my code, particularly in complex areas
  • My changes generate no new warnings

Additional Notes

Key changes:

  1. Created subscription factory pattern with Google implementation
  2. Added Cloudflare Workflows for processing Gmail notifications
  3. Implemented thread and message vectorization
  4. Added AI-based thread labeling
  5. Fixed string template in party.tsx component
  6. Added cheerio for HTML parsing
  7. Enabled interactive dev session in wrangler config

The system now processes Gmail notifications in real-time, extracts meaningful data, and provides AI-generated labels for better email organization.

Summary by CodeRabbit

  • New Features

    • Introduced advanced workflows for processing and labeling email threads using AI, including summarization, vectorization, and automatic label assignment.
    • Added support for managing subscriptions and notifications for Google and Outlook email providers.
    • Implemented user notification and token verification features for enhanced security and real-time updates.
    • Added a new subscription factory system with Google subscription management and token verification.
  • Improvements

    • Streamlined subscription management with new queue-based and scheduled background processing for renewals and state tracking.
    • Enhanced label management with detailed default categories for email threads.
    • Improved connection and subscription state handling for greater reliability and clarity.
    • Refined brain function enabling/disabling flows with better error handling and provider-specific subscription management.
  • Bug Fixes

    • Adjusted connection handling to ensure correct context usage and state updates.
    • Fixed string conversion for connection IDs in UI components and notification logic.
  • Chores

    • Updated environment configurations to support new workflows, queues, and storage namespaces.
    • Added new dependencies including cheerio for HTML parsing.
    • Added new utilities for managing subscription states and notification URLs.
  • Documentation

    • Expanded type definitions and documentation for new features and provider support.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 17, 2025

Walkthrough

This update introduces a comprehensive subscription and workflow management system for handling email provider integrations (Google and Outlook) in a mail server application. It adds new subscription factory classes, workflow pipelines for processing Gmail data, utility functions, and updates configuration files to support queues and workflows. Type definitions, prompt templates, and notification mechanisms are also enhanced.

Changes

Files/Paths Change Summary
apps/mail/components/party.tsx, apps/mail/components/ui/ai-sidebar.tsx Changed string conversion of activeConnection.id to explicit String() calls for room/name.
apps/server/package.json Added cheerio dependency.
apps/server/src/lib/auth.ts Added brain function disabling/enabling calls in connection and user deletion flows.
apps/server/src/lib/brain.fallback.prompts.ts Added ThreadLabels prompt template; updated existing prompts to use dedent.
apps/server/src/lib/brain.ts Refactored brain function enable/disable to use provider-specific subscription factories with error handling.
apps/server/src/lib/driver/google.ts Added listHistory<T> method to GoogleMailManager.
apps/server/src/lib/driver/types.ts Added listHistory<T> method to MailManager interface.
apps/server/src/lib/factories/base-subscription.factory.ts Added abstract BaseSubscriptionFactory class and related interfaces for subscription management.
apps/server/src/lib/factories/google-subscription.factory.ts Added GoogleSubscriptionFactory with Google Pub/Sub and Gmail watch integration.
apps/server/src/lib/factories/outlook-subscription.factory.ts Added stub OutlookSubscriptionFactory with unimplemented methods.
apps/server/src/lib/factories/subscription-factory.registry.ts Added registry for subscription factories keyed by provider.
apps/server/src/lib/server-utils.ts Added user notification and Google token verification utilities.
apps/server/src/lib/utils.ts Added utilities for notification URLs, subscription state management, and a mock app context.
apps/server/src/main.ts Added POST route for provider notifications; added queue and scheduled methods for subscription renewal and workflow triggering.
apps/server/src/pipelines.ts Added MainWorkflow, ZeroWorkflow, and ThreadWorkflow classes for Gmail data processing pipelines; added helper functions.
apps/server/src/trpc/routes/brain.ts Simplified brain mutations to use active connection context; added subscription state setting and queue messaging; updated summary and state queries.
apps/server/src/types.ts Added enum EProviders, interface ISubscribeBatch, and defaultLabels array.
apps/server/wrangler.jsonc Added queue and workflow configuration; replaced services with workflows; updated KV namespaces and environment variables.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant ServerAPI
    participant SubscriptionFactory
    participant ProviderAPI
    participant Workflow

    Client->>ServerAPI: Enable brain function (mutation)
    ServerAPI->>SubscriptionFactory: getSubscriptionFactory(provider)
    ServerAPI->>SubscriptionFactory: subscribe({connectionId, providerId})
    SubscriptionFactory->>ProviderAPI: Setup subscription (e.g., Gmail watch)
    ProviderAPI-->>SubscriptionFactory: Subscription confirmation
    SubscriptionFactory-->>ServerAPI: Subscription result

    Note over ServerAPI,Workflow: On notification from provider
    ProviderAPI->>ServerAPI: POST /a8n/notify/:providerId
    ServerAPI->>Workflow: Trigger MainWorkflow(historyId, subscriptionName)
    Workflow->>Workflow: Orchestrate ZeroWorkflow and ThreadWorkflow for processing
Loading

Possibly related PRs

  • Mail-0/Zero#937: Adds a NotificationProvider React component using usePartySocket with a room name based on the active email connection, directly related to changes in party socket room naming.

Poem

🐇
In the burrow, wires hum and spin,
New workflows and factories join in.
Subscriptions queue, the emails flow,
Labels and threads in tidy rows.
With prompts and tokens, brains awake—
This rabbit’s code, for progress’ sake!
📨✨

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Collaborator Author

MrgSub commented Jun 17, 2025

This stack of pull requests is managed by Graphite. Learn more about stacking.

@MrgSub MrgSub changed the title Background Pipelines Implement Gmail push notifications with workflows Jun 17, 2025
@MrgSub MrgSub marked this pull request as ready for review June 17, 2025 02:06
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

🔭 Outside diff range comments (1)
apps/server/wrangler.jsonc (1)

95-105: Add missing secrets & replace the placeholder service-account

setupGmailWatch() relies on env.GOOGLE_CLIENT_ID and env.GOOGLE_CLIENT_SECRET, but these variables are not present in any environment block.
Likewise, keeping "GOOGLE_S_ACCOUNT": "{}" will parse, yet every required field (client_email, private_key, …) is empty and will make JWT signing fail.

• Inject the real service-account JSON through Wrangler secrets or .dev.vars.
• Declare GOOGLE_CLIENT_ID / GOOGLE_CLIENT_SECRET in each vars section (again via secrets in prod).

Failing to do so will make the whole Gmail subscription flow break at runtime.

Also applies to: 223-230, 345-352

🧹 Nitpick comments (10)
apps/server/package.json (1)

6-8: --show-interactive-dev-session=true can leak env/secrets in CI

Interactive Dev Sessions surface request/response logs in a public URL.
If you ever run npm run dev in CI or a shared environment those logs may contain cookies or bearer tokens set in wrangler.toml / wrangler.jsonc.

Consider guarding the flag with an env check:

-    "dev": "wrangler dev --show-interactive-dev-session=true --experimental-vectorize-bind-to-prod --env local",
+    "dev": "wrangler dev $([[ $CI == \"true\" ]] && echo \"\" || echo \"--show-interactive-dev-session=true\") --experimental-vectorize-bind-to-prod --env local",
apps/server/src/types.ts (1)

13-44: Duplicate hard-coded defaultLabels – consider centralising
An almost identical defaultLabels array already lives in apps/mail/components/mail/mail.tsx. Maintaining two sources is error-prone and violates DRY. Export the array from a shared module (e.g. shared/labels.ts) and import it in both client & server to guarantee they stay in lock-step.

apps/server/src/lib/server-utils.ts (2)

53-59: Avoid banned {} type – give the payload an explicit shape
Biome flags this line. You already define distinct payload types below,
so the umbrella type can be a union instead of bare {}:

-type ListThreadsNotification = {
-  type: 'listThreads';
-  payload: {};
-};
+type ListThreadsNotification = {
+  type: 'listThreads';
+  payload: Record<string, never>;
+};

or simply drop the redundant wrapper and use void where no data is
expected.


116-130: verifyToken hits Google endpoint on every call – cache or use certs
Round-tripping to https://oauth2.googleapis.com/tokeninfo per request
adds latency and may run into quota limits. Google-issued tokens are JWTs
that can be verified locally with the certs from
https://www.googleapis.com/oauth2/v3/certs. Consider switching to
offline verification with periodic cert refresh.

apps/server/src/lib/factories/outlook-subscription.factory.ts (2)

8-15: Method stubs throw generic errors – return 501 Not Implemented Response
These factories sit behind an HTTP-style interface. Throwing raw Errors
bubbles up as 500s and pollutes logs. Return an explicit Response with
status = 501 to make the intent clear until implementation lands.

-    throw new Error('Outlook subscription not implemented yet');
+    return new Response('Outlook subscription not implemented', { status: 501 });

24-28: Token verification placeholder
Be aware that Microsoft Graph token validation requires hitting
https://login.microsoftonline.com/common/v2.0/.well-known/openid-configuration
to fetch keys, similar to Google. Plan for local JWT validation as with
Google to avoid per-request latency.

apps/server/src/lib/utils.ts (1)

13-23: Move test-only c stub out of production code

The exported mock c leaks into runtime bundles and risks accidental use in real handlers. Relocate it to a dedicated test helper and exclude it from the worker build.

apps/server/src/lib/factories/base-subscription.factory.ts (1)

26-36: Reuse database handle instead of reconnecting per call

createDb spawns a fresh Postgres connection each time getConnectionFromDb runs, which can overwhelm connection limits under load. Pass a shared DB instance to the factory or cache it at module scope.

apps/server/src/trpc/routes/brain.ts (1)

40-43: Null-safety guard for metadata

result.connection is accessed without checking that the metadata actually contains the connection key. Add a nullish check to avoid undefined comparisons that would erroneously pass the ownership test.

apps/server/src/pipelines.ts (1)

240-255: Serial 4 s sleep limits throughput

Sleeping 4 seconds per thread (threadsToProcess.forEach) caps you at 900 threads/hour even when Gmail quota allows more.

Consider:

• Using a concurrency counter & Promise.allSettled up to N parallel threads.
• Back-off only on 429 / rate-limit responses.

This dramatically improves latency while remaining quota-safe.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 99ba8d4 and 7a5b9b1.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (18)
  • apps/mail/components/party.tsx (1 hunks)
  • apps/server/package.json (3 hunks)
  • apps/server/src/lib/auth.ts (3 hunks)
  • apps/server/src/lib/brain.fallback.prompts.ts (2 hunks)
  • apps/server/src/lib/brain.ts (1 hunks)
  • apps/server/src/lib/driver/google.ts (1 hunks)
  • apps/server/src/lib/driver/types.ts (1 hunks)
  • apps/server/src/lib/factories/base-subscription.factory.ts (1 hunks)
  • apps/server/src/lib/factories/google-subscription.factory.ts (1 hunks)
  • apps/server/src/lib/factories/outlook-subscription.factory.ts (1 hunks)
  • apps/server/src/lib/factories/subscription-factory.registry.ts (1 hunks)
  • apps/server/src/lib/server-utils.ts (2 hunks)
  • apps/server/src/lib/utils.ts (1 hunks)
  • apps/server/src/main.ts (4 hunks)
  • apps/server/src/pipelines.ts (1 hunks)
  • apps/server/src/trpc/routes/brain.ts (3 hunks)
  • apps/server/src/types.ts (1 hunks)
  • apps/server/wrangler.jsonc (9 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (8)
apps/server/src/lib/brain.fallback.prompts.ts (1)
apps/server/src/types.ts (1)
  • defaultLabels (13-44)
apps/server/src/lib/factories/outlook-subscription.factory.ts (1)
apps/server/src/lib/factories/base-subscription.factory.ts (2)
  • SubscriptionData (6-10)
  • UnsubscriptionData (12-15)
apps/server/src/types.ts (1)
apps/mail/components/mail/mail.tsx (1)
  • defaultLabels (74-105)
apps/server/src/lib/brain.ts (2)
apps/server/src/db/schema.ts (1)
  • connection (87-106)
apps/server/src/lib/factories/subscription-factory.registry.ts (1)
  • getSubscriptionFactory (20-26)
apps/server/src/lib/factories/base-subscription.factory.ts (3)
apps/server/src/db/index.ts (1)
  • createDb (5-9)
apps/server/src/db/schema.ts (1)
  • connection (87-106)
apps/server/src/types.ts (1)
  • defaultLabels (13-44)
apps/server/src/lib/factories/subscription-factory.registry.ts (1)
apps/server/src/lib/factories/outlook-subscription.factory.ts (1)
  • OutlookSubscriptionFactory (8-29)
apps/server/src/lib/utils.ts (1)
apps/server/src/types.ts (1)
  • AppContext (211-211)
apps/server/src/trpc/routes/brain.ts (5)
apps/server/src/trpc/trpc.ts (1)
  • activeConnectionProcedure (31-42)
apps/server/src/db/schema.ts (1)
  • connection (87-106)
apps/server/src/lib/utils.ts (1)
  • setSubscribedState (33-41)
apps/server/src/types.ts (1)
  • ISubscribeBatch (8-11)
apps/server/src/lib/brain.ts (1)
  • disableBrainFunction (11-16)
🪛 Biome (1.9.4)
apps/server/src/lib/server-utils.ts

[error] 57-57: Don't use '{}' as a type.

Prefer explicitly define the object shape. '{}' means "any non-nullable value".

(lint/complexity/noBannedTypes)

🔇 Additional comments (6)
apps/server/package.json (1)

44-48: Pinning cheerio without a caret may block security patches

Most prod deps are locked to patch versions (1.2.9, 12.0.0, …) so this may be deliberate.
If not, switch to a caret so Dependabot/Snyk can surface CVEs automatically.

-    "cheerio": "1.1.0",
+    "cheerio": "^1.1.0",
apps/server/src/lib/driver/types.ts (1)

70-73: Interface change may break non-Google drivers

listHistory was added to MailManager, but only the Google implementation is provided in this PR.
Any Outlook or future providers that compile against this interface will now fail.

Add a stub (throwing) implementation to the other drivers or make the method optional:

-  listHistory<T>(historyId: string): Promise<{ history: T[]; historyId: string }>;
+  listHistory?<T = unknown>(historyId: string): Promise<{ history: T[]; historyId: string }>;
apps/server/src/lib/auth.ts (1)

121-125: Enum cast may hide unsupported providers
disableBrainFunction({ providerId: connection.providerId as EProviders })
assumes every stored string is a valid enum value. If the DB still holds
'microsoft' while the enum expects 'outlook', this will throw. Handle
unknown providers explicitly or normalise the stored values during
migration.

apps/server/src/main.ts (1)

203-240: Verify KV key used for expiry tracking

The scheduler reads last-subscribe timestamps from env.gmail_sub_age, but the only writer shown is setSubscribedState, which writes to env.subscribed_accounts. If nothing populates gmail_sub_age, every key appears expired. Confirm the correct namespace or update writer/reader to match.

apps/server/wrangler.jsonc (1)

38-57: Double-check queue binding names

The bindings are lowercase (thread_queue, subscribe_queue) while Workers runtime usually exposes them as the exact identifier (env.thread_queue).
If your code (e.g. main.ts) imports env.THREAD_QUEUE / SUBSCRIBE_QUEUE the mismatch will throw “binding not found” at deploy time.

Verify the names match the identifiers used in the handlers or rename here for consistency.

Also applies to: 166-185, 315-334

apps/server/src/lib/factories/google-subscription.factory.ts (1)

200-221: google-auth-library API usage may be outdated

auth.refreshAccessToken() was removed in v9; current API is auth.getAccessToken() or auth.refreshAccessTokenAsync().
If the project is on the latest google-auth-library, this call will throw.

Confirm library version and update the call accordingly.

Comment on lines +47 to +63
public async listHistory<T>(historyId: string): Promise<{ history: T[]; historyId: string }> {
return this.withErrorHandler(
'listHistory',
async () => {
const response = await this.gmail.users.history.list({
userId: 'me',
startHistoryId: historyId,
});

const history = response.data.history || [];
const nextHistoryId = response.data.historyId || historyId;

return { history: history as T[], historyId: nextHistoryId };
},
{ historyId },
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Typed cast hides the actual Gmail Schema$History

history is force-cast to T[], losing type safety and IntelliSense.

If you want a generic return, expose the concrete type and let callers up-cast:

-  public async listHistory<T>(historyId: string): Promise<{ history: T[]; historyId: string }> {
+  public async listHistory(
+    historyId: string,
+  ): Promise<{ history: gmail_v1.Schema$History[]; historyId: string }> {-        const history = response.data.history || [];
+        const history = response.data.history ?? [];

Call sites that care about sub-shapes can as-cast, but the default is now correct.

Also consider handling the “History cursor is too old” 404 explicitly and marking the connection as stale so a new watch can be issued.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In apps/server/src/lib/driver/google.ts lines 47 to 63, the method listHistory
force-casts the Gmail API response history to a generic type T[], which hides
the actual Gmail Schema$History type and reduces type safety. To fix this,
change the method to return the concrete Gmail Schema$History[] type by default
and let callers cast to more specific types if needed. Additionally, add
explicit handling for the "History cursor is too old" 404 error from the Gmail
API by catching this error, marking the connection as stale, and triggering a
new watch to refresh the history cursor.

@MrgSub MrgSub force-pushed the 06-16-background_pipelines branch from 151dbb1 to 306f5ed Compare June 17, 2025 02:37
Copy link
Collaborator Author

MrgSub commented Jun 17, 2025

Merge activity

  • Jun 17, 2:39 AM UTC: A user started a stack merge that includes this pull request via Graphite.
  • Jun 17, 2:39 AM UTC: @MrgSub merged this pull request with Graphite.

@MrgSub MrgSub merged commit f1e56a8 into staging Jun 17, 2025
2 of 3 checks passed
@MrgSub MrgSub deleted the 06-16-background_pipelines branch June 17, 2025 02:39
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
apps/server/src/lib/brain.fallback.prompts.ts (1)

217-224: Fix malformed XML tags in label template.

The XML tags are incorrectly formatted:

  • Line 220: <name/> should be </name>
  • Line 221: Missing </usecase> closing tag
           ${labels
             .map(
               (label) => `<item>
-          <name>${label.name}<name/>
-          <usecase>${defaultLabels.find((e) => e.name === label.name)?.usecase || ''}</usecase>    
+          <name>${label.name}</name>
+          <usecase>${defaultLabels.find((e) => e.name === label.name)?.usecase || ''}</usecase>
           </item>`,
             )
             .join('\n')}
apps/server/src/lib/factories/google-subscription.factory.ts (1)

270-277: Move KV write after successful Gmail watch setup to prevent leaks.

The gmail_sub_age.put() operation occurs before setupGmailWatch(). If the watch setup fails, the KV entry remains, incorrectly marking the connection as subscribed.

         console.log(`[SUBSCRIPTION] Setting up Gmail watch for connection: ${connectionData.id}`);
         await this.setupGmailWatch(connectionData, pubSubName);
 
+        // Only mark as subscribed after successful watch setup
         await env.gmail_sub_age.put(
           `${connectionId}__${EProviders.google}`,
           new Date().toISOString(),
         );
🧹 Nitpick comments (2)
apps/server/src/lib/server-utils.ts (1)

117-131: Strengthen token validation beyond presence check.

The current implementation only verifies that the tokeninfo endpoint returns some data. Consider validating specific fields like aud (audience), iss (issuer), or email_verified to ensure the token is not just valid but also appropriate for your application.

 export const verifyToken = async (token: string) => {
   const response = await fetch(`https://oauth2.googleapis.com/tokeninfo?id_token=${token}`, {
     method: 'GET',
     headers: {
       'Content-Type': 'application/json',
     },
   });
 
   if (!response.ok) {
     throw new Error(`Failed to verify token: ${await response.text()}`);
   }
 
   const data = (await response.json()) as any;
-  return !!data;
+  // Validate essential token fields
+  return !!(data && data.email_verified && data.aud === env.GOOGLE_CLIENT_ID);
 };
apps/server/src/lib/factories/google-subscription.factory.ts (1)

326-339: Consolidate duplicate token verification logic.

This verifyToken method duplicates the logic from server-utils.ts. Consider:

  1. Using the shared verifyToken from server-utils
  2. Or if provider-specific validation is needed, extend the shared implementation
+  import { verifyToken as verifyGoogleToken } from '../../lib/server-utils';
+
   public async verifyToken(token: string): Promise<boolean> {
-    try {
-      const response = await fetch(`https://oauth2.googleapis.com/tokeninfo?id_token=${token}`);
-
-      if (!response.ok) {
-        return false;
-      }
-
-      const data = await response.json();
-      return !!data;
-    } catch {
-      return false;
-    }
+    return verifyGoogleToken(token);
   }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 151dbb1 and 306f5ed.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (19)
  • apps/mail/components/party.tsx (1 hunks)
  • apps/mail/components/ui/ai-sidebar.tsx (2 hunks)
  • apps/server/package.json (2 hunks)
  • apps/server/src/lib/auth.ts (4 hunks)
  • apps/server/src/lib/brain.fallback.prompts.ts (4 hunks)
  • apps/server/src/lib/brain.ts (1 hunks)
  • apps/server/src/lib/driver/google.ts (1 hunks)
  • apps/server/src/lib/driver/types.ts (1 hunks)
  • apps/server/src/lib/factories/base-subscription.factory.ts (1 hunks)
  • apps/server/src/lib/factories/google-subscription.factory.ts (1 hunks)
  • apps/server/src/lib/factories/outlook-subscription.factory.ts (1 hunks)
  • apps/server/src/lib/factories/subscription-factory.registry.ts (1 hunks)
  • apps/server/src/lib/server-utils.ts (2 hunks)
  • apps/server/src/lib/utils.ts (1 hunks)
  • apps/server/src/main.ts (4 hunks)
  • apps/server/src/pipelines.ts (1 hunks)
  • apps/server/src/trpc/routes/brain.ts (3 hunks)
  • apps/server/src/types.ts (1 hunks)
  • apps/server/wrangler.jsonc (9 hunks)
✅ Files skipped from review due to trivial changes (1)
  • apps/mail/components/ui/ai-sidebar.tsx
🚧 Files skipped from review as they are similar to previous changes (13)
  • apps/server/package.json
  • apps/server/src/lib/driver/types.ts
  • apps/mail/components/party.tsx
  • apps/server/src/lib/auth.ts
  • apps/server/src/lib/driver/google.ts
  • apps/server/src/lib/factories/outlook-subscription.factory.ts
  • apps/server/src/lib/brain.ts
  • apps/server/src/main.ts
  • apps/server/src/lib/utils.ts
  • apps/server/src/trpc/routes/brain.ts
  • apps/server/src/lib/factories/base-subscription.factory.ts
  • apps/server/wrangler.jsonc
  • apps/server/src/pipelines.ts
🧰 Additional context used
🧬 Code Graph Analysis (2)
apps/server/src/types.ts (1)
apps/mail/components/mail/mail.tsx (1)
  • defaultLabels (74-105)
apps/server/src/lib/brain.fallback.prompts.ts (1)
apps/server/src/types.ts (1)
  • defaultLabels (13-44)
🪛 Biome (1.9.4)
apps/server/src/lib/server-utils.ts

[error] 57-57: Don't use '{}' as a type.

Prefer explicitly define the object shape. '{}' means "any non-nullable value".

(lint/complexity/noBannedTypes)

🔇 Additional comments (1)
apps/server/src/lib/factories/subscription-factory.registry.ts (1)

1-27: Well-structured factory registry implementation.

Good job addressing the previous concern by commenting out the unfinished OutlookSubscriptionFactory. The registry pattern is cleanly implemented with proper error handling and accessor methods.

Comment on lines +13 to +44
export const defaultLabels = [
{
name: 'to respond',
usecase: 'emails you need to respond to. NOT sales, marketing, or promotions.',
},
{
name: 'FYI',
usecase:
'emails that are not important, but you should know about. NOT sales, marketing, or promotions.',
},
{
name: 'comment',
usecase:
'Team chats in tools like Google Docs, Slack, etc. NOT marketing, sales, or promotions.',
},
{
name: 'notification',
usecase: 'Automated updates from services you use. NOT sales, marketing, or promotions.',
},
{
name: 'promotion',
usecase: 'Sales, marketing, cold emails, special offers or promotions. NOT to respond to.',
},
{
name: 'meeting',
usecase: 'Calendar events, invites, etc. NOT sales, marketing, or promotions.',
},
{
name: 'billing',
usecase: 'Billing notifications. NOT sales, marketing, or promotions.',
},
];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Eliminate code duplication by centralizing label definitions.

The defaultLabels array is an exact duplicate of the one defined in apps/mail/components/mail/mail.tsx. This violates the DRY principle and creates maintenance risks.

Consider moving the label definitions to a shared location (e.g., a dedicated constants file) and importing them in both places.

🤖 Prompt for AI Agents
In apps/server/src/types.ts lines 13 to 44, the defaultLabels array duplicates
the same array in apps/mail/components/mail/mail.tsx, violating DRY principles.
To fix this, create a new shared constants file (e.g.,
apps/shared/constants/labels.ts) to hold the defaultLabels array, then import
and use this shared constant in both apps/server/src/types.ts and
apps/mail/components/mail/mail.tsx, removing the duplicated definitions from
both files.

Comment on lines +283 to +304
} catch (error) {
console.error('[SUBSCRIPTION] Setup failed:', error);

// Clean up on failure using base class method
// await this.cleanupOnFailure(connectionId, env);

if (error instanceof Error && error.message.includes('Already Exists')) {
console.log('Resource already exists, continuing...');
return c.json({});
}

throw error;
}
} catch (error) {
console.error('[SUBSCRIPTION] Error:', error);

// Clean up on error using base class method
// await this.cleanupOnFailure(connectionId, env);

return c.json({ error: 'Internal server error' }, { status: 500 });
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Enable cleanup on failure and manage subscription state.

The error handling has commented-out cleanup calls and doesn't set the subscription state. This can lead to:

  1. Resource leaks on failure
  2. Inconsistent subscription state
       } catch (error) {
         console.error('[SUBSCRIPTION] Setup failed:', error);
 
         // Clean up on failure using base class method
-        // await this.cleanupOnFailure(connectionId, env);
+        await this.cleanupOnFailure(connectionId, env);
 
         if (error instanceof Error && error.message.includes('Already Exists')) {
           console.log('Resource already exists, continuing...');
+          await this.setSubscribedState(connectionId, EProviders.google, 'active', env);
           return c.json({});
         }
 
         throw error;
       }
     } catch (error) {
       console.error('[SUBSCRIPTION] Error:', error);
 
       // Clean up on error using base class method
-      //   await this.cleanupOnFailure(connectionId, env);
+      await this.cleanupOnFailure(connectionId, env);
 
       return c.json({ error: 'Internal server error' }, { status: 500 });
     }

Additionally, add a call to set the subscription state to 'active' after successful setup:

         console.log(`[SUBSCRIPTION] Initializing labels for connection: ${connectionId}`);
         await this.initializeConnectionLabels(connectionId);
+        
+        // Mark subscription as active
+        await this.setSubscribedState(connectionId, EProviders.google, 'active', env);
 
         console.log(`[SUBSCRIPTION] Setup completed successfully for connection: ${connectionId}`);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (error) {
console.error('[SUBSCRIPTION] Setup failed:', error);
// Clean up on failure using base class method
// await this.cleanupOnFailure(connectionId, env);
if (error instanceof Error && error.message.includes('Already Exists')) {
console.log('Resource already exists, continuing...');
return c.json({});
}
throw error;
}
} catch (error) {
console.error('[SUBSCRIPTION] Error:', error);
// Clean up on error using base class method
// await this.cleanupOnFailure(connectionId, env);
return c.json({ error: 'Internal server error' }, { status: 500 });
}
}
} catch (error) {
console.error('[SUBSCRIPTION] Setup failed:', error);
// Clean up on failure using base class method
await this.cleanupOnFailure(connectionId, env);
if (error instanceof Error && error.message.includes('Already Exists')) {
console.log('Resource already exists, continuing...');
await this.setSubscribedState(connectionId, EProviders.google, 'active', env);
return c.json({});
}
throw error;
}
} catch (error) {
console.error('[SUBSCRIPTION] Error:', error);
// Clean up on error using base class method
await this.cleanupOnFailure(connectionId, env);
return c.json({ error: 'Internal server error' }, { status: 500 });
}
Suggested change
} catch (error) {
console.error('[SUBSCRIPTION] Setup failed:', error);
// Clean up on failure using base class method
// await this.cleanupOnFailure(connectionId, env);
if (error instanceof Error && error.message.includes('Already Exists')) {
console.log('Resource already exists, continuing...');
return c.json({});
}
throw error;
}
} catch (error) {
console.error('[SUBSCRIPTION] Error:', error);
// Clean up on error using base class method
// await this.cleanupOnFailure(connectionId, env);
return c.json({ error: 'Internal server error' }, { status: 500 });
}
}
console.log(`[SUBSCRIPTION] Initializing labels for connection: ${connectionId}`);
await this.initializeConnectionLabels(connectionId);
// Mark subscription as active
await this.setSubscribedState(connectionId, EProviders.google, 'active', env);
console.log(`[SUBSCRIPTION] Setup completed successfully for connection: ${connectionId}`);
🤖 Prompt for AI Agents
In apps/server/src/lib/factories/google-subscription.factory.ts between lines
283 and 304, uncomment and enable the cleanupOnFailure calls in both catch
blocks to prevent resource leaks on failure. Also, after a successful
subscription setup, add a call to set the subscription state to 'active' to
ensure consistent subscription state management.

@coderabbitai coderabbitai bot mentioned this pull request Jun 17, 2025
34 tasks
@coderabbitai coderabbitai bot mentioned this pull request Jul 9, 2025
11 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant