From 375fbe50569615aa308dbc5c63796d53b0b444c2 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Mon, 22 Apr 2024 10:04:56 +0700 Subject: [PATCH] UBERF-6653: Fix minor issue and add force-close Signed-off-by: Andrey Sobolev --- .../src/components/ServerManager.svelte | 64 ++++++++++++++----- server/core/src/fulltext.ts | 6 +- server/ws/src/client.ts | 22 +++++-- server/ws/src/server.ts | 61 ++++++++++++------ server/ws/src/server_http.ts | 9 ++- server/ws/src/stats.ts | 27 +++++--- server/ws/src/types.ts | 10 ++- 7 files changed, 141 insertions(+), 58 deletions(-) diff --git a/plugins/workbench-resources/src/components/ServerManager.svelte b/plugins/workbench-resources/src/components/ServerManager.svelte index 149fced809a..d173da7d699 100644 --- a/plugins/workbench-resources/src/components/ServerManager.svelte +++ b/plugins/workbench-resources/src/components/ServerManager.svelte @@ -3,7 +3,7 @@ import { Metrics, systemAccountEmail } from '@hcengineering/core' import login from '@hcengineering/login' import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform' - import presentation, { createQuery } from '@hcengineering/presentation' + import presentation, { createQuery, isAdminUser } from '@hcengineering/presentation' import { Button, CheckBox, @@ -88,13 +88,20 @@ $: activeSessions = (data?.statistics?.activeSessions as Record< string, - Array<{ - userId: string - data?: Record - total: StatisticsElement - mins5: StatisticsElement - current: StatisticsElement - }> + { + sessions: Array<{ + userId: string + data?: Record + total: StatisticsElement + mins5: StatisticsElement + current: StatisticsElement + }> + name: string + wsId: string + sessionsTotal: number + upgrading: boolean + closing: boolean + } >) ?? {} const employeeQuery = createQuery() @@ -116,8 +123,8 @@ $: totalStats = Array.from(Object.entries(activeSessions).values()).reduce( (cur, it) => { - const totalFind = it[1].reduce((it, itm) => itm.current.find + it, 0) - const totalTx = it[1].reduce((it, itm) => itm.current.tx + it, 0) + const totalFind = it[1].sessions.reduce((it, itm) => itm.current.find + it, 0) + const totalTx = it[1].sessions.reduce((it, itm) => itm.current.tx + it, 0) return { find: cur.find + totalFind, tx: cur.tx + totalTx @@ -197,26 +204,49 @@
{#each Object.entries(activeSessions) as act} {@const wsInstance = $workspacesStore.find((it) => it.workspaceId === act[0])} - {@const totalFind = act[1].reduce((it, itm) => itm.current.find + it, 0)} - {@const totalTx = act[1].reduce((it, itm) => itm.current.tx + it, 0)} - {@const employeeGroups = Array.from(new Set(act[1].map((it) => it.userId))).filter( + {@const totalFind = act[1].sessions.reduce((it, itm) => itm.current.find + it, 0)} + {@const totalTx = act[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)} + {@const employeeGroups = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter( (it) => systemAccountEmail !== it || !realUsers )} - {@const realGroup = Array.from(new Set(act[1].map((it) => it.userId))).filter( + {@const realGroup = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter( (it) => systemAccountEmail !== it )} {#if employeeGroups.length > 0} -
- Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx} +
+
+ Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx} + {#if act[1].upgrading} + (Upgrading) + {/if} + {#if act[1].closing} + (Closing) + {/if} +
+ {#if isAdminUser()} +
{#each employeeGroups as employeeId} {@const employee = employees.get(employeeId)} - {@const connections = act[1].filter((it) => it.userId === employeeId)} + {@const connections = act[1].sessions.filter((it) => it.userId === employeeId)} {@const find = connections.reduce((it, itm) => itm.current.find + it, 0)} {@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)} diff --git a/server/core/src/fulltext.ts b/server/core/src/fulltext.ts index 89ffac58178..f93bdda2af1 100644 --- a/server/core/src/fulltext.ts +++ b/server/core/src/fulltext.ts @@ -37,10 +37,10 @@ import core, { type TxResult, type WorkspaceId, docKey, + isClassIndexable, isFullTextAttribute, isIndexedAttribute, - toFindResult, - isClassIndexable + toFindResult } from '@hcengineering/core' import { type FullTextIndexPipeline } from './indexer' import { createStateDoc } from './indexer/utils' @@ -53,7 +53,6 @@ import type { FullTextAdapter, IndexedDoc, WithFind } from './types' */ export class FullTextIndex implements WithFind { txFactory = new TxFactory(core.account.System, true) - consistency: Promise | undefined constructor ( private readonly hierarchy: Hierarchy, @@ -72,7 +71,6 @@ export class FullTextIndex implements WithFind { async close (): Promise { await this.indexer.cancel() - await this.consistency } async tx (ctx: MeasureContext, txes: Tx[]): Promise { diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index 07e8f00541b..b78963a2115 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -14,14 +14,9 @@ // import core, { + type Account, AccountRole, type BulkUpdateEvent, - TxFactory, - TxProcessor, - type TxWorkspaceEvent, - WorkspaceEvent, - generateId, - type Account, type Class, type Doc, type DocumentQuery, @@ -38,7 +33,12 @@ import core, { type TxApplyIf, type TxApplyResult, type TxCUD, - type TxResult + TxFactory, + TxProcessor, + type TxResult, + type TxWorkspaceEvent, + WorkspaceEvent, + generateId } from '@hcengineering/core' import { type Pipeline, type SessionContext } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' @@ -71,6 +71,14 @@ export class ClientSession implements Session { return this.token.email } + isUpgradeClient (): boolean { + return this.token.extra?.model === 'upgrade' + } + + getMode (): string { + return this.token.extra?.mode ?? 'normal' + } + pipeline (): Pipeline { return this._pipeline } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 7732c153041..061cf20520b 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -289,9 +289,10 @@ class TSessionManager implements SessionManager { let pipeline: Pipeline if (token.extra?.model === 'upgrade') { - if (workspace.upgrade) { + if (workspace.upgrade && workspace.closing === undefined) { pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline) } else { + // We need to wait in case previous upgeade connection is already closing. pipeline = await this.createUpgradeSession( token, sessionId, @@ -369,6 +370,9 @@ class TSessionManager implements SessionManager { if (LOGGING_ENABLED) { await ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) }) } + + // Mark as upgrade, to prevent any new clients to connect during close + workspace.upgrade = true // If upgrade client is used. // Drop all existing clients await this.closeAll(wsString, workspace, 0, 'upgrade') @@ -378,7 +382,6 @@ class TSessionManager implements SessionManager { // This is previous workspace, intended to be closed. workspace.id = generateId() workspace.sessions = new Map() - workspace.upgrade = token.extra?.model === 'upgrade' } // Re-create pipeline. workspace.pipeline = pipelineFactory( @@ -494,7 +497,13 @@ class TSessionManager implements SessionManager { } catch {} } - async close (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string): Promise { + async close ( + ws: ConnectionSocket, + workspaceId: WorkspaceId, + code: number, + reason: string, + supportReconnect: boolean = true + ): Promise { const wsid = toWorkspaceString(workspaceId) const workspace = this.workspaces.get(wsid) if (workspace === undefined) { @@ -504,11 +513,13 @@ class TSessionManager implements SessionManager { if (sessionRef !== undefined) { this.sessions.delete(ws.id) workspace.sessions.delete(sessionRef.session.sessionId) - this.reconnectIds.add(sessionRef.session.sessionId) + if (supportReconnect) { + this.reconnectIds.add(sessionRef.session.sessionId) - setTimeout(() => { - this.reconnectIds.delete(sessionRef.session.sessionId) - }, this.timeouts.reconnectTimeout) + setTimeout(() => { + this.reconnectIds.delete(sessionRef.session.sessionId) + }, this.timeouts.reconnectTimeout) + } try { sessionRef.socket.close() } catch (err) { @@ -519,21 +530,35 @@ class TSessionManager implements SessionManager { if (another === -1) { await this.trySetStatus(workspace.context, sessionRef.session, false) } - if (!workspace.upgrade) { - // Wait some time for new client to appear before closing workspace. - if (workspace.sessions.size === 0) { - clearTimeout(workspace.closeTimeout) + // Wait some time for new client to appear before closing workspace. + if (workspace.sessions.size === 0 && workspace.closing === undefined) { + clearTimeout(workspace.closeTimeout) + + if (!supportReconnect && workspace.upgrade) { + // We recieve a proper close from upgrade client + await this.performWorkspaceCloseCheck(workspace, workspaceId, wsid) + } else { void this.ctx.info('schedule warm closing', { workspace: workspace.workspaceName, wsid }) - workspace.closeTimeout = setTimeout(() => { - void this.performWorkspaceCloseCheck(workspace, workspaceId, wsid) - }, this.timeouts.shutdownWarmTimeout) + workspace.closeTimeout = setTimeout( + () => { + void this.performWorkspaceCloseCheck(workspace, workspaceId, wsid) + }, + workspace.upgrade ? this.timeouts.reconnectTimeout : this.timeouts.shutdownWarmTimeout + ) } - } else { - await this.performWorkspaceCloseCheck(workspace, workspaceId, wsid) } } } + async forceClose (wsId: string): Promise { + const ws = this.workspaces.get(wsId) + if (ws !== undefined) { + ws.upgrade = true // We need to similare upgrade to refresh all clients. + await this.closeAll(wsId, ws, 99, 'upgrade') + this.workspaces.delete(wsId) + } + } + async closeAll (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown'): Promise { if (LOGGING_ENABLED) { await this.ctx.info('closing workspace', { @@ -578,7 +603,7 @@ class TSessionManager implements SessionManager { } } await this.ctx.with('closing', {}, async () => { - await Promise.race([closePipeline(), timeoutPromise(15000)]) + await Promise.race([closePipeline(), timeoutPromise(120000)]) }) if (LOGGING_ENABLED) { await this.ctx.info('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName }) @@ -723,7 +748,7 @@ class TSessionManager implements SessionManager { if (request.id === -1 && request.method === 'close') { const wsRef = this.workspaces.get(workspace) if (wsRef !== undefined) { - await this.close(ws, wsRef?.workspaceId, 1000, 'client request to close workspace') + await this.close(ws, wsRef?.workspaceId, 1000, 'client request to close workspace', false) } else { ws.close() } diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index b6775dfbd6a..0f15e4a9b8f 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -127,6 +127,13 @@ export function startHttpServer ( res.end() return } + case 'force-close': { + const wsId = req.query.wsId as string + void sessions.forceClose(wsId) + res.writeHead(200) + res.end() + return + } case 'reboot': { process.exit(0) } @@ -228,7 +235,7 @@ export function startHttpServer ( } await cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (session as any).upgradeInfo } }, false, false) - // Wait 1 second before closing the connection + // Wait 10 seconds before closing the connection setTimeout(() => { cs.close() }, 10000) diff --git a/server/ws/src/stats.ts b/server/ws/src/stats.ts index 013e13adff6..abda15364bb 100644 --- a/server/ws/src/stats.ts +++ b/server/ws/src/stats.ts @@ -3,7 +3,8 @@ import { MeasureMetricsContext, type Metrics, metricsAggregate, - type MetricsData + type MetricsData, + toWorkspaceString } from '@hcengineering/core' import os from 'os' import { type SessionManager } from './types' @@ -20,14 +21,22 @@ export function getStatistics (ctx: MeasureContext, sessions: SessionManager, ad } data.statistics.totalClients = sessions.sessions.size if (admin) { - for (const [k, v] of sessions.workspaces) { - data.statistics.activeSessions[k] = Array.from(v.sessions.entries()).map(([k, v]) => ({ - userId: v.session.getUser(), - data: v.socket.data(), - mins5: v.session.mins5, - total: v.session.total, - current: v.session.current - })) + for (const [k, vv] of sessions.workspaces) { + data.statistics.activeSessions[k] = { + sessions: Array.from(vv.sessions.entries()).map(([k, v]) => ({ + userId: v.session.getUser(), + data: v.socket.data(), + mins5: v.session.mins5, + total: v.session.total, + current: v.session.current, + upgrade: v.session.isUpgradeClient() + })), + name: vv.workspaceName, + wsId: toWorkspaceString(vv.workspaceId), + sessionsTotal: vv.sessions.size, + upgrading: vv.upgrade, + closing: vv.closing !== undefined + } } } diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index ef78c888791..4fec96ce16c 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -1,5 +1,4 @@ import { - type WorkspaceIdWithUrl, type Class, type Doc, type DocumentQuery, @@ -9,7 +8,8 @@ import { type Ref, type Tx, type TxResult, - type WorkspaceId + type WorkspaceId, + type WorkspaceIdWithUrl } from '@hcengineering/core' import { type Response } from '@hcengineering/rpc' import { type BroadcastFunc, type Pipeline } from '@hcengineering/server-core' @@ -65,6 +65,10 @@ export interface Session { measureCtx?: { ctx: MeasureContext, time: number } lastRequest: number + + isUpgradeClient: () => boolean + + getMode: () => string } /** @@ -154,6 +158,8 @@ export interface SessionManager { closeAll: (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown') => Promise + forceClose: (wsId: string) => Promise + closeWorkspaces: (ctx: MeasureContext) => Promise broadcast: (from: Session | null, workspaceId: WorkspaceId, resp: Response, target?: string[]) => void