From 84e3f86e884a01754d6e796331b9c8fa9db81540 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Mon, 22 Apr 2024 10:04:56 +0700 Subject: [PATCH] UBERF-6653: Fix minor issue and add force-close Signed-off-by: Andrey Sobolev --- dev/client-resources/src/connection.ts | 12 +- dev/prod/package.json | 3 +- dev/prod/src/platform.ts | 17 +- dev/tool/src/index.ts | 7 +- packages/core/src/__tests__/client.test.ts | 3 +- packages/core/src/__tests__/connection.ts | 3 +- packages/core/src/backup.ts | 2 + packages/core/src/client.ts | 4 + packages/platform/src/platform.ts | 5 +- packages/platform/src/resource.ts | 29 ++- packages/query/src/__tests__/connection.ts | 3 +- packages/ui/src/popups.ts | 11 +- plugins/client-resources/src/connection.ts | 14 +- .../teamspace/CreateTeamspace.svelte | 11 +- .../src/components/ServerManager.svelte | 64 ++++-- .../src/components/WorkbenchApp.svelte | 2 + server/account/src/operations.ts | 105 ++++----- server/backup/src/backup.ts | 6 +- server/core/src/fulltext.ts | 6 +- server/core/src/server/storage.ts | 3 +- server/mongo/src/__tests__/storage.test.ts | 3 +- server/tool/src/index.ts | 63 +++--- server/ws/src/client.ts | 22 +- server/ws/src/server.ts | 200 +++++++++++------- server/ws/src/server_http.ts | 19 +- server/ws/src/stats.ts | 27 ++- server/ws/src/types.ts | 29 ++- 27 files changed, 421 insertions(+), 252 deletions(-) diff --git a/dev/client-resources/src/connection.ts b/dev/client-resources/src/connection.ts index 8e2808d0770..b2ee4037f1d 100644 --- a/dev/client-resources/src/connection.ts +++ b/dev/client-resources/src/connection.ts @@ -25,17 +25,17 @@ import core, { FindOptions, FindResult, getWorkspaceId, + MeasureDoneOperation, MeasureMetricsContext, Ref, + SearchOptions, + SearchQuery, + SearchResult, ServerStorage, Timestamp, Tx, TxHandler, - TxResult, - SearchQuery, - SearchOptions, - SearchResult, - MeasureDoneOperation + TxResult } from '@hcengineering/core' import { createInMemoryTxAdapter } from '@hcengineering/dev-storage' import devmodel from '@hcengineering/devmodel' @@ -109,6 +109,8 @@ class ServerStorageWrapper implements ClientConnection { async measure (operationName: string): Promise { return async () => ({ time: 0, serverTime: 0 }) } + + async sendForceClose (): Promise {} } async function createNullFullTextAdapter (): Promise { diff --git a/dev/prod/package.json b/dev/prod/package.json index ee73328fe56..367e27acde3 100644 --- a/dev/prod/package.json +++ b/dev/prod/package.json @@ -185,6 +185,7 @@ "@hcengineering/document-resources": "^0.6.0", "@hcengineering/guest": "^0.6.0", "@hcengineering/guest-assets": "^0.6.0", - "@hcengineering/guest-resources": "^0.6.0" + "@hcengineering/guest-resources": "^0.6.0", + "@hcengineering/analytics": "^0.6.0" } } diff --git a/dev/prod/src/platform.ts b/dev/prod/src/platform.ts index 79c213f4a6a..5e332d62452 100644 --- a/dev/prod/src/platform.ts +++ b/dev/prod/src/platform.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import { Plugin, addLocation, addStringsLoader, platformId } from '@hcengineering/platform' +import platform, { Plugin, addLocation, addStringsLoader, platformId } from '@hcengineering/platform' import { activityId } from '@hcengineering/activity' import { attachmentId } from '@hcengineering/attachment' @@ -86,6 +86,8 @@ import { preferenceId } from '@hcengineering/preference' import { setDefaultLanguage } from '@hcengineering/theme' import { uiId } from '@hcengineering/ui/src/plugin' +import { Analytics } from '@hcengineering/analytics' + interface Config { ACCOUNTS_URL: string UPLOAD_URL: string @@ -143,6 +145,19 @@ function configureI18n(): void { } export async function configurePlatform() { + setMetadata(platform.metadata.LoadHelper, async (loader) => { + for (let i = 0; i < 3; i++) { + try { + return loader() + } catch (err: any) { + if (err.message.includes('Loading chunk') && i != 2) { + continue + } + Analytics.handleError(err) + } + } + }) + configureI18n() const config: Config = await (await fetch(devConfig? '/config-dev.json' : '/config.json')).json() diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index fd421f4a4d3..c2d49461765 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -30,8 +30,8 @@ import { replacePassword, setAccountAdmin, setRole, - upgradeWorkspace, - UpgradeWorker + UpgradeWorker, + upgradeWorkspace } from '@hcengineering/account' import { setMetadata } from '@hcengineering/platform' import { @@ -264,7 +264,7 @@ export function devTool ( .action(async (workspace, cmd) => { const { mongodbUri, txes, version, migrateOperations } = prepareTools() await withDatabase(mongodbUri, async (db) => { - const { client } = await createWorkspace( + await createWorkspace( toolCtx, version, txes, @@ -275,7 +275,6 @@ export function devTool ( cmd.workspaceName, workspace ) - await client?.close() }) }) diff --git a/packages/core/src/__tests__/client.test.ts b/packages/core/src/__tests__/client.test.ts index 587fa28720b..1d8231258e3 100644 --- a/packages/core/src/__tests__/client.test.ts +++ b/packages/core/src/__tests__/client.test.ts @@ -123,7 +123,8 @@ describe('client', () => { getAccount: async () => null as unknown as Account, measure: async () => { return async () => ({ time: 0, serverTime: 0 }) - } + }, + sendForceClose: async () => {} } } const spyCreate = jest.spyOn(TxProcessor, 'createDoc2Doc') diff --git a/packages/core/src/__tests__/connection.ts b/packages/core/src/__tests__/connection.ts index 60fade29580..e9c5affe370 100644 --- a/packages/core/src/__tests__/connection.ts +++ b/packages/core/src/__tests__/connection.ts @@ -72,6 +72,7 @@ export async function connect (handler: (tx: Tx) => void): Promise[]) => {}, loadModel: async (last: Timestamp) => txes, getAccount: async () => null as unknown as Account, - measure: async () => async () => ({ time: 0, serverTime: 0 }) + measure: async () => async () => ({ time: 0, serverTime: 0 }), + sendForceClose: async () => {} } } diff --git a/packages/core/src/backup.ts b/packages/core/src/backup.ts index 103188e0843..7e052bb97cb 100644 --- a/packages/core/src/backup.ts +++ b/packages/core/src/backup.ts @@ -23,4 +23,6 @@ export interface BackupClient { loadDocs: (domain: Domain, docs: Ref[]) => Promise upload: (domain: Domain, docs: Doc[]) => Promise clean: (domain: Domain, docs: Ref[]) => Promise + + sendForceClose: () => Promise } diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index 9af45dcfa61..e18e8df1f1d 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -207,6 +207,10 @@ class ClientImpl implements AccountClient, BackupClient, MeasureClient { async getAccount (): Promise { return await this.conn.getAccount() } + + async sendForceClose (): Promise { + await this.conn.sendForceClose() + } } /** diff --git a/packages/platform/src/platform.ts b/packages/platform/src/platform.ts index f49327754f9..3c152c55e77 100644 --- a/packages/platform/src/platform.ts +++ b/packages/platform/src/platform.ts @@ -14,7 +14,7 @@ // limitations under the License. */ -import { Metadata } from '.' +import { Metadata, PluginLoader, PluginModule, Resources } from '.' /** * Id in format 'plugin.resource-kind.id' @@ -156,6 +156,7 @@ export default plugin(platformId, { ProductIdMismatch: '' as StatusCode<{ productId: string }> }, metadata: { - locale: '' as Metadata + locale: '' as Metadata, + LoadHelper: '' as Metadata<(loader: PluginLoader) => Promise>> } }) diff --git a/packages/platform/src/resource.ts b/packages/platform/src/resource.ts index dfe52e0326f..a48db31eb25 100644 --- a/packages/platform/src/resource.ts +++ b/packages/platform/src/resource.ts @@ -19,6 +19,7 @@ import { _parseId } from './ident' import type { Plugin, Resource } from './platform' import { PlatformError, Severity, Status } from './status' +import { getMetadata } from './metadata' import platform from './platform' /** @@ -77,19 +78,25 @@ async function loadPlugin (id: Plugin): Promise { const status = new Status(Severity.INFO, platform.status.LoadingPlugin, { plugin: id }) - pluginLoader = monitor(status, getLocation(id)()).then(async (plugin) => { - try { - // In case of ts-node, we have a bit different import structure, so let's check for it. - if (typeof plugin.default === 'object') { - // eslint-disable-next-line @typescript-eslint/return-await - return await (plugin as any).default.default() + + const loadHelper = getMetadata(platform.metadata.LoadHelper) + + const locationLoader = getLocation(id) + pluginLoader = monitor(status, loadHelper !== undefined ? loadHelper(locationLoader) : locationLoader()).then( + async (plugin) => { + try { + // In case of ts-node, we have a bit different import structure, so let's check for it. + if (typeof plugin.default === 'object') { + // eslint-disable-next-line @typescript-eslint/return-await + return await (plugin as any).default.default() + } + return await plugin.default() + } catch (err: any) { + console.error(err) + throw err } - return await plugin.default() - } catch (err: any) { - console.error(err) - throw err } - }) + ) loading.set(id, pluginLoader) } return await pluginLoader diff --git a/packages/query/src/__tests__/connection.ts b/packages/query/src/__tests__/connection.ts index b78b59e9fc8..fa3d58ab428 100644 --- a/packages/query/src/__tests__/connection.ts +++ b/packages/query/src/__tests__/connection.ts @@ -96,6 +96,7 @@ FulltextStorage & { searchFulltext: async (query: SearchQuery, options: SearchOptions): Promise => { return { docs: [] } }, - measure: async () => async () => ({ time: 0, serverTime: 0 }) + measure: async () => async () => ({ time: 0, serverTime: 0 }), + sendForceClose: async () => {} } } diff --git a/packages/ui/src/popups.ts b/packages/ui/src/popups.ts index a105268e528..b45a602b1ce 100644 --- a/packages/ui/src/popups.ts +++ b/packages/ui/src/popups.ts @@ -30,6 +30,9 @@ export interface CompAndProps { refId?: string } dock?: boolean + + // Internal + closing?: boolean } export interface PopupResult { @@ -116,7 +119,13 @@ export function closePopup (category?: string): void { } else { for (let i = popups.length - 1; i >= 0; i--) { if (popups[i].options.fixed !== true) { - popups[i].onClose?.(undefined) + const isClosing = popups[i].closing ?? false + popups[i].closing = true + if (!isClosing) { + // To prevent possible recursion, we need to check if we call some code from popup close, to do close. + popups[i].onClose?.(undefined) + } + popups[i].closing = false popups.splice(i, 1) break } diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 34f94ac1ef3..5cb4e2cc66b 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -134,22 +134,12 @@ class Connection implements ClientConnection { async close (): Promise { this.closed = true clearInterval(this.interval) - const closeEvt = serialize( - { - method: 'close', - params: [], - id: -1 - }, - false - ) if (this.websocket !== null) { if (this.websocket instanceof Promise) { await this.websocket.then((ws) => { - ws.send(closeEvt) ws.close(1000) }) } else { - this.websocket.send(closeEvt) this.websocket.close(1000) } this.websocket = null @@ -547,6 +537,10 @@ class Connection implements ClientConnection { searchFulltext (query: SearchQuery, options: SearchOptions): Promise { return this.sendRequest({ method: 'searchFulltext', params: [query, options] }) } + + sendForceClose (): Promise { + return this.sendRequest({ method: 'forceClose', params: [] }) + } } /** diff --git a/plugins/document-resources/src/components/teamspace/CreateTeamspace.svelte b/plugins/document-resources/src/components/teamspace/CreateTeamspace.svelte index 0936767755d..9877f062acc 100644 --- a/plugins/document-resources/src/components/teamspace/CreateTeamspace.svelte +++ b/plugins/document-resources/src/components/teamspace/CreateTeamspace.svelte @@ -29,7 +29,7 @@ } from '@hcengineering/core' import document, { Teamspace } from '@hcengineering/document' import { Asset } from '@hcengineering/platform' - import presentation, { Card, getClient } from '@hcengineering/presentation' + import presentation, { Card, getClient, reduceCalls } from '@hcengineering/presentation' import { Button, EditBox, @@ -72,7 +72,7 @@ let spaceType: WithLookup | undefined $: void loadSpaceType(typeId) - async function loadSpaceType (id: typeof typeId): Promise { + const loadSpaceType = reduceCalls(async (id: typeof typeId): Promise => { spaceType = id !== undefined ? await client @@ -85,7 +85,7 @@ } rolesAssignment = getRolesAssignment() - } + }) function getRolesAssignment (): RolesAssignment { if (teamspace === undefined || spaceType?.targetClass === undefined || spaceType?.$lookup?.roles === undefined) { @@ -243,7 +243,10 @@ label={isNew ? documentRes.string.NewTeamspace : documentRes.string.EditTeamspace} okLabel={isNew ? presentation.string.Create : presentation.string.Save} okAction={handleSave} - canSave={name.trim().length > 0 && !(members.length === 0 && isPrivate) && typeId !== undefined} + canSave={name.trim().length > 0 && + !(members.length === 0 && isPrivate) && + typeId !== undefined && + spaceType?.targetClass !== undefined} accentHeader width={'medium'} gap={'gapV-6'} diff --git a/plugins/workbench-resources/src/components/ServerManager.svelte b/plugins/workbench-resources/src/components/ServerManager.svelte index 149fced809a..d173da7d699 100644 --- a/plugins/workbench-resources/src/components/ServerManager.svelte +++ b/plugins/workbench-resources/src/components/ServerManager.svelte @@ -3,7 +3,7 @@ import { Metrics, systemAccountEmail } from '@hcengineering/core' import login from '@hcengineering/login' import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform' - import presentation, { createQuery } from '@hcengineering/presentation' + import presentation, { createQuery, isAdminUser } from '@hcengineering/presentation' import { Button, CheckBox, @@ -88,13 +88,20 @@ $: activeSessions = (data?.statistics?.activeSessions as Record< string, - Array<{ - userId: string - data?: Record - total: StatisticsElement - mins5: StatisticsElement - current: StatisticsElement - }> + { + sessions: Array<{ + userId: string + data?: Record + total: StatisticsElement + mins5: StatisticsElement + current: StatisticsElement + }> + name: string + wsId: string + sessionsTotal: number + upgrading: boolean + closing: boolean + } >) ?? {} const employeeQuery = createQuery() @@ -116,8 +123,8 @@ $: totalStats = Array.from(Object.entries(activeSessions).values()).reduce( (cur, it) => { - const totalFind = it[1].reduce((it, itm) => itm.current.find + it, 0) - const totalTx = it[1].reduce((it, itm) => itm.current.tx + it, 0) + const totalFind = it[1].sessions.reduce((it, itm) => itm.current.find + it, 0) + const totalTx = it[1].sessions.reduce((it, itm) => itm.current.tx + it, 0) return { find: cur.find + totalFind, tx: cur.tx + totalTx @@ -197,26 +204,49 @@
{#each Object.entries(activeSessions) as act} {@const wsInstance = $workspacesStore.find((it) => it.workspaceId === act[0])} - {@const totalFind = act[1].reduce((it, itm) => itm.current.find + it, 0)} - {@const totalTx = act[1].reduce((it, itm) => itm.current.tx + it, 0)} - {@const employeeGroups = Array.from(new Set(act[1].map((it) => it.userId))).filter( + {@const totalFind = act[1].sessions.reduce((it, itm) => itm.current.find + it, 0)} + {@const totalTx = act[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)} + {@const employeeGroups = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter( (it) => systemAccountEmail !== it || !realUsers )} - {@const realGroup = Array.from(new Set(act[1].map((it) => it.userId))).filter( + {@const realGroup = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter( (it) => systemAccountEmail !== it )} {#if employeeGroups.length > 0} -
- Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx} +
+
+ Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx} + {#if act[1].upgrading} + (Upgrading) + {/if} + {#if act[1].closing} + (Closing) + {/if} +
+ {#if isAdminUser()} +
{#each employeeGroups as employeeId} {@const employee = employees.get(employeeId)} - {@const connections = act[1].filter((it) => it.userId === employeeId)} + {@const connections = act[1].sessions.filter((it) => it.userId === employeeId)} {@const find = connections.reduce((it, itm) => itm.current.find + it, 0)} {@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)} diff --git a/plugins/workbench-resources/src/components/WorkbenchApp.svelte b/plugins/workbench-resources/src/components/WorkbenchApp.svelte index 61553fbf3d8..d13c1204bb5 100644 --- a/plugins/workbench-resources/src/components/WorkbenchApp.svelte +++ b/plugins/workbench-resources/src/components/WorkbenchApp.svelte @@ -96,6 +96,7 @@ diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts index b3d08eeb0ab..982855706b7 100644 --- a/server/account/src/operations.ts +++ b/server/account/src/operations.ts @@ -839,8 +839,9 @@ export async function createWorkspace ( email: string, workspaceName: string, workspace?: string, - notifyHandler?: (workspace: Workspace) => void -): Promise<{ workspaceInfo: Workspace, err?: any, client?: Client }> { + notifyHandler?: (workspace: Workspace) => void, + postInitHandler?: (workspace: Workspace, model: Tx[]) => Promise +): Promise<{ workspaceInfo: Workspace, err?: any, model?: Tx[] }> { return await rateLimiter.exec(async () => { // We need to search for duplicate workspaceUrl await searchPromise @@ -861,7 +862,6 @@ export async function createWorkspace ( await updateInfo({ createProgress: 10 }) - let client: Client | undefined const childLogger = ctx.newChild('createWorkspace', { workspace: workspaceInfo.workspace }) const ctxModellogger: ModelLogger = { log: (msg, data) => { @@ -871,6 +871,7 @@ export async function createWorkspace ( void childLogger.error(msg, data) } } + let model: Tx[] = [] try { const initWS = getMetadata(toolPlugin.metadata.InitWorkspace) const wsId = getWorkspaceId(workspaceInfo.workspace, productId) @@ -882,11 +883,10 @@ export async function createWorkspace ( initWS !== workspaceInfo.workspace ) { // Just any valid model for transactor to be able to function - await ( - await initModel(ctx, getTransactor(), wsId, txes, [], ctxModellogger, async (value) => { - await updateInfo({ createProgress: Math.round((Math.min(value, 100) / 100) * 20) }) - }) - ).close() + await initModel(ctx, getTransactor(), wsId, txes, [], ctxModellogger, async (value) => { + await updateInfo({ createProgress: Math.round((Math.min(value, 100) / 100) * 20) }) + }) + await updateInfo({ createProgress: 20 }) // Clone init workspace. await cloneWorkspace( @@ -899,7 +899,7 @@ export async function createWorkspace ( } ) await updateInfo({ createProgress: 50 }) - client = await upgradeModel( + model = await upgradeModel( ctx, getTransactor(), wsId, @@ -913,26 +913,23 @@ export async function createWorkspace ( ) await updateInfo({ createProgress: 90 }) } else { - client = await initModel( - ctx, - getTransactor(), - wsId, - txes, - migrationOperation, - ctxModellogger, - async (value) => { - await updateInfo({ createProgress: Math.round(Math.min(value, 100)) }) - } - ) + await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger, async (value) => { + await updateInfo({ createProgress: Math.round(Math.min(value, 100)) }) + }) } } catch (err: any) { Analytics.handleError(err) return { workspaceInfo, err, client: null as any } } + + if (postInitHandler !== undefined) { + await postInitHandler?.(workspaceInfo, model) + } + childLogger.end() // Workspace is created, we need to clear disabled flag. await updateInfo({ createProgress: 100, disabled: false, creating: false }) - return { workspaceInfo, client } + return { workspaceInfo, model } }) } @@ -970,18 +967,16 @@ export async function upgradeWorkspace ( toVersion: versionStr, workspace: ws.workspace }) - await ( - await upgradeModel( - ctx, - getTransactor(), - getWorkspaceId(ws.workspace, productId), - txes, - migrationOperation, - logger, - false, - async (value) => {} - ) - ).close() + await upgradeModel( + ctx, + getTransactor(), + getWorkspaceId(ws.workspace, productId), + txes, + migrationOperation, + logger, + false, + async (value) => {} + ) await db.collection(WORKSPACE_COLLECTION).updateOne( { _id: ws._id }, @@ -1020,7 +1015,7 @@ export const createUserWorkspace = } async function doCreate (info: Account, notifyHandler: (workspace: Workspace) => void): Promise { - const { workspaceInfo, err, client } = await createWorkspace( + const { workspaceInfo, err } = await createWorkspace( ctx, version, txes, @@ -1030,7 +1025,29 @@ export const createUserWorkspace = email, workspaceName, undefined, - notifyHandler + notifyHandler, + async (workspace, model) => { + const initWS = getMetadata(toolPlugin.metadata.InitWorkspace) + const shouldUpdateAccount = initWS !== undefined && (await getWorkspaceById(db, productId, initWS)) !== null + const client = await connect( + getTransactor(), + getWorkspaceId(workspace.workspace, productId), + undefined, + { + admin: 'true' + }, + model + ) + try { + await assignWorkspace(ctx, db, productId, email, workspace.workspace, shouldUpdateAccount, client) + await setRole(email, workspace.workspace, productId, AccountRole.Owner, client) + await ctx.info('Creating server side done', { workspaceName, email }) + } catch (err: any) { + Analytics.handleError(err) + } finally { + await client.close() + } + } ) if (err != null) { @@ -1045,20 +1062,10 @@ export const createUserWorkspace = ) throw err } - try { - info.lastWorkspace = Date.now() - - // Update last workspace time. - await db.collection(ACCOUNT_COLLECTION).updateOne({ _id: info._id }, { $set: { lastWorkspace: Date.now() } }) - - const initWS = getMetadata(toolPlugin.metadata.InitWorkspace) - const shouldUpdateAccount = initWS !== undefined && (await getWorkspaceById(db, productId, initWS)) !== null - await assignWorkspace(ctx, db, productId, email, workspaceInfo.workspace, shouldUpdateAccount, client) - await setRole(email, workspaceInfo.workspace, productId, AccountRole.Owner, client) - await ctx.info('Creating server side done', { workspaceName, email }) - } finally { - await client?.close() - } + info.lastWorkspace = Date.now() + + // Update last workspace time. + await db.collection(ACCOUNT_COLLECTION).updateOne({ _id: info._id }, { $set: { lastWorkspace: Date.now() } }) } const workspaceInfo = await new Promise((resolve) => { diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index f9d456cc6c0..2a9ba454bf6 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -215,7 +215,9 @@ export async function cloneWorkspace ( mode: 'backup' })) as unknown as CoreClient & BackupClient const targetConnection = (await connect(transactorUrl, targetWorkspaceId, undefined, { - mode: 'backup' + mode: 'backup', + model: 'upgrade', + admin: 'true' })) as unknown as CoreClient & BackupClient try { const domains = sourceConnection @@ -338,6 +340,7 @@ export async function cloneWorkspace ( } finally { console.log('end clone') await sourceConnection.close() + await targetConnection.sendForceClose() await targetConnection.close() } } @@ -997,6 +1000,7 @@ export async function restore ( } } } finally { + await connection.sendForceClose() await connection.close() } } diff --git a/server/core/src/fulltext.ts b/server/core/src/fulltext.ts index 89ffac58178..f93bdda2af1 100644 --- a/server/core/src/fulltext.ts +++ b/server/core/src/fulltext.ts @@ -37,10 +37,10 @@ import core, { type TxResult, type WorkspaceId, docKey, + isClassIndexable, isFullTextAttribute, isIndexedAttribute, - toFindResult, - isClassIndexable + toFindResult } from '@hcengineering/core' import { type FullTextIndexPipeline } from './indexer' import { createStateDoc } from './indexer/utils' @@ -53,7 +53,6 @@ import type { FullTextAdapter, IndexedDoc, WithFind } from './types' */ export class FullTextIndex implements WithFind { txFactory = new TxFactory(core.account.System, true) - consistency: Promise | undefined constructor ( private readonly hierarchy: Hierarchy, @@ -72,7 +71,6 @@ export class FullTextIndex implements WithFind { async close (): Promise { await this.indexer.cancel() - await this.consistency } async tx (ctx: MeasureContext, txes: Tx[]): Promise { diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index d47fdce9356..ab7ee7de7fa 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -52,6 +52,7 @@ import core, { type TxResult, type TxUpdateDoc, type WorkspaceIdWithUrl, + cutObjectArray, toFindResult } from '@hcengineering/core' import { type Metadata, getResource } from '@hcengineering/platform' @@ -393,7 +394,7 @@ export class TServerStorage implements ServerStorage { { clazz, query, options } ) if (Date.now() - st > 1000) { - await ctx.error('FindAll', { time: Date.now() - st, clazz, query, options }) + await ctx.error('FindAll', { time: Date.now() - st, clazz, query: cutObjectArray(query), options }) } return result } diff --git a/server/mongo/src/__tests__/storage.test.ts b/server/mongo/src/__tests__/storage.test.ts index be88df7f87f..0dbcb91812c 100644 --- a/server/mongo/src/__tests__/storage.test.ts +++ b/server/mongo/src/__tests__/storage.test.ts @@ -183,7 +183,8 @@ describe('mongo operations', () => { clean: async (domain: Domain, docs: Ref[]) => {}, loadModel: async () => txes, getAccount: async () => ({}) as any, - measure: async () => async () => ({ time: 0, serverTime: 0 }) + measure: async () => async () => ({ time: 0, serverTime: 0 }), + sendForceClose: async () => {} } return st }) diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index ef10ca7aa5c..dc570f563f6 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -117,7 +117,7 @@ export async function initModel ( migrateOperations: [string, MigrateOperation][], logger: ModelLogger = consoleModelLogger, progress: (value: number) => Promise -): Promise { +): Promise { const { mongodbUri, storageAdapter: minio, txes } = prepareTools(rawTxes) if (txes.some((tx) => tx.objectSpace !== core.space.Model)) { throw Error('Model txes must target only core.space.Model') @@ -125,7 +125,7 @@ export async function initModel ( const _client = getMongoClient(mongodbUri) const client = await _client.getClient() - let connection: CoreClient & BackupClient + let connection: (CoreClient & BackupClient) | undefined try { const db = getWorkspaceDB(client, workspaceId) @@ -156,12 +156,8 @@ export async function initModel ( )) as unknown as CoreClient & BackupClient const states = await connection.findAll(core.class.MigrationState, {}) - const migrateState = new Map( - Array.from(groupByArray(states, (it) => it.plugin).entries()).map((it) => [ - it[0], - new Set(it[1].map((q) => q.state)) - ]) - ) + const sts = Array.from(groupByArray(states, (it) => it.plugin).entries()) + const migrateState = new Map(sts.map((it) => [it[0], new Set(it[1].map((q) => q.state))])) ;(connection as any).migrateState = migrateState try { @@ -183,8 +179,9 @@ export async function initModel ( logger.error('error', { error: e }) throw e } - return connection } finally { + await connection?.sendForceClose() + await connection?.close() _client.close() } } @@ -201,7 +198,7 @@ export async function upgradeModel ( logger: ModelLogger = consoleModelLogger, skipTxUpdate: boolean = false, progress: (value: number) => Promise -): Promise { +): Promise { const { mongodbUri, txes } = prepareTools(rawTxes) if (txes.some((tx) => tx.objectSpace !== core.space.Model)) { @@ -266,7 +263,7 @@ export async function upgradeModel ( }) logger.log('Apply upgrade operations', { workspaceId: workspaceId.name }) - const connection = await ctx.with( + const connection = (await ctx.with( 'connect-platform', {}, async (ctx) => @@ -281,29 +278,33 @@ export async function upgradeModel ( }, model ) - ) - - await ctx.with('upgrade', {}, async () => { - let i = 0 - for (const op of migrateOperations) { - const t = Date.now() - ;(connection as any).migrateState = migrateState - await op[1].upgrade(connection as any, logger) - logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name }) - await progress(60 + ((100 / migrateOperations.length) * i * 40) / 100) - i++ - } - }) + )) as CoreClient & BackupClient + try { + await ctx.with('upgrade', {}, async () => { + let i = 0 + for (const op of migrateOperations) { + const t = Date.now() + ;(connection as any).migrateState = migrateState + await op[1].upgrade(connection as any, logger) + logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name }) + await progress(60 + ((100 / migrateOperations.length) * i * 40) / 100) + i++ + } + }) - if (!skipTxUpdate) { - // Create update indexes - await ctx.with('create-indexes', {}, async (ctx) => { - await createUpdateIndexes(ctx, connection, db, logger, async (value) => { - await progress(40 + (Math.min(value, 100) / 100) * 20) + if (!skipTxUpdate) { + // Create update indexes + await ctx.with('create-indexes', {}, async (ctx) => { + await createUpdateIndexes(ctx, connection, db, logger, async (value) => { + await progress(40 + (Math.min(value, 100) / 100) * 20) + }) }) - }) + } + } finally { + await connection.sendForceClose() + await connection.close() } - return connection + return model } finally { _client.close() } diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index 07e8f00541b..b78963a2115 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -14,14 +14,9 @@ // import core, { + type Account, AccountRole, type BulkUpdateEvent, - TxFactory, - TxProcessor, - type TxWorkspaceEvent, - WorkspaceEvent, - generateId, - type Account, type Class, type Doc, type DocumentQuery, @@ -38,7 +33,12 @@ import core, { type TxApplyIf, type TxApplyResult, type TxCUD, - type TxResult + TxFactory, + TxProcessor, + type TxResult, + type TxWorkspaceEvent, + WorkspaceEvent, + generateId } from '@hcengineering/core' import { type Pipeline, type SessionContext } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' @@ -71,6 +71,14 @@ export class ClientSession implements Session { return this.token.email } + isUpgradeClient (): boolean { + return this.token.extra?.model === 'upgrade' + } + + getMode (): string { + return this.token.extra?.mode ?? 'normal' + } + pipeline (): Pipeline { return this._pipeline } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 7732c153041..6134d341583 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -45,13 +45,14 @@ import { type Workspace } from './types' -interface WorkspaceLoginInfo extends BaseWorkspaceInfo { +interface WorkspaceLoginInfo extends Omit { upgrade?: { toProcess: number total: number elapsed: number eta: number } + workspaceId: string } function timeoutPromise (time: number): Promise { @@ -66,7 +67,6 @@ function timeoutPromise (time: number): Promise { export interface Timeouts { // Timeout preferences pingTimeout: number // Default 1 second - shutdownWarmTimeout: number // Default 1 minute reconnectTimeout: number // Default 3 seconds } @@ -145,8 +145,8 @@ class TSessionManager implements SessionManager { ticks = 0 handleInterval (): void { - for (const h of this.workspaces.entries()) { - for (const s of h[1].sessions) { + for (const [wsId, workspace] of this.workspaces.entries()) { + for (const s of workspace.sessions) { if (this.ticks % (5 * 60) === 0) { s[1].session.mins5.find = s[1].session.current.find s[1].session.mins5.tx = s[1].session.current.tx @@ -162,13 +162,15 @@ class TSessionManager implements SessionManager { } if (diff > timeout && this.ticks % 10 === 0) { - void this.ctx.error('session hang, closing...', { sessionId: h[0], user: s[1].session.getUser() }) - void this.close(s[1].socket, h[1].workspaceId, 1001, 'CLIENT_HANGOUT') + void this.ctx.error('session hang, closing...', { wsId, user: s[1].session.getUser() }) + + // Force close workspace if only one client and it hang. + void this.close(s[1].socket, workspace.workspaceId) continue } if (diff > 20000 && diff < 60000 && this.ticks % 10 === 0) { void s[1].socket.send( - h[1].context, + workspace.context, { result: 'ping' }, s[1].session.binaryResponseMode, s[1].session.useCompression @@ -178,13 +180,29 @@ class TSessionManager implements SessionManager { for (const r of s[1].session.requests.values()) { if (now - r.start > 30000) { void this.ctx.info('request hang found, 30sec', { - sessionId: h[0], + wsId, user: s[1].session.getUser(), ...r.params }) } } } + + // Wait some time for new client to appear before closing workspace. + if (workspace.sessions.size === 0 && workspace.closing === undefined) { + workspace.softShutdown-- + if (workspace.softShutdown <= 0) { + void this.ctx.info('closing workspace, no users', { + workspace: workspace.workspaceId.name, + wsId, + upgrade: workspace.upgrade, + backup: workspace.backup + }) + workspace.closing = this.performWorkspaceCloseCheck(workspace, workspace.workspaceId, wsId) + } + } else { + workspace.softShutdown = 3 + } } this.ticks++ } @@ -221,7 +239,7 @@ class TSessionManager implements SessionManager { sessionId: string | undefined, accountsUrl: string ): Promise< - | { session: Session, context: MeasureContext, workspaceName: string } + | { session: Session, context: MeasureContext, workspaceId: string } | { upgrade: true, upgradeInfo?: WorkspaceLoginInfo['upgrade'] } | { error: any } > { @@ -258,11 +276,9 @@ class TSessionManager implements SessionManager { } let workspace = this.workspaces.get(wsString) - if (workspace?.closeTimeout !== undefined) { - await ctx.info('Cancel workspace warm close', { wsString }) - clearTimeout(workspace?.closeTimeout) + if (workspace?.closing !== undefined) { + await workspace?.closing } - await workspace?.closing workspace = this.workspaces.get(wsString) if (sessionId !== undefined && workspace?.sessions?.has(sessionId) === true) { const helloResponse: HelloResponse = { @@ -275,14 +291,20 @@ class TSessionManager implements SessionManager { await ws.send(ctx, helloResponse, false, false) return { error: new Error('Session already exists') } } - const workspaceName = workspaceInfo.workspaceName ?? workspaceInfo.workspaceUrl ?? workspaceInfo.workspace + const workspaceName = workspaceInfo.workspaceName ?? workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId if (workspace === undefined) { + await ctx.info('open workspace', { + email: token.email, + workspace: workspaceInfo.workspaceId, + wsUrl: workspaceInfo.workspaceUrl, + ...token.extra + }) workspace = this.createWorkspace( baseCtx, pipelineFactory, token, - workspaceInfo.workspaceUrl ?? workspaceInfo.workspace, + workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId, workspaceName ) } @@ -290,9 +312,20 @@ class TSessionManager implements SessionManager { let pipeline: Pipeline if (token.extra?.model === 'upgrade') { if (workspace.upgrade) { + await ctx.info('reconnect workspace in upgrade', { + email: token.email, + workspace: workspaceInfo.workspaceId, + wsUrl: workspaceInfo.workspaceUrl + }) pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline) } else { - pipeline = await this.createUpgradeSession( + await ctx.info('reconnect workspace in upgrade switch', { + email: token.email, + workspace: workspaceInfo.workspaceId, + wsUrl: workspaceInfo.workspaceUrl + }) + // We need to wait in case previous upgeade connection is already closing. + pipeline = await this.switchToUpgradeSession( token, sessionId, ctx, @@ -300,7 +333,7 @@ class TSessionManager implements SessionManager { workspace, pipelineFactory, ws, - workspaceInfo.workspaceUrl ?? workspaceInfo.workspace, + workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId, workspaceName ) } @@ -336,13 +369,13 @@ class TSessionManager implements SessionManager { session.useCompression ) } - return { session, context: workspace.context, workspaceName } + return { session, context: workspace.context, workspaceId: workspaceInfo.workspaceId } }) } private wsFromToken (token: Token): WorkspaceLoginInfo { return { - workspace: token.workspace.name, + workspaceId: token.workspace.name, workspaceUrl: token.workspace.name, workspaceName: token.workspace.name, createdBy: '', @@ -355,7 +388,7 @@ class TSessionManager implements SessionManager { } } - private async createUpgradeSession ( + private async switchToUpgradeSession ( token: Token, sessionId: string | undefined, ctx: MeasureContext, @@ -369,16 +402,20 @@ class TSessionManager implements SessionManager { if (LOGGING_ENABLED) { await ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) }) } + + // Mark as upgrade, to prevent any new clients to connect during close + workspace.upgrade = true + workspace.backup = token.extra?.mode === 'backup' // If upgrade client is used. // Drop all existing clients - await this.closeAll(wsString, workspace, 0, 'upgrade') + workspace.closing = this.closeAll(wsString, workspace, 0, 'upgrade') + await workspace.closing // Wipe workspace and update values. workspace.workspaceName = workspaceName if (!workspace.upgrade) { // This is previous workspace, intended to be closed. workspace.id = generateId() workspace.sessions = new Map() - workspace.upgrade = token.extra?.model === 'upgrade' } // Re-create pipeline. workspace.pipeline = pipelineFactory( @@ -432,6 +469,7 @@ class TSessionManager implements SessionManager { workspaceName: string ): Workspace { const upgrade = token.extra?.model === 'upgrade' + const backup = token.extra?.mode === 'backup' const context = ctx.newChild('🧲 session', {}) const pipelineCtx = context.newChild('🧲 pipeline-factory', {}) const workspace: Workspace = { @@ -446,7 +484,9 @@ class TSessionManager implements SessionManager { } ), sessions: new Map(), + softShutdown: 3, upgrade, + backup, workspaceId: token.workspace, workspaceName } @@ -494,7 +534,7 @@ class TSessionManager implements SessionManager { } catch {} } - async close (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string): Promise { + async close (ws: ConnectionSocket, workspaceId: WorkspaceId): Promise { const wsid = toWorkspaceString(workspaceId) const workspace = this.workspaces.get(wsid) if (workspace === undefined) { @@ -516,25 +556,29 @@ class TSessionManager implements SessionManager { } const user = sessionRef.session.getUser() const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user) - if (another === -1) { + if (another === -1 && !workspace.upgrade) { await this.trySetStatus(workspace.context, sessionRef.session, false) } - if (!workspace.upgrade) { - // Wait some time for new client to appear before closing workspace. - if (workspace.sessions.size === 0) { - clearTimeout(workspace.closeTimeout) - void this.ctx.info('schedule warm closing', { workspace: workspace.workspaceName, wsid }) - workspace.closeTimeout = setTimeout(() => { - void this.performWorkspaceCloseCheck(workspace, workspaceId, wsid) - }, this.timeouts.shutdownWarmTimeout) - } - } else { - await this.performWorkspaceCloseCheck(workspace, workspaceId, wsid) - } } } - async closeAll (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown'): Promise { + async forceClose (wsId: string, ignoreSocket?: ConnectionSocket): Promise { + const ws = this.workspaces.get(wsId) + if (ws !== undefined) { + ws.upgrade = true // We need to similare upgrade to refresh all clients. + ws.closing = this.closeAll(wsId, ws, 99, 'force-close', ignoreSocket) + await ws.closing + this.workspaces.delete(wsId) + } + } + + async closeAll ( + wsId: string, + workspace: Workspace, + code: number, + reason: 'upgrade' | 'shutdown' | 'force-close', + ignoreSocket?: ConnectionSocket + ): Promise { if (LOGGING_ENABLED) { await this.ctx.info('closing workspace', { workspace: workspace.id, @@ -550,12 +594,11 @@ class TSessionManager implements SessionManager { const closeS = async (s: Session, webSocket: ConnectionSocket): Promise => { s.workspaceClosed = true - if (reason === 'upgrade') { + if (reason === 'upgrade' || reason === 'force-close') { // Override message handler, to wait for upgrading response from clients. await this.sendUpgrade(workspace.context, webSocket, s.binaryResponseMode) } webSocket.close() - await this.trySetStatus(workspace.context, s, false) } if (LOGGING_ENABLED) { @@ -565,7 +608,9 @@ class TSessionManager implements SessionManager { wsName: workspace.workspaceName }) } - await Promise.all(sessions.map((s) => closeS(s[1].session, s[1].socket))) + await Promise.all( + sessions.filter((it) => it[1].socket.id !== ignoreSocket?.id).map((s) => closeS(s[1].session, s[1].socket)) + ) const closePipeline = async (): Promise => { try { @@ -578,7 +623,7 @@ class TSessionManager implements SessionManager { } } await this.ctx.with('closing', {}, async () => { - await Promise.race([closePipeline(), timeoutPromise(15000)]) + await Promise.race([closePipeline(), timeoutPromise(120000)]) }) if (LOGGING_ENABLED) { await this.ctx.info('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName }) @@ -612,40 +657,40 @@ class TSessionManager implements SessionManager { workspaceId: WorkspaceId, wsid: string ): Promise { + const wsUID = workspace.id + const logParams = { wsid, workspace: workspace.id, wsName: workspaceId.name } if (workspace.sessions.size === 0) { - const wsUID = workspace.id - const logParams = { wsid, workspace: workspace.id, wsName: workspaceId.name } if (LOGGING_ENABLED) { await this.ctx.info('no sessions for workspace', logParams) } + try { + if (workspace.sessions.size === 0) { + const pl = await workspace.pipeline + await Promise.race([pl, timeoutPromise(60000)]) + await Promise.race([pl.close(), timeoutPromise(60000)]) - if (workspace.closing === undefined) { - const waitAndClose = async (workspace: Workspace): Promise => { - try { - if (workspace.sessions.size === 0) { - const pl = await workspace.pipeline - await Promise.race([pl, timeoutPromise(60000)]) - await Promise.race([pl.close(), timeoutPromise(60000)]) - - if (this.workspaces.get(wsid)?.id === wsUID) { - this.workspaces.delete(wsid) - } - workspace.context.end() - if (LOGGING_ENABLED) { - await this.ctx.info('Closed workspace', logParams) - } - } - } catch (err: any) { - Analytics.handleError(err) + if (this.workspaces.get(wsid)?.id === wsUID) { this.workspaces.delete(wsid) - if (LOGGING_ENABLED) { - await this.ctx.error('failed', { ...logParams, error: err }) - } + } + workspace.context.end() + if (LOGGING_ENABLED) { + await this.ctx.info('Closed workspace', logParams) } } - workspace.closing = waitAndClose(workspace) + } catch (err: any) { + Analytics.handleError(err) + this.workspaces.delete(wsid) + if (LOGGING_ENABLED) { + await this.ctx.error('failed', { ...logParams, error: err }) + } + } + } else { + if (LOGGING_ENABLED) { + await this.ctx.info('few sessions for workspace, close skipped', { + ...logParams, + sessions: workspace.sessions.size + }) } - await workspace.closing } } @@ -720,13 +765,22 @@ class TSessionManager implements SessionManager { const backupMode = 'loadChunk' in service await userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { const request = await ctx.with('📥 read', {}, async () => readRequest(msg, false)) - if (request.id === -1 && request.method === 'close') { + if (request.method === 'forceClose') { const wsRef = this.workspaces.get(workspace) + let done = false if (wsRef !== undefined) { - await this.close(ws, wsRef?.workspaceId, 1000, 'client request to close workspace') - } else { - ws.close() + if (wsRef.upgrade) { + done = true + console.log('FORCE CLOSE', workspace) + // In case of upgrade, we need to force close workspace not in interval handler + await this.forceClose(workspace, ws) + } + } + const forceCloseResponse: Response = { + id: request.id, + result: done } + await ws.send(ctx, forceCloseResponse, service.binaryResponseMode, service.useCompression) return } if (request.id === -1 && request.method === 'hello') { @@ -737,6 +791,7 @@ class TSessionManager implements SessionManager { if (LOGGING_ENABLED) { await ctx.info('hello happen', { + workspace, user: service.getUser(), binary: service.binaryResponseMode, compression: service.useCompression, @@ -894,9 +949,8 @@ export function start ( } & Partial ): () => Promise { const sessions = new TSessionManager(ctx, opt.sessionFactory, { - pingTimeout: opt.pingTimeout ?? 1000, - shutdownWarmTimeout: opt.shutdownWarmTimeout ?? 60 * 1000, - reconnectTimeout: 3000 + pingTimeout: opt.pingTimeout ?? 10000, + reconnectTimeout: 500 }) return opt.serverFactory( sessions, diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index b6775dfbd6a..e947eb6fa9d 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -127,6 +127,13 @@ export function startHttpServer ( res.end() return } + case 'force-close': { + const wsId = req.query.wsId as string + void sessions.forceClose(wsId) + res.writeHead(200) + res.end() + return + } case 'reboot': { process.exit(0) } @@ -227,11 +234,7 @@ export function startHttpServer ( void ctx.error('error', { error: session.error?.message, stack: session.error?.stack }) } await cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (session as any).upgradeInfo } }, false, false) - - // Wait 1 second before closing the connection - setTimeout(() => { - cs.close() - }, 10000) + cs.close() return } // eslint-disable-next-line @typescript-eslint/no-misused-promises @@ -244,7 +247,7 @@ export function startHttpServer ( buff = Buffer.concat(msg).toString() } if (buff !== undefined) { - void handleRequest(session.context, session.session, cs, buff, session.workspaceName) + void handleRequest(session.context, session.session, cs, buff, session.workspaceId) } } catch (err: any) { Analytics.handleError(err) @@ -259,12 +262,12 @@ export function startHttpServer ( return } // remove session after 1seconds, give a time to reconnect. - void sessions.close(cs, token.workspace, code, reason.toString()) + void sessions.close(cs, token.workspace) }) const b = buffer buffer = undefined for (const msg of b) { - await handleRequest(session.context, session.session, cs, msg, session.workspaceName) + await handleRequest(session.context, session.session, cs, msg, session.workspaceId) } } wss.on('connection', handleConnection as any) diff --git a/server/ws/src/stats.ts b/server/ws/src/stats.ts index 013e13adff6..abda15364bb 100644 --- a/server/ws/src/stats.ts +++ b/server/ws/src/stats.ts @@ -3,7 +3,8 @@ import { MeasureMetricsContext, type Metrics, metricsAggregate, - type MetricsData + type MetricsData, + toWorkspaceString } from '@hcengineering/core' import os from 'os' import { type SessionManager } from './types' @@ -20,14 +21,22 @@ export function getStatistics (ctx: MeasureContext, sessions: SessionManager, ad } data.statistics.totalClients = sessions.sessions.size if (admin) { - for (const [k, v] of sessions.workspaces) { - data.statistics.activeSessions[k] = Array.from(v.sessions.entries()).map(([k, v]) => ({ - userId: v.session.getUser(), - data: v.socket.data(), - mins5: v.session.mins5, - total: v.session.total, - current: v.session.current - })) + for (const [k, vv] of sessions.workspaces) { + data.statistics.activeSessions[k] = { + sessions: Array.from(vv.sessions.entries()).map(([k, v]) => ({ + userId: v.session.getUser(), + data: v.socket.data(), + mins5: v.session.mins5, + total: v.session.total, + current: v.session.current, + upgrade: v.session.isUpgradeClient() + })), + name: vv.workspaceName, + wsId: toWorkspaceString(vv.workspaceId), + sessionsTotal: vv.sessions.size, + upgrading: vv.upgrade, + closing: vv.closing !== undefined + } } } diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index ef78c888791..31c5c071f8b 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -1,5 +1,4 @@ import { - type WorkspaceIdWithUrl, type Class, type Doc, type DocumentQuery, @@ -9,7 +8,8 @@ import { type Ref, type Tx, type TxResult, - type WorkspaceId + type WorkspaceId, + type WorkspaceIdWithUrl } from '@hcengineering/core' import { type Response } from '@hcengineering/rpc' import { type BroadcastFunc, type Pipeline } from '@hcengineering/server-core' @@ -65,6 +65,10 @@ export interface Session { measureCtx?: { ctx: MeasureContext, time: number } lastRequest: number + + isUpgradeClient: () => boolean + + getMode: () => string } /** @@ -118,9 +122,10 @@ export interface Workspace { pipeline: Promise sessions: Map upgrade: boolean + backup: boolean closing?: Promise - closeTimeout?: any + softShutdown: number workspaceId: WorkspaceId workspaceName: string @@ -144,15 +149,21 @@ export interface SessionManager { productId: string, sessionId: string | undefined, accountsUrl: string - ) => Promise< - { session: Session, context: MeasureContext, workspaceName: string } | { upgrade: true } | { error: any } - > + ) => Promise<{ session: Session, context: MeasureContext, workspaceId: string } | { upgrade: true } | { error: any }> broadcastAll: (workspace: Workspace, tx: Tx[], targets?: string[]) => void - close: (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string) => Promise + close: (ws: ConnectionSocket, workspaceId: WorkspaceId) => Promise + + closeAll: ( + wsId: string, + workspace: Workspace, + code: number, + reason: 'upgrade' | 'shutdown', + ignoreSocket?: ConnectionSocket + ) => Promise - closeAll: (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown') => Promise + forceClose: (wsId: string, ignoreSocket?: ConnectionSocket) => Promise closeWorkspaces: (ctx: MeasureContext) => Promise @@ -169,7 +180,7 @@ export type HandleRequestFunction = ( service: S, ws: ConnectionSocket, msg: Buffer, - workspace: string + workspaceId: string ) => Promise /**