Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/mail/components/mail/mail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ function CategoryDropdown({ isMultiSelectMode }: CategoryDropdownProps) {
<div className="relative overflow-visible">
<Mail className="h-4 w-4 fill-white dark:fill-white" />
</div>
<span className="text-xs font-medium">Labels</span>
<span className="text-xs font-medium">Categories</span>
<ChevronDown
className={`h-2 w-2 text-white transition-transform duration-200 ${isOpen ? 'rotate-180' : 'rotate-0'}`}
/>
Expand Down
7 changes: 1 addition & 6 deletions apps/mail/components/party.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,8 @@ export const NotificationProvider = () => {
const { threadId } = JSON.parse(message.data);
queryClient.invalidateQueries({
queryKey: trpc.mail.get.queryKey({ id: threadId }),
refetchType: 'active',
exact: true,
predicate: (query) => {
const queryAge = Date.now() - (query.state.dataUpdatedAt || 0);
return queryAge > 60000; // 1 minute in milliseconds
},
});
console.log('invalidated mail get', threadId);
} else if (type === IncomingMessageType.Mail_List) {
const { folder } = JSON.parse(message.data);
queryClient.invalidateQueries({
Expand Down
7 changes: 6 additions & 1 deletion apps/mail/providers/query-provider.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ export function QueryProvider({
persistOptions={{
persister,
buster: CACHE_BURST_KEY,
maxAge: 1000 * 60 * 1, // 1 minute, we're storing in DOs
maxAge: 1000 * 60 * 1, // 1 minute, we're storing in DOs,
dehydrateOptions: {
shouldDehydrateQuery(query) {
return (query.queryKey[0] as string[]).some((e) => e === 'listThreads');
},
},
}}
onSuccess={() => {
const threadQueryKey = [['mail', 'listThreads'], { type: 'infinite' }];
Expand Down
1 change: 1 addition & 0 deletions apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"date-fns": "^4.1.0",
"dedent": "^1.6.0",
"drizzle-orm": "catalog:",
"effect": "3.16.12",
"elevenlabs": "1.59.0",
"email-addresses": "^5.0.0",
"google-auth-library": "9.15.1",
Expand Down
7 changes: 2 additions & 5 deletions apps/server/src/lib/brain.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { ReSummarizeThread, SummarizeMessage, SummarizeThread } from './brain.fallback.prompts';
import { AiChatPrompt, StyledEmailAssistantSystemPrompt } from './prompts';
import { getSubscriptionFactory } from './factories/subscription-factory.registry';
import { AiChatPrompt, StyledEmailAssistantSystemPrompt } from './prompts';
import { EPrompts, EProviders } from '../types';
import { getPromptName } from '../pipelines';
import { env } from 'cloudflare:workers';

export const enableBrainFunction = async (connection: { id: string; providerId: EProviders }) => {
Expand All @@ -24,10 +25,6 @@ export const disableBrainFunction = async (connection: { id: string; providerId:
}
};

const getPromptName = (connectionId: string, prompt: EPrompts) => {
return `${connectionId}-${prompt}`;
};

export const getPrompt = async (promptName: string, fallback: string) => {
const existingPrompt = await env.prompts_storage.get(promptName);
if (!existingPrompt || existingPrompt === 'undefined') {
Expand Down
3 changes: 2 additions & 1 deletion apps/server/src/lib/factories/google-subscription.factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ class GoogleSubscriptionFactory extends BaseSubscriptionFactory {
private getServiceAccount(): GoogleServiceAccount {
if (!this.serviceAccount) {
const serviceAccountJson = env.GOOGLE_S_ACCOUNT;
if (!serviceAccountJson) {
if (!serviceAccountJson || serviceAccountJson === '{}') {
throw new Error('GOOGLE_S_ACCOUNT environment variable is required');
}

try {
this.serviceAccount = JSON.parse(serviceAccountJson);
} catch (error) {
console.log('Invalid GOOGLE_S_ACCOUNT JSON format', serviceAccountJson);
throw new Error('Invalid GOOGLE_S_ACCOUNT JSON format');
}
return this.serviceAccount as GoogleServiceAccount;
Expand Down
74 changes: 55 additions & 19 deletions apps/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import {
writingStyleMatrix,
} from './db/schema';
import { env, WorkerEntrypoint, DurableObject, RpcTarget } from 'cloudflare:workers';
import { EProviders, type ISubscribeBatch, type IThreadBatch } from './types';
import { getZeroAgent, getZeroDB, verifyToken } from './lib/server-utils';
import { MainWorkflow, ThreadWorkflow, ZeroWorkflow } from './pipelines';
import { oAuthDiscoveryMetadata } from 'better-auth/plugins';
import { EProviders, type ISubscribeBatch } from './types';
import { eq, and, desc, asc, inArray } from 'drizzle-orm';
import { EWorkflowType, runWorkflow } from './pipelines';
import { contextStorage } from 'hono/context-storage';
import { defaultUserSettings } from './lib/schemas';
import { createLocalJWKSet, jwtVerify } from 'jose';
Expand All @@ -40,6 +40,7 @@ import { aiRouter } from './routes/ai';
import { Autumn } from 'autumn-js';
import { appRouter } from './trpc';
import { cors } from 'hono/cors';
import { Effect } from 'effect';
import { Hono } from 'hono';

export class DbRpcDO extends RpcTarget {
Expand Down Expand Up @@ -567,12 +568,20 @@ export default class extends WorkerEntrypoint<typeof env> {
.use(
'*',
cors({
origin: (c) => {
if (c.includes(env.COOKIE_DOMAIN)) {
return c;
} else {
origin: (origin) => {
if (!origin) return null;
let hostname: string;
try {
hostname = new URL(origin).hostname;
} catch {
return null;
}
const cookieDomain = env.COOKIE_DOMAIN;
if (!cookieDomain) return null;
if (hostname === cookieDomain || hostname.endsWith('.' + cookieDomain)) {
return origin;
}
return null;
},
credentials: true,
allowHeaders: ['Content-Type', 'Authorization'],
Expand Down Expand Up @@ -635,27 +644,27 @@ export default class extends WorkerEntrypoint<typeof env> {
.get('/', (c) => c.redirect(`${env.VITE_PUBLIC_APP_URL}`))
.post('/a8n/notify/:providerId', async (c) => {
if (!c.req.header('Authorization')) return c.json({ error: 'Unauthorized' }, { status: 401 });
return c.json({ message: 'OK' }, { status: 200 });
const providerId = c.req.param('providerId');
if (providerId === EProviders.google) {
const body = await c.req.json<{ historyId: string }>();
const subHeader = c.req.header('x-goog-pubsub-subscription-name');
if (!subHeader) {
console.log('[GOOGLE] no subscription header', body);
return c.json({}, { status: 200 });
}
const isValid = await verifyToken(c.req.header('Authorization')!.split(' ')[1]);
if (!isValid) {
console.log('[GOOGLE] invalid request', body);
return c.json({}, { status: 200 });
}
try {
const instance = await env.MAIN_WORKFLOW.create({
params: {
providerId,
historyId: body.historyId,
subscriptionName: subHeader,
},
await env.thread_queue.send({
providerId,
historyId: body.historyId,
subscriptionName: subHeader!,
});
console.log('[GOOGLE] created instance', instance.id, instance.status);
} catch (error) {
console.error('Error creating instance', error, {
console.error('Error sending to thread queue', error, {
providerId,
historyId: body.historyId,
subscriptionName: subHeader,
Expand All @@ -675,13 +684,13 @@ export default class extends WorkerEntrypoint<typeof env> {
return this.app.fetch(request, this.env, this.ctx);
}

async queue(batch: MessageBatch<ISubscribeBatch>) {
async queue(batch: MessageBatch<any>) {
switch (true) {
case batch.queue.startsWith('subscribe-queue'): {
console.log('batch', batch);
try {
await Promise.all(
batch.messages.map(async (msg) => {
batch.messages.map(async (msg: Message<ISubscribeBatch>) => {
const connectionId = msg.body.connectionId;
const providerId = msg.body.providerId;
console.log('connectionId', connectionId);
Expand All @@ -696,12 +705,39 @@ export default class extends WorkerEntrypoint<typeof env> {
}
}),
);
console.log('batch done');
console.log('[SUBSCRIBE_QUEUE] batch done');
} finally {
batch.ackAll();
}
return;
}
case batch.queue.startsWith('thread-queue'): {
console.log('batch', batch);
try {
await Promise.all(
batch.messages.map(async (msg: Message<IThreadBatch>) => {
const providerId = msg.body.providerId;
const historyId = msg.body.historyId;
const subscriptionName = msg.body.subscriptionName;
const workflow = runWorkflow(EWorkflowType.MAIN, {
providerId,
historyId,
subscriptionName,
});

try {
const result = await Effect.runPromise(workflow);
console.log('[THREAD_QUEUE] result', result);
} catch (error) {
console.error('Error running workflow', error);
}
}),
);
} finally {
batch.ackAll();
}
break;
}
}
}

Expand Down Expand Up @@ -749,4 +785,4 @@ export default class extends WorkerEntrypoint<typeof env> {
}
}

export { DurableMailbox, ZeroAgent, ZeroMCP, MainWorkflow, ZeroWorkflow, ThreadWorkflow, ZeroDB };
export { DurableMailbox, ZeroAgent, ZeroMCP, ZeroDB };
Loading