Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: migrate ScheduledJob from rethinkdb to pg #9490

Merged
merged 4 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions packages/server/billing/helpers/removeTeamsLimitObjects.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import getKysely from '../../postgres/getKysely'
import {r} from 'rethinkdb-ts'
import {RValue} from '../../database/stricterR'
import {DataLoaderWorker} from '../../graphql/graphql'
import updateNotification from '../../graphql/public/mutations/helpers/updateNotification'

const removeTeamsLimitObjects = async (orgId: string, dataLoader: DataLoaderWorker) => {
const removeJobTypes = ['LOCK_ORGANIZATION', 'WARN_ORGANIZATION']
const removeNotificationTypes = ['TEAMS_LIMIT_EXCEEDED', 'TEAMS_LIMIT_REMINDER']
const removeJobTypes = ['LOCK_ORGANIZATION', 'WARN_ORGANIZATION'] as const
const removeNotificationTypes = ['TEAMS_LIMIT_EXCEEDED', 'TEAMS_LIMIT_REMINDER'] as const
const pg = getKysely()

// Remove team limits jobs and existing notifications
const [, updateNotificationsChanges] = await Promise.all([
r
.table('ScheduledJob')
.getAll(orgId, {index: 'orgId'})
.filter((row: RValue) => r.expr(removeJobTypes).contains(row('type')))
.delete()
.run(),
pg
.deleteFrom('ScheduledJob')
.where('orgId', '=', orgId)
.where('type', 'in', removeJobTypes)
.execute(),
r
.table('Notification')
.getAll(orgId, {index: 'orgId'})
Expand Down
3 changes: 0 additions & 3 deletions packages/server/database/types/ScheduledJob.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import generateUID from '../../generateUID'

export type ScheduledJobType =
| 'MEETING_STAGE_TIME_LIMIT_END'
| 'LOCK_ORGANIZATION'
| 'WARN_ORGANIZATION'

export default abstract class ScheduledJob {
id = generateUID()
protected constructor(public type: ScheduledJobType, public runAt: Date) {}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Postgres now manages the id

}
20 changes: 11 additions & 9 deletions packages/server/database/types/scheduleTeamLimitsJobs.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import ms from 'ms'
import {r} from 'rethinkdb-ts'
import getKysely from '../../postgres/getKysely'
import {Threshold} from '../../../client/types/constEnums'
import ScheduledTeamLimitsJob from './ScheduledTeamLimitsJob'

const scheduleTeamLimitsJobs = async (scheduledLockAt: Date, orgId: string) => {
const scheduledLock = r
.table('ScheduledJob')
.insert(new ScheduledTeamLimitsJob(scheduledLockAt, orgId, 'LOCK_ORGANIZATION'))
.run()
const pg = getKysely()
const scheduledLock = pg
.insertInto('ScheduledJob')
.values(new ScheduledTeamLimitsJob(scheduledLockAt, orgId, 'LOCK_ORGANIZATION'))
.execute()

const oneWeekBeforeLock = new Date(
scheduledLockAt.getTime() - ms(`${Threshold.FINAL_WARNING_DAYS_BEFORE_LOCK}d`)
)
const scheduledWarn = r
.table('ScheduledJob')
.insert(new ScheduledTeamLimitsJob(oneWeekBeforeLock, orgId, 'WARN_ORGANIZATION'))
.run()

const scheduledWarn = pg
.insertInto('ScheduledJob')
.values(new ScheduledTeamLimitsJob(oneWeekBeforeLock, orgId, 'WARN_ORGANIZATION'))
.execute()

await Promise.all([scheduledLock, scheduledWarn])
}
Expand Down
19 changes: 15 additions & 4 deletions packages/server/graphql/mutations/helpers/removeScheduledJobs.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import getRethink from '../../../database/rethinkDriver'
import {Updateable} from 'kysely'
import {DB} from '../../../postgres/pg'
import getKysely from '../../../postgres/getKysely'

const removeScheduledJobs = async (runAt: Date, filter: {[key: string]: any}) => {
const r = await getRethink()
return r.table('ScheduledJob').getAll(runAt, {index: 'runAt'}).filter(filter).delete().run()
type FilterType = Omit<Updateable<DB['ScheduledJob']>, 'runAt'>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Updatable makes all the fields optional

const removeScheduledJobs = async (runAt: Date, filter?: FilterType) => {
const pg = getKysely()
let query = pg.deleteFrom('ScheduledJob').where('runAt', '=', runAt)
if (filter) {
Object.keys(filter).forEach((key) => {
const value = filter[key as keyof FilterType]
if (value) query = query.where(key as keyof FilterType, '=', value)
})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will "AND ${field} = ${value}" for anything specified in the filter object

}
return query.execute()
}

export default removeScheduledJobs
10 changes: 6 additions & 4 deletions packages/server/graphql/mutations/setStageTimer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {GraphQLFloat, GraphQLID, GraphQLNonNull} from 'graphql'
import {SubscriptionChannel} from 'parabol-client/types/constEnums'
import findStageById from 'parabol-client/utils/meetings/findStageById'
import getKysely from '../../postgres/getKysely'
import getRethink from '../../database/rethinkDriver'
import ScheduledJobMeetingStageTimeLimit from '../../database/types/ScheduledJobMetingStageTimeLimit'
import {getUserId, isTeamMember} from '../../utils/authorization'
Expand Down Expand Up @@ -90,12 +91,13 @@ export default {
? new Date(now.getTime() + timeRemaining - AVG_PING)
: newScheduledEndTime
} else {
const pg = getKysely()
stage.isAsync = true
stage.scheduledEndTime = newScheduledEndTime
await r
.table('ScheduledJob')
.insert(new ScheduledJobMeetingStageTimeLimit(newScheduledEndTime, meetingId))
.run()
await pg
.insertInto('ScheduledJob')
.values(new ScheduledJobMeetingStageTimeLimit(newScheduledEndTime, meetingId))
.execute()
IntegrationNotifier.startTimeLimit(dataLoader, newScheduledEndTime, meetingId, teamId)
}
} else {
Expand Down
22 changes: 13 additions & 9 deletions packages/server/graphql/private/mutations/runScheduledJobs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import {Selectable} from 'kysely'
import {SubscriptionChannel} from 'parabol-client/types/constEnums'
import getKysely from '../../../postgres/getKysely'
import {DB} from '../../../postgres/pg'
import getRethink from '../../../database/rethinkDriver'
import NotificationMeetingStageTimeLimitEnd from '../../../database/types/NotificationMeetingStageTimeLimitEnd'
import processTeamsLimitsJob from '../../../database/types/processTeamsLimitsJob'
Expand Down Expand Up @@ -39,11 +42,11 @@ const processMeetingStageTimeLimits = async (

export type ScheduledJobUnion = ScheduledJobMeetingStageTimeLimit | ScheduledTeamLimitsJob

const processJob = async (job: ScheduledJobUnion, dataLoader: DataLoaderWorker) => {
const r = await getRethink()
const res = await r.table('ScheduledJob').get(job.id).delete().run()
const processJob = async (job: Selectable<DB['ScheduledJob']>, dataLoader: DataLoaderWorker) => {
const pg = getKysely()
const res = await pg.deleteFrom('ScheduledJob').where('id', '=', job.id).executeTakeFirst()
// prevent duplicates. after this point, we assume the job finishes to completion (ignores server crashes, etc.)
if (res.deleted !== 1) return
if (res.numDeletedRows !== BigInt(1)) return

if (job.type === 'MEETING_STAGE_TIME_LIMIT_END') {
return processMeetingStageTimeLimits(
Expand All @@ -60,15 +63,16 @@ const runScheduledJobs: MutationResolvers['runScheduledJobs'] = async (
{seconds},
{dataLoader}
) => {
const r = await getRethink()
const pg = getKysely()
const now = new Date()

// RESOLUTION
const before = new Date(now.getTime() + seconds * 1000)
const upcomingJobs = (await r
.table('ScheduledJob')
.between(r.minval, before, {index: 'runAt'})
.run()) as ScheduledJobUnion[]
const upcomingJobs = await pg
.selectFrom('ScheduledJob')
.selectAll()
.where('runAt', '<', before)
.execute()

upcomingJobs.forEach((job) => {
const {runAt} = job
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {Client} from 'pg'
import getPgConfig from '../getPgConfig'

export async function up() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'ScheduledJobTypeEnum') THEN
EXECUTE 'CREATE TYPE "ScheduledJobTypeEnum" AS ENUM (''MEETING_STAGE_TIME_LIMIT_END'', ''LOCK_ORGANIZATION'', ''WARN_ORGANIZATION'')';
END IF;
END $$;

CREATE TABLE "ScheduledJob" (
"id" SERIAL PRIMARY KEY,
"runAt" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,
"type" "ScheduledJobTypeEnum" NOT NULL,
"orgId" VARCHAR(100),
"meetingId" VARCHAR(100)
);

CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_orgId" ON "ScheduledJob"("orgId");
CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_runAt" ON "ScheduledJob"("runAt");
CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_type" ON "ScheduledJob"("type");
`)
await client.end()
}

export async function down() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DROP TABLE IF EXISTS "ScheduledJob";
DROP TYPE IF EXISTS "ScheduledJobTypeEnum";
`)
await client.end()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import {FirstParam} from 'parabol-client/types/generics'
import {Client} from 'pg'
import {r} from 'rethinkdb-ts'
import getPgConfig from '../getPgConfig'
import connectRethinkDB from '../../database/connectRethinkDB'
import getPgp from '../getPgp'

export async function up() {
await connectRethinkDB()
const {pgp, pg} = getPgp()
const batchSize = 1000

const columnSet = new pgp.helpers.ColumnSet(
['runAt', 'type', {name: 'orgId', def: null}, {name: 'meetingId', def: null}],
{table: 'ScheduledJob'}
)

const getNextData = async (leftBoundCursor: Date | undefined) => {
const startAt = leftBoundCursor || r.minval
const nextBatch = await r
.table('ScheduledJob')
.between(startAt, r.maxval, {index: 'runAt', leftBound: 'open'})
.orderBy({index: 'runAt'})
.limit(batchSize)
.run()
if (nextBatch.length === 0) return null
if (nextBatch.length < batchSize) return nextBatch
const lastItem = nextBatch.pop()
const lastMatchingRunAt = nextBatch.findLastIndex((item) => item.runAt !== lastItem!.runAt)
if (lastMatchingRunAt === -1) {
throw new Error(
'batchSize is smaller than the number of items that share the same cursor. Increase batchSize'
)
}
return nextBatch.slice(0, lastMatchingRunAt)
}

await pg.tx('ScheduledJob', (task) => {
const fetchAndProcess: FirstParam<typeof task.sequence> = async (
_index,
leftBoundCursor: undefined | Date
) => {
const nextData = await getNextData(leftBoundCursor)
if (!nextData) return undefined
const insert = pgp.helpers.insert(nextData, columnSet)
await task.none(insert)
return nextData.at(-1)!.runAt
}
return task.sequence(fetchAndProcess)
})
await r.getPoolMaster()?.drain()
}

export async function down() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`DELETE FROM "ScheduledJob"`)
await client.end()
}
Loading
Loading