Skip to content

Commit

Permalink
chore(rethinkdb): Organization: Phase 1 (#9883)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <matt.krick@gmail.com>
  • Loading branch information
mattkrick authored Jul 4, 2024
1 parent 4663e9e commit 6bb5fb2
Show file tree
Hide file tree
Showing 49 changed files with 417 additions and 156 deletions.
15 changes: 0 additions & 15 deletions packages/embedder/indexing/orgIdsWithFeatureFlag.ts

This file was deleted.

49 changes: 24 additions & 25 deletions packages/server/billing/helpers/adjustUserCount.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import {InvoiceItemType} from 'parabol-client/types/constEnums'
import getRethink from '../../database/rethinkDriver'
import {RDatum} from '../../database/stricterR'
import Organization from '../../database/types/Organization'
import OrganizationUser from '../../database/types/OrganizationUser'
import {DataLoaderWorker} from '../../graphql/graphql'
import isValid from '../../graphql/isValid'
import getKysely from '../../postgres/getKysely'
import insertOrgUserAudit from '../../postgres/helpers/insertOrgUserAudit'
import {OrganizationUserAuditEventTypeEnum} from '../../postgres/queries/generated/insertOrgUserAuditQuery'
import {getUserById} from '../../postgres/queries/getUsersByIds'
Expand All @@ -22,7 +23,7 @@ const maybeUpdateOrganizationActiveDomain = async (
dataLoader: DataLoaderWorker
) => {
const r = await getRethink()
const organization = await r.table('Organization').get(orgId).run()
const organization = await dataLoader.get('organizations').load(orgId)
const {isActiveDomainTouched, activeDomain} = organization
// don't modify if the domain was set manually
if (isActiveDomainTouched) return
Expand All @@ -38,14 +39,18 @@ const maybeUpdateOrganizationActiveDomain = async (
// don't modify if we can't guess the domain or the domain we guess is the current domain
const domain = await getActiveDomainForOrgId(orgId)
if (!domain || domain === activeDomain) return

await r
.table('Organization')
.get(orgId)
.update({
activeDomain: domain
})
.run()
organization.activeDomain = domain
const pg = getKysely()
await Promise.all([
pg.updateTable('Organization').set({activeDomain: domain}).where('id', '=', orgId).execute(),
r
.table('Organization')
.get(orgId)
.update({
activeDomain: domain
})
.run()
])
}

const changePause = (inactive: boolean) => async (_orgIds: string[], user: IUser) => {
Expand Down Expand Up @@ -76,18 +81,16 @@ const changePause = (inactive: boolean) => async (_orgIds: string[], user: IUser
const addUser = async (orgIds: string[], user: IUser, dataLoader: DataLoaderWorker) => {
const {id: userId} = user
const r = await getRethink()
const {organizations, organizationUsers} = await r({
organizationUsers: r
const [rawOrganizations, organizationUsers] = await Promise.all([
dataLoader.get('organizations').loadMany(orgIds),
r
.table('OrganizationUser')
.getAll(userId, {index: 'userId'})
.orderBy(r.desc('newUserUntil'))
.coerceTo('array') as unknown as OrganizationUser[],
organizations: r
.table('Organization')
.getAll(r.args(orgIds))
.coerceTo('array') as unknown as Organization[]
}).run()

.coerceTo('array')
.run()
])
const organizations = rawOrganizations.filter(isValid)
const docs = orgIds.map((orgId) => {
const oldOrganizationUser = organizationUsers.find(
(organizationUser) => organizationUser.orgId === orgId
Expand Down Expand Up @@ -153,7 +156,6 @@ export default async function adjustUserCount(
type: InvoiceItemType,
dataLoader: DataLoaderWorker
) {
const r = await getRethink()
const orgIds = Array.isArray(orgInput) ? orgInput : [orgInput]

const user = (await getUserById(userId))!
Expand All @@ -164,11 +166,8 @@ export default async function adjustUserCount(
const auditEventType = auditEventTypeLookup[type]
await insertOrgUserAudit(orgIds, userId, auditEventType)

const paidOrgs = await r
.table('Organization')
.getAll(r.args(orgIds), {index: 'id'})
.filter((org: RDatum) => org('stripeSubscriptionId').default(null).ne(null))
.run()
const organizations = await dataLoader.get('organizations').loadMany(orgIds)
const paidOrgs = organizations.filter(isValid).filter((org) => org.stripeSubscriptionId)

handleEnterpriseOrgQuantityChanges(paidOrgs, dataLoader).catch()
handleTeamOrgQuantityChanges(paidOrgs).catch(Logger.error)
Expand Down
13 changes: 6 additions & 7 deletions packages/server/billing/helpers/generateInvoice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {InvoiceLineItemEnum} from '../../database/types/InvoiceLineItem'
import InvoiceLineItemDetail from '../../database/types/InvoiceLineItemDetail'
import InvoiceLineItemOtherAdjustments from '../../database/types/InvoiceLineItemOtherAdjustments'
import NextPeriodCharges from '../../database/types/NextPeriodCharges'
import Organization from '../../database/types/Organization'
import QuantityChangeLineItem from '../../database/types/QuantityChangeLineItem'
import generateUID from '../../generateUID'
import {DataLoaderWorker} from '../../graphql/graphql'
Expand Down Expand Up @@ -354,16 +353,16 @@ export default async function generateInvoice(
invoice.status === 'paid' && invoice.status_transitions.paid_at
? fromEpochSeconds(invoice.status_transitions.paid_at)
: undefined

const {organization, billingLeaderIds} = await r({
organization: r.table('Organization').get(orgId) as unknown as Organization,
billingLeaderIds: r
const [organization, billingLeaderIds] = await Promise.all([
dataLoader.get('organizations').load(orgId),
r
.table('OrganizationUser')
.getAll(orgId, {index: 'orgId'})
.filter({removedAt: null})
.filter((row: RDatum) => r.expr(['BILLING_LEADER', 'ORG_ADMIN']).contains(row('role')))
.coerceTo('array')('userId') as unknown as string[]
}).run()
.coerceTo('array')('userId')
.run() as any as string[]
])

const billingLeaders = (await dataLoader.get('users').loadMany(billingLeaderIds)).filter(isValid)
const billingLeaderEmails = billingLeaders.map((user) => user.email)
Expand Down
5 changes: 2 additions & 3 deletions packages/server/billing/helpers/generateUpcomingInvoice.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import getRethink from '../../database/rethinkDriver'
import {DataLoaderWorker} from '../../graphql/graphql'
import getUpcomingInvoiceId from '../../utils/getUpcomingInvoiceId'
import {getStripeManager} from '../../utils/stripe'
import fetchAllLines from './fetchAllLines'
import generateInvoice from './generateInvoice'

const generateUpcomingInvoice = async (orgId: string, dataLoader: DataLoaderWorker) => {
const r = await getRethink()
const invoiceId = getUpcomingInvoiceId(orgId)
const {stripeId} = await r.table('Organization').get(orgId).pluck('stripeId').run()
const organization = await dataLoader.get('organizations').load(orgId)
const {stripeId} = organization
const manager = getStripeManager()
const [stripeLineItems, upcomingInvoice] = await Promise.all([
fetchAllLines('upcoming', stripeId),
Expand Down
36 changes: 26 additions & 10 deletions packages/server/billing/helpers/teamLimitsCheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ export const maybeRemoveRestrictions = async (orgId: string, dataLoader: DataLoa

if (!(await isLimitExceeded(orgId))) {
const billingLeadersIds = await dataLoader.get('billingLeadersIdsByOrgId').load(orgId)
const pg = getKysely()
await Promise.all([
pg
.updateTable('Organization')
.set({tierLimitExceededAt: null, scheduledLockAt: null, lockedAt: null})
.where('id', '=', orgId)
.execute(),
r
.table('Organization')
.get(orgId)
Expand Down Expand Up @@ -129,16 +135,26 @@ export const checkTeamsLimit = async (orgId: string, dataLoader: DataLoaderWorke

const now = new Date()
const scheduledLockAt = new Date(now.getTime() + ms(`${Threshold.STARTER_TIER_LOCK_AFTER_DAYS}d`))

await r
.table('Organization')
.get(orgId)
.update({
tierLimitExceededAt: now,
scheduledLockAt,
updatedAt: now
})
.run()
const pg = getKysely()
await Promise.all([
pg
.updateTable('Organization')
.set({
tierLimitExceededAt: now,
scheduledLockAt
})
.where('id', '=', orgId)
.execute(),
r
.table('Organization')
.get(orgId)
.update({
tierLimitExceededAt: now,
scheduledLockAt,
updatedAt: now
})
.run()
])
dataLoader.get('organizations').clear(orgId)

const billingLeaders = await getBillingLeadersByOrgId(orgId, dataLoader)
Expand Down
48 changes: 31 additions & 17 deletions packages/server/billing/helpers/terminateSubscription.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,45 @@
import getRethink from '../../database/rethinkDriver'
import Organization from '../../database/types/Organization'
import getKysely from '../../postgres/getKysely'
import {Logger} from '../../utils/Logger'
import sendToSentry from '../../utils/sendToSentry'
import {getStripeManager} from '../../utils/stripe'

const terminateSubscription = async (orgId: string) => {
const r = await getRethink()
const pg = getKysely()
const now = new Date()
// flag teams as unpaid
const [rethinkResult] = await Promise.all([
r({
organization: r
.table('Organization')
.get(orgId)
.update(
{
// periodEnd should always be redundant, but useful for testing purposes
periodEnd: now,
stripeSubscriptionId: null
},
{returnChanges: true}
)('changes')(0)('old_val')
.default(null) as unknown as Organization
}).run()
const [pgOrganization, organization] = await Promise.all([
pg
.with('OldOrg', (qc) =>
qc.selectFrom('Organization').select('stripeSubscriptionId').where('id', '=', orgId)
)
.updateTable('Organization')
.set({periodEnd: now, stripeSubscriptionId: null})
.where('id', '=', orgId)
.returning((qc) =>
qc.selectFrom('OldOrg').select('stripeSubscriptionId').as('stripeSubscriptionId')
)
.executeTakeFirst(),
r
.table('Organization')
.get(orgId)
.update(
{
// periodEnd should always be redundant, but useful for testing purposes
periodEnd: now,
stripeSubscriptionId: null
},
{returnChanges: true}
)('changes')(0)('old_val')
.default(null)
.run() as unknown as Organization
])
const {organization} = rethinkResult
const {stripeSubscriptionId} = organization

if (stripeSubscriptionId !== pgOrganization?.stripeSubscriptionId) {
sendToSentry(new Error(`stripeSubscriptionId mismatch for orgId ${orgId}`))
}
if (stripeSubscriptionId) {
const manager = getStripeManager()
try {
Expand Down
7 changes: 0 additions & 7 deletions packages/server/database/types/GoogleAnalyzedEntity.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import {sql} from 'kysely'

interface Input {
lemma?: string
name: string
Expand All @@ -17,8 +15,3 @@ export default class GoogleAnalyzedEntity {
this.salience = salience
}
}

export const toGoogleAnalyzedEntityPG = (entities: GoogleAnalyzedEntity[]) =>
sql<
string[]
>`(select coalesce(array_agg((name, salience, lemma)::"GoogleAnalyzedEntity"), '{}') from json_populate_recordset(null::"GoogleAnalyzedEntity", ${JSON.stringify(entities)}))`
10 changes: 9 additions & 1 deletion packages/server/database/types/processTeamsLimitsJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import sendTeamsLimitEmail from '../../billing/helpers/sendTeamsLimitEmail'
import {DataLoaderWorker} from '../../graphql/graphql'
import isValid from '../../graphql/isValid'
import publishNotification from '../../graphql/public/mutations/helpers/publishNotification'
import getKysely from '../../postgres/getKysely'
import NotificationTeamsLimitReminder from './NotificationTeamsLimitReminder'
import ScheduledTeamLimitsJob from './ScheduledTeamLimitsJob'

Expand All @@ -27,7 +28,14 @@ const processTeamsLimitsJob = async (job: ScheduledTeamLimitsJob, dataLoader: Da

if (type === 'LOCK_ORGANIZATION') {
const now = new Date()
await r.table('Organization').get(orgId).update({lockedAt: now}).run()
await Promise.all([
getKysely()
.updateTable('Organization')
.set({lockedAt: now})
.where('id', '=', 'orgId')
.execute(),
r.table('Organization').get(orgId).update({lockedAt: now}).run()
])
organization.lockedAt = lockedAt
} else if (type === 'WARN_ORGANIZATION') {
const notificationsToInsert = billingLeadersIds.map((userId) => {
Expand Down
6 changes: 5 additions & 1 deletion packages/server/graphql/mutations/addOrg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ export default {
const teamId = generateUID()
const {email} = viewer
await createNewOrg(orgId, orgName, viewerId, email, dataLoader)
await createTeamAndLeader(viewer, {id: teamId, orgId, isOnboardTeam: false, ...newTeam})
await createTeamAndLeader(
viewer,
{id: teamId, orgId, isOnboardTeam: false, ...newTeam},
dataLoader
)

const {tms} = authToken
// MUTATIVE
Expand Down
2 changes: 1 addition & 1 deletion packages/server/graphql/mutations/addTeam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export default {

// RESOLUTION
const teamId = generateUID()
await createTeamAndLeader(viewer, {id: teamId, isOnboardTeam: false, ...newTeam})
await createTeamAndLeader(viewer, {id: teamId, isOnboardTeam: false, ...newTeam}, dataLoader)

const {tms} = authToken
// MUTATIVE
Expand Down
4 changes: 2 additions & 2 deletions packages/server/graphql/mutations/createReflection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import getGroupSmartTitle from 'parabol-client/utils/smartGroup/getGroupSmartTit
import unlockAllStagesForPhase from 'parabol-client/utils/unlockAllStagesForPhase'
import normalizeRawDraftJS from 'parabol-client/validation/normalizeRawDraftJS'
import getRethink from '../../database/rethinkDriver'
import {toGoogleAnalyzedEntityPG} from '../../database/types/GoogleAnalyzedEntity'
import ReflectionGroup from '../../database/types/ReflectionGroup'
import generateUID from '../../generateUID'
import getKysely from '../../postgres/getKysely'
import {toGoogleAnalyzedEntity} from '../../postgres/helpers/toGoogleAnalyzedEntity'
import {analytics} from '../../utils/analytics/analytics'
import {getUserId} from '../../utils/authorization'
import publish from '../../utils/publish'
Expand Down Expand Up @@ -102,7 +102,7 @@ export default {
await pg
.with('Group', (qc) => qc.insertInto('RetroReflectionGroup').values(reflectionGroup))
.insertInto('RetroReflection')
.values({...reflection, entities: toGoogleAnalyzedEntityPG(entities)})
.values({...reflection, entities: toGoogleAnalyzedEntity(entities)})
.execute()

const groupPhase = phases.find((phase) => phase.phaseType === 'group')!
Expand Down
6 changes: 3 additions & 3 deletions packages/server/graphql/mutations/downgradeToStarter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {GraphQLID, GraphQLList, GraphQLNonNull, GraphQLString} from 'graphql'
import {SubscriptionChannel} from 'parabol-client/types/constEnums'
import getRethink from '../../database/rethinkDriver'
import {getUserId, isSuperUser, isUserBillingLeader} from '../../utils/authorization'
import publish from '../../utils/publish'
import standardError from '../../utils/standardError'
Expand Down Expand Up @@ -37,7 +36,6 @@ export default {
}: {orgId: string; reasonsForLeaving?: TReasonToDowngradeEnum[]; otherTool?: string},
{authToken, dataLoader, socketId: mutatorId}: GQLContext
) {
const r = await getRethink()
const operationId = dataLoader.share()
const subOptions = {mutatorId, operationId}

Expand All @@ -56,7 +54,8 @@ export default {
return standardError(new Error('Other tool name is too long'), {userId: viewerId})
}

const {stripeSubscriptionId, tier} = await r.table('Organization').get(orgId).run()
const {stripeSubscriptionId, tier} = await dataLoader.get('organizations').load(orgId)
dataLoader.get('organizations').clear(orgId)

if (tier === 'starter') {
return standardError(new Error('Already on free tier'), {userId: viewerId})
Expand All @@ -68,6 +67,7 @@ export default {
orgId,
stripeSubscriptionId!,
viewer,
dataLoader,
reasonsForLeaving,
otherTool
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ const bootstrapNewUser = async (
const orgName = `${newUser.preferredName}’s Org`
await createNewOrg(orgId, orgName, userId, email, dataLoader)
await Promise.all([
createTeamAndLeader(newUser as IUser, validNewTeam),
createTeamAndLeader(newUser as IUser, validNewTeam, dataLoader),
addSeedTasks(userId, teamId),
r.table('SuggestedAction').insert(new SuggestedActionInviteYourTeam({userId, teamId})).run(),
sendPromptToJoinOrg(newUser, dataLoader)
Expand Down
Loading

0 comments on commit 6bb5fb2

Please sign in to comment.