diff --git a/src/client.ts b/src/client.ts index c6bb3951..72980001 100644 --- a/src/client.ts +++ b/src/client.ts @@ -67,7 +67,7 @@ export class Client { private consumers = new Map() private publishers = new Map() private compressions = new Map() - private connection: Connection + private locatorConnection: Connection private pool: ConnectionPool = new ConnectionPool() private constructor( @@ -76,20 +76,19 @@ export class Client { ) { this.compressions.set(CompressionType.None, NoneCompression.create()) this.compressions.set(CompressionType.Gzip, GzipCompression.create()) - this.connection = this.getLocatorConnection() - this.connection.incrRefCount() + this.locatorConnection = this.getLocatorConnection() } getCompression(compressionType: CompressionType) { - return this.connection.getCompression(compressionType) + return this.locatorConnection.getCompression(compressionType) } registerCompression(compression: Compression) { - this.connection.registerCompression(compression) + this.locatorConnection.registerCompression(compression) } public start(): Promise { - return this.connection.start().then( + return this.locatorConnection.start().then( (_res) => { return this }, @@ -104,26 +103,18 @@ export class Client { this.logger.info(`${this.id} Closing client...`) if (this.publisherCounts()) { this.logger.info(`Stopping all producers...`) - await this.closeAllPublishers(true) + await this.closeAllPublishers() } if (this.consumerCounts()) { this.logger.info(`Stopping all consumers...`) - await this.closeAllConsumers(true) - } - this.connection.decrRefCount() - await this.closeConnectionIfUnused(this.connection, params) - } - - private async closeConnectionIfUnused(connection: Connection, params: ClosingParams) { - if (connection.refCount <= 0) { - this.pool.removeCachedConnection(this.connection) - await this.connection.close({ ...params, manuallyClose: true }) + await this.closeAllConsumers() } + await this.locatorConnection.close({ ...params, manuallyClose: true }) } public async queryMetadata(params: QueryMetadataParams): Promise { const { streams } = params - const res = await this.connection.sendAndWait(new MetadataRequest({ streams })) + const res = await this.locatorConnection.sendAndWait(new MetadataRequest({ streams })) if (!res.ok) { throw new Error(`Query Metadata command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } @@ -135,7 +126,7 @@ export class Client { public async queryPartitions(params: QueryPartitionsParams): Promise { const { superStream } = params - const res = await this.connection.sendAndWait(new PartitionsQuery({ superStream })) + const res = await this.locatorConnection.sendAndWait(new PartitionsQuery({ superStream })) if (!res.ok) { throw new Error(`Query Partitions command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } @@ -158,7 +149,7 @@ export class Client { } let lastPublishingId = 0n if (streamPublisherParams.publisherRef) { - lastPublishingId = await this.connection.queryPublisherSequence({ + lastPublishingId = await this.locatorConnection.queryPublisherSequence({ stream: streamPublisherParams.stream, publisherRef: streamPublisherParams.publisherRef, }) @@ -178,7 +169,7 @@ export class Client { public async deletePublisher(extendedPublisherId: string) { const { publisher, connection } = this.publishers.get(extendedPublisherId) ?? { publisher: undefined, - connection: this.connection, + connection: this.locatorConnection, } const publisherId = extractPublisherId(extendedPublisherId) const res = await connection.sendAndWait(new DeletePublisherRequest(publisherId)) @@ -241,7 +232,7 @@ export class Client { } const consumerId = extractConsumerId(extendedConsumerId) - const { streamInfos } = await this.connection.sendAndWait( + const { streamInfos } = await this.locatorConnection.sendAndWait( new MetadataRequest({ streams: [activeConsumer.consumer.streamName] }) ) if (streamInfos.length > 0 && streamExists(streamInfos[0])) { @@ -280,16 +271,16 @@ export class Client { } public queryOffset(params: QueryOffsetParams) { - return this.connection.queryOffset(params) + return this.locatorConnection.queryOffset(params) } - private async closeAllConsumers(manuallyClose: boolean) { - await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close(manuallyClose))) + private async closeAllConsumers() { + await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close())) this.consumers = new Map() } - private async closeAllPublishers(manuallyClose: boolean) { - await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose))) + private async closeAllPublishers() { + await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(true))) this.publishers = new Map() } @@ -307,7 +298,7 @@ export class Client { public async createStream(params: { stream: string; arguments?: CreateStreamArguments }): Promise { this.logger.debug(`Create Stream...`) - const res = await this.connection.sendAndWait(new CreateStreamRequest(params)) + const res = await this.locatorConnection.sendAndWait(new CreateStreamRequest(params)) if (res.code === STREAM_ALREADY_EXISTS_ERROR_CODE) { return true } @@ -321,7 +312,7 @@ export class Client { public async deleteStream(params: { stream: string }): Promise { this.logger.debug(`Delete Stream...`) - const res = await this.connection.sendAndWait(new DeleteStreamRequest(params.stream)) + const res = await this.locatorConnection.sendAndWait(new DeleteStreamRequest(params.stream)) if (!res.ok) { throw new Error(`Delete Stream command returned error with code ${res.code}`) } @@ -349,7 +340,7 @@ export class Client { numberOfPartitions, bindingKeys ) - const res = await this.connection.sendAndWait( + const res = await this.locatorConnection.sendAndWait( new CreateSuperStreamRequest({ ...params, partitions, bindingKeys: streamBindingKeys }) ) if (res.code === STREAM_ALREADY_EXISTS_ERROR_CODE) { @@ -371,7 +362,7 @@ export class Client { } this.logger.debug(`Delete Super Stream...`) - const res = await this.connection.sendAndWait( + const res = await this.locatorConnection.sendAndWait( new DeleteSuperStreamRequest(params.streamName) ) if (!res.ok) { @@ -382,7 +373,7 @@ export class Client { } public async streamStatsRequest(streamName: string) { - const res = await this.connection.sendAndWait(new StreamStatsRequest(streamName)) + const res = await this.locatorConnection.sendAndWait(new StreamStatsRequest(streamName)) if (!res.ok) { throw new Error(`Stream Stats command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } @@ -391,11 +382,11 @@ export class Client { } public getConnectionInfo(): ConnectionInfo { - return this.connection.getConnectionInfo() + return this.locatorConnection.getConnectionInfo() } public async subscribe(params: SubscribeParams): Promise { - const res = await this.connection.sendAndWait(new SubscribeRequest({ ...params })) + const res = await this.locatorConnection.sendAndWait(new SubscribeRequest({ ...params })) if (!res.ok) { throw new Error(`Subscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } @@ -403,12 +394,12 @@ export class Client { } public async restart() { - this.logger.info(`Restarting client connection ${this.connection.connectionId}`) + this.logger.info(`Restarting client connection ${this.locatorConnection.connectionId}`) const uniqueConnectionIds = new Set() - uniqueConnectionIds.add(this.connection.connectionId) + uniqueConnectionIds.add(this.locatorConnection.connectionId) await wait(5000) - await this.connection.restart() + await this.locatorConnection.restart() for (const { consumer, connection, params } of this.consumers.values()) { if (!uniqueConnectionIds.has(connection.connectionId)) { @@ -431,19 +422,19 @@ export class Client { } public get maxFrameSize() { - return this.connection.maxFrameSize ?? DEFAULT_FRAME_MAX + return this.locatorConnection.maxFrameSize ?? DEFAULT_FRAME_MAX } public get serverVersions() { - return this.connection.serverVersions + return this.locatorConnection.serverVersions } public get rabbitManagementVersion() { - return this.connection.rabbitManagementVersion + return this.locatorConnection.rabbitManagementVersion } public async routeQuery(params: { routingKey: string; superStream: string }) { - const res = await this.connection.sendAndWait(new RouteQuery(params)) + const res = await this.locatorConnection.sendAndWait(new RouteQuery(params)) if (!res.ok) { throw new Error(`Route Query command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } @@ -452,7 +443,7 @@ export class Client { } public async partitionsQuery(params: { superStream: string }) { - const res = await this.connection.sendAndWait(new PartitionsQuery(params)) + const res = await this.locatorConnection.sendAndWait(new PartitionsQuery(params)) if (!res.ok) { throw new Error(`Partitions Query command returned error with code ${res.code} - ${errorMessageOf(res.code)}`) } @@ -629,23 +620,27 @@ export class Client { connectionClosedListener?: ConnectionClosedListener ): Promise { const [metadata] = await this.queryMetadata({ streams: [streamName] }) - const chosenNode = chooseNode(metadata, purpose === "publisher") + const isPublisher = purpose === "publisher" + const chosenNode = chooseNode(metadata, isPublisher) if (!chosenNode) { throw new Error(`Stream was not found on any node`) } - const cachedConnection = this.pool.getCachedConnection(purpose, streamName, this.connection.vhost, chosenNode.host) - if (cachedConnection) return cachedConnection - - const newConnection = await this.getConnectionOnChosenNode( + const connection = await this.pool.getConnection( purpose, streamName, - chosenNode, - metadata, - connectionClosedListener + this.locatorConnection.vhost, + chosenNode.host, + async () => { + return await this.getConnectionOnChosenNode( + isPublisher, + streamName, + chosenNode, + metadata, + connectionClosedListener + ) + } ) - - this.pool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection) - return newConnection + return connection } private createSuperStreamPartitionsAndBindingKeys( @@ -688,13 +683,13 @@ export class Client { } private async getConnectionOnChosenNode( - purpose: ConnectionPurpose, + isPublisher: boolean, streamName: string, chosenNode: { host: string; port: number }, metadata: StreamMetadata, connectionClosedListener?: ConnectionClosedListener ): Promise { - const connectionParams = this.buildConnectionParams(purpose === "publisher", streamName, connectionClosedListener) + const connectionParams = this.buildConnectionParams(isPublisher, streamName, connectionClosedListener) if (this.params.addressResolver && this.params.addressResolver.enabled) { const maxAttempts = computeMaxAttempts(metadata) const resolver = this.params.addressResolver @@ -727,7 +722,7 @@ export class Client { } private async closing(consumer: StreamConsumer, extendedConsumerId: string) { - await consumer.close(true) + await consumer.close() this.consumers.delete(extendedConsumerId) this.logger.info(`Closed consumer with id: ${extendedConsumerId}`) } diff --git a/src/connection_pool.ts b/src/connection_pool.ts index 8910ec10..6bb067a6 100644 --- a/src/connection_pool.ts +++ b/src/connection_pool.ts @@ -5,53 +5,58 @@ type InstanceKey = string export type ConnectionPurpose = "consumer" | "publisher" export class ConnectionPool { - private consumerConnectionProxies: Map = new Map() - private publisherConnectionProxies: Map = new Map() - - public getCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) { - const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies - const key = this.getCacheKey(streamName, vhost, host) - const proxies = map.get(key) || [] - const connection = proxies.at(-1) - const refCount = connection?.refCount - return refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined - } + private connectionsMap: Map = new Map() - public cacheConnection( - purpose: ConnectionPurpose, + public async getConnection( + entityType: ConnectionPurpose, streamName: string, vhost: string, host: string, - client: Connection + connectionCreator: () => Promise ) { - const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies - const key = this.getCacheKey(streamName, vhost, host) - const currentlyCached = map.get(key) || [] - currentlyCached.push(client) - map.set(key, currentlyCached) + const key = this.getCacheKey(streamName, vhost, host, entityType) + const connections = this.connectionsMap.get(key) || [] + const connection = connections.at(-1) + const refCount = connection?.refCount + const cachedConnection = + refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined + + if (cachedConnection) { + return cachedConnection + } else { + const newConnection = await connectionCreator() + this.cacheConnection(key, newConnection) + return newConnection + } } - public removeIfUnused(connection: Connection) { + public async releaseConnection(connection: Connection, manuallyClose = true): Promise { + connection.decrRefCount() if (connection.refCount <= 0) { + await connection.close({ closingCode: 0, closingReason: "", manuallyClose }) this.removeCachedConnection(connection) - return true } - return false } - public removeCachedConnection(connection: Connection) { + private cacheConnection(key: string, connection: Connection) { + const currentlyCached = this.connectionsMap.get(key) || [] + currentlyCached.push(connection) + this.connectionsMap.set(key, currentlyCached) + } + + private removeCachedConnection(connection: Connection) { const { leader, streamName, hostname: host, vhost } = connection if (streamName === undefined) return - const m = leader ? this.publisherConnectionProxies : this.consumerConnectionProxies - const k = this.getCacheKey(streamName, vhost, host) - const mappedClientList = m.get(k) + const entityType = leader ? "publisher" : "consumer" + const k = this.getCacheKey(streamName, vhost, host, entityType) + const mappedClientList = this.connectionsMap.get(k) if (mappedClientList) { const filtered = mappedClientList.filter((c) => c !== connection) - m.set(k, filtered) + this.connectionsMap.set(k, filtered) } } - private getCacheKey(streamName: string, vhost: string, host: string) { - return `${streamName}@${vhost}@${host}` + private getCacheKey(streamName: string, vhost: string, host: string, entityType: ConnectionPurpose) { + return `${streamName}@${vhost}@${host}@${entityType}` } } diff --git a/src/consumer.ts b/src/consumer.ts index 5058a963..d6f09f56 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -97,12 +97,9 @@ export class StreamConsumer implements Consumer { this.singleActive = params.singleActive ?? false } - async close(manuallyClose: boolean): Promise { + async close(): Promise { this.closed = true - this.connection.decrRefCount() - if (this.pool.removeIfUnused(this.connection)) { - await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose }) - } + await this.pool.releaseConnection(this.connection) } public storeOffset(offsetValue: bigint): Promise { diff --git a/src/publisher.ts b/src/publisher.ts index 9987b40c..9f83403f 100644 --- a/src/publisher.ts +++ b/src/publisher.ts @@ -281,10 +281,7 @@ export class StreamPublisher implements Publisher { public async close(manuallyClose: boolean): Promise { if (!this.closed) { await this.flush() - this.connection.decrRefCount() - if (this.pool.removeIfUnused(this.connection)) { - await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose }) - } + await this.pool.releaseConnection(this.connection, manuallyClose) } this._closed = true }