Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gergo/web 2158 previews module multi region #3492

Merged
merged 5 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions packages/preview-service/src/bin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ import '@/bootstrap.js' // This has side-effects and has to be imported first

import { startServer } from '@/server/server.js'
import { startPreviewService } from '@/server/background.js'
import { db } from '@/clients/knex.js'

startServer({ db })
startPreviewService({ db })
const start = async () => {
await startServer()
await startPreviewService()
}

start()
.then()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the .then()/.catch() bits are redundant

.catch((err) => {
throw err
})
97 changes: 71 additions & 26 deletions packages/preview-service/src/clients/knex.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,74 @@
import { knexLogger as logger } from '@/observability/logging.js'
import { getPostgresConnectionString, getPostgresMaxConnections } from '@/utils/env.js'
import * as knex from 'knex'
import { get } from 'lodash-es'

// CJS interop (types are off)
const knexBuilder = (get(knex, 'default') ||
get(knex, 'knex')) as unknown as typeof knex.knex

export const db = knexBuilder({
client: 'pg',
connection: {
// eslint-disable-next-line camelcase
application_name: 'speckle_preview_service',
connectionString: getPostgresConnectionString()
},
pool: {
min: 0,
max: getPostgresMaxConnections(),
acquireTimeoutMillis: 16000, //allows for 3x creation attempts plus idle time between attempts
createTimeoutMillis: 5000
},
log: {
warn: (message) => logger.warn(message),
error: (message) => logger.error(message),
debug: (message) => logger.debug(message)
import Environment from '@speckle/shared/dist/commonjs/environment/index.js'
import {
loadMultiRegionsConfig,
configureKnexClient
} from '@speckle/shared/dist/commonjs/environment/multiRegionConfig.js'
import { Knex } from 'knex'

const { FF_WORKSPACES_MULTI_REGION_ENABLED } = Environment.getFeatureFlags()

type ConfiguredKnexClient = ReturnType<typeof configureKnexClient>
export type DbClients = Record<'main', ConfiguredKnexClient> &
Record<string, ConfiguredKnexClient>
let dbClients: DbClients

const isDevEnv = process.env.NODE_ENV === 'development'

export const getDbClients = async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already better than it was, but in the future we can probably even extract a lot of this DB querying/"repository" logic that is common between these BG services into shared as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in future, preview service should be given a short-lived API token and call the API on the server instead of duplicating the objects stuff.
But that's a discussion & refactor for the future, not one to hold up this PR.

if (dbClients) return dbClients
const maxConnections = getPostgresMaxConnections()

const configArgs = {
migrationDirs: [],
isTestEnv: isDevEnv,
isDevOrTestEnv: isDevEnv,
logger,
maxConnections,
applicationName: 'speckle_fileimport_service'
}
if (!FF_WORKSPACES_MULTI_REGION_ENABLED) {
const mainClient = configureKnexClient(
{
postgres: {
connectionUri: getPostgresConnectionString()
}
},
configArgs
)
dbClients = { main: mainClient }
} else {
const configPath = process.env.MULTI_REGION_CONFIG_PATH || 'multiregion.json'
const config = await loadMultiRegionsConfig({ path: configPath })
const clients = [['main', configureKnexClient(config.main, configArgs)]]
Object.entries(config.regions).map(([key, config]) => {
clients.push([key, configureKnexClient(config, configArgs)])
})
dbClients = Object.fromEntries(clients) as DbClients
}
// migrations are managed in the server package
})
return dbClients
}

export const getProjectDbClient = async ({
projectId
}: {
projectId: string
}): Promise<Knex> => {
const dbClients = await getDbClients()
const mainDb = dbClients.main.public
if (!FF_WORKSPACES_MULTI_REGION_ENABLED) return mainDb

const projectRegion = await mainDb<{ id: string; regionKey: string }>('streams')
.select('id', 'regionKey')
.where({ id: projectId })
.first()

if (!projectRegion) return mainDb

const regionDb = dbClients[projectRegion.regionKey]
if (!regionDb)
throw new Error(`Project region client not found for ${projectRegion.regionKey}`)

return regionDb.public
}
6 changes: 3 additions & 3 deletions packages/preview-service/src/observability/metricsApp.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { getDbClients } from '@/clients/knex.js'
import { loggingExpressMiddleware } from '@/observability/expressLogging.js'
import { metricsRouterFactory } from '@/observability/metricsRoute.js'
import { initPrometheusMetrics } from '@/observability/prometheusMetrics.js'
import { errorHandler } from '@/utils/errorHandler.js'
import express from 'express'
import createError from 'http-errors'
import type { Knex } from 'knex'

