From e0b900dd6903922b66bbfd2e17f00450d12f66b6 Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 27 Feb 2024 12:27:25 -0800 Subject: [PATCH 01/11] feat: channelz improvements, idle timeout implementation --- packages/grpc-js/.eslintrc | 1 + packages/grpc-js/gulpfile.ts | 33 +- packages/grpc-js/package.json | 37 +- packages/grpc-js/src/channel.ts | 2 +- packages/grpc-js/src/channelz.ts | 468 ++++++------ .../src/load-balancer-child-handler.ts | 2 +- packages/grpc-js/src/load-balancer.ts | 2 +- packages/grpc-js/src/server-call.ts | 18 +- packages/grpc-js/src/server.ts | 678 ++++++++++++------ packages/grpc-js/src/subchannel-interface.ts | 2 +- packages/grpc-js/src/subchannel.ts | 38 +- packages/grpc-js/src/transport.ts | 16 +- packages/grpc-js/test/common.ts | 4 +- 13 files changed, 784 insertions(+), 517 deletions(-) diff --git a/packages/grpc-js/.eslintrc b/packages/grpc-js/.eslintrc index 2f6bfd62d..9a72b31de 100644 --- a/packages/grpc-js/.eslintrc +++ b/packages/grpc-js/.eslintrc @@ -50,6 +50,7 @@ "@typescript-eslint/explicit-module-boundary-types": "off", "@typescript-eslint/ban-types": "off", "@typescript-eslint/camelcase": "off", + "@typescript-eslint/no-explicit-any": "off", "node/no-missing-import": "off", "node/no-empty-function": "off", "node/no-unsupported-features/es-syntax": "off", diff --git a/packages/grpc-js/gulpfile.ts b/packages/grpc-js/gulpfile.ts index d85900364..e4e9071ff 100644 --- a/packages/grpc-js/gulpfile.ts +++ b/packages/grpc-js/gulpfile.ts @@ -35,14 +35,17 @@ const pkgPath = path.resolve(jsCoreDir, 'package.json'); const supportedVersionRange = require(pkgPath).engines.node; const versionNotSupported = () => { console.log(`Skipping grpc-js task for Node ${process.version}`); - return () => { return Promise.resolve(); }; + return () => { + return Promise.resolve(); + }; }; const identity = (value: any): any => value; -const checkTask = semver.satisfies(process.version, supportedVersionRange) ? - identity : versionNotSupported; +const checkTask = semver.satisfies(process.version, supportedVersionRange) + ? identity + : versionNotSupported; const execNpmVerb = (verb: string, ...args: string[]) => - execa('npm', [verb, ...args], {cwd: jsCoreDir, stdio: 'inherit'}); + execa('npm', [verb, ...args], { cwd: jsCoreDir, stdio: 'inherit' }); const execNpmCommand = execNpmVerb.bind(null, 'run'); const install = checkTask(() => execNpmVerb('install', '--unsafe-perm')); @@ -64,22 +67,20 @@ const cleanAll = gulp.parallel(clean); */ const compile = checkTask(() => execNpmCommand('compile')); -const copyTestFixtures = checkTask(() => ncpP(`${jsCoreDir}/test/fixtures`, `${outDir}/test/fixtures`)); +const copyTestFixtures = checkTask(() => + ncpP(`${jsCoreDir}/test/fixtures`, `${outDir}/test/fixtures`) +); const runTests = checkTask(() => { process.env.GRPC_EXPERIMENTAL_ENABLE_OUTLIER_DETECTION = 'true'; - return gulp.src(`${outDir}/test/**/*.js`) - .pipe(mocha({reporter: 'mocha-jenkins-reporter', - require: ['ts-node/register']})); + return gulp.src(`${outDir}/test/**/*.js`).pipe( + mocha({ + reporter: 'mocha-jenkins-reporter', + require: ['ts-node/register'], + }) + ); }); const test = gulp.series(install, copyTestFixtures, runTests); -export { - install, - lint, - clean, - cleanAll, - compile, - test -} +export { install, lint, clean, cleanAll, compile, test }; diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index d1cb3d561..34d8b558b 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -6,7 +6,7 @@ "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "main": "build/src/index.js", "engines": { - "node": "^8.13.0 || >=10.10.0" + "node": ">=12.10.0" }, "keywords": [], "author": { @@ -15,17 +15,18 @@ "types": "build/src/index.d.ts", "license": "Apache-2.0", "devDependencies": { - "@types/gulp": "^4.0.6", - "@types/gulp-mocha": "0.0.32", - "@types/lodash": "^4.14.186", - "@types/mocha": "^5.2.6", - "@types/ncp": "^2.0.1", - "@types/pify": "^3.0.2", - "@types/semver": "^7.3.9", - "@typescript-eslint/eslint-plugin": "^5.59.11", - "@typescript-eslint/parser": "^5.59.11", - "@typescript-eslint/typescript-estree": "^5.59.11", - "clang-format": "^1.0.55", + "@types/gulp": "^4.0.17", + "@types/gulp-mocha": "0.0.37", + "@types/lodash": "^4.14.202", + "@types/mocha": "^10.0.6", + "@types/ncp": "^2.0.8", + "@types/pify": "^5.0.4", + "@types/semver": "^7.5.8", + "@types/node": ">=20.11.20", + "@typescript-eslint/eslint-plugin": "^7.1.0", + "@typescript-eslint/parser": "^7.1.0", + "@typescript-eslint/typescript-estree": "^7.1.0", + "clang-format": "^1.8.0", "eslint": "^8.42.0", "eslint-config-prettier": "^8.8.0", "eslint-plugin-node": "^11.1.0", @@ -33,16 +34,16 @@ "execa": "^2.0.3", "gulp": "^4.0.2", "gulp-mocha": "^6.0.0", - "lodash": "^4.17.4", + "lodash": "^4.17.21", "madge": "^5.0.1", "mocha-jenkins-reporter": "^0.4.1", "ncp": "^2.0.0", "pify": "^4.0.1", "prettier": "^2.8.8", "rimraf": "^3.0.2", - "semver": "^7.3.5", - "ts-node": "^10.9.1", - "typescript": "^5.1.3" + "semver": "^7.6.0", + "ts-node": "^10.9.2", + "typescript": "^5.3.3" }, "contributors": [ { @@ -65,8 +66,8 @@ "generate-test-types": "proto-loader-gen-types --keepCase --longs String --enums String --defaults --oneofs --includeComments --include-dirs test/fixtures/ -O test/generated/ --grpcLib ../../src/index test_service.proto" }, "dependencies": { - "@grpc/proto-loader": "^0.7.8", - "@types/node": ">=12.12.47" + "@grpc/proto-loader": "^0.7.10", + "@js-sdsl/ordered-map": "^4.4.2" }, "files": [ "src/**/*.ts", diff --git a/packages/grpc-js/src/channel.ts b/packages/grpc-js/src/channel.ts index 7ce5a15f7..514920c8f 100644 --- a/packages/grpc-js/src/channel.ts +++ b/packages/grpc-js/src/channel.ts @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options'; import { ServerSurfaceCall } from './server-call'; import { ConnectivityState } from './connectivity-state'; -import { ChannelRef } from './channelz'; +import type { ChannelRef } from './channelz'; import { Call } from './call-interface'; import { InternalChannel } from './internal-channel'; import { Deadline } from './deadline'; diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 1e2627a97..6d70b7543 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -16,6 +16,7 @@ */ import { isIPv4, isIPv6 } from 'net'; +import { OrderedMap, type OrderedMapIterator } from '@js-sdsl/ordered-map'; import { ConnectivityState } from './connectivity-state'; import { Status } from './constants'; import { Timestamp } from './generated/google/protobuf/Timestamp'; @@ -66,24 +67,25 @@ export type TraceSeverity = | 'CT_ERROR'; export interface ChannelRef { - kind: 'channel'; + kind: EntityTypes.channel; id: number; name: string; } export interface SubchannelRef { - kind: 'subchannel'; + kind: EntityTypes.subchannel; id: number; name: string; } export interface ServerRef { - kind: 'server'; + kind: EntityTypes.server; id: number; + name: string; } export interface SocketRef { - kind: 'socket'; + kind: EntityTypes.socket; id: number; name: string; } @@ -131,6 +133,21 @@ interface TraceEvent { */ const TARGET_RETAINED_TRACES = 32; +export class ChannelzTraceStub { + readonly events: TraceEvent[] = []; + readonly creationTimestamp: Date = new Date(); + readonly eventsLogged = 0; + + addTrace(): void {} + getTraceMessage(): ChannelTrace { + return { + creation_timestamp: dateToProtoTimestamp(this.creationTimestamp), + num_events_logged: this.eventsLogged, + events: [], + }; + } +} + export class ChannelzTrace { events: TraceEvent[] = []; creationTimestamp: Date; @@ -182,105 +199,64 @@ export class ChannelzTrace { } export class ChannelzChildrenTracker { - private channelChildren: Map = - new Map(); - private subchannelChildren: Map< + private channelChildren = new OrderedMap< + number, + { ref: ChannelRef; count: number } + >(); + private subchannelChildren = new OrderedMap< number, { ref: SubchannelRef; count: number } - > = new Map(); - private socketChildren: Map = - new Map(); + >(); + private socketChildren = new OrderedMap< + number, + { ref: SocketRef; count: number } + >(); + private trackerMap = { + [EntityTypes.channel]: this.channelChildren, + [EntityTypes.subchannel]: this.subchannelChildren, + [EntityTypes.socket]: this.socketChildren, + } as const; refChild(child: ChannelRef | SubchannelRef | SocketRef) { - switch (child.kind) { - case 'channel': { - const trackedChild = this.channelChildren.get(child.id) ?? { - ref: child, - count: 0, - }; - trackedChild.count += 1; - this.channelChildren.set(child.id, trackedChild); - break; - } - case 'subchannel': { - const trackedChild = this.subchannelChildren.get(child.id) ?? { - ref: child, - count: 0, - }; - trackedChild.count += 1; - this.subchannelChildren.set(child.id, trackedChild); - break; - } - case 'socket': { - const trackedChild = this.socketChildren.get(child.id) ?? { - ref: child, - count: 0, - }; - trackedChild.count += 1; - this.socketChildren.set(child.id, trackedChild); - break; - } + const tracker = this.trackerMap[child.kind]; + const trackedChild = tracker.getElementByKey(child.id); + + if (trackedChild === undefined) { + tracker.setElement(child.id, { + // @ts-expect-error union issues + ref: child, + count: 1, + }); + } else { + trackedChild.count += 1; } } unrefChild(child: ChannelRef | SubchannelRef | SocketRef) { - switch (child.kind) { - case 'channel': { - const trackedChild = this.channelChildren.get(child.id); - if (trackedChild !== undefined) { - trackedChild.count -= 1; - if (trackedChild.count === 0) { - this.channelChildren.delete(child.id); - } else { - this.channelChildren.set(child.id, trackedChild); - } - } - break; - } - case 'subchannel': { - const trackedChild = this.subchannelChildren.get(child.id); - if (trackedChild !== undefined) { - trackedChild.count -= 1; - if (trackedChild.count === 0) { - this.subchannelChildren.delete(child.id); - } else { - this.subchannelChildren.set(child.id, trackedChild); - } - } - break; - } - case 'socket': { - const trackedChild = this.socketChildren.get(child.id); - if (trackedChild !== undefined) { - trackedChild.count -= 1; - if (trackedChild.count === 0) { - this.socketChildren.delete(child.id); - } else { - this.socketChildren.set(child.id, trackedChild); - } - } - break; + const tracker = this.trackerMap[child.kind]; + const trackedChild = tracker.getElementByKey(child.id); + if (trackedChild !== undefined) { + trackedChild.count -= 1; + if (trackedChild.count === 0) { + tracker.eraseElementByKey(child.id); } } } getChildLists(): ChannelzChildren { - const channels: ChannelRef[] = []; - for (const { ref } of this.channelChildren.values()) { - channels.push(ref); - } - const subchannels: SubchannelRef[] = []; - for (const { ref } of this.subchannelChildren.values()) { - subchannels.push(ref); - } - const sockets: SocketRef[] = []; - for (const { ref } of this.socketChildren.values()) { - sockets.push(ref); - } - return { channels, subchannels, sockets }; + return { + channels: this.channelChildren, + subchannels: this.subchannelChildren, + sockets: this.socketChildren, + }; } } +export class ChannelzChildrenTrackerStub extends ChannelzChildrenTracker { + override refChild(): void {} + override unrefChild(): void {} +} + export class ChannelzCallTracker { callsStarted = 0; callsSucceeded = 0; @@ -299,17 +275,23 @@ export class ChannelzCallTracker { } } +export class ChannelzCallTrackerStub extends ChannelzCallTracker { + override addCallStarted() {} + override addCallSucceeded() {} + override addCallFailed() {} +} + export interface ChannelzChildren { - channels: ChannelRef[]; - subchannels: SubchannelRef[]; - sockets: SocketRef[]; + channels: OrderedMap; + subchannels: OrderedMap; + sockets: OrderedMap; } export interface ChannelInfo { target: string; state: ConnectivityState; - trace: ChannelzTrace; - callTracker: ChannelzCallTracker; + trace: ChannelzTrace | ChannelzTraceStub; + callTracker: ChannelzCallTracker | ChannelzCallTrackerStub; children: ChannelzChildren; } @@ -348,105 +330,102 @@ export interface SocketInfo { remoteFlowControlWindow: number | null; } -interface ChannelEntry { +type ChannelEntry = { ref: ChannelRef; getInfo(): ChannelInfo; -} +}; -interface SubchannelEntry { +type SubchannelEntry = { ref: SubchannelRef; getInfo(): SubchannelInfo; -} +}; -interface ServerEntry { +type ServerEntry = { ref: ServerRef; getInfo(): ServerInfo; -} +}; -interface SocketEntry { +type SocketEntry = { ref: SocketRef; getInfo(): SocketInfo; -} - -let nextId = 1; - -function getNextId(): number { - return nextId++; -} - -const channels: (ChannelEntry | undefined)[] = []; -const subchannels: (SubchannelEntry | undefined)[] = []; -const servers: (ServerEntry | undefined)[] = []; -const sockets: (SocketEntry | undefined)[] = []; - -export function registerChannelzChannel( - name: string, - getInfo: () => ChannelInfo, - channelzEnabled: boolean -): ChannelRef { - const id = getNextId(); - const ref: ChannelRef = { id, name, kind: 'channel' }; - if (channelzEnabled) { - channels[id] = { ref, getInfo }; - } - return ref; -} - -export function registerChannelzSubchannel( - name: string, - getInfo: () => SubchannelInfo, - channelzEnabled: boolean -): SubchannelRef { - const id = getNextId(); - const ref: SubchannelRef = { id, name, kind: 'subchannel' }; - if (channelzEnabled) { - subchannels[id] = { ref, getInfo }; +}; + +export const enum EntityTypes { + channel = 'channel', + subchannel = 'subchannel', + server = 'server', + socket = 'socket', +} + +const entityMaps = { + [EntityTypes.channel]: new OrderedMap(), + [EntityTypes.subchannel]: new OrderedMap(), + [EntityTypes.server]: new OrderedMap(), + [EntityTypes.socket]: new OrderedMap(), +} as const; + +export type RefByType = T extends EntityTypes.channel + ? ChannelRef + : T extends EntityTypes.server + ? ServerRef + : T extends EntityTypes.socket + ? SocketRef + : T extends EntityTypes.subchannel + ? SubchannelRef + : never; + +export type EntryByType = T extends EntityTypes.channel + ? ChannelEntry + : T extends EntityTypes.server + ? ServerEntry + : T extends EntityTypes.socket + ? SocketEntry + : T extends EntityTypes.subchannel + ? SubchannelEntry + : never; + +export type InfoByType = T extends EntityTypes.channel + ? ChannelInfo + : T extends EntityTypes.subchannel + ? SubchannelInfo + : T extends EntityTypes.server + ? ServerInfo + : T extends EntityTypes.socket + ? SocketInfo + : never; + +const generateRegisterFn = (kind: R) => { + let nextId = 1; + function getNextId(): number { + return nextId++; } - return ref; -} -export function registerChannelzServer( - getInfo: () => ServerInfo, - channelzEnabled: boolean -): ServerRef { - const id = getNextId(); - const ref: ServerRef = { id, kind: 'server' }; - if (channelzEnabled) { - servers[id] = { ref, getInfo }; - } - return ref; -} - -export function registerChannelzSocket( - name: string, - getInfo: () => SocketInfo, - channelzEnabled: boolean -): SocketRef { - const id = getNextId(); - const ref: SocketRef = { id, name, kind: 'socket' }; - if (channelzEnabled) { - sockets[id] = { ref, getInfo }; - } - return ref; -} + return ( + name: string, + getInfo: () => InfoByType, + channelzEnabled: boolean + ): RefByType => { + const id = getNextId(); + const ref = { id, name, kind } as RefByType; + if (channelzEnabled) { + // @ts-expect-error typing issues + entityMaps[kind].setElement(id, { ref, getInfo }); + } + return ref; + }; +}; + +export const registerChannelzChannel = generateRegisterFn(EntityTypes.channel); +export const registerChannelzSubchannel = generateRegisterFn( + EntityTypes.subchannel +); +export const registerChannelzServer = generateRegisterFn(EntityTypes.server); +export const registerChannelzSocket = generateRegisterFn(EntityTypes.socket); export function unregisterChannelzRef( ref: ChannelRef | SubchannelRef | ServerRef | SocketRef ) { - switch (ref.kind) { - case 'channel': - delete channels[ref.id]; - return; - case 'subchannel': - delete subchannels[ref.id]; - return; - case 'server': - delete servers[ref.id]; - return; - case 'socket': - delete sockets[ref.id]; - return; - } + entityMaps[ref.kind].eraseElementByKey(ref.id); } /** @@ -556,6 +535,17 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null { function getChannelMessage(channelEntry: ChannelEntry): ChannelMessage { const resolvedInfo = channelEntry.getInfo(); + const channelRef: ChannelRefMessage[] = []; + const subchannelRef: SubchannelRefMessage[] = []; + + resolvedInfo.children.channels.forEach(el => { + channelRef.push(channelRefToMessage(el[1].ref)); + }); + + resolvedInfo.children.subchannels.forEach(el => { + subchannelRef.push(subchannelRefToMessage(el[1].ref)); + }); + return { ref: channelRefToMessage(channelEntry.ref), data: { @@ -569,12 +559,8 @@ function getChannelMessage(channelEntry: ChannelEntry): ChannelMessage { ), trace: resolvedInfo.trace.getTraceMessage(), }, - channel_ref: resolvedInfo.children.channels.map(ref => - channelRefToMessage(ref) - ), - subchannel_ref: resolvedInfo.children.subchannels.map(ref => - subchannelRefToMessage(ref) - ), + channel_ref: channelRef, + subchannel_ref: subchannelRef, }; } @@ -582,8 +568,9 @@ function GetChannel( call: ServerUnaryCall, callback: sendUnaryData ): void { - const channelId = Number.parseInt(call.request.channel_id); - const channelEntry = channels[channelId]; + const channelId = parseInt(call.request.channel_id, 10); + const channelEntry = + entityMaps[EntityTypes.channel].getElementByKey(channelId); if (channelEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -598,27 +585,34 @@ function GetTopChannels( call: ServerUnaryCall, callback: sendUnaryData ): void { - const maxResults = Number.parseInt(call.request.max_results); + const maxResults = parseInt(call.request.max_results, 10) || 100; const resultList: ChannelMessage[] = []; - let i = Number.parseInt(call.request.start_channel_id); - for (; i < channels.length; i++) { - const channelEntry = channels[i]; - if (channelEntry === undefined) { - continue; - } - resultList.push(getChannelMessage(channelEntry)); - if (resultList.length >= maxResults) { - break; - } + const startId = parseInt(call.request.start_channel_id, 10); + const channelEntries = entityMaps[EntityTypes.channel]; + + let i: OrderedMapIterator; + for ( + i = channelEntries.lowerBound(startId); + !i.equals(channelEntries.end()) && resultList.length < maxResults; + i = i.next() + ) { + resultList.push(getChannelMessage(i.pointer[1])); } + callback(null, { channel: resultList, - end: i >= servers.length, + end: i.equals(channelEntries.end()), }); } function getServerMessage(serverEntry: ServerEntry): ServerMessage { const resolvedInfo = serverEntry.getInfo(); + const listenSocket: SocketRefMessage[] = []; + + resolvedInfo.listenerChildren.sockets.forEach(el => { + listenSocket.push(socketRefToMessage(el[1].ref)); + }); + return { ref: serverRefToMessage(serverEntry.ref), data: { @@ -630,9 +624,7 @@ function getServerMessage(serverEntry: ServerEntry): ServerMessage { ), trace: resolvedInfo.trace.getTraceMessage(), }, - listen_socket: resolvedInfo.listenerChildren.sockets.map(ref => - socketRefToMessage(ref) - ), + listen_socket: listenSocket, }; } @@ -640,8 +632,9 @@ function GetServer( call: ServerUnaryCall, callback: sendUnaryData ): void { - const serverId = Number.parseInt(call.request.server_id); - const serverEntry = servers[serverId]; + const serverId = parseInt(call.request.server_id, 10); + const serverEntries = entityMaps[EntityTypes.server]; + const serverEntry = serverEntries.getElementByKey(serverId); if (serverEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -656,22 +649,23 @@ function GetServers( call: ServerUnaryCall, callback: sendUnaryData ): void { - const maxResults = Number.parseInt(call.request.max_results); + const maxResults = parseInt(call.request.max_results, 10) || 100; + const startId = parseInt(call.request.start_server_id, 10); + const serverEntries = entityMaps[EntityTypes.server]; const resultList: ServerMessage[] = []; - let i = Number.parseInt(call.request.start_server_id); - for (; i < servers.length; i++) { - const serverEntry = servers[i]; - if (serverEntry === undefined) { - continue; - } - resultList.push(getServerMessage(serverEntry)); - if (resultList.length >= maxResults) { - break; - } + + let i: OrderedMapIterator; + for ( + i = serverEntries.lowerBound(startId); + !i.equals(serverEntries.end()) && resultList.length < maxResults; + i = i.next() + ) { + resultList.push(getServerMessage(i.pointer[1])); } + callback(null, { server: resultList, - end: i >= servers.length, + end: i.equals(serverEntries.end()), }); } @@ -679,8 +673,9 @@ function GetSubchannel( call: ServerUnaryCall, callback: sendUnaryData ): void { - const subchannelId = Number.parseInt(call.request.subchannel_id); - const subchannelEntry = subchannels[subchannelId]; + const subchannelId = parseInt(call.request.subchannel_id, 10); + const subchannelEntry = + entityMaps[EntityTypes.subchannel].getElementByKey(subchannelId); if (subchannelEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -689,6 +684,12 @@ function GetSubchannel( return; } const resolvedInfo = subchannelEntry.getInfo(); + const listenSocket: SocketRefMessage[] = []; + + resolvedInfo.children.sockets.forEach(el => { + listenSocket.push(socketRefToMessage(el[1].ref)); + }); + const subchannelMessage: SubchannelMessage = { ref: subchannelRefToMessage(subchannelEntry.ref), data: { @@ -702,9 +703,7 @@ function GetSubchannel( ), trace: resolvedInfo.trace.getTraceMessage(), }, - socket_ref: resolvedInfo.children.sockets.map(ref => - socketRefToMessage(ref) - ), + socket_ref: listenSocket, }; callback(null, { subchannel: subchannelMessage }); } @@ -735,8 +734,8 @@ function GetSocket( call: ServerUnaryCall, callback: sendUnaryData ): void { - const socketId = Number.parseInt(call.request.socket_id); - const socketEntry = sockets[socketId]; + const socketId = parseInt(call.request.socket_id, 10); + const socketEntry = entityMaps[EntityTypes.socket].getElementByKey(socketId); if (socketEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -809,8 +808,9 @@ function GetServerSockets( >, callback: sendUnaryData ): void { - const serverId = Number.parseInt(call.request.server_id); - const serverEntry = servers[serverId]; + const serverId = parseInt(call.request.server_id, 10); + const serverEntry = entityMaps[EntityTypes.server].getElementByKey(serverId); + if (serverEntry === undefined) { callback({ code: Status.NOT_FOUND, @@ -818,28 +818,28 @@ function GetServerSockets( }); return; } - const startId = Number.parseInt(call.request.start_socket_id); - const maxResults = Number.parseInt(call.request.max_results); + + const startId = parseInt(call.request.start_socket_id, 10); + const maxResults = parseInt(call.request.max_results, 10) || 100; const resolvedInfo = serverEntry.getInfo(); // If we wanted to include listener sockets in the result, this line would // instead say // const allSockets = resolvedInfo.listenerChildren.sockets.concat(resolvedInfo.sessionChildren.sockets).sort((ref1, ref2) => ref1.id - ref2.id); - const allSockets = resolvedInfo.sessionChildren.sockets.sort( - (ref1, ref2) => ref1.id - ref2.id - ); + const allSockets = resolvedInfo.sessionChildren.sockets; const resultList: SocketRefMessage[] = []; - let i = 0; - for (; i < allSockets.length; i++) { - if (allSockets[i].id >= startId) { - resultList.push(socketRefToMessage(allSockets[i])); - if (resultList.length >= maxResults) { - break; - } - } + + let i: OrderedMapIterator; + for ( + i = allSockets.lowerBound(startId); + !i.equals(allSockets.end()) && resultList.length < maxResults; + i = i.next() + ) { + resultList.push(socketRefToMessage(i.pointer[1].ref)); } + callback(null, { socket_ref: resultList, - end: i >= allSockets.length, + end: i.equals(allSockets.end()), }); } diff --git a/packages/grpc-js/src/load-balancer-child-handler.ts b/packages/grpc-js/src/load-balancer-child-handler.ts index a29d6c92b..352ea7b81 100644 --- a/packages/grpc-js/src/load-balancer-child-handler.ts +++ b/packages/grpc-js/src/load-balancer-child-handler.ts @@ -25,7 +25,7 @@ import { Endpoint, SubchannelAddress } from './subchannel-address'; import { ChannelOptions } from './channel-options'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; -import { ChannelRef, SubchannelRef } from './channelz'; +import type { ChannelRef, SubchannelRef } from './channelz'; import { SubchannelInterface } from './subchannel-interface'; const TYPE_NAME = 'child_load_balancer_helper'; diff --git a/packages/grpc-js/src/load-balancer.ts b/packages/grpc-js/src/load-balancer.ts index f8071317a..fb353a59a 100644 --- a/packages/grpc-js/src/load-balancer.ts +++ b/packages/grpc-js/src/load-balancer.ts @@ -19,7 +19,7 @@ import { ChannelOptions } from './channel-options'; import { Endpoint, SubchannelAddress } from './subchannel-address'; import { ConnectivityState } from './connectivity-state'; import { Picker } from './picker'; -import { ChannelRef, SubchannelRef } from './channelz'; +import type { ChannelRef, SubchannelRef } from './channelz'; import { SubchannelInterface } from './subchannel-interface'; import { LoadBalancingConfig } from './service-config'; import { log } from './logging'; diff --git a/packages/grpc-js/src/server-call.ts b/packages/grpc-js/src/server-call.ts index 95393fba9..d5aababfa 100644 --- a/packages/grpc-js/src/server-call.ts +++ b/packages/grpc-js/src/server-call.ts @@ -19,12 +19,12 @@ import { EventEmitter } from 'events'; import { Duplex, Readable, Writable } from 'stream'; import { Status } from './constants'; -import { Deserialize, Serialize } from './make-client'; +import type { Deserialize, Serialize } from './make-client'; import { Metadata } from './metadata'; -import { ObjectReadable, ObjectWritable } from './object-stream'; -import { StatusObject, PartialStatusObject } from './call-interface'; -import { Deadline } from './deadline'; -import { ServerInterceptingCallInterface } from './server-interceptors'; +import type { ObjectReadable, ObjectWritable } from './object-stream'; +import type { StatusObject, PartialStatusObject } from './call-interface'; +import type { Deadline } from './deadline'; +import type { ServerInterceptingCallInterface } from './server-interceptors'; export type ServerStatusResponse = Partial; @@ -330,7 +330,7 @@ export interface UnaryHandler { func: handleUnaryCall; serialize: Serialize; deserialize: Deserialize; - type: HandlerType; + type: 'unary'; path: string; } @@ -338,7 +338,7 @@ export interface ClientStreamingHandler { func: handleClientStreamingCall; serialize: Serialize; deserialize: Deserialize; - type: HandlerType; + type: 'clientStream'; path: string; } @@ -346,7 +346,7 @@ export interface ServerStreamingHandler { func: handleServerStreamingCall; serialize: Serialize; deserialize: Deserialize; - type: HandlerType; + type: 'serverStream'; path: string; } @@ -354,7 +354,7 @@ export interface BidiStreamingHandler { func: handleBidiStreamingCall; serialize: Serialize; deserialize: Deserialize; - type: HandlerType; + type: 'bidi'; path: string; } diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 46bd22ead..0a5bc0e9b 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -64,8 +64,11 @@ import { } from './uri-parser'; import { ChannelzCallTracker, + ChannelzCallTrackerStub, ChannelzChildrenTracker, + ChannelzChildrenTrackerStub, ChannelzTrace, + ChannelzTraceStub, registerChannelzServer, registerChannelzSocket, ServerInfo, @@ -87,6 +90,7 @@ import { CallEventTracker } from './transport'; const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31); const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); const KEEPALIVE_TIMEOUT_MS = 20000; +const MAX_CONNECTION_IDLE_MS = 30 * 60 * 1e3; // 30 min const { HTTP2_HEADER_PATH } = http2.constants; @@ -177,9 +181,10 @@ function getDefaultHandler(handlerType: HandlerType, methodName: string) { interface ChannelzSessionInfo { ref: SocketRef; - streamTracker: ChannelzCallTracker; + streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub; messagesSent: number; messagesReceived: number; + keepAlivesSent: number; lastMessageSentTimestamp: Date | null; lastMessageReceivedTimestamp: Date | null; } @@ -243,6 +248,13 @@ export interface ServerOptions extends ChannelOptions { export class Server { private boundPorts: Map = new Map(); private http2Servers: Map = new Map(); + private sessionIdleTimeouts = new Map< + http2.Http2Session, + { + activeStreams: number; + timeout: NodeJS.Timeout | null; + } + >(); private handlers: Map = new Map< string, @@ -261,10 +273,14 @@ export class Server { // Channelz Info private readonly channelzEnabled: boolean = true; private channelzRef: ServerRef; - private channelzTrace = new ChannelzTrace(); - private callTracker = new ChannelzCallTracker(); - private listenerChildrenTracker = new ChannelzChildrenTracker(); - private sessionChildrenTracker = new ChannelzChildrenTracker(); + private channelzTrace: ChannelzTrace | ChannelzTraceStub; + private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub; + private listenerChildrenTracker: + | ChannelzChildrenTracker + | ChannelzChildrenTrackerStub; + private sessionChildrenTracker: + | ChannelzChildrenTracker + | ChannelzChildrenTrackerStub; private readonly maxConnectionAgeMs: number; private readonly maxConnectionAgeGraceMs: number; @@ -272,6 +288,8 @@ export class Server { private readonly keepaliveTimeMs: number; private readonly keepaliveTimeoutMs: number; + private readonly sessionIdleTimeout: number; + private readonly interceptors: ServerInterceptor[]; /** @@ -284,14 +302,24 @@ export class Server { this.options = options ?? {}; if (this.options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; + this.channelzTrace = new ChannelzTraceStub(); + this.callTracker = new ChannelzCallTrackerStub(); + this.listenerChildrenTracker = new ChannelzChildrenTrackerStub(); + this.sessionChildrenTracker = new ChannelzChildrenTrackerStub(); + } else { + this.channelzTrace = new ChannelzTrace(); + this.callTracker = new ChannelzCallTracker(); + this.listenerChildrenTracker = new ChannelzChildrenTracker(); + this.sessionChildrenTracker = new ChannelzChildrenTracker(); } + this.channelzRef = registerChannelzServer( + 'server', () => this.getChannelzInfo(), this.channelzEnabled ); - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Server created'); - } + + this.channelzTrace.addTrace('CT_INFO', 'Server created'); this.maxConnectionAgeMs = this.options['grpc.max_connection_age_ms'] ?? UNLIMITED_CONNECTION_AGE_MS; this.maxConnectionAgeGraceMs = @@ -301,6 +329,9 @@ export class Server { this.options['grpc.keepalive_time_ms'] ?? KEEPALIVE_MAX_TIME_MS; this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS; + this.sessionIdleTimeout = + this.options['grpc.max_connection_idle'] ?? MAX_CONNECTION_IDLE_MS; + this.commonServerOptions = { maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, }; @@ -382,7 +413,7 @@ export class Server { streamsFailed: sessionInfo.streamTracker.callsFailed, messagesSent: sessionInfo.messagesSent, messagesReceived: sessionInfo.messagesReceived, - keepAlivesSent: 0, + keepAlivesSent: sessionInfo.keepAlivesSent, lastLocalStreamCreatedTimestamp: null, lastRemoteStreamCreatedTimestamp: sessionInfo.streamTracker.lastCallStartedTimestamp, @@ -581,9 +612,8 @@ export class Server { const channelzRef = this.registerListenerToChannelz( boundSubchannelAddress ); - if (this.channelzEnabled) { - this.listenerChildrenTracker.refChild(channelzRef); - } + this.listenerChildrenTracker.refChild(channelzRef); + this.http2Servers.set(http2Server, { channelzRef: channelzRef, sessions: new Set(), @@ -854,7 +884,7 @@ export class Server { ); const serverInfo = this.http2Servers.get(server); server.close(() => { - if (this.channelzEnabled && serverInfo) { + if (serverInfo) { this.listenerChildrenTracker.unrefChild(serverInfo.channelzRef); unregisterChannelzRef(serverInfo.channelzRef); } @@ -870,15 +900,15 @@ export class Server { this.trace('Closing session initiated by ' + session.socket?.remoteAddress); const sessionInfo = this.sessions.get(session); const closeCallback = () => { - if (this.channelzEnabled && sessionInfo) { + if (sessionInfo) { this.sessionChildrenTracker.unrefChild(sessionInfo.ref); unregisterChannelzRef(sessionInfo.ref); + this.sessions.delete(session); } - this.sessions.delete(session); callback?.(); }; if (session.closed) { - process.nextTick(closeCallback); + queueMicrotask(closeCallback); } else { session.close(closeCallback); } @@ -956,14 +986,13 @@ export class Server { const allSessions: Set = new Set(); for (const http2Server of boundPortObject.listeningServers) { const serverEntry = this.http2Servers.get(http2Server); - if (!serverEntry) { - continue; - } - for (const session of serverEntry.sessions) { - allSessions.add(session); - this.closeSession(session, () => { - allSessions.delete(session); - }); + if (serverEntry) { + for (const session of serverEntry.sessions) { + allSessions.add(session); + this.closeSession(session, () => { + allSessions.delete(session); + }); + } } } /* After the grace time ends, send another goaway to all remaining sessions @@ -995,9 +1024,7 @@ export class Server { session.destroy(http2.constants.NGHTTP2_CANCEL as any); }); this.sessions.clear(); - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } + unregisterChannelzRef(this.channelzRef); this.shutdown = true; } @@ -1049,9 +1076,7 @@ export class Server { tryShutdown(callback: (error?: Error) => void): void { const wrappedCallback = (error?: Error) => { - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } + unregisterChannelzRef(this.channelzRef); callback(error); }; let pendingChecks = 0; @@ -1065,24 +1090,26 @@ export class Server { } this.shutdown = true; - for (const server of this.http2Servers.keys()) { + for (const [serverKey, server] of this.http2Servers.entries()) { pendingChecks++; - const serverString = this.http2Servers.get(server)!.channelzRef.name; + const serverString = server.channelzRef.name; this.trace('Waiting for server ' + serverString + ' to close'); - this.closeServer(server, () => { + this.closeServer(serverKey, () => { this.trace('Server ' + serverString + ' finished closing'); maybeCallback(); }); + + for (const session of server.sessions.keys()) { + pendingChecks++; + const sessionString = session.socket?.remoteAddress; + this.trace('Waiting for session ' + sessionString + ' to close'); + this.closeSession(session, () => { + this.trace('Session ' + sessionString + ' finished closing'); + maybeCallback(); + }); + } } - for (const session of this.sessions.keys()) { - pendingChecks++; - const sessionString = session.socket?.remoteAddress; - this.trace('Waiting for session ' + sessionString + ' to close'); - this.closeSession(session, () => { - this.trace('Session ' + sessionString + ' finished closing'); - maybeCallback(); - }); - } + if (pendingChecks === 0) { wrappedCallback(); } @@ -1160,213 +1187,161 @@ export class Server { }; stream.respond(trailersToSend, { endStream: true }); - if (this.channelzEnabled) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); - } + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); } - private _channelzHandler( - stream: http2.ServerHttp2Stream, - headers: http2.IncomingHttpHeaders + private _sessionHandler( + http2Server: http2.Http2Server | http2.Http2SecureServer ) { - const channelzSessionInfo = this.sessions.get( - stream.session as http2.ServerHttp2Session - ); + return (session: http2.ServerHttp2Session) => { + this.http2Servers.get(http2Server)?.sessions.add(session); + + let connectionAgeTimer: NodeJS.Timeout | null = null; + let connectionAgeGraceTimer: NodeJS.Timeout | null = null; + let sessionClosedByServer = false; - this.callTracker.addCallStarted(); - channelzSessionInfo?.streamTracker.addCallStarted(); + const idleTimeoutObj = this.enableIdleTimeout(session); - if (!this._verifyContentType(stream, headers)) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); - return; - } + if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { + // Apply a random jitter within a +/-10% range + const jitterMagnitude = this.maxConnectionAgeMs / 10; + const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; - const path = headers[HTTP2_HEADER_PATH] as string; + connectionAgeTimer = setTimeout(() => { + sessionClosedByServer = true; - const handler = this._retrieveHandler(path); - if (!handler) { - this._respondWithError( - getUnimplementedStatusResponse(path), - stream, - channelzSessionInfo - ); - return; - } + this.trace( + `Connection dropped by max connection age: ${session.socket?.remoteAddress}` + ); - const callEventTracker: CallEventTracker = { - addMessageSent: () => { - if (channelzSessionInfo) { - channelzSessionInfo.messagesSent += 1; - channelzSessionInfo.lastMessageSentTimestamp = new Date(); - } - }, - addMessageReceived: () => { - if (channelzSessionInfo) { - channelzSessionInfo.messagesReceived += 1; - channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); - } - }, - onCallEnd: status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }, - onStreamEnd: success => { - if (channelzSessionInfo) { - if (success) { - channelzSessionInfo.streamTracker.addCallSucceeded(); - } else { - channelzSessionInfo.streamTracker.addCallFailed(); + try { + session.goaway( + http2.constants.NGHTTP2_NO_ERROR, + ~(1 << 31), + Buffer.from('max_age') + ); + } catch (e) { + // The goaway can't be sent because the session is already closed + session.destroy(); + return; } - } - }, - }; + session.close(); - const call = getServerInterceptingCall( - this.interceptors, - stream, - headers, - callEventTracker, - handler, - this.options - ); + /* Allow a grace period after sending the GOAWAY before forcibly + * closing the connection. */ + if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { + connectionAgeGraceTimer = setTimeout(() => { + session.destroy(); + }, this.maxConnectionAgeGraceMs).unref?.(); + } + }, this.maxConnectionAgeMs + jitter).unref?.(); + } - if (!this._runHandlerForCall(call, handler)) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); + const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { + const timeoutTImer = setTimeout(() => { + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs).unref?.(); - call.sendStatus({ - code: Status.INTERNAL, - details: `Unknown handler type: ${handler.type}`, - }); - } - } + try { + session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(timeoutTImer); - private _streamHandler( - stream: http2.ServerHttp2Stream, - headers: http2.IncomingHttpHeaders - ) { - if (this._verifyContentType(stream, headers) !== true) { - return; - } + if (err) { + sessionClosedByServer = true; + this.trace( + `Connection dropped due to error of a ping frame ${err.message} return in ${duration}` + ); + session.close(); + } + } + ); + } catch (e) { + // The ping can't be sent because the session is already closed + session.destroy(); + } + }, this.keepaliveTimeMs).unref?.(); - const path = headers[HTTP2_HEADER_PATH] as string; + session.on('close', () => { + if (!sessionClosedByServer) { + this.trace( + `Connection dropped by client ${session.socket?.remoteAddress}` + ); + } - const handler = this._retrieveHandler(path); - if (!handler) { - this._respondWithError( - getUnimplementedStatusResponse(path), - stream, - null - ); - return; - } + if (connectionAgeTimer) { + clearTimeout(connectionAgeTimer); + } - const call = getServerInterceptingCall( - this.interceptors, - stream, - headers, - null, - handler, - this.options - ); + if (connectionAgeGraceTimer) { + clearTimeout(connectionAgeGraceTimer); + } - if (!this._runHandlerForCall(call, handler)) { - call.sendStatus({ - code: Status.INTERNAL, - details: `Unknown handler type: ${handler.type}`, - }); - } - } + if (keeapliveTimeTimer) { + clearTimeout(keeapliveTimeTimer); + } - private _runHandlerForCall( - call: ServerInterceptingCallInterface, - handler: Handler - ): boolean { - const { type } = handler; - if (type === 'unary') { - handleUnary(call, handler as UntypedUnaryHandler); - } else if (type === 'clientStream') { - handleClientStreaming(call, handler as UntypedClientStreamingHandler); - } else if (type === 'serverStream') { - handleServerStreaming(call, handler as UntypedServerStreamingHandler); - } else if (type === 'bidi') { - handleBidiStreaming(call, handler as UntypedBidiStreamingHandler); - } else { - return false; - } + clearTimeout(idleTimeoutObj.timeout); + this.sessionIdleTimeouts.delete(session); - return true; + this.http2Servers.get(http2Server)?.sessions.delete(session); + }); + }; } - private _setupHandlers( + private _channelzSessionHandler( http2Server: http2.Http2Server | http2.Http2SecureServer - ): void { - if (http2Server === null) { - return; - } - - const serverAddress = http2Server.address(); - let serverAddressString = 'null'; - if (serverAddress) { - if (typeof serverAddress === 'string') { - serverAddressString = serverAddress; - } else { - serverAddressString = serverAddress.address + ':' + serverAddress.port; - } - } - this.serverAddressString = serverAddressString; - - const handler = this.channelzEnabled - ? this._channelzHandler - : this._streamHandler; - - http2Server.on('stream', handler.bind(this)); - http2Server.on('session', session => { + ) { + return (session: http2.ServerHttp2Session) => { const channelzRef = registerChannelzSocket( - session.socket.remoteAddress ?? 'unknown', + session.socket?.remoteAddress ?? 'unknown', this.getChannelzSessionInfoGetter(session), this.channelzEnabled ); const channelzSessionInfo: ChannelzSessionInfo = { ref: channelzRef, - streamTracker: new ChannelzCallTracker(), + streamTracker: this.channelzEnabled + ? new ChannelzCallTracker() + : new ChannelzCallTrackerStub(), messagesSent: 0, messagesReceived: 0, + keepAlivesSent: 0, lastMessageSentTimestamp: null, lastMessageReceivedTimestamp: null, }; this.http2Servers.get(http2Server)?.sessions.add(session); this.sessions.set(session, channelzSessionInfo); - const clientAddress = session.socket.remoteAddress; - if (this.channelzEnabled) { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection established by client ' + clientAddress - ); - this.sessionChildrenTracker.refChild(channelzRef); - } + const clientAddress = `${session.socket.remoteAddress}:${session.socket.remotePort}`; + + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection established by client ' + clientAddress + ); + this.trace('Connection established by client ' + clientAddress); + this.sessionChildrenTracker.refChild(channelzRef); + let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; + + const idleTimeoutObj = this.enableIdleTimeout(session); + if (this.maxConnectionAgeMs !== UNLIMITED_CONNECTION_AGE_MS) { // Apply a random jitter within a +/-10% range const jitterMagnitude = this.maxConnectionAgeMs / 10; const jitter = Math.random() * jitterMagnitude * 2 - jitterMagnitude; + connectionAgeTimer = setTimeout(() => { sessionClosedByServer = true; - if (this.channelzEnabled) { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by max connection age from ' + clientAddress - ); - } + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by max connection age from ' + clientAddress + ); + try { session.goaway( http2.constants.NGHTTP2_NO_ERROR, @@ -1379,6 +1354,7 @@ export class Server { return; } session.close(); + /* Allow a grace period after sending the GOAWAY before forcibly * closing the connection. */ if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { @@ -1388,52 +1364,316 @@ export class Server { } }, this.maxConnectionAgeMs + jitter).unref?.(); } + const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { const timeoutTImer = setTimeout(() => { sessionClosedByServer = true; - if (this.channelzEnabled) { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by keepalive timeout from ' + clientAddress - ); - } + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by keepalive timeout from ' + clientAddress + ); + session.close(); }, this.keepaliveTimeoutMs).unref?.(); try { session.ping( (err: Error | null, duration: number, payload: Buffer) => { clearTimeout(timeoutTImer); + + if (err) { + sessionClosedByServer = true; + this.channelzTrace.addTrace( + 'CT_INFO', + `Connection dropped due to error of a ping frame ${err.message} return in ${duration}` + ); + + session.close(); + } } ); + channelzSessionInfo.keepAlivesSent += 1; } catch (e) { // The ping can't be sent because the session is already closed session.destroy(); } }, this.keepaliveTimeMs).unref?.(); + session.on('close', () => { - if (this.channelzEnabled) { - if (!sessionClosedByServer) { - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by client ' + clientAddress - ); - } - this.sessionChildrenTracker.unrefChild(channelzRef); - unregisterChannelzRef(channelzRef); + if (!sessionClosedByServer) { + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by client ' + clientAddress + ); } + this.trace( + `DROPPING ${channelzRef.name} - ${channelzRef.kind} - ${channelzRef.id}` + ); + this.sessionChildrenTracker.unrefChild(channelzRef); + unregisterChannelzRef(channelzRef); + if (connectionAgeTimer) { clearTimeout(connectionAgeTimer); } + if (connectionAgeGraceTimer) { clearTimeout(connectionAgeGraceTimer); } + if (keeapliveTimeTimer) { clearTimeout(keeapliveTimeTimer); } + + clearTimeout(idleTimeoutObj.timeout); + this.sessionIdleTimeouts.delete(session); + this.http2Servers.get(http2Server)?.sessions.delete(session); this.sessions.delete(session); }); - }); + }; + } + + private _channelzHandler(http2Server: http2.Http2Server) { + return ( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) => { + // for handling idle timeout + this.onStreamOpened(stream); + + const channelzSessionInfo = this.sessions.get( + stream.session as http2.ServerHttp2Session + ); + + this.callTracker.addCallStarted(); + channelzSessionInfo?.streamTracker.addCallStarted(); + + if (!this._verifyContentType(stream, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + return; + } + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + channelzSessionInfo + ); + return; + } + + const callEventTracker: CallEventTracker = { + addMessageSent: () => { + if (channelzSessionInfo) { + channelzSessionInfo.messagesSent += 1; + channelzSessionInfo.lastMessageSentTimestamp = new Date(); + } + }, + addMessageReceived: () => { + if (channelzSessionInfo) { + channelzSessionInfo.messagesReceived += 1; + channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); + } + }, + onCallEnd: status => { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }, + onStreamEnd: success => { + if (channelzSessionInfo) { + if (success) { + channelzSessionInfo.streamTracker.addCallSucceeded(); + } else { + channelzSessionInfo.streamTracker.addCallFailed(); + } + } + }, + }; + + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + callEventTracker, + handler, + this.options + ); + + if (!this._runHandlerForCall(call, handler)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + + call.sendStatus({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } + }; + } + + private _streamHandler(http2Server: http2.Http2Server) { + return ( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) => { + // for handling idle timeout + this.onStreamOpened(stream); + + if (this._verifyContentType(stream, headers) !== true) { + return; + } + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + null + ); + return; + } + + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + null, + handler, + this.options + ); + + if (!this._runHandlerForCall(call, handler)) { + call.sendStatus({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } + }; + } + + private _runHandlerForCall( + call: ServerInterceptingCallInterface, + handler: + | UntypedUnaryHandler + | UntypedClientStreamingHandler + | UntypedServerStreamingHandler + | UntypedBidiStreamingHandler + ): boolean { + const { type } = handler; + if (type === 'unary') { + handleUnary(call, handler); + } else if (type === 'clientStream') { + handleClientStreaming(call, handler); + } else if (type === 'serverStream') { + handleServerStreaming(call, handler); + } else if (type === 'bidi') { + handleBidiStreaming(call, handler); + } else { + return false; + } + + return true; + } + + private _setupHandlers( + http2Server: http2.Http2Server | http2.Http2SecureServer + ): void { + if (http2Server === null) { + return; + } + + const serverAddress = http2Server.address(); + let serverAddressString = 'null'; + if (serverAddress) { + if (typeof serverAddress === 'string') { + serverAddressString = serverAddress; + } else { + serverAddressString = serverAddress.address + ':' + serverAddress.port; + } + } + this.serverAddressString = serverAddressString; + + const handler = this.channelzEnabled + ? this._channelzHandler(http2Server) + : this._streamHandler(http2Server); + + const sessionHandler = this.channelzEnabled + ? this._channelzSessionHandler(http2Server) + : this._sessionHandler(http2Server); + + http2Server.on('stream', handler); + http2Server.on('session', sessionHandler); + } + + private enableIdleTimeout(session: http2.ServerHttp2Session) { + const idleTimeoutObj = { + activeStreams: 0, + timeout: setTimeout( + this.onIdleTimeout, + this.sessionIdleTimeout, + this, + session + ).unref(), + }; + this.sessionIdleTimeouts.set(session, idleTimeoutObj); + + this.trace(`Enable idle timeout for ${session.socket?.remoteAddress}`); + + return idleTimeoutObj; + } + + private onIdleTimeout(ctx: Server, session: http2.ServerHttp2Session) { + ctx.trace(`Idle timeout for ${session.socket?.remoteAddress}`); + ctx.closeSession(session); + } + + private onStreamOpened(stream: http2.ServerHttp2Stream) { + const session = stream.session as http2.ServerHttp2Session; + this.trace(`Stream opened for ${session.socket?.remoteAddress}`); + const idleTimeoutObj = this.sessionIdleTimeouts.get(session); + if (idleTimeoutObj) { + idleTimeoutObj.activeStreams += 1; + if (idleTimeoutObj.timeout) { + clearTimeout(idleTimeoutObj.timeout); + idleTimeoutObj.timeout = null; + } + + this.trace( + `onStreamOpened: adding on stream close event for ${session.socket?.remoteAddress}` + ); + stream.once('close', () => this.onStreamClose(session)); + } else { + this.trace( + `onStreamOpened: missing stream for ${session.socket?.remoteAddress}` + ); + } + } + + private onStreamClose(session: http2.ServerHttp2Session) { + this.trace(`Stream closed for ${session.socket?.remoteAddress}`); + const idleTimeoutObj = this.sessionIdleTimeouts.get(session); + if (idleTimeoutObj) { + idleTimeoutObj.activeStreams -= 1; + if (idleTimeoutObj.activeStreams === 0) { + this.trace( + `onStreamClose: set idle timeout for ${this.sessionIdleTimeout}ms ${session.socket?.remoteAddress}` + ); + idleTimeoutObj.timeout = setTimeout( + this.onIdleTimeout, + this.sessionIdleTimeout, + this, + session + ).unref(); + } + } } } diff --git a/packages/grpc-js/src/subchannel-interface.ts b/packages/grpc-js/src/subchannel-interface.ts index c26669ba3..6c314189a 100644 --- a/packages/grpc-js/src/subchannel-interface.ts +++ b/packages/grpc-js/src/subchannel-interface.ts @@ -15,7 +15,7 @@ * */ -import { SubchannelRef } from './channelz'; +import type { SubchannelRef } from './channelz'; import { ConnectivityState } from './connectivity-state'; import { Subchannel } from './subchannel'; diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 63e254cf3..95b600c4c 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -31,10 +31,13 @@ import { SubchannelRef, ChannelzTrace, ChannelzChildrenTracker, + ChannelzChildrenTrackerStub, SubchannelInfo, registerChannelzSubchannel, ChannelzCallTracker, + ChannelzCallTrackerStub, unregisterChannelzRef, + ChannelzTraceStub, } from './channelz'; import { ConnectivityStateListener, @@ -89,12 +92,15 @@ export class Subchannel { // Channelz info private readonly channelzEnabled: boolean = true; private channelzRef: SubchannelRef; - private channelzTrace: ChannelzTrace; - private callTracker = new ChannelzCallTracker(); - private childrenTracker = new ChannelzChildrenTracker(); + + private channelzTrace: ChannelzTrace | ChannelzTraceStub; + private callTracker: ChannelzCallTracker | ChannelzCallTrackerStub; + private childrenTracker: + | ChannelzChildrenTracker + | ChannelzChildrenTrackerStub; // Channelz socket info - private streamTracker = new ChannelzCallTracker(); + private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub; /** * A class representing a connection to a single backend. @@ -127,16 +133,24 @@ export class Subchannel { if (options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; + this.channelzTrace = new ChannelzTraceStub(); + this.callTracker = new ChannelzCallTrackerStub(); + this.childrenTracker = new ChannelzChildrenTrackerStub(); + this.streamTracker = new ChannelzCallTrackerStub(); + } else { + this.channelzTrace = new ChannelzTrace(); + this.callTracker = new ChannelzCallTracker(); + this.childrenTracker = new ChannelzChildrenTracker(); + this.streamTracker = new ChannelzCallTracker(); } - this.channelzTrace = new ChannelzTrace(); + this.channelzRef = registerChannelzSubchannel( this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled ); - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); - } + + this.channelzTrace.addTrace('CT_INFO', 'Subchannel created'); this.trace( 'Subchannel constructed with options ' + JSON.stringify(options, undefined, 2) @@ -338,12 +352,8 @@ export class Subchannel { this.refTrace('refcount ' + this.refcount + ' -> ' + (this.refcount - 1)); this.refcount -= 1; if (this.refcount === 0) { - if (this.channelzEnabled) { - this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); - } - if (this.channelzEnabled) { - unregisterChannelzRef(this.channelzRef); - } + this.channelzTrace.addTrace('CT_INFO', 'Shutting down'); + unregisterChannelzRef(this.channelzRef); process.nextTick(() => { this.transitionToState( [ConnectivityState.CONNECTING, ConnectivityState.READY], diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index c4941b068..620488635 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -28,6 +28,7 @@ import { ChannelCredentials } from './channel-credentials'; import { ChannelOptions } from './channel-options'; import { ChannelzCallTracker, + ChannelzCallTrackerStub, registerChannelzSocket, SocketInfo, SocketRef, @@ -136,7 +137,7 @@ class Http2Transport implements Transport { // Channelz info private channelzRef: SocketRef; private readonly channelzEnabled: boolean = true; - private streamTracker = new ChannelzCallTracker(); + private streamTracker: ChannelzCallTracker | ChannelzCallTrackerStub; private keepalivesSent = 0; private messagesSent = 0; private messagesReceived = 0; @@ -159,12 +160,17 @@ class Http2Transport implements Transport { if (options['grpc.enable_channelz'] === 0) { this.channelzEnabled = false; + this.streamTracker = new ChannelzCallTrackerStub(); + } else { + this.streamTracker = new ChannelzCallTracker(); } + this.channelzRef = registerChannelzSocket( this.subchannelAddressString, () => this.getChannelzInfo(), this.channelzEnabled ); + // Build user-agent string. this.userAgent = [ options['grpc.primary_user_agent'], @@ -192,6 +198,7 @@ class Http2Transport implements Transport { this.stopKeepalivePings(); this.handleDisconnect(); }); + session.once( 'goaway', (errorCode: number, lastStreamID: number, opaqueData?: Buffer) => { @@ -214,11 +221,13 @@ class Http2Transport implements Transport { this.reportDisconnectToOwner(tooManyPings); } ); + session.once('error', error => { /* Do nothing here. Any error should also trigger a close event, which is * where we want to handle that. */ this.trace('connection closed with error ' + (error as Error).message); }); + if (logging.isTracerEnabled(TRACER_NAME)) { session.on('remoteSettings', (settings: http2.Settings) => { this.trace( @@ -237,6 +246,7 @@ class Http2Transport implements Transport { ); }); } + /* Start the keepalive timer last, because this can trigger trace logs, * which should only happen after everything else is set up. */ if (this.keepaliveWithoutCalls) { @@ -625,6 +635,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { private session: http2.ClientHttp2Session | null = null; private isShutdown = false; constructor(private channelTarget: GrpcUri) {} + private trace(text: string) { logging.trace( LogVerbosity.DEBUG, @@ -632,6 +643,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { uriToString(this.channelTarget) + ' ' + text ); } + private createSession( address: SubchannelAddress, credentials: ChannelCredentials, @@ -641,6 +653,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { if (this.isShutdown) { return Promise.reject(); } + return new Promise((resolve, reject) => { let remoteName: string | null; if (proxyConnectionResult.realTarget) { @@ -767,6 +780,7 @@ export class Http2SubchannelConnector implements SubchannelConnector { }); }); } + connect( address: SubchannelAddress, credentials: ChannelCredentials, diff --git a/packages/grpc-js/test/common.ts b/packages/grpc-js/test/common.ts index 88aa129aa..eaa701f18 100644 --- a/packages/grpc-js/test/common.ts +++ b/packages/grpc-js/test/common.ts @@ -31,7 +31,7 @@ import { HealthListener, SubchannelInterface, } from '../src/subchannel-interface'; -import { SubchannelRef } from '../src/channelz'; +import { EntityTypes, SubchannelRef } from '../src/channelz'; import { Subchannel } from '../src/subchannel'; import { ConnectivityState } from '../src/connectivity-state'; @@ -196,7 +196,7 @@ export class MockSubchannel implements SubchannelInterface { unref(): void {} getChannelzRef(): SubchannelRef { return { - kind: 'subchannel', + kind: EntityTypes.subchannel, id: -1, name: this.address, }; From a4a676d3788976ef3e8bb1049f37b971eb8d8d4d Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 27 Feb 2024 14:17:32 -0800 Subject: [PATCH 02/11] chore: move new functions towards the end of the class --- packages/grpc-js/src/server.ts | 362 ++++++++++++++++----------------- 1 file changed, 181 insertions(+), 181 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 0a5bc0e9b..1229b8c6c 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1191,6 +1191,187 @@ export class Server { channelzSessionInfo?.streamTracker.addCallFailed(); } + private _channelzHandler(http2Server: http2.Http2Server) { + return ( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) => { + // for handling idle timeout + this.onStreamOpened(stream); + + const channelzSessionInfo = this.sessions.get( + stream.session as http2.ServerHttp2Session + ); + + this.callTracker.addCallStarted(); + channelzSessionInfo?.streamTracker.addCallStarted(); + + if (!this._verifyContentType(stream, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + return; + } + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + channelzSessionInfo + ); + return; + } + + const callEventTracker: CallEventTracker = { + addMessageSent: () => { + if (channelzSessionInfo) { + channelzSessionInfo.messagesSent += 1; + channelzSessionInfo.lastMessageSentTimestamp = new Date(); + } + }, + addMessageReceived: () => { + if (channelzSessionInfo) { + channelzSessionInfo.messagesReceived += 1; + channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); + } + }, + onCallEnd: status => { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }, + onStreamEnd: success => { + if (channelzSessionInfo) { + if (success) { + channelzSessionInfo.streamTracker.addCallSucceeded(); + } else { + channelzSessionInfo.streamTracker.addCallFailed(); + } + } + }, + }; + + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + callEventTracker, + handler, + this.options + ); + + if (!this._runHandlerForCall(call, handler)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + + call.sendStatus({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } + }; + } + + private _streamHandler(http2Server: http2.Http2Server) { + return ( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) => { + // for handling idle timeout + this.onStreamOpened(stream); + + if (this._verifyContentType(stream, headers) !== true) { + return; + } + + const path = headers[HTTP2_HEADER_PATH] as string; + + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + null + ); + return; + } + + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + null, + handler, + this.options + ); + + if (!this._runHandlerForCall(call, handler)) { + call.sendStatus({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } + }; + } + + private _runHandlerForCall( + call: ServerInterceptingCallInterface, + handler: + | UntypedUnaryHandler + | UntypedClientStreamingHandler + | UntypedServerStreamingHandler + | UntypedBidiStreamingHandler + ): boolean { + const { type } = handler; + if (type === 'unary') { + handleUnary(call, handler); + } else if (type === 'clientStream') { + handleClientStreaming(call, handler); + } else if (type === 'serverStream') { + handleServerStreaming(call, handler); + } else if (type === 'bidi') { + handleBidiStreaming(call, handler); + } else { + return false; + } + + return true; + } + + private _setupHandlers( + http2Server: http2.Http2Server | http2.Http2SecureServer + ): void { + if (http2Server === null) { + return; + } + + const serverAddress = http2Server.address(); + let serverAddressString = 'null'; + if (serverAddress) { + if (typeof serverAddress === 'string') { + serverAddressString = serverAddress; + } else { + serverAddressString = serverAddress.address + ':' + serverAddress.port; + } + } + this.serverAddressString = serverAddressString; + + const handler = this.channelzEnabled + ? this._channelzHandler(http2Server) + : this._streamHandler(http2Server); + + const sessionHandler = this.channelzEnabled + ? this._channelzSessionHandler(http2Server) + : this._sessionHandler(http2Server); + + http2Server.on('stream', handler); + http2Server.on('session', sessionHandler); + } + private _sessionHandler( http2Server: http2.Http2Server | http2.Http2SecureServer ) { @@ -1432,187 +1613,6 @@ export class Server { }; } - private _channelzHandler(http2Server: http2.Http2Server) { - return ( - stream: http2.ServerHttp2Stream, - headers: http2.IncomingHttpHeaders - ) => { - // for handling idle timeout - this.onStreamOpened(stream); - - const channelzSessionInfo = this.sessions.get( - stream.session as http2.ServerHttp2Session - ); - - this.callTracker.addCallStarted(); - channelzSessionInfo?.streamTracker.addCallStarted(); - - if (!this._verifyContentType(stream, headers)) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); - return; - } - - const path = headers[HTTP2_HEADER_PATH] as string; - - const handler = this._retrieveHandler(path); - if (!handler) { - this._respondWithError( - getUnimplementedStatusResponse(path), - stream, - channelzSessionInfo - ); - return; - } - - const callEventTracker: CallEventTracker = { - addMessageSent: () => { - if (channelzSessionInfo) { - channelzSessionInfo.messagesSent += 1; - channelzSessionInfo.lastMessageSentTimestamp = new Date(); - } - }, - addMessageReceived: () => { - if (channelzSessionInfo) { - channelzSessionInfo.messagesReceived += 1; - channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); - } - }, - onCallEnd: status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); - } else { - this.callTracker.addCallFailed(); - } - }, - onStreamEnd: success => { - if (channelzSessionInfo) { - if (success) { - channelzSessionInfo.streamTracker.addCallSucceeded(); - } else { - channelzSessionInfo.streamTracker.addCallFailed(); - } - } - }, - }; - - const call = getServerInterceptingCall( - this.interceptors, - stream, - headers, - callEventTracker, - handler, - this.options - ); - - if (!this._runHandlerForCall(call, handler)) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); - - call.sendStatus({ - code: Status.INTERNAL, - details: `Unknown handler type: ${handler.type}`, - }); - } - }; - } - - private _streamHandler(http2Server: http2.Http2Server) { - return ( - stream: http2.ServerHttp2Stream, - headers: http2.IncomingHttpHeaders - ) => { - // for handling idle timeout - this.onStreamOpened(stream); - - if (this._verifyContentType(stream, headers) !== true) { - return; - } - - const path = headers[HTTP2_HEADER_PATH] as string; - - const handler = this._retrieveHandler(path); - if (!handler) { - this._respondWithError( - getUnimplementedStatusResponse(path), - stream, - null - ); - return; - } - - const call = getServerInterceptingCall( - this.interceptors, - stream, - headers, - null, - handler, - this.options - ); - - if (!this._runHandlerForCall(call, handler)) { - call.sendStatus({ - code: Status.INTERNAL, - details: `Unknown handler type: ${handler.type}`, - }); - } - }; - } - - private _runHandlerForCall( - call: ServerInterceptingCallInterface, - handler: - | UntypedUnaryHandler - | UntypedClientStreamingHandler - | UntypedServerStreamingHandler - | UntypedBidiStreamingHandler - ): boolean { - const { type } = handler; - if (type === 'unary') { - handleUnary(call, handler); - } else if (type === 'clientStream') { - handleClientStreaming(call, handler); - } else if (type === 'serverStream') { - handleServerStreaming(call, handler); - } else if (type === 'bidi') { - handleBidiStreaming(call, handler); - } else { - return false; - } - - return true; - } - - private _setupHandlers( - http2Server: http2.Http2Server | http2.Http2SecureServer - ): void { - if (http2Server === null) { - return; - } - - const serverAddress = http2Server.address(); - let serverAddressString = 'null'; - if (serverAddress) { - if (typeof serverAddress === 'string') { - serverAddressString = serverAddress; - } else { - serverAddressString = serverAddress.address + ':' + serverAddress.port; - } - } - this.serverAddressString = serverAddressString; - - const handler = this.channelzEnabled - ? this._channelzHandler(http2Server) - : this._streamHandler(http2Server); - - const sessionHandler = this.channelzEnabled - ? this._channelzSessionHandler(http2Server) - : this._sessionHandler(http2Server); - - http2Server.on('stream', handler); - http2Server.on('session', sessionHandler); - } - private enableIdleTimeout(session: http2.ServerHttp2Session) { const idleTimeoutObj = { activeStreams: 0, From b8f157ed21add2daffe11e320781e345d7cc38fb Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 27 Feb 2024 14:30:55 -0800 Subject: [PATCH 03/11] chore: revert interface -> type change in channelz --- packages/grpc-js/src/channelz.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 6d70b7543..4eab762a8 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -330,25 +330,25 @@ export interface SocketInfo { remoteFlowControlWindow: number | null; } -type ChannelEntry = { +interface ChannelEntry { ref: ChannelRef; getInfo(): ChannelInfo; -}; +} -type SubchannelEntry = { +interface SubchannelEntry { ref: SubchannelRef; getInfo(): SubchannelInfo; -}; +} -type ServerEntry = { +interface ServerEntry { ref: ServerRef; getInfo(): ServerInfo; -}; +} -type SocketEntry = { +interface SocketEntry { ref: SocketRef; getInfo(): SocketInfo; -}; +} export const enum EntityTypes { channel = 'channel', From 0b79b7420a77564babca5e8718ef95c6aedbea71 Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 27 Feb 2024 14:35:02 -0800 Subject: [PATCH 04/11] chore: cleanup traces --- packages/grpc-js/src/server.ts | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 1229b8c6c..67b5fdf73 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1586,9 +1586,7 @@ export class Server { 'Connection dropped by client ' + clientAddress ); } - this.trace( - `DROPPING ${channelzRef.name} - ${channelzRef.kind} - ${channelzRef.id}` - ); + this.sessionChildrenTracker.unrefChild(channelzRef); unregisterChannelzRef(channelzRef); @@ -1637,7 +1635,7 @@ export class Server { private onStreamOpened(stream: http2.ServerHttp2Stream) { const session = stream.session as http2.ServerHttp2Session; - this.trace(`Stream opened for ${session.socket?.remoteAddress}`); + const idleTimeoutObj = this.sessionIdleTimeouts.get(session); if (idleTimeoutObj) { idleTimeoutObj.activeStreams += 1; @@ -1646,26 +1644,16 @@ export class Server { idleTimeoutObj.timeout = null; } - this.trace( - `onStreamOpened: adding on stream close event for ${session.socket?.remoteAddress}` - ); stream.once('close', () => this.onStreamClose(session)); - } else { - this.trace( - `onStreamOpened: missing stream for ${session.socket?.remoteAddress}` - ); } } private onStreamClose(session: http2.ServerHttp2Session) { - this.trace(`Stream closed for ${session.socket?.remoteAddress}`); const idleTimeoutObj = this.sessionIdleTimeouts.get(session); + if (idleTimeoutObj) { idleTimeoutObj.activeStreams -= 1; if (idleTimeoutObj.activeStreams === 0) { - this.trace( - `onStreamClose: set idle timeout for ${this.sessionIdleTimeout}ms ${session.socket?.remoteAddress}` - ); idleTimeoutObj.timeout = setTimeout( this.onIdleTimeout, this.sessionIdleTimeout, From 74102fcc872759d18a62cf5098256eb521064b0c Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 27 Feb 2024 14:39:24 -0800 Subject: [PATCH 05/11] chore: extraneous closure, dont need server ref --- packages/grpc-js/src/server.ts | 216 ++++++++++++++++----------------- 1 file changed, 106 insertions(+), 110 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 67b5fdf73..eed097f83 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -1191,131 +1191,127 @@ export class Server { channelzSessionInfo?.streamTracker.addCallFailed(); } - private _channelzHandler(http2Server: http2.Http2Server) { - return ( - stream: http2.ServerHttp2Stream, - headers: http2.IncomingHttpHeaders - ) => { - // for handling idle timeout - this.onStreamOpened(stream); - - const channelzSessionInfo = this.sessions.get( - stream.session as http2.ServerHttp2Session - ); + private _channelzHandler( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) { + // for handling idle timeout + this.onStreamOpened(stream); - this.callTracker.addCallStarted(); - channelzSessionInfo?.streamTracker.addCallStarted(); + const channelzSessionInfo = this.sessions.get( + stream.session as http2.ServerHttp2Session + ); - if (!this._verifyContentType(stream, headers)) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); - return; - } + this.callTracker.addCallStarted(); + channelzSessionInfo?.streamTracker.addCallStarted(); - const path = headers[HTTP2_HEADER_PATH] as string; + if (!this._verifyContentType(stream, headers)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); + return; + } - const handler = this._retrieveHandler(path); - if (!handler) { - this._respondWithError( - getUnimplementedStatusResponse(path), - stream, - channelzSessionInfo - ); - return; - } + const path = headers[HTTP2_HEADER_PATH] as string; - const callEventTracker: CallEventTracker = { - addMessageSent: () => { - if (channelzSessionInfo) { - channelzSessionInfo.messagesSent += 1; - channelzSessionInfo.lastMessageSentTimestamp = new Date(); - } - }, - addMessageReceived: () => { - if (channelzSessionInfo) { - channelzSessionInfo.messagesReceived += 1; - channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); - } - }, - onCallEnd: status => { - if (status.code === Status.OK) { - this.callTracker.addCallSucceeded(); + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), + stream, + channelzSessionInfo + ); + return; + } + + const callEventTracker: CallEventTracker = { + addMessageSent: () => { + if (channelzSessionInfo) { + channelzSessionInfo.messagesSent += 1; + channelzSessionInfo.lastMessageSentTimestamp = new Date(); + } + }, + addMessageReceived: () => { + if (channelzSessionInfo) { + channelzSessionInfo.messagesReceived += 1; + channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); + } + }, + onCallEnd: status => { + if (status.code === Status.OK) { + this.callTracker.addCallSucceeded(); + } else { + this.callTracker.addCallFailed(); + } + }, + onStreamEnd: success => { + if (channelzSessionInfo) { + if (success) { + channelzSessionInfo.streamTracker.addCallSucceeded(); } else { - this.callTracker.addCallFailed(); - } - }, - onStreamEnd: success => { - if (channelzSessionInfo) { - if (success) { - channelzSessionInfo.streamTracker.addCallSucceeded(); - } else { - channelzSessionInfo.streamTracker.addCallFailed(); - } + channelzSessionInfo.streamTracker.addCallFailed(); } - }, - }; + } + }, + }; - const call = getServerInterceptingCall( - this.interceptors, - stream, - headers, - callEventTracker, - handler, - this.options - ); + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + callEventTracker, + handler, + this.options + ); - if (!this._runHandlerForCall(call, handler)) { - this.callTracker.addCallFailed(); - channelzSessionInfo?.streamTracker.addCallFailed(); + if (!this._runHandlerForCall(call, handler)) { + this.callTracker.addCallFailed(); + channelzSessionInfo?.streamTracker.addCallFailed(); - call.sendStatus({ - code: Status.INTERNAL, - details: `Unknown handler type: ${handler.type}`, - }); - } - }; + call.sendStatus({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } } - private _streamHandler(http2Server: http2.Http2Server) { - return ( - stream: http2.ServerHttp2Stream, - headers: http2.IncomingHttpHeaders - ) => { - // for handling idle timeout - this.onStreamOpened(stream); - - if (this._verifyContentType(stream, headers) !== true) { - return; - } + private _streamHandler( + stream: http2.ServerHttp2Stream, + headers: http2.IncomingHttpHeaders + ) { + // for handling idle timeout + this.onStreamOpened(stream); - const path = headers[HTTP2_HEADER_PATH] as string; + if (this._verifyContentType(stream, headers) !== true) { + return; + } - const handler = this._retrieveHandler(path); - if (!handler) { - this._respondWithError( - getUnimplementedStatusResponse(path), - stream, - null - ); - return; - } + const path = headers[HTTP2_HEADER_PATH] as string; - const call = getServerInterceptingCall( - this.interceptors, + const handler = this._retrieveHandler(path); + if (!handler) { + this._respondWithError( + getUnimplementedStatusResponse(path), stream, - headers, - null, - handler, - this.options + null ); + return; + } - if (!this._runHandlerForCall(call, handler)) { - call.sendStatus({ - code: Status.INTERNAL, - details: `Unknown handler type: ${handler.type}`, - }); - } - }; + const call = getServerInterceptingCall( + this.interceptors, + stream, + headers, + null, + handler, + this.options + ); + + if (!this._runHandlerForCall(call, handler)) { + call.sendStatus({ + code: Status.INTERNAL, + details: `Unknown handler type: ${handler.type}`, + }); + } } private _runHandlerForCall( @@ -1361,14 +1357,14 @@ export class Server { this.serverAddressString = serverAddressString; const handler = this.channelzEnabled - ? this._channelzHandler(http2Server) - : this._streamHandler(http2Server); + ? this._channelzHandler + : this._streamHandler; const sessionHandler = this.channelzEnabled ? this._channelzSessionHandler(http2Server) : this._sessionHandler(http2Server); - http2Server.on('stream', handler); + http2Server.on('stream', handler.bind(this)); http2Server.on('session', sessionHandler); } From 11a98b5f373ff32ba5f4aceabfa8a98197ec1675 Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 27 Feb 2024 16:49:20 -0800 Subject: [PATCH 06/11] chore: updated docs, cached onStreamClose per session --- packages/grpc-js/README.md | 3 + packages/grpc-js/src/channel-options.ts | 1 + packages/grpc-js/src/server.ts | 188 ++++++++++++++---------- 3 files changed, 116 insertions(+), 76 deletions(-) diff --git a/packages/grpc-js/README.md b/packages/grpc-js/README.md index eb04ece2f..f3b682f3c 100644 --- a/packages/grpc-js/README.md +++ b/packages/grpc-js/README.md @@ -60,6 +60,9 @@ Many channel arguments supported in `grpc` are not supported in `@grpc/grpc-js`. - `grpc.enable_channelz` - `grpc.dns_min_time_between_resolutions_ms` - `grpc.enable_retries` + - `grpc.max_connection_age_ms` + - `grpc.max_connection_age_grace_ms` + - `grpc.max_connection_idle_ms` - `grpc.per_rpc_retry_buffer_size` - `grpc.retry_buffer_size` - `grpc.service_config_disable_resolution` diff --git a/packages/grpc-js/src/channel-options.ts b/packages/grpc-js/src/channel-options.ts index aa1e6c83e..6804852e2 100644 --- a/packages/grpc-js/src/channel-options.ts +++ b/packages/grpc-js/src/channel-options.ts @@ -54,6 +54,7 @@ export interface ChannelOptions { 'grpc.retry_buffer_size'?: number; 'grpc.max_connection_age_ms'?: number; 'grpc.max_connection_age_grace_ms'?: number; + 'grpc.max_connection_idle_ms'?: number; 'grpc-node.max_session_memory'?: number; 'grpc.service_config_disable_resolution'?: number; 'grpc.client_idle_timeout_ms'?: number; diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index eed097f83..8a3a29fd8 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -90,7 +90,7 @@ import { CallEventTracker } from './transport'; const UNLIMITED_CONNECTION_AGE_MS = ~(1 << 31); const KEEPALIVE_MAX_TIME_MS = ~(1 << 31); const KEEPALIVE_TIMEOUT_MS = 20000; -const MAX_CONNECTION_IDLE_MS = 30 * 60 * 1e3; // 30 min +const MAX_CONNECTION_IDLE_MS = ~(1 << 31); const { HTTP2_HEADER_PATH } = http2.constants; @@ -241,6 +241,12 @@ interface Http2ServerInfo { sessions: Set; } +interface SessionIdleTimeoutTracker { + activeStreams: number; + timeout: NodeJS.Timeout | null; + onClose: (session: http2.ServerHttp2Session) => void | null; +} + export interface ServerOptions extends ChannelOptions { interceptors?: ServerInterceptor[]; } @@ -249,11 +255,8 @@ export class Server { private boundPorts: Map = new Map(); private http2Servers: Map = new Map(); private sessionIdleTimeouts = new Map< - http2.Http2Session, - { - activeStreams: number; - timeout: NodeJS.Timeout | null; - } + http2.ServerHttp2Session, + SessionIdleTimeoutTracker >(); private handlers: Map = new Map< @@ -330,7 +333,7 @@ export class Server { this.keepaliveTimeoutMs = this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS; this.sessionIdleTimeout = - this.options['grpc.max_connection_idle'] ?? MAX_CONNECTION_IDLE_MS; + this.options['grpc.max_connection_idle_ms'] ?? MAX_CONNECTION_IDLE_MS; this.commonServerOptions = { maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, @@ -903,7 +906,6 @@ export class Server { if (sessionInfo) { this.sessionChildrenTracker.unrefChild(sessionInfo.ref); unregisterChannelzRef(sessionInfo.ref); - this.sessions.delete(session); } callback?.(); }; @@ -1001,7 +1003,7 @@ export class Server { for (const session of allSessions) { session.destroy(http2.constants.NGHTTP2_CANCEL as any); } - }, graceTimeMs).unref?.(); + }, graceTimeMs).unref(); } forceShutdown(): void { @@ -1376,6 +1378,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; + let keeapliveTimeTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1389,7 +1392,8 @@ export class Server { sessionClosedByServer = true; this.trace( - `Connection dropped by max connection age: ${session.socket?.remoteAddress}` + 'Connection dropped by max connection age: ' + + session.socket?.remoteAddress ); try { @@ -1410,36 +1414,38 @@ export class Server { if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { connectionAgeGraceTimer = setTimeout(() => { session.destroy(); - }, this.maxConnectionAgeGraceMs).unref?.(); + }, this.maxConnectionAgeGraceMs).unref(); } - }, this.maxConnectionAgeMs + jitter).unref?.(); + }, this.maxConnectionAgeMs + jitter).unref(); } - const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { - const timeoutTImer = setTimeout(() => { - sessionClosedByServer = true; - session.close(); - }, this.keepaliveTimeoutMs).unref?.(); - - try { - session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(timeoutTImer); - - if (err) { - sessionClosedByServer = true; - this.trace( - `Connection dropped due to error of a ping frame ${err.message} return in ${duration}` - ); - session.close(); + if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { + keeapliveTimeTimer = setInterval(() => { + const timeoutTimer = setTimeout(() => { + sessionClosedByServer = true; + session.close(); + }, this.keepaliveTimeoutMs).unref(); + + try { + session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(timeoutTimer); + + if (err) { + sessionClosedByServer = true; + this.trace( + `Connection dropped due to error of a ping frame ${err.message} return in ${duration}` + ); + session.close(); + } } - } - ); - } catch (e) { - // The ping can't be sent because the session is already closed - session.destroy(); - } - }, this.keepaliveTimeMs).unref?.(); + ); + } catch (e) { + // The ping can't be sent because the session is already closed + session.destroy(); + } + }, this.keepaliveTimeMs).unref(); + } session.on('close', () => { if (!sessionClosedByServer) { @@ -1460,8 +1466,12 @@ export class Server { clearTimeout(keeapliveTimeTimer); } - clearTimeout(idleTimeoutObj.timeout); - this.sessionIdleTimeouts.delete(session); + if (idleTimeoutObj !== null) { + if (idleTimeoutObj.timeout !== null) { + clearTimeout(idleTimeoutObj.timeout); + } + this.sessionIdleTimeouts.delete(session); + } this.http2Servers.get(http2Server)?.sessions.delete(session); }); @@ -1503,6 +1513,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; + let keeapliveTimeTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1537,43 +1548,48 @@ export class Server { if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { connectionAgeGraceTimer = setTimeout(() => { session.destroy(); - }, this.maxConnectionAgeGraceMs).unref?.(); + }, this.maxConnectionAgeGraceMs).unref(); } - }, this.maxConnectionAgeMs + jitter).unref?.(); + }, this.maxConnectionAgeMs + jitter).unref(); } - const keeapliveTimeTimer: NodeJS.Timeout | null = setInterval(() => { - const timeoutTImer = setTimeout(() => { - sessionClosedByServer = true; - this.channelzTrace.addTrace( - 'CT_INFO', - 'Connection dropped by keepalive timeout from ' + clientAddress - ); + if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { + keeapliveTimeTimer = setInterval(() => { + const timeoutTImer = setTimeout(() => { + sessionClosedByServer = true; + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped by keepalive timeout from ' + clientAddress + ); - session.close(); - }, this.keepaliveTimeoutMs).unref?.(); - try { - session.ping( - (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(timeoutTImer); - - if (err) { - sessionClosedByServer = true; - this.channelzTrace.addTrace( - 'CT_INFO', - `Connection dropped due to error of a ping frame ${err.message} return in ${duration}` - ); - - session.close(); + session.close(); + }, this.keepaliveTimeoutMs).unref(); + try { + session.ping( + (err: Error | null, duration: number, payload: Buffer) => { + clearTimeout(timeoutTImer); + + if (err) { + sessionClosedByServer = true; + this.channelzTrace.addTrace( + 'CT_INFO', + 'Connection dropped due to error of a ping frame ' + + err.message + + ' return in ' + + duration + ); + + session.close(); + } } - } - ); - channelzSessionInfo.keepAlivesSent += 1; - } catch (e) { - // The ping can't be sent because the session is already closed - session.destroy(); - } - }, this.keepaliveTimeMs).unref?.(); + ); + channelzSessionInfo.keepAlivesSent += 1; + } catch (e) { + // The ping can't be sent because the session is already closed + session.destroy(); + } + }, this.keepaliveTimeMs).unref(); + } session.on('close', () => { if (!sessionClosedByServer) { @@ -1598,8 +1614,12 @@ export class Server { clearTimeout(keeapliveTimeTimer); } - clearTimeout(idleTimeoutObj.timeout); - this.sessionIdleTimeouts.delete(session); + if (idleTimeoutObj !== null) { + if (idleTimeoutObj.timeout !== null) { + clearTimeout(idleTimeoutObj.timeout); + } + this.sessionIdleTimeouts.delete(session); + } this.http2Servers.get(http2Server)?.sessions.delete(session); this.sessions.delete(session); @@ -1607,9 +1627,16 @@ export class Server { }; } - private enableIdleTimeout(session: http2.ServerHttp2Session) { + private enableIdleTimeout( + session: http2.ServerHttp2Session + ): SessionIdleTimeoutTracker | null { + if (this.sessionIdleTimeout >= MAX_CONNECTION_IDLE_MS) { + return null; + } + const idleTimeoutObj = { activeStreams: 0, + onClose: this.onStreamClose.bind(this, session), // so that we don't recreate it each time timeout: setTimeout( this.onIdleTimeout, this.sessionIdleTimeout, @@ -1619,13 +1646,22 @@ export class Server { }; this.sessionIdleTimeouts.set(session, idleTimeoutObj); - this.trace(`Enable idle timeout for ${session.socket?.remoteAddress}`); + const { socket } = session; + this.trace( + 'Enable idle timeout for ' + + socket.remoteAddress + + ':' + + socket.remotePort + ); return idleTimeoutObj; } private onIdleTimeout(ctx: Server, session: http2.ServerHttp2Session) { - ctx.trace(`Idle timeout for ${session.socket?.remoteAddress}`); + const { socket } = session; + ctx.trace( + 'Idle timeout for ' + socket?.remoteAddress + ':' + socket?.remotePort + ); ctx.closeSession(session); } @@ -1640,7 +1676,7 @@ export class Server { idleTimeoutObj.timeout = null; } - stream.once('close', () => this.onStreamClose(session)); + stream.once('close', idleTimeoutObj.onClose); } } From bedb5055e89b39125d9466ee6f3c504f130792f6 Mon Sep 17 00:00:00 2001 From: AVVS Date: Wed, 28 Feb 2024 13:36:24 -0800 Subject: [PATCH 07/11] refactor: no clearTimeout/null timers, use .refresh() + count refs --- packages/grpc-js/src/server.ts | 51 +++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 8a3a29fd8..32c18ea94 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -243,7 +243,8 @@ interface Http2ServerInfo { interface SessionIdleTimeoutTracker { activeStreams: number; - timeout: NodeJS.Timeout | null; + lastIdle: number; + timeout: NodeJS.Timeout; onClose: (session: http2.ServerHttp2Session) => void | null; } @@ -292,6 +293,7 @@ export class Server { private readonly keepaliveTimeoutMs: number; private readonly sessionIdleTimeout: number; + private readonly sessionHalfIdleTimeout: number; private readonly interceptors: ServerInterceptor[]; @@ -334,6 +336,7 @@ export class Server { this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS; this.sessionIdleTimeout = this.options['grpc.max_connection_idle_ms'] ?? MAX_CONNECTION_IDLE_MS; + this.sessionHalfIdleTimeout = Math.ceil(this.sessionIdleTimeout / 2); this.commonServerOptions = { maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, @@ -1467,9 +1470,7 @@ export class Server { } if (idleTimeoutObj !== null) { - if (idleTimeoutObj.timeout !== null) { - clearTimeout(idleTimeoutObj.timeout); - } + clearTimeout(idleTimeoutObj.timeout); this.sessionIdleTimeouts.delete(session); } @@ -1615,9 +1616,7 @@ export class Server { } if (idleTimeoutObj !== null) { - if (idleTimeoutObj.timeout !== null) { - clearTimeout(idleTimeoutObj.timeout); - } + clearTimeout(idleTimeoutObj.timeout); this.sessionIdleTimeouts.delete(session); } @@ -1634,12 +1633,14 @@ export class Server { return null; } - const idleTimeoutObj = { + const idleTimeoutObj: SessionIdleTimeoutTracker = { activeStreams: 0, + lastIdle: Date.now(), onClose: this.onStreamClose.bind(this, session), // so that we don't recreate it each time + // this is 50% of the actual timeout, we will check half-way through and .refresh() for a subsequent check timeout: setTimeout( this.onIdleTimeout, - this.sessionIdleTimeout, + this.sessionHalfIdleTimeout, this, session ).unref(), @@ -1660,9 +1661,25 @@ export class Server { private onIdleTimeout(ctx: Server, session: http2.ServerHttp2Session) { const { socket } = session; ctx.trace( - 'Idle timeout for ' + socket?.remoteAddress + ':' + socket?.remotePort + 'Session idle timeout checkpoint for ' + + socket?.remoteAddress + + ':' + + socket?.remotePort ); - ctx.closeSession(session); + + const sessionInfo = ctx.sessionIdleTimeouts.get(session); + // if it is called while we have activeStreams - timer will not be rescheduled + // until last active stream is closed, then it will call .refresh() on the timer + // important part is to not clearTimeout(timer) or it becomes unusable + // for future refreshes + if (sessionInfo && sessionInfo.activeStreams === 0) { + const idleFor = Date.now() - sessionInfo.lastIdle; + if (idleFor >= this.sessionIdleTimeout) { + ctx.closeSession(session); + } else { + sessionInfo.timeout.refresh(); + } + } } private onStreamOpened(stream: http2.ServerHttp2Stream) { @@ -1671,11 +1688,6 @@ export class Server { const idleTimeoutObj = this.sessionIdleTimeouts.get(session); if (idleTimeoutObj) { idleTimeoutObj.activeStreams += 1; - if (idleTimeoutObj.timeout) { - clearTimeout(idleTimeoutObj.timeout); - idleTimeoutObj.timeout = null; - } - stream.once('close', idleTimeoutObj.onClose); } } @@ -1686,12 +1698,7 @@ export class Server { if (idleTimeoutObj) { idleTimeoutObj.activeStreams -= 1; if (idleTimeoutObj.activeStreams === 0) { - idleTimeoutObj.timeout = setTimeout( - this.onIdleTimeout, - this.sessionIdleTimeout, - this, - session - ).unref(); + idleTimeoutObj.timeout.refresh(); } } } From b873dce908ddfefc42da43be8a612c4cdc2eefcc Mon Sep 17 00:00:00 2001 From: AVVS Date: Wed, 28 Feb 2024 14:26:42 -0800 Subject: [PATCH 08/11] chore: simplify idle timeout further, fix wrong ref --- packages/grpc-js/src/server.ts | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 32c18ea94..46fecc2e3 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -293,7 +293,6 @@ export class Server { private readonly keepaliveTimeoutMs: number; private readonly sessionIdleTimeout: number; - private readonly sessionHalfIdleTimeout: number; private readonly interceptors: ServerInterceptor[]; @@ -336,7 +335,6 @@ export class Server { this.options['grpc.keepalive_timeout_ms'] ?? KEEPALIVE_TIMEOUT_MS; this.sessionIdleTimeout = this.options['grpc.max_connection_idle_ms'] ?? MAX_CONNECTION_IDLE_MS; - this.sessionHalfIdleTimeout = Math.ceil(this.sessionIdleTimeout / 2); this.commonServerOptions = { maxSendHeaderBlockLength: Number.MAX_SAFE_INTEGER, @@ -1636,11 +1634,10 @@ export class Server { const idleTimeoutObj: SessionIdleTimeoutTracker = { activeStreams: 0, lastIdle: Date.now(), - onClose: this.onStreamClose.bind(this, session), // so that we don't recreate it each time - // this is 50% of the actual timeout, we will check half-way through and .refresh() for a subsequent check + onClose: this.onStreamClose.bind(this, session), timeout: setTimeout( this.onIdleTimeout, - this.sessionHalfIdleTimeout, + this.sessionIdleTimeout, this, session ).unref(), @@ -1658,7 +1655,11 @@ export class Server { return idleTimeoutObj; } - private onIdleTimeout(ctx: Server, session: http2.ServerHttp2Session) { + private onIdleTimeout( + this: undefined, + ctx: Server, + session: http2.ServerHttp2Session + ) { const { socket } = session; ctx.trace( 'Session idle timeout checkpoint for ' + @@ -1668,17 +1669,17 @@ export class Server { ); const sessionInfo = ctx.sessionIdleTimeouts.get(session); + // if it is called while we have activeStreams - timer will not be rescheduled // until last active stream is closed, then it will call .refresh() on the timer // important part is to not clearTimeout(timer) or it becomes unusable // for future refreshes - if (sessionInfo && sessionInfo.activeStreams === 0) { - const idleFor = Date.now() - sessionInfo.lastIdle; - if (idleFor >= this.sessionIdleTimeout) { - ctx.closeSession(session); - } else { - sessionInfo.timeout.refresh(); - } + if ( + sessionInfo !== undefined && + sessionInfo.activeStreams === 0 && + Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout + ) { + ctx.closeSession(session); } } @@ -1698,6 +1699,7 @@ export class Server { if (idleTimeoutObj) { idleTimeoutObj.activeStreams -= 1; if (idleTimeoutObj.activeStreams === 0) { + idleTimeoutObj.lastIdle = Date.now(); idleTimeoutObj.timeout.refresh(); } } From 62e8ea97e659ea3c98ab77a38702fb5e0b67fed8 Mon Sep 17 00:00:00 2001 From: AVVS Date: Sat, 2 Mar 2024 07:58:54 -0800 Subject: [PATCH 09/11] chore: tests & cleanup of unref?.() --- packages/grpc-js/package.json | 2 +- .../grpc-js/src/load-balancer-pick-first.ts | 3 +- packages/grpc-js/src/resolver-dns.ts | 3 +- .../grpc-js/src/resolving-load-balancer.ts | 1 + packages/grpc-js/src/server.ts | 207 ++++++++++-------- packages/grpc-js/src/transport.ts | 3 +- packages/grpc-js/test/common.ts | 21 ++ packages/grpc-js/test/test-idle-timer.ts | 86 ++++++++ 8 files changed, 235 insertions(+), 91 deletions(-) diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 34d8b558b..1c7935473 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -20,9 +20,9 @@ "@types/lodash": "^4.14.202", "@types/mocha": "^10.0.6", "@types/ncp": "^2.0.8", + "@types/node": ">=20.11.20", "@types/pify": "^5.0.4", "@types/semver": "^7.5.8", - "@types/node": ">=20.11.20", "@typescript-eslint/eslint-plugin": "^7.1.0", "@typescript-eslint/parser": "^7.1.0", "@typescript-eslint/typescript-estree": "^7.1.0", diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 29bbfbf07..f6c43b33d 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -415,7 +415,8 @@ export class PickFirstLoadBalancer implements LoadBalancer { } this.connectionDelayTimeout = setTimeout(() => { this.startNextSubchannelConnecting(subchannelIndex + 1); - }, CONNECTION_DELAY_INTERVAL_MS).unref?.(); + }, CONNECTION_DELAY_INTERVAL_MS); + this.connectionDelayTimeout.unref?.(); } private pickSubchannel(subchannel: SubchannelInterface) { diff --git a/packages/grpc-js/src/resolver-dns.ts b/packages/grpc-js/src/resolver-dns.ts index 6652839b0..6463c2656 100644 --- a/packages/grpc-js/src/resolver-dns.ts +++ b/packages/grpc-js/src/resolver-dns.ts @@ -309,7 +309,8 @@ class DnsResolver implements Resolver { if (this.continueResolving) { this.startResolutionWithBackoff(); } - }, this.minTimeBetweenResolutionsMs).unref?.(); + }, this.minTimeBetweenResolutionsMs); + this.nextResolutionTimer.unref?.(); this.isNextResolutionTimerRunning = true; } diff --git a/packages/grpc-js/src/resolving-load-balancer.ts b/packages/grpc-js/src/resolving-load-balancer.ts index 82c4ff436..72aef0dfd 100644 --- a/packages/grpc-js/src/resolving-load-balancer.ts +++ b/packages/grpc-js/src/resolving-load-balancer.ts @@ -212,6 +212,7 @@ export class ResolvingLoadBalancer implements LoadBalancer { methodConfig: [], }; } + this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); this.childLoadBalancer = new ChildLoadBalancerHandler( { diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index 46fecc2e3..b0fd5e7d3 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -95,6 +95,7 @@ const MAX_CONNECTION_IDLE_MS = ~(1 << 31); const { HTTP2_HEADER_PATH } = http2.constants; const TRACER_NAME = 'server'; +const kMaxAge = Buffer.from('max_age'); type AnyHttp2Server = http2.Http2Server | http2.Http2SecureServer; @@ -369,65 +370,61 @@ export class Server { private getChannelzSessionInfoGetter( session: http2.ServerHttp2Session - ): () => SocketInfo { - return () => { - const sessionInfo = this.sessions.get(session)!; - const sessionSocket = session.socket; - const remoteAddress = sessionSocket.remoteAddress - ? stringToSubchannelAddress( - sessionSocket.remoteAddress, - sessionSocket.remotePort - ) - : null; - const localAddress = sessionSocket.localAddress - ? stringToSubchannelAddress( - sessionSocket.localAddress!, - sessionSocket.localPort - ) - : null; - let tlsInfo: TlsInfo | null; - if (session.encrypted) { - const tlsSocket: TLSSocket = sessionSocket as TLSSocket; - const cipherInfo: CipherNameAndProtocol & { standardName?: string } = - tlsSocket.getCipher(); - const certificate = tlsSocket.getCertificate(); - const peerCertificate = tlsSocket.getPeerCertificate(); - tlsInfo = { - cipherSuiteStandardName: cipherInfo.standardName ?? null, - cipherSuiteOtherName: cipherInfo.standardName - ? null - : cipherInfo.name, - localCertificate: - certificate && 'raw' in certificate ? certificate.raw : null, - remoteCertificate: - peerCertificate && 'raw' in peerCertificate - ? peerCertificate.raw - : null, - }; - } else { - tlsInfo = null; - } - const socketInfo: SocketInfo = { - remoteAddress: remoteAddress, - localAddress: localAddress, - security: tlsInfo, - remoteName: null, - streamsStarted: sessionInfo.streamTracker.callsStarted, - streamsSucceeded: sessionInfo.streamTracker.callsSucceeded, - streamsFailed: sessionInfo.streamTracker.callsFailed, - messagesSent: sessionInfo.messagesSent, - messagesReceived: sessionInfo.messagesReceived, - keepAlivesSent: sessionInfo.keepAlivesSent, - lastLocalStreamCreatedTimestamp: null, - lastRemoteStreamCreatedTimestamp: - sessionInfo.streamTracker.lastCallStartedTimestamp, - lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp, - lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp, - localFlowControlWindow: session.state.localWindowSize ?? null, - remoteFlowControlWindow: session.state.remoteWindowSize ?? null, + ): SocketInfo { + const sessionInfo = this.sessions.get(session)!; + const sessionSocket = session.socket; + const remoteAddress = sessionSocket.remoteAddress + ? stringToSubchannelAddress( + sessionSocket.remoteAddress, + sessionSocket.remotePort + ) + : null; + const localAddress = sessionSocket.localAddress + ? stringToSubchannelAddress( + sessionSocket.localAddress!, + sessionSocket.localPort + ) + : null; + let tlsInfo: TlsInfo | null; + if (session.encrypted) { + const tlsSocket: TLSSocket = sessionSocket as TLSSocket; + const cipherInfo: CipherNameAndProtocol & { standardName?: string } = + tlsSocket.getCipher(); + const certificate = tlsSocket.getCertificate(); + const peerCertificate = tlsSocket.getPeerCertificate(); + tlsInfo = { + cipherSuiteStandardName: cipherInfo.standardName ?? null, + cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name, + localCertificate: + certificate && 'raw' in certificate ? certificate.raw : null, + remoteCertificate: + peerCertificate && 'raw' in peerCertificate + ? peerCertificate.raw + : null, }; - return socketInfo; + } else { + tlsInfo = null; + } + const socketInfo: SocketInfo = { + remoteAddress: remoteAddress, + localAddress: localAddress, + security: tlsInfo, + remoteName: null, + streamsStarted: sessionInfo.streamTracker.callsStarted, + streamsSucceeded: sessionInfo.streamTracker.callsSucceeded, + streamsFailed: sessionInfo.streamTracker.callsFailed, + messagesSent: sessionInfo.messagesSent, + messagesReceived: sessionInfo.messagesReceived, + keepAlivesSent: sessionInfo.keepAlivesSent, + lastLocalStreamCreatedTimestamp: null, + lastRemoteStreamCreatedTimestamp: + sessionInfo.streamTracker.lastCallStartedTimestamp, + lastMessageSentTimestamp: sessionInfo.lastMessageSentTimestamp, + lastMessageReceivedTimestamp: sessionInfo.lastMessageReceivedTimestamp, + localFlowControlWindow: session.state.localWindowSize ?? null, + remoteFlowControlWindow: session.state.remoteWindowSize ?? null, }; + return socketInfo; } private trace(text: string): void { @@ -1004,7 +1001,7 @@ export class Server { for (const session of allSessions) { session.destroy(http2.constants.NGHTTP2_CANCEL as any); } - }, graceTimeMs).unref(); + }, graceTimeMs).unref?.(); } forceShutdown(): void { @@ -1380,6 +1377,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let keeapliveTimeTimer: NodeJS.Timeout | null = null; + let keepaliveTimeoutTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1401,7 +1399,7 @@ export class Server { session.goaway( http2.constants.NGHTTP2_NO_ERROR, ~(1 << 31), - Buffer.from('max_age') + kMaxAge ); } catch (e) { // The goaway can't be sent because the session is already closed @@ -1415,37 +1413,47 @@ export class Server { if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { connectionAgeGraceTimer = setTimeout(() => { session.destroy(); - }, this.maxConnectionAgeGraceMs).unref(); + }, this.maxConnectionAgeGraceMs); + connectionAgeGraceTimer.unref?.(); } - }, this.maxConnectionAgeMs + jitter).unref(); + }, this.maxConnectionAgeMs + jitter); + connectionAgeTimer.unref?.(); } if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { keeapliveTimeTimer = setInterval(() => { - const timeoutTimer = setTimeout(() => { + keepaliveTimeoutTimer = setTimeout(() => { sessionClosedByServer = true; session.close(); - }, this.keepaliveTimeoutMs).unref(); + }, this.keepaliveTimeoutMs); + keepaliveTimeoutTimer.unref?.(); try { session.ping( (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(timeoutTimer); + if (keepaliveTimeoutTimer) { + clearTimeout(keepaliveTimeoutTimer); + } if (err) { sessionClosedByServer = true; this.trace( - `Connection dropped due to error of a ping frame ${err.message} return in ${duration}` + 'Connection dropped due to error of a ping frame ' + + err.message + + ' return in ' + + duration ); session.close(); } } ); } catch (e) { + clearTimeout(keepaliveTimeoutTimer); // The ping can't be sent because the session is already closed session.destroy(); } - }, this.keepaliveTimeMs).unref(); + }, this.keepaliveTimeMs); + keeapliveTimeTimer.unref?.(); } session.on('close', () => { @@ -1464,7 +1472,10 @@ export class Server { } if (keeapliveTimeTimer) { - clearTimeout(keeapliveTimeTimer); + clearInterval(keeapliveTimeTimer); + if (keepaliveTimeoutTimer) { + clearTimeout(keepaliveTimeoutTimer); + } } if (idleTimeoutObj !== null) { @@ -1483,15 +1494,13 @@ export class Server { return (session: http2.ServerHttp2Session) => { const channelzRef = registerChannelzSocket( session.socket?.remoteAddress ?? 'unknown', - this.getChannelzSessionInfoGetter(session), + this.getChannelzSessionInfoGetter.bind(this, session), this.channelzEnabled ); const channelzSessionInfo: ChannelzSessionInfo = { ref: channelzRef, - streamTracker: this.channelzEnabled - ? new ChannelzCallTracker() - : new ChannelzCallTrackerStub(), + streamTracker: new ChannelzCallTracker(), messagesSent: 0, messagesReceived: 0, keepAlivesSent: 0, @@ -1513,6 +1522,7 @@ export class Server { let connectionAgeTimer: NodeJS.Timeout | null = null; let connectionAgeGraceTimer: NodeJS.Timeout | null = null; let keeapliveTimeTimer: NodeJS.Timeout | null = null; + let keepaliveTimeoutTimer: NodeJS.Timeout | null = null; let sessionClosedByServer = false; const idleTimeoutObj = this.enableIdleTimeout(session); @@ -1533,7 +1543,7 @@ export class Server { session.goaway( http2.constants.NGHTTP2_NO_ERROR, ~(1 << 31), - Buffer.from('max_age') + kMaxAge ); } catch (e) { // The goaway can't be sent because the session is already closed @@ -1547,14 +1557,16 @@ export class Server { if (this.maxConnectionAgeGraceMs !== UNLIMITED_CONNECTION_AGE_MS) { connectionAgeGraceTimer = setTimeout(() => { session.destroy(); - }, this.maxConnectionAgeGraceMs).unref(); + }, this.maxConnectionAgeGraceMs); + connectionAgeGraceTimer.unref?.(); } - }, this.maxConnectionAgeMs + jitter).unref(); + }, this.maxConnectionAgeMs + jitter); + connectionAgeTimer.unref?.(); } if (this.keepaliveTimeMs < KEEPALIVE_MAX_TIME_MS) { keeapliveTimeTimer = setInterval(() => { - const timeoutTImer = setTimeout(() => { + keepaliveTimeoutTimer = setTimeout(() => { sessionClosedByServer = true; this.channelzTrace.addTrace( 'CT_INFO', @@ -1562,11 +1574,15 @@ export class Server { ); session.close(); - }, this.keepaliveTimeoutMs).unref(); + }, this.keepaliveTimeoutMs); + keepaliveTimeoutTimer.unref?.(); + try { session.ping( (err: Error | null, duration: number, payload: Buffer) => { - clearTimeout(timeoutTImer); + if (keepaliveTimeoutTimer) { + clearTimeout(keepaliveTimeoutTimer); + } if (err) { sessionClosedByServer = true; @@ -1584,10 +1600,12 @@ export class Server { ); channelzSessionInfo.keepAlivesSent += 1; } catch (e) { + clearTimeout(keepaliveTimeoutTimer); // The ping can't be sent because the session is already closed session.destroy(); } - }, this.keepaliveTimeMs).unref(); + }, this.keepaliveTimeMs); + keeapliveTimeTimer.unref?.(); } session.on('close', () => { @@ -1610,7 +1628,10 @@ export class Server { } if (keeapliveTimeTimer) { - clearTimeout(keeapliveTimeTimer); + clearInterval(keeapliveTimeTimer); + if (keepaliveTimeoutTimer) { + clearTimeout(keepaliveTimeoutTimer); + } } if (idleTimeoutObj !== null) { @@ -1640,8 +1661,9 @@ export class Server { this.sessionIdleTimeout, this, session - ).unref(), + ), }; + idleTimeoutObj.timeout.unref?.(); this.sessionIdleTimeouts.set(session, idleTimeoutObj); const { socket } = session; @@ -1661,13 +1683,6 @@ export class Server { session: http2.ServerHttp2Session ) { const { socket } = session; - ctx.trace( - 'Session idle timeout checkpoint for ' + - socket?.remoteAddress + - ':' + - socket?.remotePort - ); - const sessionInfo = ctx.sessionIdleTimeouts.get(session); // if it is called while we have activeStreams - timer will not be rescheduled @@ -1679,6 +1694,15 @@ export class Server { sessionInfo.activeStreams === 0 && Date.now() - sessionInfo.lastIdle >= ctx.sessionIdleTimeout ) { + ctx.trace( + 'Session idle timeout triggered for ' + + socket?.remoteAddress + + ':' + + socket?.remotePort + + ' last idle at ' + + sessionInfo.lastIdle + ); + ctx.closeSession(session); } } @@ -1701,6 +1725,15 @@ export class Server { if (idleTimeoutObj.activeStreams === 0) { idleTimeoutObj.lastIdle = Date.now(); idleTimeoutObj.timeout.refresh(); + + this.trace( + 'Session onStreamClose' + + session.socket?.remoteAddress + + ':' + + session.socket?.remotePort + + ' at ' + + idleTimeoutObj.lastIdle + ); } } } diff --git a/packages/grpc-js/src/transport.ts b/packages/grpc-js/src/transport.ts index 620488635..71d0f26b3 100644 --- a/packages/grpc-js/src/transport.ts +++ b/packages/grpc-js/src/transport.ts @@ -477,7 +477,8 @@ class Http2Transport implements Transport { ); this.keepaliveTimerId = setTimeout(() => { this.maybeSendPing(); - }, this.keepaliveTimeMs).unref?.(); + }, this.keepaliveTimeMs); + this.keepaliveTimerId.unref?.(); } /* Otherwise, there is already either a keepalive timer or a ping pending, * wait for those to resolve. */ diff --git a/packages/grpc-js/test/common.ts b/packages/grpc-js/test/common.ts index eaa701f18..fcdbb4500 100644 --- a/packages/grpc-js/test/common.ts +++ b/packages/grpc-js/test/common.ts @@ -140,6 +140,27 @@ export class TestClient { return this.client.getChannel().getConnectivityState(false); } + waitForClientState( + deadline: grpc.Deadline, + state: ConnectivityState, + callback: (error?: Error) => void + ) { + this.client + .getChannel() + .watchConnectivityState(this.getChannelState(), deadline, err => { + if (err) { + return callback(err); + } + + const currentState = this.getChannelState(); + if (currentState === state) { + callback(); + } else { + return this.waitForClientState(deadline, currentState, callback); + } + }); + } + close() { this.client.close(); } diff --git a/packages/grpc-js/test/test-idle-timer.ts b/packages/grpc-js/test/test-idle-timer.ts index 3fdeb1f64..a8f457e3f 100644 --- a/packages/grpc-js/test/test-idle-timer.ts +++ b/packages/grpc-js/test/test-idle-timer.ts @@ -128,3 +128,89 @@ describe('Channel idle timer', () => { }); }); }); + +describe('Server idle timer', () => { + let server: TestServer; + let client: TestClient | null = null; + before(() => { + server = new TestServer(false, { + 'grpc.max_connection_idle_ms': 500, // small for testing purposes + }); + return server.start(); + }); + afterEach(() => { + if (client) { + client.close(); + client = null; + } + }); + after(() => { + server.shutdown(); + }); + + it('Should go idle after the specified time after a request ends', function (done) { + this.timeout(5000); + client = TestClient.createFromServer(server); + client.sendRequest(error => { + assert.ifError(error); + assert.strictEqual( + client!.getChannelState(), + grpc.connectivityState.READY + ); + client?.waitForClientState( + Date.now() + 600, + grpc.connectivityState.IDLE, + done + ); + }); + }); + + it('Should be able to make a request after going idle', function (done) { + this.timeout(5000); + client = TestClient.createFromServer(server); + client.sendRequest(error => { + assert.ifError(error); + assert.strictEqual( + client!.getChannelState(), + grpc.connectivityState.READY + ); + + client!.waitForClientState( + Date.now() + 600, + grpc.connectivityState.IDLE, + err => { + if (err) return done(err); + + assert.strictEqual( + client!.getChannelState(), + grpc.connectivityState.IDLE + ); + client!.sendRequest(error => { + assert.ifError(error); + done(); + }); + } + ); + }); + }); + + it('Should go idle after the specified time after waitForReady ends', function (done) { + this.timeout(5000); + client = TestClient.createFromServer(server); + const deadline = new Date(); + deadline.setSeconds(deadline.getSeconds() + 3); + client.waitForReady(deadline, error => { + assert.ifError(error); + assert.strictEqual( + client!.getChannelState(), + grpc.connectivityState.READY + ); + + client!.waitForClientState( + Date.now() + 600, + grpc.connectivityState.IDLE, + done + ); + }); + }); +}); From cf321a80b1918affaa41da9fe7e6424876ca5b48 Mon Sep 17 00:00:00 2001 From: AVVS Date: Mon, 4 Mar 2024 18:25:23 -0800 Subject: [PATCH 10/11] chore: use iterators for tracking map, const for default values --- packages/grpc-js/src/channelz.ts | 61 ++++++++++++++++++-------------- packages/grpc-js/src/server.ts | 4 +-- 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 4eab762a8..697ea5d8a 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -133,6 +133,11 @@ interface TraceEvent { */ const TARGET_RETAINED_TRACES = 32; +/** + * Default number of sockets/servers/channels/subchannels to return + */ +const DEFAULT_MAX_RESULTS = 100; + export class ChannelzTraceStub { readonly events: TraceEvent[] = []; readonly creationTimestamp: Date = new Date(); @@ -198,19 +203,15 @@ export class ChannelzTrace { } } +type RefOrderedMap = OrderedMap< + number, + { ref: { id: number; kind: EntityTypes; name: string }; count: number } +>; + export class ChannelzChildrenTracker { - private channelChildren = new OrderedMap< - number, - { ref: ChannelRef; count: number } - >(); - private subchannelChildren = new OrderedMap< - number, - { ref: SubchannelRef; count: number } - >(); - private socketChildren = new OrderedMap< - number, - { ref: SocketRef; count: number } - >(); + private channelChildren: RefOrderedMap = new OrderedMap(); + private subchannelChildren: RefOrderedMap = new OrderedMap(); + private socketChildren: RefOrderedMap = new OrderedMap(); private trackerMap = { [EntityTypes.channel]: this.channelChildren, [EntityTypes.subchannel]: this.subchannelChildren, @@ -219,16 +220,19 @@ export class ChannelzChildrenTracker { refChild(child: ChannelRef | SubchannelRef | SocketRef) { const tracker = this.trackerMap[child.kind]; - const trackedChild = tracker.getElementByKey(child.id); - - if (trackedChild === undefined) { - tracker.setElement(child.id, { - // @ts-expect-error union issues - ref: child, - count: 1, - }); + const trackedChild = tracker.find(child.id); + + if (trackedChild.equals(tracker.end())) { + tracker.setElement( + child.id, + { + ref: child, + count: 1, + }, + trackedChild + ); } else { - trackedChild.count += 1; + trackedChild.pointer[1].count += 1; } } @@ -245,9 +249,9 @@ export class ChannelzChildrenTracker { getChildLists(): ChannelzChildren { return { - channels: this.channelChildren, - subchannels: this.subchannelChildren, - sockets: this.socketChildren, + channels: this.channelChildren as ChannelzChildren['channels'], + subchannels: this.subchannelChildren as ChannelzChildren['subchannels'], + sockets: this.socketChildren as ChannelzChildren['sockets'], }; } } @@ -585,7 +589,8 @@ function GetTopChannels( call: ServerUnaryCall, callback: sendUnaryData ): void { - const maxResults = parseInt(call.request.max_results, 10) || 100; + const maxResults = + parseInt(call.request.max_results, 10) || DEFAULT_MAX_RESULTS; const resultList: ChannelMessage[] = []; const startId = parseInt(call.request.start_channel_id, 10); const channelEntries = entityMaps[EntityTypes.channel]; @@ -649,7 +654,8 @@ function GetServers( call: ServerUnaryCall, callback: sendUnaryData ): void { - const maxResults = parseInt(call.request.max_results, 10) || 100; + const maxResults = + parseInt(call.request.max_results, 10) || DEFAULT_MAX_RESULTS; const startId = parseInt(call.request.start_server_id, 10); const serverEntries = entityMaps[EntityTypes.server]; const resultList: ServerMessage[] = []; @@ -820,7 +826,8 @@ function GetServerSockets( } const startId = parseInt(call.request.start_socket_id, 10); - const maxResults = parseInt(call.request.max_results, 10) || 100; + const maxResults = + parseInt(call.request.max_results, 10) || DEFAULT_MAX_RESULTS; const resolvedInfo = serverEntry.getInfo(); // If we wanted to include listener sockets in the result, this line would // instead say diff --git a/packages/grpc-js/src/server.ts b/packages/grpc-js/src/server.ts index b0fd5e7d3..feb511b41 100644 --- a/packages/grpc-js/src/server.ts +++ b/packages/grpc-js/src/server.ts @@ -368,7 +368,7 @@ export class Server { }; } - private getChannelzSessionInfoGetter( + private getChannelzSessionInfo( session: http2.ServerHttp2Session ): SocketInfo { const sessionInfo = this.sessions.get(session)!; @@ -1494,7 +1494,7 @@ export class Server { return (session: http2.ServerHttp2Session) => { const channelzRef = registerChannelzSocket( session.socket?.remoteAddress ?? 'unknown', - this.getChannelzSessionInfoGetter.bind(this, session), + this.getChannelzSessionInfo.bind(this, session), this.channelzEnabled ); From 74ddb3bd6fb0d37b716bf6b4f0eb694b8db761ec Mon Sep 17 00:00:00 2001 From: AVVS Date: Tue, 5 Mar 2024 15:34:29 -0800 Subject: [PATCH 11/11] chore: address ts errors --- packages/grpc-js/src/channelz.ts | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/packages/grpc-js/src/channelz.ts b/packages/grpc-js/src/channelz.ts index 697ea5d8a..c207e567c 100644 --- a/packages/grpc-js/src/channelz.ts +++ b/packages/grpc-js/src/channelz.ts @@ -66,28 +66,26 @@ export type TraceSeverity = | 'CT_WARNING' | 'CT_ERROR'; -export interface ChannelRef { - kind: EntityTypes.channel; +interface Ref { + kind: EntityTypes; id: number; name: string; } -export interface SubchannelRef { +export interface ChannelRef extends Ref { + kind: EntityTypes.channel; +} + +export interface SubchannelRef extends Ref { kind: EntityTypes.subchannel; - id: number; - name: string; } -export interface ServerRef { +export interface ServerRef extends Ref { kind: EntityTypes.server; - id: number; - name: string; } -export interface SocketRef { +export interface SocketRef extends Ref { kind: EntityTypes.socket; - id: number; - name: string; } function channelRefToMessage(ref: ChannelRef): ChannelRefMessage { @@ -361,6 +359,8 @@ export const enum EntityTypes { socket = 'socket', } +type EntryOrderedMap = OrderedMap any }>; + const entityMaps = { [EntityTypes.channel]: new OrderedMap(), [EntityTypes.subchannel]: new OrderedMap(), @@ -404,6 +404,8 @@ const generateRegisterFn = (kind: R) => { return nextId++; } + const entityMap: EntryOrderedMap = entityMaps[kind]; + return ( name: string, getInfo: () => InfoByType, @@ -412,8 +414,7 @@ const generateRegisterFn = (kind: R) => { const id = getNextId(); const ref = { id, name, kind } as RefByType; if (channelzEnabled) { - // @ts-expect-error typing issues - entityMaps[kind].setElement(id, { ref, getInfo }); + entityMap.setElement(id, { ref, getInfo }); } return ref; };