Skip to content

Commit

Permalink
Use sse (#9)
Browse files Browse the repository at this point in the history
* move all to chat router + local redis

* redis pubsub

* first attempt at ui impl

* server side events works now for streaming in messages

* auth callback with  broken cookie dep

* bumped fastify + deps to v5

* uses supabase cookies

* fix login when identity already exists
  • Loading branch information
zolinthecow authored Sep 25, 2024
1 parent 9e10c1d commit 5edab15
Show file tree
Hide file tree
Showing 39 changed files with 1,010 additions and 771 deletions.
2 changes: 2 additions & 0 deletions apps/server/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ SUPABASE_SERVICE_ROLE_KEY=
SUPABASE_DB_PASSWORD=
POSTGRES_URL="postgresql://postgres:postgres@localhost:54322/postgres"

UPSTASH_REDIS_REST_URL=
UPSTASH_REDIS_REST_TOKEN=
11 changes: 7 additions & 4 deletions apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
"author": "",
"license": "ISC",
"dependencies": {
"@fastify/cors": "^9.0.1",
"@fastify/request-context": "^5.1.0",
"@fastify/cookie": "^10.0.1",
"@fastify/cors": "^10.0.1",
"@fastify/request-context": "^6.0.1",
"@repo/db": "workspace:*",
"@supabase/supabase-js": "^2.45.1",
"@trpc/server": "11.0.0-rc.477",
"@upstash/redis": "^1.34.0",
"dotenv": "^16.4.5",
"fastify": "^4.28.1",
"fastify-plugin": "^4.5.1",
"fastify": "^5.0.0",
"fastify-plugin": "^5.0.1",
"ioredis": "^5.4.1",
"luxon": "^3.5.0",
"openai": "^4.55.3",
"slonik": "^46.0.1",
Expand Down
1 change: 1 addition & 0 deletions apps/server/src/ChatService/ChatService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export default class ChatService implements IChatService {
chatID: input.chatID,
messageType: 'assistant',
messageContent: chunk.choices[0]?.delta.content || '',
status: 'streaming',
createdAt: DateTime.now().toJSDate(),
updatedAt: DateTime.now().toJSDate(),
} satisfies DBChatMessage;
Expand Down
70 changes: 41 additions & 29 deletions apps/server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'dotenv/config';

import fastifyCookie from '@fastify/cookie';
import fastifyCors from '@fastify/cors';
import { fastifyRequestContext } from '@fastify/request-context';
import type { User } from '@supabase/supabase-js';
Expand All @@ -8,9 +9,9 @@ import {
fastifyTRPCPlugin,
} from '@trpc/server/adapters/fastify';
import fastify from 'fastify';
import AIServiceSingletonPlugin from './fastifyPlugins/AIServiceSingletonPlugin';
import ChatServicePlugin from './fastifyPlugins/ChatServicePlugin';
import SlonikDBSingletonPlugin from './fastifyPlugins/SlonikDBSingletonPlugin';
import AIServiceSingletonPlugin from './fastify/plugins/AIServiceSingletonPlugin';
import ChatServicePlugin from './fastify/plugins/ChatServicePlugin';
import SlonikDBSingletonPlugin from './fastify/plugins/SlonikDBSingletonPlugin';
import { createContext } from './trpc/context';
import { type AppRouter, appRouter } from './trpc/router';
import { supabase } from './utils/supabase';
Expand All @@ -28,28 +29,54 @@ declare module '@fastify/request-context' {
}
}
server.register(fastifyRequestContext);

// Cors config
server.register(fastifyCors, {
origin: 'http://localhost:5173',
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'authorization'],
credentials: true,
});

// Cookies
server.register(fastifyCookie, {
secret: 'abc123',
hook: 'onRequest',
parseOptions: {},
});

// Add AI Service as a singleton across fastify
server.register(AIServiceSingletonPlugin);

// Add a chat service
server.register(ChatServicePlugin);

// Add a db connection pool as a singleton across fastify
server.register(SlonikDBSingletonPlugin);

