Skip to content

Commit

Permalink
UBERF-6653: Fix minor issue and add force-close
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
  • Loading branch information
haiodo committed Apr 22, 2024
1 parent 207b8f9 commit 375fbe5
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 58 deletions.
64 changes: 47 additions & 17 deletions plugins/workbench-resources/src/components/ServerManager.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import { Metrics, systemAccountEmail } from '@hcengineering/core'
import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation, { createQuery } from '@hcengineering/presentation'
import presentation, { createQuery, isAdminUser } from '@hcengineering/presentation'
import {
Button,
CheckBox,
Expand Down Expand Up @@ -88,13 +88,20 @@
$: activeSessions =
(data?.statistics?.activeSessions as Record<
string,
Array<{
userId: string
data?: Record<string, any>
total: StatisticsElement
mins5: StatisticsElement
current: StatisticsElement
}>
{
sessions: Array<{
userId: string
data?: Record<string, any>
total: StatisticsElement
mins5: StatisticsElement
current: StatisticsElement
}>
name: string
wsId: string
sessionsTotal: number
upgrading: boolean
closing: boolean
}
>) ?? {}
const employeeQuery = createQuery()
Expand All @@ -116,8 +123,8 @@
$: totalStats = Array.from(Object.entries(activeSessions).values()).reduce(
(cur, it) => {
const totalFind = it[1].reduce((it, itm) => itm.current.find + it, 0)
const totalTx = it[1].reduce((it, itm) => itm.current.tx + it, 0)
const totalFind = it[1].sessions.reduce((it, itm) => itm.current.find + it, 0)
const totalTx = it[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)
return {
find: cur.find + totalFind,
tx: cur.tx + totalTx
Expand Down Expand Up @@ -197,26 +204,49 @@
<div class="flex-column p-3 h-full" style:overflow="auto">
{#each Object.entries(activeSessions) as act}
{@const wsInstance = $workspacesStore.find((it) => it.workspaceId === act[0])}
{@const totalFind = act[1].reduce((it, itm) => itm.current.find + it, 0)}
{@const totalTx = act[1].reduce((it, itm) => itm.current.tx + it, 0)}
{@const employeeGroups = Array.from(new Set(act[1].map((it) => it.userId))).filter(
{@const totalFind = act[1].sessions.reduce((it, itm) => itm.current.find + it, 0)}
{@const totalTx = act[1].sessions.reduce((it, itm) => itm.current.tx + it, 0)}
{@const employeeGroups = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter(
(it) => systemAccountEmail !== it || !realUsers
)}
{@const realGroup = Array.from(new Set(act[1].map((it) => it.userId))).filter(
{@const realGroup = Array.from(new Set(act[1].sessions.map((it) => it.userId))).filter(
(it) => systemAccountEmail !== it
)}
{#if employeeGroups.length > 0}
<span class="flex-col">
<Expandable contentColor expanded={false} expandable={true} bordered>
<svelte:fragment slot="title">
<div class="fs-title" class:greyed={realGroup.length === 0}>
Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx}
<div class="flex flex-row-center flex-between flex-grow p-1">
<div class="fs-title" class:greyed={realGroup.length === 0}>
Workspace: {wsInstance?.workspaceName ?? act[0]}: {employeeGroups.length} current 5 mins => {totalFind}/{totalTx}
{#if act[1].upgrading}
(Upgrading)
{/if}
{#if act[1].closing}
(Closing)
{/if}
</div>
{#if isAdminUser()}
<Button
label={getEmbeddedLabel('Force close')}
size={'small'}
kind={'ghost'}
on:click={() => {
void fetch(
endpoint + `/api/v1/manage?token=${token}&operation=force-close&wsId=${act[1].wsId}`,
{
method: 'PUT'
}
)
}}
/>
{/if}
</div>
</svelte:fragment>
<div class="flex-col">
{#each employeeGroups as employeeId}
{@const employee = employees.get(employeeId)}
{@const connections = act[1].filter((it) => it.userId === employeeId)}
{@const connections = act[1].sessions.filter((it) => it.userId === employeeId)}

{@const find = connections.reduce((it, itm) => itm.current.find + it, 0)}
{@const txes = connections.reduce((it, itm) => itm.current.tx + it, 0)}
Expand Down
6 changes: 2 additions & 4 deletions server/core/src/fulltext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import core, {
type TxResult,
type WorkspaceId,
docKey,
isClassIndexable,
isFullTextAttribute,
isIndexedAttribute,
toFindResult,
isClassIndexable
toFindResult
} from '@hcengineering/core'
import { type FullTextIndexPipeline } from './indexer'
import { createStateDoc } from './indexer/utils'
Expand All @@ -53,7 +53,6 @@ import type { FullTextAdapter, IndexedDoc, WithFind } from './types'
*/
export class FullTextIndex implements WithFind {
txFactory = new TxFactory(core.account.System, true)
consistency: Promise<void> | undefined

constructor (
private readonly hierarchy: Hierarchy,
Expand All @@ -72,7 +71,6 @@ export class FullTextIndex implements WithFind {

async close (): Promise<void> {
await this.indexer.cancel()
await this.consistency
}

async tx (ctx: MeasureContext, txes: Tx[]): Promise<TxResult> {
Expand Down
22 changes: 15 additions & 7 deletions server/ws/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,9 @@
//

import core, {
type Account,
AccountRole,
type BulkUpdateEvent,
TxFactory,
TxProcessor,
type TxWorkspaceEvent,
WorkspaceEvent,
generateId,
type Account,
type Class,
type Doc,
type DocumentQuery,
Expand All @@ -38,7 +33,12 @@ import core, {
type TxApplyIf,
type TxApplyResult,
type TxCUD,
type TxResult
TxFactory,
TxProcessor,
type TxResult,
type TxWorkspaceEvent,
WorkspaceEvent,
generateId
} from '@hcengineering/core'
import { type Pipeline, type SessionContext } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
Expand Down Expand Up @@ -71,6 +71,14 @@ export class ClientSession implements Session {
return this.token.email
}

isUpgradeClient (): boolean {
return this.token.extra?.model === 'upgrade'
}

getMode (): string {
return this.token.extra?.mode ?? 'normal'
}

pipeline (): Pipeline {
return this._pipeline
}
Expand Down
61 changes: 43 additions & 18 deletions server/ws/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,10 @@ class TSessionManager implements SessionManager {

let pipeline: Pipeline
if (token.extra?.model === 'upgrade') {
if (workspace.upgrade) {
if (workspace.upgrade && workspace.closing === undefined) {
pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline)
} else {
// We need to wait in case previous upgeade connection is already closing.
pipeline = await this.createUpgradeSession(
token,
sessionId,
Expand Down Expand Up @@ -369,6 +370,9 @@ class TSessionManager implements SessionManager {
if (LOGGING_ENABLED) {
await ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) })
}

// Mark as upgrade, to prevent any new clients to connect during close
workspace.upgrade = true
// If upgrade client is used.
// Drop all existing clients
await this.closeAll(wsString, workspace, 0, 'upgrade')
Expand All @@ -378,7 +382,6 @@ class TSessionManager implements SessionManager {
// This is previous workspace, intended to be closed.
workspace.id = generateId()
workspace.sessions = new Map()
workspace.upgrade = token.extra?.model === 'upgrade'
}
// Re-create pipeline.
workspace.pipeline = pipelineFactory(
Expand Down Expand Up @@ -494,7 +497,13 @@ class TSessionManager implements SessionManager {
} catch {}
}

async close (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string): Promise<void> {
async close (
ws: ConnectionSocket,
workspaceId: WorkspaceId,
code: number,
reason: string,
supportReconnect: boolean = true
): Promise<void> {
const wsid = toWorkspaceString(workspaceId)
const workspace = this.workspaces.get(wsid)
if (workspace === undefined) {
Expand All @@ -504,11 +513,13 @@ class TSessionManager implements SessionManager {
if (sessionRef !== undefined) {
this.sessions.delete(ws.id)
workspace.sessions.delete(sessionRef.session.sessionId)
this.reconnectIds.add(sessionRef.session.sessionId)
if (supportReconnect) {
this.reconnectIds.add(sessionRef.session.sessionId)

setTimeout(() => {
this.reconnectIds.delete(sessionRef.session.sessionId)
}, this.timeouts.reconnectTimeout)
setTimeout(() => {
this.reconnectIds.delete(sessionRef.session.sessionId)
}, this.timeouts.reconnectTimeout)
}
try {
sessionRef.socket.close()
} catch (err) {
Expand All @@ -519,21 +530,35 @@ class TSessionManager implements SessionManager {
if (another === -1) {
await this.trySetStatus(workspace.context, sessionRef.session, false)
}
if (!workspace.upgrade) {
// Wait some time for new client to appear before closing workspace.
if (workspace.sessions.size === 0) {
clearTimeout(workspace.closeTimeout)
// Wait some time for new client to appear before closing workspace.
if (workspace.sessions.size === 0 && workspace.closing === undefined) {
clearTimeout(workspace.closeTimeout)

if (!supportReconnect && workspace.upgrade) {
// We recieve a proper close from upgrade client
await this.performWorkspaceCloseCheck(workspace, workspaceId, wsid)
} else {
void this.ctx.info('schedule warm closing', { workspace: workspace.workspaceName, wsid })
workspace.closeTimeout = setTimeout(() => {
void this.performWorkspaceCloseCheck(workspace, workspaceId, wsid)
}, this.timeouts.shutdownWarmTimeout)
workspace.closeTimeout = setTimeout(
() => {
void this.performWorkspaceCloseCheck(workspace, workspaceId, wsid)
},
workspace.upgrade ? this.timeouts.reconnectTimeout : this.timeouts.shutdownWarmTimeout
)
}
} else {
await this.performWorkspaceCloseCheck(workspace, workspaceId, wsid)
}
}
}

async forceClose (wsId: string): Promise<void> {
const ws = this.workspaces.get(wsId)
if (ws !== undefined) {
ws.upgrade = true // We need to similare upgrade to refresh all clients.
await this.closeAll(wsId, ws, 99, 'upgrade')
this.workspaces.delete(wsId)
}
}

async closeAll (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown'): Promise<void> {
if (LOGGING_ENABLED) {
await this.ctx.info('closing workspace', {
Expand Down Expand Up @@ -578,7 +603,7 @@ class TSessionManager implements SessionManager {
}
}
await this.ctx.with('closing', {}, async () => {
await Promise.race([closePipeline(), timeoutPromise(15000)])
await Promise.race([closePipeline(), timeoutPromise(120000)])
})
if (LOGGING_ENABLED) {
await this.ctx.info('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName })
Expand Down Expand Up @@ -723,7 +748,7 @@ class TSessionManager implements SessionManager {
if (request.id === -1 && request.method === 'close') {
const wsRef = this.workspaces.get(workspace)
if (wsRef !== undefined) {
await this.close(ws, wsRef?.workspaceId, 1000, 'client request to close workspace')
await this.close(ws, wsRef?.workspaceId, 1000, 'client request to close workspace', false)
} else {
ws.close()
}
Expand Down
9 changes: 8 additions & 1 deletion server/ws/src/server_http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ export function startHttpServer (
res.end()
return
}
case 'force-close': {
const wsId = req.query.wsId as string
void sessions.forceClose(wsId)
res.writeHead(200)
res.end()
return
}
case 'reboot': {
process.exit(0)
}
Expand Down Expand Up @@ -228,7 +235,7 @@ export function startHttpServer (
}
await cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (session as any).upgradeInfo } }, false, false)

// Wait 1 second before closing the connection
// Wait 10 seconds before closing the connection
setTimeout(() => {
cs.close()
}, 10000)
Expand Down
27 changes: 18 additions & 9 deletions server/ws/src/stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import {
MeasureMetricsContext,
type Metrics,
metricsAggregate,
type MetricsData
type MetricsData,
toWorkspaceString
} from '@hcengineering/core'
import os from 'os'
import { type SessionManager } from './types'
Expand All @@ -20,14 +21,22 @@ export function getStatistics (ctx: MeasureContext, sessions: SessionManager, ad
}
data.statistics.totalClients = sessions.sessions.size
if (admin) {
for (const [k, v] of sessions.workspaces) {
data.statistics.activeSessions[k] = Array.from(v.sessions.entries()).map(([k, v]) => ({
userId: v.session.getUser(),
data: v.socket.data(),
mins5: v.session.mins5,
total: v.session.total,
current: v.session.current
}))
for (const [k, vv] of sessions.workspaces) {
data.statistics.activeSessions[k] = {
sessions: Array.from(vv.sessions.entries()).map(([k, v]) => ({
userId: v.session.getUser(),
data: v.socket.data(),
mins5: v.session.mins5,
total: v.session.total,
current: v.session.current,
upgrade: v.session.isUpgradeClient()
})),
name: vv.workspaceName,
wsId: toWorkspaceString(vv.workspaceId),
sessionsTotal: vv.sessions.size,
upgrading: vv.upgrade,
closing: vv.closing !== undefined
}
}
}

Expand Down
Loading

0 comments on commit 375fbe5

Please sign in to comment.