Skip to content

Commit

Permalink
UBERF-6374: Improve server logging and improve startup performance
Browse files Browse the repository at this point in the history
UBERF-6374: Improve server logging and improve startup performance
+ Add connection stats logging on workbench client
  • Loading branch information
haiodo committed Apr 5, 2024
1 parent ceac67f commit cf8890b
Show file tree
Hide file tree
Showing 27 changed files with 526 additions and 325 deletions.
6 changes: 0 additions & 6 deletions dev/storage/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ class InMemoryTxAdapter extends DummyDbAdapter implements TxAdapter {
return r
}

async init (model: Tx[]): Promise<void> {
for (const tx of model) {
await this.txdb.tx(tx)
}
}

async getModel (): Promise<Tx[]> {
return builder().getTxes()
}
Expand Down
130 changes: 92 additions & 38 deletions packages/core/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { BackupClient, DocChunk } from './backup'
import { Account, AttachedDoc, Class, DOMAIN_MODEL, Doc, Domain, PluginConfiguration, Ref, Timestamp } from './classes'
import core from './component'
import { Hierarchy } from './hierarchy'
import { MeasureContext, MeasureMetricsContext } from './measurements'
import { ModelDb } from './memdb'
import type { DocumentQuery, FindOptions, FindResult, FulltextStorage, Storage, TxResult, WithLookup } from './storage'
import { SearchOptions, SearchQuery, SearchResult, SortingOrder } from './storage'
Expand Down Expand Up @@ -222,8 +223,10 @@ export async function createClient (
connect: (txHandler: TxHandler) => Promise<ClientConnection>,
// If set will build model with only allowed plugins.
allowedPlugins?: Plugin[],
txPersistence?: TxPersistenceStore
txPersistence?: TxPersistenceStore,
_ctx?: MeasureContext
): Promise<AccountClient> {
const ctx = _ctx ?? new MeasureMetricsContext('createClient', {})
let client: ClientImpl | null = null

// Temporal buffer, while we apply model
Expand All @@ -248,9 +251,13 @@ export async function createClient (
}
const configs = new Map<Ref<PluginConfiguration>, PluginConfiguration>()

const conn = await connect(txHandler)
const conn = await ctx.with('connect', {}, async () => await connect(txHandler))

await loadModel(conn, allowedPlugins, configs, hierarchy, model, false, txPersistence)
await ctx.with(
'load-model',
{ reload: false },
async (ctx) => await loadModel(ctx, conn, allowedPlugins, configs, hierarchy, model, false, txPersistence)
)

txBuffer = txBuffer.filter((tx) => tx.space !== core.space.Model)

Expand All @@ -264,14 +271,20 @@ export async function createClient (
conn.onConnect = async (event) => {
console.log('Client: onConnect', event)
// Find all new transactions and apply
const loadModelResponse = await loadModel(conn, allowedPlugins, configs, hierarchy, model, true, txPersistence)
const loadModelResponse = await ctx.with(
'connect',
{ reload: true },
async (ctx) => await loadModel(ctx, conn, allowedPlugins, configs, hierarchy, model, true, txPersistence)
)

if (event === ClientConnectEvent.Reconnected && loadModelResponse.full) {
// We have upgrade procedure and need rebuild all stuff.
hierarchy = new Hierarchy()
model = new ModelDb(hierarchy)

await buildModel(loadModelResponse, allowedPlugins, configs, hierarchy, model)
await ctx.with('build-model', {}, async (ctx) => {
await buildModel(ctx, loadModelResponse, allowedPlugins, configs, hierarchy, model)
})
await oldOnConnect?.(ClientConnectEvent.Upgraded)

// No need to fetch more stuff since upgrade was happened.
Expand All @@ -285,10 +298,15 @@ export async function createClient (
}

// We need to look for last {transactionThreshold} transactions and if it is more since lastTx one we receive, we need to perform full refresh.
const atxes = await conn.findAll(
core.class.Tx,
{ modifiedOn: { $gt: lastTx }, objectSpace: { $ne: core.space.Model } },
{ sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending }, limit: transactionThreshold }
const atxes = await ctx.with(
'find-atx',
{},
async () =>
await conn.findAll(
core.class.Tx,
{ modifiedOn: { $gt: lastTx }, objectSpace: { $ne: core.space.Model } },
{ sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending }, limit: transactionThreshold }
)
)

let needFullRefresh = false
Expand Down Expand Up @@ -318,14 +336,23 @@ export async function createClient (
}

async function tryLoadModel (
ctx: MeasureContext,
conn: ClientConnection,
reload: boolean,
persistence?: TxPersistenceStore
): Promise<LoadModelResponse> {
const current = (await persistence?.load()) ?? { full: true, transactions: [], hash: '' }
const current = (await ctx.with('persistence-load', {}, async () => await persistence?.load())) ?? {
full: true,
transactions: [],
hash: ''
}

const lastTxTime = getLastTxTime(current.transactions)
const result = await conn.loadModel(lastTxTime, current.hash)
const result = await ctx.with(
'connection-load-model',
{},
async (ctx) => await conn.loadModel(lastTxTime, current.hash)
)

if (Array.isArray(result)) {
// Fallback to old behavior, only for tests
Expand All @@ -337,10 +364,15 @@ async function tryLoadModel (
}

// Save concatenated
await persistence?.store({
...result,
transactions: !result.full ? current.transactions.concat(result.transactions) : result.transactions
})
void (await ctx.with(
'persistence-store',
{},
async (ctx) =>
await persistence?.store({
...result,
transactions: !result.full ? current.transactions.concat(result.transactions) : result.transactions
})
))

if (!result.full && !reload) {
result.transactions = current.transactions.concat(result.transactions)
Expand All @@ -361,6 +393,7 @@ function isPersonAccount (tx: Tx): boolean {
}

async function loadModel (
ctx: MeasureContext,
conn: ClientConnection,
allowedPlugins: Plugin[] | undefined,
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
Expand All @@ -371,7 +404,11 @@ async function loadModel (
): Promise<LoadModelResponse> {
const t = Date.now()

const modelResponse = await tryLoadModel(conn, reload, persistence)
const modelResponse = await ctx.with(
'try-load-model',
{},
async (ctx) => await tryLoadModel(ctx, conn, reload, persistence)
)

if (reload && modelResponse.full) {
return modelResponse
Expand All @@ -385,11 +422,14 @@ async function loadModel (
)
}

await buildModel(modelResponse, allowedPlugins, configs, hierarchy, model)
await ctx.with('build-model', {}, async (ctx) => {
await buildModel(ctx, modelResponse, allowedPlugins, configs, hierarchy, model)
})
return modelResponse
}

async function buildModel (
ctx: MeasureContext,
modelResponse: LoadModelResponse,
allowedPlugins: Plugin[] | undefined,
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
Expand All @@ -400,38 +440,52 @@ async function buildModel (
const userTx: Tx[] = []

const atxes = modelResponse.transactions
atxes.forEach((tx) =>
((tx.modifiedBy === core.account.ConfigUser || tx.modifiedBy === core.account.System) && !isPersonAccount(tx)
? systemTx
: userTx
).push(tx)
)

await ctx.with('split txes', {}, async () => {
atxes.forEach((tx) =>
((tx.modifiedBy === core.account.ConfigUser || tx.modifiedBy === core.account.System) && !isPersonAccount(tx)
? systemTx
: userTx
).push(tx)
)
})

if (allowedPlugins != null) {
fillConfiguration(systemTx, configs)
fillConfiguration(userTx, configs)
await ctx.with('fill config system', {}, async () => {
fillConfiguration(systemTx, configs)
})
await ctx.with('fill config user', {}, async () => {
fillConfiguration(userTx, configs)
})
const excludedPlugins = Array.from(configs.values()).filter(
(it) => !it.enabled || !allowedPlugins.includes(it.pluginId)
)
systemTx = pluginFilterTx(excludedPlugins, configs, systemTx)
await ctx.with('filter txes', {}, async () => {
systemTx = pluginFilterTx(excludedPlugins, configs, systemTx)
})
}

const txes = systemTx.concat(userTx)

for (const tx of txes) {
try {
hierarchy.tx(tx)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
await ctx.with('build hierarchy', {}, async () => {
for (const tx of txes) {
try {
hierarchy.tx(tx)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
}
}
}
for (const tx of txes) {
try {
await model.tx(tx)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
})
await ctx.with('build model', {}, async (ctx) => {
model.ctx = ctx // update model context, so we will track all metrics
for (const tx of txes) {
try {
model.addTx(tx, true)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
}
}
}
})
}

function getLastTxTime (txes: Tx[]): number {
Expand Down
Loading

0 comments on commit cf8890b

Please sign in to comment.