diff --git a/packages/server/billing/helpers/removeTeamsLimitObjects.ts b/packages/server/billing/helpers/removeTeamsLimitObjects.ts index 2f38ee51122..53bfe84b76c 100644 --- a/packages/server/billing/helpers/removeTeamsLimitObjects.ts +++ b/packages/server/billing/helpers/removeTeamsLimitObjects.ts @@ -1,20 +1,22 @@ +import getKysely from '../../postgres/getKysely' +import {DB} from '../../postgres/pg' import {r} from 'rethinkdb-ts' import {RValue} from '../../database/stricterR' import {DataLoaderWorker} from '../../graphql/graphql' import updateNotification from '../../graphql/public/mutations/helpers/updateNotification' const removeTeamsLimitObjects = async (orgId: string, dataLoader: DataLoaderWorker) => { - const removeJobTypes = ['LOCK_ORGANIZATION', 'WARN_ORGANIZATION'] + const removeJobTypes = ['LOCK_ORGANIZATION', 'WARN_ORGANIZATION'] as DB['ScheduledJob']['type'][] const removeNotificationTypes = ['TEAMS_LIMIT_EXCEEDED', 'TEAMS_LIMIT_REMINDER'] + const pg = getKysely() // Remove team limits jobs and existing notifications const [, updateNotificationsChanges] = await Promise.all([ - r - .table('ScheduledJob') - .getAll(orgId, {index: 'orgId'}) - .filter((row: RValue) => r.expr(removeJobTypes).contains(row('type'))) - .delete() - .run(), + pg + .deleteFrom('ScheduledJob') + .where('orgId', '=', orgId) + .where('type', 'in', removeJobTypes) + .execute(), r .table('Notification') .getAll(orgId, {index: 'orgId'}) diff --git a/packages/server/database/types/ScheduledJob.ts b/packages/server/database/types/ScheduledJob.ts index 06071851a89..ad42dce95a6 100644 --- a/packages/server/database/types/ScheduledJob.ts +++ b/packages/server/database/types/ScheduledJob.ts @@ -1,11 +1,8 @@ -import generateUID from '../../generateUID' - export type ScheduledJobType = | 'MEETING_STAGE_TIME_LIMIT_END' | 'LOCK_ORGANIZATION' | 'WARN_ORGANIZATION' export default abstract class ScheduledJob { - id = generateUID() protected constructor(public type: ScheduledJobType, public runAt: Date) {} } diff --git a/packages/server/database/types/scheduleTeamLimitsJobs.ts b/packages/server/database/types/scheduleTeamLimitsJobs.ts index 368d75edc6a..427d328e608 100644 --- a/packages/server/database/types/scheduleTeamLimitsJobs.ts +++ b/packages/server/database/types/scheduleTeamLimitsJobs.ts @@ -1,21 +1,23 @@ import ms from 'ms' -import {r} from 'rethinkdb-ts' +import getKysely from '../../postgres/getKysely' import {Threshold} from '../../../client/types/constEnums' import ScheduledTeamLimitsJob from './ScheduledTeamLimitsJob' const scheduleTeamLimitsJobs = async (scheduledLockAt: Date, orgId: string) => { - const scheduledLock = r - .table('ScheduledJob') - .insert(new ScheduledTeamLimitsJob(scheduledLockAt, orgId, 'LOCK_ORGANIZATION')) - .run() + const pg = getKysely() + const scheduledLock = pg + .insertInto('ScheduledJob') + .values(new ScheduledTeamLimitsJob(scheduledLockAt, orgId, 'LOCK_ORGANIZATION')) + .execute() const oneWeekBeforeLock = new Date( scheduledLockAt.getTime() - ms(`${Threshold.FINAL_WARNING_DAYS_BEFORE_LOCK}d`) ) - const scheduledWarn = r - .table('ScheduledJob') - .insert(new ScheduledTeamLimitsJob(oneWeekBeforeLock, orgId, 'WARN_ORGANIZATION')) - .run() + + const scheduledWarn = pg + .insertInto('ScheduledJob') + .values(new ScheduledTeamLimitsJob(oneWeekBeforeLock, orgId, 'WARN_ORGANIZATION')) + .execute() await Promise.all([scheduledLock, scheduledWarn]) } diff --git a/packages/server/graphql/mutations/helpers/removeScheduledJobs.ts b/packages/server/graphql/mutations/helpers/removeScheduledJobs.ts index cd6a5eef813..b4931f9ca3b 100644 --- a/packages/server/graphql/mutations/helpers/removeScheduledJobs.ts +++ b/packages/server/graphql/mutations/helpers/removeScheduledJobs.ts @@ -1,8 +1,19 @@ -import getRethink from '../../../database/rethinkDriver' +import {Updateable} from 'kysely' +import {DB} from '../../../postgres/pg' +import getKysely from '../../../postgres/getKysely' -const removeScheduledJobs = async (runAt: Date, filter: {[key: string]: any}) => { - const r = await getRethink() - return r.table('ScheduledJob').getAll(runAt, {index: 'runAt'}).filter(filter).delete().run() +type FilterType = Omit, 'runAt'> + +const removeScheduledJobs = async (runAt: Date, filter?: FilterType) => { + const pg = getKysely() + let query = pg.deleteFrom('ScheduledJob').where('runAt', '=', runAt) + if (filter) { + Object.keys(filter).forEach((key) => { + const value = filter[key as keyof FilterType] + if (value) query = query.where(key as keyof FilterType, '=', value) + }) + } + return query.execute() } export default removeScheduledJobs diff --git a/packages/server/graphql/mutations/setStageTimer.ts b/packages/server/graphql/mutations/setStageTimer.ts index 81d262d220e..40901c6baa0 100644 --- a/packages/server/graphql/mutations/setStageTimer.ts +++ b/packages/server/graphql/mutations/setStageTimer.ts @@ -1,6 +1,7 @@ import {GraphQLFloat, GraphQLID, GraphQLNonNull} from 'graphql' import {SubscriptionChannel} from 'parabol-client/types/constEnums' import findStageById from 'parabol-client/utils/meetings/findStageById' +import getKysely from '../../postgres/getKysely' import getRethink from '../../database/rethinkDriver' import ScheduledJobMeetingStageTimeLimit from '../../database/types/ScheduledJobMetingStageTimeLimit' import {getUserId, isTeamMember} from '../../utils/authorization' @@ -90,12 +91,13 @@ export default { ? new Date(now.getTime() + timeRemaining - AVG_PING) : newScheduledEndTime } else { + const pg = getKysely() stage.isAsync = true stage.scheduledEndTime = newScheduledEndTime - await r - .table('ScheduledJob') - .insert(new ScheduledJobMeetingStageTimeLimit(newScheduledEndTime, meetingId)) - .run() + await pg + .insertInto('ScheduledJob') + .values(new ScheduledJobMeetingStageTimeLimit(newScheduledEndTime, meetingId)) + .execute() IntegrationNotifier.startTimeLimit(dataLoader, newScheduledEndTime, meetingId, teamId) } } else { diff --git a/packages/server/graphql/private/mutations/runScheduledJobs.ts b/packages/server/graphql/private/mutations/runScheduledJobs.ts index faee38517ec..a4f5b5f055f 100644 --- a/packages/server/graphql/private/mutations/runScheduledJobs.ts +++ b/packages/server/graphql/private/mutations/runScheduledJobs.ts @@ -1,4 +1,7 @@ +import {Selectable} from 'kysely' import {SubscriptionChannel} from 'parabol-client/types/constEnums' +import getKysely from '../../../postgres/getKysely' +import {DB} from '../../../postgres/pg' import getRethink from '../../../database/rethinkDriver' import NotificationMeetingStageTimeLimitEnd from '../../../database/types/NotificationMeetingStageTimeLimitEnd' import processTeamsLimitsJob from '../../../database/types/processTeamsLimitsJob' @@ -39,11 +42,11 @@ const processMeetingStageTimeLimits = async ( export type ScheduledJobUnion = ScheduledJobMeetingStageTimeLimit | ScheduledTeamLimitsJob -const processJob = async (job: ScheduledJobUnion, dataLoader: DataLoaderWorker) => { - const r = await getRethink() - const res = await r.table('ScheduledJob').get(job.id).delete().run() +const processJob = async (job: Selectable, dataLoader: DataLoaderWorker) => { + const pg = getKysely() + const res = await pg.deleteFrom('ScheduledJob').where('id', '=', job.id).executeTakeFirst() // prevent duplicates. after this point, we assume the job finishes to completion (ignores server crashes, etc.) - if (res.deleted !== 1) return + if (res.numDeletedRows !== BigInt(1)) return if (job.type === 'MEETING_STAGE_TIME_LIMIT_END') { return processMeetingStageTimeLimits( @@ -60,15 +63,17 @@ const runScheduledJobs: MutationResolvers['runScheduledJobs'] = async ( {seconds}, {dataLoader} ) => { - const r = await getRethink() + const pg = getKysely() const now = new Date() // RESOLUTION const before = new Date(now.getTime() + seconds * 1000) - const upcomingJobs = (await r - .table('ScheduledJob') - .between(r.minval, before, {index: 'runAt'}) - .run()) as ScheduledJobUnion[] + const upcomingJobs = await pg + .selectFrom('ScheduledJob') + .selectAll() + .where('runAt', '>=', new Date(0)) + .where('runAt', '<', before) + .execute() upcomingJobs.forEach((job) => { const {runAt} = job diff --git a/packages/server/postgres/migrations/1709306178000_addScheduledJob.ts b/packages/server/postgres/migrations/1709306178000_addScheduledJob.ts new file mode 100644 index 00000000000..dffd9217014 --- /dev/null +++ b/packages/server/postgres/migrations/1709306178000_addScheduledJob.ts @@ -0,0 +1,38 @@ +import {Client} from 'pg' +import getPgConfig from '../getPgConfig' + +export async function up() { + const client = new Client(getPgConfig()) + await client.connect() + await client.query(` + DO $$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'ScheduledJobTypeEnum') THEN + EXECUTE 'CREATE TYPE "ScheduledJobTypeEnum" AS ENUM (''MEETING_STAGE_TIME_LIMIT_END'', ''LOCK_ORGANIZATION'', ''WARN_ORGANIZATION'')'; + END IF; + END $$; + + CREATE TABLE "ScheduledJob" ( + "id" SERIAL PRIMARY KEY, + "runAt" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL, + "type" "ScheduledJobTypeEnum" NOT NULL, + "orgId" VARCHAR(100), + "meetingId" VARCHAR(100) + ); + + CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_orgId" ON "ScheduledJob"("orgId"); + CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_runAt" ON "ScheduledJob"("runAt"); + CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_type" ON "ScheduledJob"("type"); + `) + await client.end() +} + +export async function down() { + const client = new Client(getPgConfig()) + await client.connect() + await client.query(` + DROP TABLE IF EXISTS "ScheduledJob"; + DROP TYPE IF EXISTS "ScheduledJobTypeEnum"; + `) + await client.end() +} diff --git a/packages/server/postgres/migrations/1709312768000_scheduledJobMove.ts b/packages/server/postgres/migrations/1709312768000_scheduledJobMove.ts new file mode 100644 index 00000000000..6896f2846dc --- /dev/null +++ b/packages/server/postgres/migrations/1709312768000_scheduledJobMove.ts @@ -0,0 +1,72 @@ +import {FirstParam} from 'parabol-client/types/generics' +import {Client} from 'pg' +import {r} from 'rethinkdb-ts' +import getPgConfig from '../getPgConfig' +import connectRethinkDB from '../../database/connectRethinkDB' +import getPgp from '../getPgp' +import {ScheduledJob as PgScheduledJob} from '../pg' + +export async function up() { + await connectRethinkDB() + const {pgp, pg} = getPgp() + const batchSize = 1000 + + const columnSet = new pgp.helpers.ColumnSet( + ['runAt', 'type', {name: 'orgId', def: null}, {name: 'meetingId', def: null}], + {table: 'ScheduledJob'} + ) + + const transformRethinkRow = (row: any): Omit => { + const {id, runAt, type, orgId, meetingId} = row + return { + runAt, + type, + orgId, + meetingId + } + } + + const getNextData = async (leftBoundCursor: Date | undefined) => { + const startAt = leftBoundCursor || r.minval + const nextBatch = ( + await r + .table('ScheduledJob') + .between(startAt, r.maxval, {index: 'runAt', leftBound: 'open'}) + .orderBy({index: 'runAt'}) + .limit(batchSize) + .run() + ).map(transformRethinkRow) + if (nextBatch.length === 0) return null + if (nextBatch.length < batchSize) return nextBatch + const lastItem = nextBatch.pop() + const lastMatchingRunAt = nextBatch.findLastIndex((item) => item.runAt !== lastItem!.runAt) + if (lastMatchingRunAt === -1) { + throw new Error( + 'batchSize is smaller than the number of items that share the same cursor. Increase batchSize' + ) + } + return nextBatch.slice(0, lastMatchingRunAt + 1) + } + + await pg.tx('ScheduledJob', (task) => { + const fetchAndProcess: FirstParam = async ( + _index, + leftBoundCursor: undefined | Date + ) => { + const nextData = await getNextData(leftBoundCursor) + if (!nextData) return undefined + const insert = pgp.helpers.insert(nextData, columnSet) + await task.none(insert) + return nextData.at(-1)!.runAt + } + return task.sequence(fetchAndProcess) + }) + await r.getPoolMaster()?.drain() +} + +export async function down() { + const client = new Client(getPgConfig()) + await client.connect() + await client.query(`DELETE FROM "ScheduledJob"`) + await client.end() +}