Skip to content

Commit

Permalink
chore: add embeddings table migration (#9372)
Browse files Browse the repository at this point in the history
* chore: add embeddings table migration

* chore: code review changes

* feat: auto-add pgvector extension in production

Also:
   - fix: corrects types in standaloneMigrations.ts
   - fix: silly things I missed in the addEmbeddingTables migration

* fix: check for POSTGRES_USE_PGVECTOR

* fix: POSTGRES_USE_PGVECTOR strict check for === 'true'
  • Loading branch information
jordanh committed Feb 13, 2024
1 parent 51f28a1 commit 012ca77
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 2 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ POSTGRES_USER=pgparaboladmin
POSTGRES_DB=parabol-saas
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_USE_PGVECTOR=true
# POSTGRES_POOL_SIZE=5
# POSTGRES_SSL_REJECT_UNAUTHORIZED=false
# POSTGRES_SSL_DIR='/var/lib/postgresql'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {Client} from 'pg'
import getPgConfig from '../getPgConfig'
import {r} from 'rethinkdb-ts'
import connectRethinkDB from '../../database/connectRethinkDB'

export async function up() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'EmbeddingsObjectTypeEnum') THEN
EXECUTE 'CREATE TYPE "EmbeddingsObjectTypeEnum" AS ENUM (''retrospectiveDiscussionTopic'')';
END IF;
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'EmbeddingsStateEnum') THEN
EXECUTE 'CREATE TYPE "EmbeddingsStateEnum" AS ENUM (''queued'', ''embedding'', ''failed'')';
END IF;
EXECUTE 'CREATE TABLE IF NOT EXISTS "EmbeddingsMetadata" (
"id" SERIAL PRIMARY KEY,
"createdAt" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
"updatedAt" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
"objectType" "EmbeddingsObjectTypeEnum" NOT NULL,
"refId" VARCHAR(100),
UNIQUE("objectType", "refId"),
"refUpdatedAt" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
"models" VARCHAR(255)[],
"teamId" VARCHAR(100) NOT NULL,
"embedText" TEXT
)';
DROP TRIGGER IF EXISTS "update_EmbeddingsMetadata_updatedAt" ON "EmbeddingsMetadata";
CREATE TRIGGER "update_EmbeddingsMetadata_updatedAt" BEFORE UPDATE ON "EmbeddingsMetadata" FOR EACH ROW EXECUTE PROCEDURE "set_updatedAt"();
EXECUTE 'CREATE INDEX IF NOT EXISTS "idx_EmbeddingsMetadata_objectType" ON "EmbeddingsMetadata"("objectType")';
EXECUTE 'CREATE INDEX IF NOT EXISTS "idx_EmbeddingsMetadata_refUpdatedAt" ON "EmbeddingsMetadata"("refUpdatedAt")';
EXECUTE 'CREATE INDEX IF NOT EXISTS "idx_EmbeddingsMetadata_models" ON "EmbeddingsMetadata" USING GIN (models)';
EXECUTE 'CREATE INDEX IF NOT EXISTS "idx_EmbeddingsMetadata_teamId" ON "EmbeddingsMetadata"("teamId")';
EXECUTE 'CREATE TABLE IF NOT EXISTS "EmbeddingsJobQueue" (
"id" SERIAL PRIMARY KEY,
"updatedAt" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
"objectType" "EmbeddingsObjectTypeEnum" NOT NULL,
"refId" VARCHAR(100) NOT NULL,
"model" VARCHAR(255) NOT NULL,
UNIQUE("objectType", "refId", "model"),
"state" "EmbeddingsStateEnum" DEFAULT ''queued'' NOT NULL,
"stateMessage" TEXT
)';
DROP TRIGGER IF EXISTS "update_EmbeddingsJobQueue_updatedAt" ON "EmbeddingsJobQueue";
CREATE TRIGGER "update_EmbeddingsJobQueue_updatedAt" BEFORE UPDATE ON "EmbeddingsJobQueue" FOR EACH ROW EXECUTE PROCEDURE "set_updatedAt"();
EXECUTE 'CREATE INDEX IF NOT EXISTS "idx_EmbeddingsJobQueue_objectType" ON "EmbeddingsJobQueue"("objectType")';
EXECUTE 'CREATE INDEX IF NOT EXISTS "idx_EmbeddingsJobQueue_refId" ON "EmbeddingsJobQueue"("refId")';
EXECUTE 'CREATE INDEX IF NOT EXISTS "idx_EmbeddingsJobQueue_state" ON "EmbeddingsJobQueue"("state")';
END $$;
`)
await client.end()
try {
await connectRethinkDB()
await r
.table('Organization')
.indexCreate('featureFlagsIndex', r.row('featureFlags'), {multi: true})
.run()
await r.getPoolMaster()?.drain()
} catch (e) {
console.log(e)
}
}

export async function down() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DO $$
BEGIN
EXECUTE 'DROP TABLE IF EXISTS "EmbeddingsJobQueue"';
EXECUTE 'DROP TABLE IF EXISTS "EmbeddingsMetadata"';
EXECUTE 'DROP TYPE IF EXISTS "EmbeddingsStateEnum"';
EXECUTE 'DROP TYPE IF EXISTS "EmbeddingsObjectTypeEnum"';
END $$;
`)
await client.end()
try {
await connectRethinkDB()
await r.table('Organization').indexDrop('featureFlagsIndex').run()
await r.getPoolMaster()?.drain()
} catch (e) {
console.log(e)
}
}
14 changes: 14 additions & 0 deletions scripts/toolboxSrc/pgEnsureExtensions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {sql} from 'kysely'
import getKysely from '../../packages/server/postgres/getKysely'

export default async () => {
console.log('πŸ”© Postgres Extension Checks Started')
if (process.env.POSTGRES_USE_PGVECTOR === 'true') {
console.log(' pgvector')
const pg = getKysely()
await sql`CREATE EXTENSION IF NOT EXISTS "vector";`.execute(pg)
} else {
console.log(' pgvector: skipping check (POSTGRES_USE_PGVECTOR !== true)')
}
console.log('πŸ”© Postgres Extension Checks Completed')
}
6 changes: 4 additions & 2 deletions scripts/toolboxSrc/standaloneMigrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import {r} from 'rethinkdb-ts'
import {parse} from 'url'
import cliPgmConfig from '../../packages/server/postgres/pgmConfig'
import '../webpack/utils/dotenv'
import pgEnsureExtensions from './pgEnsureExtensions'
import pgMigrate from './pgMigrateRunner'
import * as rethinkMigrate from './rethinkMigrateRunner'

const migrateRethinkDB = async () => {
console.log('πŸ‘΄ RethinkDB Migraiton Started')
console.log('πŸ‘΄ RethinkDB Migration Started')
const {hostname, port, path: urlPath} = parse(process.env.RETHINKDB_URL!)
process.env.host = hostname!
process.env.port = port!
Expand All @@ -27,11 +28,12 @@ const migrateRethinkDB = async () => {
collector[name] = context(relativePath)
})
await rethinkMigrate.up({all: true, migrations: collector})
console.log('πŸ‘΄ RethinkDB Migraiton Complete')
console.log('πŸ‘΄ RethinkDB Migration Complete')
}

const migratePG = async () => {
console.log('🐘 Postgres Migration Started')
await pgEnsureExtensions()
// pgm uses a dynamic require statement, which doesn't work with webpack
// if we ignore that dynamic require, we'd still have to include the migrations directory AND any dependencies it might have
// by processing through webpack's require.context, we let webpack handle everything
Expand Down

0 comments on commit 012ca77

Please sign in to comment.