Skip to content

Commit

Permalink
OSS Chat Server (#206)
Browse files Browse the repository at this point in the history
* add OSS_TODOs

* Add comments

* more comment edits

* fix build errs

* remove  abstraction from chat-core

* update mongodb imports

* Fix broken tests

* start OpenAiEmbedFunc refactor

* split data stores

* update ingest w new stores

* configurable reference links

* Make max number messages & tokens in single RAG in conversation configurable

* update tests w new route conf

* update SomeStreamEventUsage

* Refactor project as lib

* wrap close statements so second doesn't fail even if first does

* implement CB feedback

* add gitignore
  • Loading branch information
mongodben authored Oct 20, 2023
1 parent f8cdd3c commit 820e249
Show file tree
Hide file tree
Showing 59 changed files with 8,968 additions and 717 deletions.
182 changes: 4 additions & 178 deletions chat-core/src/DatabaseConnection.ts
Original file line number Diff line number Diff line change
@@ -1,187 +1,13 @@
import { strict as assert } from "assert";
import { MongoClient, Filter } from "mongodb";
import { PageStore, Page, PersistedPage } from "./Page";
import {
EmbeddedContentStore,
EmbeddedContent,
FindNearestNeighborsOptions,
WithScore,
} from "./EmbeddedContent";

export type DatabaseConnection = {
/**
Close the connection.
@param force - Force close, emitting no events
*/
close(force?: boolean): Promise<void>;
/**
@internal
Drop the database. Typically used for testing and debugging purposes.
*/
drop(): Promise<void>;
};

export interface MakeDatabaseConnectionParams {
connectionUri: string;
databaseName: string;
}

/**
Create a connection to the database.
*/
export const makeDatabaseConnection = ({
connectionUri,
databaseName,
}: MakeDatabaseConnectionParams): DatabaseConnection &
PageStore &
EmbeddedContentStore => {
const client = new MongoClient(connectionUri, {
serverSelectionTimeoutMS: 30000,
});
const db = client.db(databaseName);
const embeddedContentCollection =
db.collection<EmbeddedContent>("embedded_content");
const pagesCollection = db.collection<PersistedPage>("pages");
const instance: DatabaseConnection & PageStore & EmbeddedContentStore = {
async drop() {
await db.dropDatabase();
},

async close(force) {
client.close(force);
},

async loadEmbeddedContent({ page }) {
return await embeddedContentCollection.find(pageIdentity(page)).toArray();
},

async deleteEmbeddedContent({ page }) {
const deleteResult = await embeddedContentCollection.deleteMany(
pageIdentity(page)
);
if (!deleteResult.acknowledged) {
throw new Error("EmbeddedContent deletion not acknowledged!");
}
},

async updateEmbeddedContent({ page, embeddedContent }) {
assert(embeddedContent.length !== 0);
embeddedContent.forEach((embeddedContent) => {
assert(
embeddedContent.sourceName === page.sourceName &&
embeddedContent.url === page.url,
`EmbeddedContent source/url (${embeddedContent.sourceName} / ${embeddedContent.url}) must match give page source/url (${page.sourceName} / ${page.url})!`
);
});
await client.withSession(async (session) => {
await session.withTransaction(async () => {
// First delete all the embeddedContent for the given page
const deleteResult = await embeddedContentCollection.deleteMany(
pageIdentity(page),
{ session }
);
if (!deleteResult.acknowledged) {
throw new Error("EmbeddedContent deletion not acknowledged!");
}

// Insert the embedded content for the page
const insertResult = await embeddedContentCollection.insertMany(
[...embeddedContent],
{
session,
}
);

if (!insertResult.acknowledged) {
throw new Error("EmbeddedContent insertion not acknowledged!");
}
const { insertedCount } = insertResult;
if (insertedCount !== embeddedContent.length) {
throw new Error(
`Expected ${embeddedContent.length} inserted, got ${insertedCount}`
);
}
});
});
},

async findNearestNeighbors(vector, options) {
const {
indexName,
path,
k,
minScore,
filter,
}: Partial<FindNearestNeighborsOptions> = {
// Default options
indexName: "default",
path: "embedding",
k: 3,
minScore: 0.9,

// User options override
...(options ?? {}),
};
return embeddedContentCollection
.aggregate<WithScore<EmbeddedContent>>([
{
$search: {
index: indexName,
knnBeta: {
vector,
path,
k,
filter,
},
},
},
{
$addFields: {
score: {
$meta: "searchScore",
},
},
},
{ $match: { score: { $gte: minScore } } },
])
.toArray();
},

async loadPages(args) {
const filter: Filter<PersistedPage> = {};
if (args?.sources !== undefined) {
filter.sourceName = { $in: args.sources };
}
if (args?.updated !== undefined) {
filter.updated = { $gte: args.updated };
}
return pagesCollection.find(filter).toArray();
},

async updatePages(pages) {
await Promise.all(
pages.map(async (page) => {
const result = await pagesCollection.updateOne(
pageIdentity(page),
{ $set: page },
{ upsert: true }
);
if (!result.acknowledged) {
throw new Error(`update pages not acknowledged!`);
}
if (!result.modifiedCount && !result.upsertedCount) {
throw new Error(
`Page ${JSON.stringify(pageIdentity(page))} not updated!`
);
}
})
);
},
};
return instance;
};

/**
Returns a query filter that represents a unique page in the system.
*/
export const pageIdentity = ({ url, sourceName }: Page) => ({
url,
sourceName,
});
14 changes: 14 additions & 0 deletions chat-core/src/EmbedFunc.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
export type EmbedArgs = {
/**
The text to embed.
*/
text: string;

/**
The user's IP address. Used to prevent abuse.
*/
userIp: string;
};

export type EmbedResult = {
/**
Vector embedding of the text.
*/
embedding: number[];
};

/**
Takes a string of text and returns an array of numbers representing the
vector embedding of the text.
*/
export type EmbedFunc = (args: EmbedArgs) => Promise<EmbedResult>;
16 changes: 15 additions & 1 deletion chat-core/src/EmbeddedContent.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { Page } from "./Page";

/**
The embedded content of a chunk of text stored in the database.
*/
export interface EmbeddedContent {
/**
The URL of the page with the content.
Expand Down Expand Up @@ -50,6 +53,9 @@ export interface EmbeddedContent {
chunkAlgoHash?: string;
}

/**
Data store of the embedded content.
*/
export type EmbeddedContentStore = {
/**
Load the embedded content for the given page.
Expand All @@ -76,10 +82,18 @@ export type EmbeddedContentStore = {
vector: number[],
options?: Partial<FindNearestNeighborsOptions>
): Promise<WithScore<EmbeddedContent>[]>;

/**
Close connection to data store.
*/
close?: () => Promise<void>;
};

export type WithScore<T> = T & { score: number };

/**
Options for performing a nearest-neighbor search.
*/
export type FindNearestNeighborsOptions = {
/**
The name of the index to use.
Expand All @@ -102,7 +116,7 @@ export type FindNearestNeighborsOptions = {
minScore: number;

/**
Atlas Search filter expression.
Search filter expression.
*/
filter: Record<string, unknown>;
};
34 changes: 34 additions & 0 deletions chat-core/src/MongoDbDatabaseConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { DatabaseConnection } from "./DatabaseConnection";
import { MongoClient, Db } from "mongodb";

export interface MakeMongoDbDatabaseConnectionParams {
connectionUri: string;
databaseName: string;
}

/**
Constructs connection to MongoDB database.
*/
export function makeMongoDbDatabaseConnection({
connectionUri,
databaseName,
}: MakeMongoDbDatabaseConnectionParams): DatabaseConnection & {
mongoClient: MongoClient;
db: Db;
} {
const mongoClient = new MongoClient(connectionUri, {
serverSelectionTimeoutMS: 30000,
});
const db = mongoClient.db(databaseName);
return {
mongoClient,
db,
async drop() {
await db.dropDatabase();
},

async close(force) {
mongoClient.close(force);
},
};
}
Loading

0 comments on commit 820e249

Please sign in to comment.