// Auth
server.addHook('onRequest', async (req, reply) => {
let authToken = req.headers.authorization;
// It should always be in the form of `Bearer ${token}`
if (typeof authToken !== 'string' || !authToken.startsWith('Bearer ')) {
// Could be CORS requests or something
req.requestContext.set('user', null);
// const accessToken = req.cookies['sb-access-token'];
let accessToken = req.headers.authorization;
accessToken = accessToken?.substring(7);

if (!accessToken) {
console.log('NO ACCESS TOKEN');
return;
}

// Start of the actual token after `Bearer`
authToken = authToken.substring(7);

let user: User;
try {
const { data, error } = await supabase.auth.getUser(authToken);
const { data, error } = await supabase.auth.getUser(accessToken);

if (error) {
console.error('[AUTH ERROR]:', error);
reply.code(401).send({ error: 'Invalid token.' });
return reply;
}

if (!data.user) {
console.error('[ERROR]: NO USER FOUND');
reply.code(401).send({ error: 'User not found.' });
return reply;
}
Expand All @@ -60,26 +87,10 @@ server.addHook('onRequest', async (req, reply) => {
reply.code(500).send({ error: 'Internal server error.' });
return reply;
}
req.requestContext.set('user', user);
});

// Cors config
server.register(fastifyCors, {
origin: ['http://localhost:5173'],
methods: ['GET', 'POST', 'OPTIONS'],
allowedHeaders: ['*'],
credentials: true,
req.requestContext.set('user', user);
});

// Add AI Service as a singleton across fastify
server.register(AIServiceSingletonPlugin);

// Add a chat service
server.register(ChatServicePlugin);

// Add a db connection pool as a singleton across fastify
server.register(SlonikDBSingletonPlugin);

// TRPC
server.register(fastifyTRPCPlugin, {
prefix: '/trpc',
Expand All @@ -102,6 +113,7 @@ server.register(fastifyTRPCPlugin, {
console.info(`[INFO]: Listening on port ${SERVER_PORT}`);
} catch (err) {
server.log.error(err);
console.error(err);
process.exit(1);
}
})();
11 changes: 2 additions & 9 deletions apps/server/src/trpc/context.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,17 @@
import { requestContext } from '@fastify/request-context';
import type { User } from '@supabase/supabase-js';
import { TRPCError } from '@trpc/server';
import type { CreateFastifyContextOptions } from '@trpc/server/adapters/fastify';

export async function createContext({ req, res }: CreateFastifyContextOptions) {
const user = requestContext.get('user');
console.log('NO USER');
if (!user) {
throw new TRPCError({
code: 'UNAUTHORIZED',
message: 'Unauthorized',
});
}
return {
req,
res,
user,
aiService: req.server.aiService,
chatService: req.server.chatService,
dbPool: req.server.dbPool,
};
}

export type Context = Awaited<ReturnType<typeof createContext>>;
export type AuthedContext = Context & { user: User };
2 changes: 0 additions & 2 deletions apps/server/src/trpc/router.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { chatRouter } from './routers/chat';
import { chatMessagesRouter } from './routers/chatMessages';
import { router } from './trpc';

export const appRouter = router({
chat: chatRouter,
chatMessages: chatMessagesRouter,
});

export type AppRouter = typeof appRouter;
82 changes: 16 additions & 66 deletions apps/server/src/trpc/routers/chat/create.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,24 @@
// Creates a DB chat and streams down the response. Chats can only be created from the home page.

import type { IAIService } from '@/AIService/AIService.interface';
import { upsertDBChatMessage } from '@/utils/sql';
import { type DBChat, type DBChatMessage, DBChatSchema } from '@repo/db';
import { type DatabasePool, sql } from 'slonik';
import { ulid } from 'ulid';
import { z } from 'zod';
import { publicProcedure } from '../../trpc';
import {
type SendMessageOutput,
updateDBChatMessage,
upsertDBChatMessage,
} from '../chatMessages/send';
import { authedProcedure } from '../../trpc';
import { generateAssistantMessage } from './sendMessage';

export const CreateChatSchema = z.object({
initialMessage: z.string(),
});

type CreateChatOutput =
| {
type: 'chat';
chat: DBChat;
}
| SendMessageOutput;

export const create = publicProcedure
export const create = authedProcedure
.input(CreateChatSchema)
.mutation(async function* ({
input,
ctx,
}): AsyncGenerator<CreateChatOutput> {
.mutation(async ({ input, ctx }) => {
const chatID = ulid();

// Try to overlap this request as best as possible with the db insertions
// TODO: Change to pub sub sse with redis
const previewMessagePromise = maybeSetChatPreview(
{
chatID,
Expand Down Expand Up @@ -64,60 +51,23 @@ export const create = publicProcedure
chatID,
messageContent: input.initialMessage,
messageType: 'user',
responseStatus: 'streaming',
status: 'done',
},
ctx.dbPool,
);
messages.push(initialMessage);

yield {
type: 'chat',
chat: newChat,
};

const chatIterator = ctx.chatService.generateResponse({
userID: ctx.user.id,
chatID,
message: input.initialMessage,
previousMessages: [],
});

let fullMessage = '';
let messageID = '';
for await (const chunk of chatIterator) {
yield {
type: 'messageChunk',
messageChunk: chunk,
};
messageID = chunk.id;
fullMessage += chunk.messageContent;
}

const completedAssistantMessage = await upsertDBChatMessage(
{
id: messageID,
userID: ctx.user.id,
chatID: chatID,
messageType: 'assistant',
messageContent: fullMessage,
},
ctx.dbPool,
generateAssistantMessage(
{ message: input.initialMessage, chatID },
ctx,
).catch((e) =>
console.error(
'[ERROR] Failed to generate assistant message in send:',
e,
),
);

await previewMessagePromise;

yield {
type: 'completeMessage',
message: completedAssistantMessage,
};

await updateDBChatMessage(
{
messageID: messageID,
responseStatus: 'done',
},
ctx.dbPool,
);
return newChat;
});

type MaybeSetChatPreviewParams = {
Expand Down
4 changes: 2 additions & 2 deletions apps/server/src/trpc/routers/chat/get.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { type DBChat, DBChatSchema } from '@repo/db';
import { sql } from 'slonik';
import { z } from 'zod';
import { publicProcedure } from '../../trpc';
import { authedProcedure } from '../../trpc';

export const get = publicProcedure
export const get = authedProcedure
.input(z.object({ id: z.string().ulid() }))
.query(async ({ input, ctx }) => {
return (await ctx.dbPool.one(sql.type(DBChatSchema)`
Expand Down
6 changes: 6 additions & 0 deletions apps/server/src/trpc/routers/chat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@ import { router } from '../../trpc';
import { create } from './create';
import { get } from './get';
import { infiniteList } from './infiniteList';
import { infiniteListMessages } from './infiniteListMessages';
import { listenNewMessages } from './listenNewMessages';
import { sendMessage } from './sendMessage';

export const chatRouter = router({
get,
create,
infiniteList,
infiniteListMessages,
listenNewMessages,
sendMessage,
});
4 changes: 2 additions & 2 deletions apps/server/src/trpc/routers/chat/infiniteList.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { type DBChat, DBChatSchema } from '@repo/db';
import { TRPCError } from '@trpc/server';
import { sql } from 'slonik';
import { z } from 'zod';
import { publicProcedure } from '../../trpc';
import { authedProcedure } from '../../trpc';

export const infiniteList = publicProcedure
export const infiniteList = authedProcedure
.input(
z.object({
limit: z.number().min(1).max(100).default(50),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { type DBChatMessage, DBChatMessageSchema } from '@repo/db';
import { TRPCError } from '@trpc/server';
import { sql } from 'slonik';
import { z } from 'zod';
import { publicProcedure } from '../../trpc';
import { authedProcedure } from '../../trpc';

export const infiniteList = publicProcedure
export const infiniteListMessages = authedProcedure
.input(
z.object({
chatID: z.string().ulid(),
Expand Down
Loading

0 comments on commit 5edab15

Please sign in to comment.