export const appFactory = (deps: { db: Knex }) => {
const { db } = deps
export const appFactory = async () => {
const db = (await getDbClients()).main.public
initPrometheusMetrics({ db })
const app = express()

Expand Down
8 changes: 3 additions & 5 deletions packages/preview-service/src/server/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import previewRouterFactory from '@/server/routes/preview.js'
import { errorHandler } from '@/utils/errorHandler.js'
import express from 'express'
import createError from 'http-errors'
import type { Knex } from 'knex'
import path from 'path'

export const appFactory = (deps: { db: Knex }) => {
const { db } = deps
export const appFactory = () => {
const app = express()

app.use(loggingExpressMiddleware)
Expand All @@ -23,8 +21,8 @@ export const appFactory = (deps: { db: Knex }) => {

app.use('/', indexRouterFactory())
app.use('/preview', previewRouterFactory())
app.use('/objects', objectsRouterFactory({ db }))
app.use('/api', apiRouterFactory({ db }))
app.use('/objects', objectsRouterFactory())
app.use('/api', apiRouterFactory())

// catch 404 and forward to error handler
app.use(function (req, res, next) {
Expand Down
67 changes: 49 additions & 18 deletions packages/preview-service/src/server/background.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//FIXME this doesn't quite fit in the /server directory, but it's not a service either. It's a background worker.
import { updateHealthcheckDataFactory } from '@/clients/execHealthcheck.js'
import { generatePreviewFactory } from '@/clients/previewService.js'
import { WorkStatus } from '@/domain/backgroundWorker.js'
import { extendLoggerComponent, logger } from '@/observability/logging.js'
import { initPrometheusMetrics } from '@/observability/prometheusMetrics.js'
import {
Expand All @@ -14,32 +15,42 @@ import {
import { insertPreviewFactory } from '@/repositories/previews.js'
import { generateAndStore360PreviewFactory } from '@/services/360preview.js'
import { pollForAndCreatePreviewFactory } from '@/services/pollForPreview.js'
import { forceExit, repeatedlyDoSomeWorkFactory } from '@/services/taskManager.js'
import { throwUncoveredError, wait } from '@speckle/shared'
import {
getHealthCheckFilePath,
serviceOrigin,
getPreviewTimeout
} from '@/utils/env.js'
import type { Knex } from 'knex'
import { DbClients, getDbClients } from '@/clients/knex.js'

export function startPreviewService(params: { db: Knex }) {
const { db } = params
let shouldExit = false

export async function startPreviewService() {
const backgroundLogger = extendLoggerComponent(logger, 'backgroundWorker')
backgroundLogger.info('📸 Starting Preview Service background worker')

process.on('SIGTERM', () => {
forceExit()
shouldExit = true
backgroundLogger.info('Shutting down...')
})

process.on('SIGINT', () => {
forceExit()
shouldExit = true
backgroundLogger.info('Shutting down...')
})

initPrometheusMetrics({ db })
repeatedlyDoSomeWorkFactory({
doSomeWork: pollForAndCreatePreviewFactory({
const dbClients = await getDbClients()

// TODO, this should also be initialized for all DBs
initPrometheusMetrics({ db: dbClients.main.public })

const clientGenerator = infiniteDbClientsIterator(dbClients)

while (!shouldExit) {
const db = clientGenerator.next().value
if (!db) throw new Error('The infinite client generator failed to return a client')

const status = await pollForAndCreatePreviewFactory({
updateHealthcheckData: updateHealthcheckDataFactory({
healthCheckFilePath: getHealthCheckFilePath()
}),
Expand All @@ -54,14 +65,34 @@ export function startPreviewService(params: { db: Knex }) {
updatePreviewMetadata: updatePreviewMetadataFactory({ db }),
notifyUpdate: notifyUpdateFactory({ db }),
logger: backgroundLogger
}),
onExit: () => {
process.exit(0)
},
delayPeriods: {
onSuccess: 10,
onNoWorkFound: 1000,
onFailed: 5000
})()

switch (status) {
case WorkStatus.SUCCESS:
await wait(10)
break
case WorkStatus.NOWORKFOUND:
await wait(1000)
break
case WorkStatus.FAILED:
await wait(5000)
break
default:
throwUncoveredError(status)
}
})()
}
process.exit(0)
}

function* infiniteDbClientsIterator(dbClients: DbClients) {
let index = 0
const dbClientEntries = Object.values(dbClients)
const clientCount = dbClientEntries.length
while (true) {
// reset index
if (index === clientCount) index = 0
const dbConnection = dbClientEntries[index]
index++
yield dbConnection.public
}
}
9 changes: 5 additions & 4 deletions packages/preview-service/src/server/routes/api.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { getProjectDbClient } from '@/clients/knex.js'
import { getObjectsStreamFactory } from '@/repositories/objects.js'
import { isSimpleTextRequested, simpleTextOrJsonContentType } from '@/utils/headers.js'
import { SpeckleObjectsStream } from '@/utils/speckleObjectsStream.js'
import express from 'express'
import type { Knex } from 'knex'
import { PassThrough, pipeline } from 'stream'
import zlib from 'zlib'
import { z } from 'zod'

const apiRouterFactory = (deps: { db: Knex }) => {
const { db } = deps
const apiRouterFactory = () => {
const apiRouter = express.Router()

const getObjectsRequestBodySchema = z.object({
Expand All @@ -31,7 +30,9 @@ const apiRouterFactory = (deps: { db: Knex }) => {
'Content-Type': simpleTextOrJsonContentType(req)
})

const dbStream = getObjectsStreamFactory({ db })({
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })

const dbStream = getObjectsStreamFactory({ db: projectDb })({
streamId: req.params.streamId,
objectIds: getObjectsRequestBody.objects
})
Expand Down
15 changes: 9 additions & 6 deletions packages/preview-service/src/server/routes/objects.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import { getProjectDbClient } from '@/clients/knex.js'
import {
getObjectChildrenStreamFactory,
getObjectFactory
} from '@/repositories/objects.js'
import { isSimpleTextRequested, simpleTextOrJsonContentType } from '@/utils/headers.js'
import { SpeckleObjectsStream } from '@/utils/speckleObjectsStream.js'
import express, { RequestHandler } from 'express'
import type { Knex } from 'knex'
import { PassThrough, pipeline } from 'stream'
import zlib from 'zlib'

const objectsRouterFactory = (deps: { db: Knex }) => {
const { db } = deps
const objectsRouterFactory = () => {
const objectsRouter = express.Router()

// This method was copy-pasted from the server method, without authentication/authorization (this web service is an internal one)
Expand All @@ -21,8 +20,10 @@ const objectsRouterFactory = (deps: { db: Knex }) => {
streamId: req.params.streamId,
objectId: req.params.objectId
})

const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
// Populate first object (the "commit")
const obj = await getObjectFactory({ db })({
const obj = await getObjectFactory({ db: projectDb })({
streamId: req.params.streamId,
objectId: req.params.objectId
})
Expand All @@ -36,7 +37,7 @@ const objectsRouterFactory = (deps: { db: Knex }) => {
'Content-Type': simpleTextOrJsonContentType(req)
})

const dbStream = await getObjectChildrenStreamFactory({ db })({
const dbStream = await getObjectChildrenStreamFactory({ db: projectDb })({
streamId: req.params.streamId,
objectId: req.params.objectId
})
Expand Down Expand Up @@ -80,7 +81,9 @@ const objectsRouterFactory = (deps: { db: Knex }) => {
streamId: req.params.streamId,
objectId: req.params.objectId
})
const obj = await getObjectFactory({ db })({

const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const obj = await getObjectFactory({ db: projectDb })({
streamId: req.params.streamId,
objectId: req.params.objectId
})
Expand Down
12 changes: 5 additions & 7 deletions packages/preview-service/src/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,21 @@ import { appFactory as metricsAppFactory } from '@/observability/metricsApp.js'
import { appFactory } from '@/server/app.js'
import { getAppPort, getHost, getMetricsHost, getMetricsPort } from '@/utils/env.js'
import http from 'http'
import type { Knex } from 'knex'
import { isNaN, isString, toNumber } from 'lodash-es'

export const startServer = (params: { db: Knex; serveOnRandomPort?: boolean }) => {
const { db } = params
export const startServer = async (params?: { serveOnRandomPort?: boolean }) => {
/**
* Get port from environment and store in Express.
*/
const inputPort = params.serveOnRandomPort ? 0 : normalizePort(getAppPort())
const app = appFactory({ db })
const inputPort = params?.serveOnRandomPort ? 0 : normalizePort(getAppPort())
const app = appFactory()
app.set('port', inputPort)

// we place the metrics on a separate port as we wish to expose it to external monitoring tools, but do not wish to expose other routes (for now)
const inputMetricsPort = params.serveOnRandomPort
const inputMetricsPort = params?.serveOnRandomPort
? 0
: normalizePort(getMetricsPort())
const metricsApp = metricsAppFactory({ db })
const metricsApp = await metricsAppFactory()
metricsApp.set('port', inputMetricsPort)

/**
Expand Down
Loading
Loading