-
Notifications
You must be signed in to change notification settings - Fork 331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(rethinkdb): OrganizationUser: Phase 2 #9953
Changes from all commits
bfa5c21
7ac086e
9844960
62e123c
c38f86c
d318560
16843dc
abb3bdf
9911448
af03b31
2a3c0ca
8dfa870
3b48d22
24da04b
c60e502
0d2c7d7
9ef9346
76aeff9
0830c8e
a12b08a
a732657
954503d
c7775ad
d53aca4
6689e39
a6fd0d3
00964a8
50ca56b
b220f22
22ca4f3
9c55acd
9dfd35c
654aa42
ebbea29
7dbea0d
3fd2236
c552ea6
50d9e77
b7c092d
fe9efe9
4d838f3
34b05bf
02aa23d
dfba81e
81d68b7
f4f5ede
dcbcd78
b329fd3
0a549d6
5f6bdfc
21876d9
af856f2
129d1a9
6e368a4
27b0cf2
b5422dd
3c061e2
1cdf979
473c0c5
49be1bc
65614c6
ca01882
44d153e
303f493
3c3b46c
3e3634c
8d33999
78dcb46
24a2698
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
import {Kysely, PostgresDialect} from 'kysely' | ||
import {r} from 'rethinkdb-ts' | ||
import connectRethinkDB from '../../database/connectRethinkDB' | ||
import getPg from '../getPg' | ||
|
||
export async function up() { | ||
await connectRethinkDB() | ||
const pg = new Kysely<any>({ | ||
dialect: new PostgresDialect({ | ||
pool: getPg() | ||
}) | ||
}) | ||
try { | ||
console.log('Adding index') | ||
await r | ||
.table('OrganizationUser') | ||
.indexCreate('joinedAtId', (row: any) => [row('joinedAt'), row('id')]) | ||
.run() | ||
await r.table('OrganizationUser').indexWait().run() | ||
} catch { | ||
// index already exists | ||
} | ||
await r.table('OrganizationUser').get('aGhostOrganizationUser').update({tier: 'enterprise'}).run() | ||
await console.log('Adding index complete') | ||
const MAX_PG_PARAMS = 65545 | ||
const PG_COLS = [ | ||
'id', | ||
'suggestedTier', | ||
'inactive', | ||
'joinedAt', | ||
'orgId', | ||
'removedAt', | ||
'role', | ||
'userId', | ||
'tier', | ||
'trialStartDate' | ||
] as const | ||
type OrganizationUser = { | ||
[K in (typeof PG_COLS)[number]]: any | ||
} | ||
const BATCH_SIZE = Math.trunc(MAX_PG_PARAMS / PG_COLS.length) | ||
|
||
let curjoinedAt = r.minval | ||
let curId = r.minval | ||
for (let i = 0; i < 1e6; i++) { | ||
console.log('inserting row', i * BATCH_SIZE, curjoinedAt, curId) | ||
const rawRowsToInsert = (await r | ||
.table('OrganizationUser') | ||
.between([curjoinedAt, curId], [r.maxval, r.maxval], { | ||
index: 'joinedAtId', | ||
leftBound: 'open', | ||
rightBound: 'closed' | ||
}) | ||
.orderBy({index: 'joinedAtId'}) | ||
.limit(BATCH_SIZE) | ||
.pluck(...PG_COLS) | ||
.run()) as OrganizationUser[] | ||
|
||
const rowsToInsert = rawRowsToInsert.map((row) => { | ||
const {newUserUntil, ...rest} = row as any | ||
return { | ||
...rest | ||
} | ||
}) | ||
if (rowsToInsert.length === 0) break | ||
const lastRow = rowsToInsert[rowsToInsert.length - 1] | ||
curjoinedAt = lastRow.joinedAt | ||
curId = lastRow.id | ||
try { | ||
await pg | ||
.insertInto('OrganizationUser') | ||
.values(rowsToInsert) | ||
.onConflict((oc) => oc.doNothing()) | ||
.execute() | ||
} catch (e) { | ||
await Promise.all( | ||
rowsToInsert.map(async (row) => { | ||
try { | ||
await pg | ||
.insertInto('OrganizationUser') | ||
.values(row) | ||
.onConflict((oc) => oc.doNothing()) | ||
.execute() | ||
} catch (e) { | ||
if (e.constraint === 'fk_userId' || e.constraint === 'fk_orgId') { | ||
console.log(`Skipping ${row.id} because it has no user/org`) | ||
return | ||
} | ||
console.log(e, row) | ||
} | ||
}) | ||
) | ||
} | ||
} | ||
} | ||
|
||
export async function down() { | ||
await connectRethinkDB() | ||
try { | ||
await r.table('OrganizationUser').indexDrop('joinedAtId').run() | ||
} catch { | ||
// index already dropped | ||
} | ||
const pg = new Kysely<any>({ | ||
dialect: new PostgresDialect({ | ||
pool: getPg() | ||
}) | ||
}) | ||
await pg.deleteFrom('OrganizationUser').execute() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ export const checkRowCount = async (tableName: string) => { | |
} | ||
|
||
export async function checkTableEq( | ||
rethinkQuery: (updatedAt: Date, id: string | number) => RSelection, | ||
rethinkQuery: (joinedAt: Date, id: string | number) => RSelection, | ||
pgQuery: (ids: string[]) => Promise<PGDoc[] | null>, | ||
equalityMap: Record<string, (a: unknown, b: unknown) => boolean>, | ||
maxErrors: number | null | undefined | ||
|
@@ -51,7 +51,7 @@ export async function checkTableEq( | |
.run()) as RethinkDoc[] | ||
if (rethinkRows.length === 0) break | ||
const lastRow = rethinkRows[rethinkRows.length - 1]! | ||
curUpdatedDate = lastRow.updatedAt | ||
curUpdatedDate = lastRow.joinedAt | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Optimise the loop condition. The loop condition - for (let i = 0; i < 1e6; i++) {
+ while (true) {
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. using a while true loop would open us up to a threat of an infinite loop, right? isn't that worse than an arbitrarily big for loop? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mattkrick, you are correct that using a Would you like me to propose a specific solution or gather more information on the data size to determine a better exit condition? |
||
curId = lastRow.id | ||
const ids = rethinkRows.map((t) => t.id) | ||
const pgRows = (await pgQuery(ids)) ?? [] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove hardcoded ghost organization user
The hardcoded update for
aGhostOrganizationUser
should be documented or removed if not necessary.