From 2df0c15de4ea93c03c280e59f27710ba5374b037 Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Tue, 30 Jan 2024 18:14:29 +0100 Subject: [PATCH 1/2] feature: specify connection name when creating a client --- src/client.ts | 11 ++++++----- src/connection.ts | 26 ++++++++++++++----------- src/requests/peer_properties_request.ts | 4 ++-- test/e2e/connect.test.ts | 12 ++++++++++++ test/e2e/declare_consumer.test.ts | 26 ++++++++++++++++++++++++- test/e2e/declare_publisher.test.ts | 26 ++++++++++++++++++++++++- test/support/fake_data.ts | 4 +++- test/support/rabbit.ts | 10 +++++++++- 8 files changed, 97 insertions(+), 22 deletions(-) diff --git a/src/client.ts b/src/client.ts index 49acf748..fe73b8a1 100644 --- a/src/client.ts +++ b/src/client.ts @@ -493,10 +493,10 @@ export class Client { if (!chosenNode) { throw new Error(`Stream was not found on any node`) } - const cachedConnectionProxy = ConnectionPool.getUsableCachedConnection(leader, streamName, chosenNode.host) - if (cachedConnectionProxy) return cachedConnectionProxy + const cachedConnection = ConnectionPool.getUsableCachedConnection(leader, streamName, chosenNode.host) + if (cachedConnection) return cachedConnection - const newConnectionProxy = await this.getConnectionOnChosenNode( + const newConnection = await this.getConnectionOnChosenNode( leader, streamName, chosenNode, @@ -504,8 +504,8 @@ export class Client { connectionClosedListener ) - ConnectionPool.cacheConnection(leader, streamName, newConnectionProxy.hostname, newConnectionProxy) - return newConnectionProxy + ConnectionPool.cacheConnection(leader, streamName, newConnection.hostname, newConnection) + return newConnection } private createSuperStreamPartitionsAndBindingKeys( @@ -611,6 +611,7 @@ export interface ClientParams { addressResolver?: AddressResolverParams leader?: boolean streamName?: string + connectionName?: string } export interface DeclarePublisherParams { diff --git a/src/connection.ts b/src/connection.ts index 343061ef..bd276e14 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -8,7 +8,7 @@ import { Logger } from "./logger" import { CloseRequest } from "./requests/close_request" import { ExchangeCommandVersionsRequest } from "./requests/exchange_command_versions_request" import { OpenRequest } from "./requests/open_request" -import { PeerPropertiesRequest } from "./requests/peer_properties_request" +import { PROPERTIES as PEER_PROPERTIES, PeerPropertiesRequest } from "./requests/peer_properties_request" import { BufferSizeParams, BufferSizeSettings, Request } from "./requests/request" import { SaslAuthenticateRequest } from "./requests/sasl_authenticate_request" import { SaslHandshakeRequest } from "./requests/sasl_handshake_request" @@ -43,14 +43,14 @@ import { coerce, lt } from "semver" export type ConnectionClosedListener = (hadError: boolean) => void -export type ConnectionProxyListenersParams = ClientListenersParams & { +export type ConnectionListenersParams = ClientListenersParams & { deliverV1?: DeliverListener deliverV2?: DeliverV2Listener consumer_update_query?: ConsumerUpdateQueryListener } -export type ConnectionProxyParams = ClientParams & { - listeners?: ConnectionProxyListenersParams +export type ConnectionParams = ClientParams & { + listeners?: ConnectionListenersParams } export type ConnectionInfo = { @@ -87,7 +87,7 @@ export class Connection { private refs: number = 0 private filteringEnabled: boolean = false - constructor(private readonly params: ConnectionProxyParams, private readonly logger: Logger) { + constructor(private readonly params: ConnectionParams, private readonly logger: Logger) { this.hostname = params.hostname this.leader = params.leader ?? false this.streamName = params.streamName @@ -108,11 +108,11 @@ export class Connection { this.connectionClosedListener = params.listeners?.connection_closed } - public static connect(params: ConnectionProxyParams, logger: Logger): Promise { + public static connect(params: ConnectionParams, logger: Logger): Promise { return new Connection(params, logger).start() } - public static create(params: ConnectionProxyParams, logger: Logger): Connection { + public static create(params: ConnectionParams, logger: Logger): Connection { return new Connection(params, logger) } @@ -199,7 +199,7 @@ export class Connection { } } - private registerListeners(listeners?: ConnectionProxyListenersParams) { + private registerListeners(listeners?: ConnectionListenersParams) { if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update) if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm) if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error) @@ -301,7 +301,11 @@ export class Connection { private async exchangeProperties(): Promise { this.logger.debug(`Exchange peer properties ...`) - const res = await this.sendAndWait(new PeerPropertiesRequest()) + const peerProperties = { + ...PEER_PROPERTIES, + connection_name: this.params.connectionName ?? PEER_PROPERTIES.connection_name, + } + const res = await this.sendAndWait(new PeerPropertiesRequest(peerProperties)) if (!res.ok) { throw new Error(`Unable to exchange peer properties ${res.code} `) } @@ -469,10 +473,10 @@ export function errorMessageOf(code: number): string { } } -export function connect(logger: Logger, params: ConnectionProxyParams) { +export function connect(logger: Logger, params: ConnectionParams) { return Connection.connect(params, logger) } -export function create(logger: Logger, params: ConnectionProxyParams) { +export function create(logger: Logger, params: ConnectionParams) { return Connection.create(params, logger) } diff --git a/src/requests/peer_properties_request.ts b/src/requests/peer_properties_request.ts index 135a3b89..b6bc2065 100644 --- a/src/requests/peer_properties_request.ts +++ b/src/requests/peer_properties_request.ts @@ -6,9 +6,9 @@ import { DataWriter } from "./data_writer" export const PROPERTIES = { product: "RabbitMQ Stream", - version: "0.0.1", + version: "0.2.0", platform: "javascript", - copyright: "Copyright (c) 2020-2021 Coders51 srl", + copyright: "Copyright (c) 2020-2024 Coders51 srl", information: "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/", connection_name: "Unknown", } diff --git a/test/e2e/connect.test.ts b/test/e2e/connect.test.ts index a5b7f202..4dd55999 100644 --- a/test/e2e/connect.test.ts +++ b/test/e2e/connect.test.ts @@ -4,6 +4,7 @@ import { createClient } from "../support/fake_data" import { Rabbit } from "../support/rabbit" import { eventually, username, password } from "../support/util" import { Version } from "../../src/versions" +import { randomUUID } from "node:crypto" describe("connect", () => { let client: Client @@ -27,6 +28,17 @@ describe("connect", () => { }, 5000) }).timeout(10000) + it("declaring connection name", async () => { + const connectionName = `connection-name-${randomUUID()}` + client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName) + + await eventually(async () => { + const connections = await rabbit.getConnections() + expect(connections.length).eql(1) + expect(connections[0].client_properties?.connection_name).eql(connectionName) + }, 5000) + }).timeout(10000) + it("and receive server-side message version declarations during handshake", async () => { client = await createClient(username, password) diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 9c2613f7..64a3e25a 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -15,7 +15,7 @@ import { createPublisher, createStreamName, } from "../support/fake_data" -import { Rabbit } from "../support/rabbit" +import { Rabbit, RabbitConnectionResponse } from "../support/rabbit" import { getMaxSharedConnectionInstances, range } from "../../src/util" import { BufferDataReader } from "../../src/response_decoder" import { @@ -29,6 +29,7 @@ import { } from "../support/util" import { readFileSync } from "fs" import path from "path" +import { randomUUID } from "crypto" describe("declare consumer", () => { let streamName: string @@ -311,6 +312,29 @@ describe("declare consumer", () => { expect(countConsumersOverLimit).is.undefined expect(Array.from(counts.keys()).length).gt(1) }).timeout(10000) + + describe("when the client declares a named connection", () => { + let connectionName: string | undefined = undefined + + beforeEach(async () => { + try { + await client.close() + connectionName = `consumer-${randomUUID()}` + client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName) + } catch (e) {} + }) + it("the name is inherited on the consumer connection", async () => { + await createConsumer(streamName, client) + + await eventually(async () => { + const connections = await rabbit.getConnections() + expect(connections.length).eql(2) + expect(connections).to.satisfy((conns: RabbitConnectionResponse[]) => { + return conns.every((conn) => conn.client_properties?.connection_name === connectionName) + }) + }, 5000) + }).timeout(6000) + }) }) function createProperties(): MessageProperties { diff --git a/test/e2e/declare_publisher.test.ts b/test/e2e/declare_publisher.test.ts index 41973818..1f737fef 100644 --- a/test/e2e/declare_publisher.test.ts +++ b/test/e2e/declare_publisher.test.ts @@ -1,9 +1,10 @@ import { expect } from "chai" import { Client } from "../../src" import { createClient, createPublisher, createStreamName } from "../support/fake_data" -import { Rabbit } from "../support/rabbit" +import { Rabbit, RabbitConnectionResponse } from "../support/rabbit" import { eventually, expectToThrowAsync, username, password } from "../support/util" import { getMaxSharedConnectionInstances } from "../../src/util" +import { randomUUID } from "crypto" describe("declare publisher", () => { let streamName: string @@ -94,4 +95,27 @@ describe("declare publisher", () => { expect(countPublishersOverLimit).is.undefined expect(Array.from(counts.keys()).length).gt(1) }).timeout(10000) + + describe("when the client declares a named connection", () => { + let connectionName: string | undefined = undefined + + beforeEach(async () => { + try { + await client.close() + connectionName = `publisher-${randomUUID()}` + client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName) + } catch (e) {} + }) + it("the name is inherited on the consumer connection", async () => { + await createPublisher(streamName, client) + + await eventually(async () => { + const connections = await rabbit.getConnections() + expect(connections.length).eql(2) + expect(connections).to.satisfy((conns: RabbitConnectionResponse[]) => { + return conns.every((conn) => conn.client_properties?.connection_name === connectionName) + }) + }, 5000) + }).timeout(10000) + }) }) diff --git a/test/support/fake_data.ts b/test/support/fake_data.ts index 7872ce43..23eabcd9 100644 --- a/test/support/fake_data.ts +++ b/test/support/fake_data.ts @@ -57,7 +57,8 @@ export async function createClient( listeners?: ClientListenersParams, frameMax?: number, bufferSizeSettings?: BufferSizeSettings, - port?: number + port?: number, + connectionName?: string ): Promise { const [firstNode] = getTestNodesFromEnv() return connect({ @@ -70,5 +71,6 @@ export async function createClient( heartbeat: 0, listeners: listeners, bufferSizeSettings: bufferSizeSettings, + connectionName: connectionName, }) } diff --git a/test/support/rabbit.ts b/test/support/rabbit.ts index cb0fad83..6da32791 100644 --- a/test/support/rabbit.ts +++ b/test/support/rabbit.ts @@ -2,8 +2,16 @@ import got from "got" import { getTestNodesFromEnv } from "./util" import { range } from "../../src/util" -interface RabbitConnectionResponse { +export interface RabbitConnectionResponse { name: string + client_properties?: { + connection_name?: string + copyright?: string + information?: string + platform?: string + product?: string + version?: string + } } interface RabbitConsumerCredits { From 20bc2792e401c490dc766747c271a96e63b5a28f Mon Sep 17 00:00:00 2001 From: Igor Cappello Date: Wed, 31 Jan 2024 09:25:01 +0100 Subject: [PATCH 2/2] chore: fix test name --- test/e2e/declare_publisher.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/declare_publisher.test.ts b/test/e2e/declare_publisher.test.ts index 1f737fef..53a3c079 100644 --- a/test/e2e/declare_publisher.test.ts +++ b/test/e2e/declare_publisher.test.ts @@ -106,7 +106,7 @@ describe("declare publisher", () => { client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName) } catch (e) {} }) - it("the name is inherited on the consumer connection", async () => { + it("the name is inherited on the publisher connection", async () => { await createPublisher(streamName, client) await eventually(async () => {