Skip to content

UBERF-7543: Fix server memory usage #6044

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions server/ws/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,10 @@ import core, {
import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types'
import { RPCHandler } from '@hcengineering/rpc'

/**
* @public
*/
export class ClientSession implements Session {
handler = new RPCHandler()
createTime = Date.now()
requests = new Map<string, SessionRequest>()
binaryMode: boolean = false
Expand Down
26 changes: 17 additions & 9 deletions server/ws/src/server_http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ export function startHttpServer (
})
}

const rpcHandler = new RPCHandler()

const app = express()
app.use(cors())
app.use(
Expand Down Expand Up @@ -242,7 +244,6 @@ export function startHttpServer (
})

const httpServer = http.createServer(app)

const wss = new WebSocketServer({
noServer: true,
perMessageDeflate: enableCompression
Expand Down Expand Up @@ -283,7 +284,8 @@ export function startHttpServer (
language: request.headers['accept-language'] ?? '',
email: token.email,
mode: token.extra?.mode,
model: token.extra?.model
model: token.extra?.model,
rpcHandler
}
const cs: ConnectionSocket = createWebsocketClientSocket(ws, data)

Expand Down Expand Up @@ -385,19 +387,18 @@ export function startHttpServer (
if (LOGGING_ENABLED) {
ctx.error('invalid token', err)
}
const handler = new RPCHandler()
wss.handleUpgrade(request, socket, head, (ws) => {
const resp: Response<any> = {
id: -1,
error: UNAUTHORIZED,
result: 'hello'
}
ws.send(handler.serialize(resp, false), { binary: false })
ws.send(rpcHandler.serialize(resp, false), { binary: false })
ws.onmessage = (msg) => {
const resp: Response<any> = {
error: UNAUTHORIZED
}
ws.send(handler.serialize(resp, false), { binary: false })
ws.send(rpcHandler.serialize(resp, false), { binary: false })
}
})
}
Expand Down Expand Up @@ -431,9 +432,16 @@ export function startHttpServer (
}
function createWebsocketClientSocket (
ws: WebSocket,
data: { remoteAddress: string, userAgent: string, language: string, email: string, mode: any, model: any }
data: {
remoteAddress: string
userAgent: string
language: string
email: string
mode: any
model: any
rpcHandler: RPCHandler
}
): ConnectionSocket {
const handler = new RPCHandler()
const cs: ConnectionSocket = {
id: generateId(),
isClosed: false,
Expand All @@ -442,12 +450,12 @@ function createWebsocketClientSocket (
ws.close()
},
readRequest: (buffer: Buffer, binary: boolean) => {
return handler.readRequest(buffer, binary)
return data.rpcHandler.readRequest(buffer, binary)
},
data: () => data,
send: async (ctx: MeasureContext, msg, binary, compression) => {
const sst = Date.now()
const smsg = handler.serialize(msg, binary)
const smsg = data.rpcHandler.serialize(msg, binary)
ctx.measure('serialize', Date.now() - sst)

ctx.measure('send-data', smsg.length)
Expand Down
20 changes: 15 additions & 5 deletions server/ws/src/server_u.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export function startUWebsocketServer (

const uAPP = uWebSockets.App()

const rpcHandler = new RPCHandler()

const writeStatus = (response: HttpResponse, status: string): HttpResponse => {
return response
.writeStatus(status)
Expand Down Expand Up @@ -124,7 +126,8 @@ export function startUWebsocketServer (
language: '',
email: data.payload.email,
mode: data.payload.extra?.mode,
model: data.payload.extra?.model
model: data.payload.extra?.model,
rpcHandler
}
const cs = createWebSocketClientSocket(wrData, ws, data)
data.connectionSocket = cs
Expand Down Expand Up @@ -370,11 +373,18 @@ export function startUWebsocketServer (
}
}
function createWebSocketClientSocket (
wrData: { remoteAddress: ArrayBuffer, userAgent: string, language: string, email: string, mode: any, model: any },
wrData: {
remoteAddress: ArrayBuffer
userAgent: string
language: string
email: string
mode: any
model: any
rpcHandler: RPCHandler
},
ws: uWebSockets.WebSocket<WebsocketUserData>,
data: WebsocketUserData
): ConnectionSocket {
const handler = new RPCHandler()
const cs: ConnectionSocket = {
id: generateId(),
isClosed: false,
Expand All @@ -388,13 +398,13 @@ function createWebSocketClientSocket (
}
},
readRequest: (buffer: Buffer, binary: boolean) => {
return handler.readRequest(buffer, binary)
return wrData.rpcHandler.readRequest(buffer, binary)
},
send: async (ctx, msg, binary, compression): Promise<number> => {
if (data.backPressure !== undefined) {
await data.backPressure
}
const serialized = handler.serialize(msg, binary)
const serialized = wrData.rpcHandler.serialize(msg, binary)
try {
const sendR = ws.send(serialized, binary, compression)
if (sendR === 2) {
Expand Down