Skip to content

Commit 37f6133

Browse files
authored
Specify connection name when creating a client (#177)
* feature: specify connection name when creating a client
1 parent a56a632 commit 37f6133

File tree

8 files changed

+97
-22
lines changed

8 files changed

+97
-22
lines changed

src/client.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -493,19 +493,19 @@ export class Client {
493493
if (!chosenNode) {
494494
throw new Error(`Stream was not found on any node`)
495495
}
496-
const cachedConnectionProxy = ConnectionPool.getUsableCachedConnection(leader, streamName, chosenNode.host)
497-
if (cachedConnectionProxy) return cachedConnectionProxy
496+
const cachedConnection = ConnectionPool.getUsableCachedConnection(leader, streamName, chosenNode.host)
497+
if (cachedConnection) return cachedConnection
498498

499-
const newConnectionProxy = await this.getConnectionOnChosenNode(
499+
const newConnection = await this.getConnectionOnChosenNode(
500500
leader,
501501
streamName,
502502
chosenNode,
503503
metadata,
504504
connectionClosedListener
505505
)
506506

507-
ConnectionPool.cacheConnection(leader, streamName, newConnectionProxy.hostname, newConnectionProxy)
508-
return newConnectionProxy
507+
ConnectionPool.cacheConnection(leader, streamName, newConnection.hostname, newConnection)
508+
return newConnection
509509
}
510510

511511
private createSuperStreamPartitionsAndBindingKeys(
@@ -611,6 +611,7 @@ export interface ClientParams {
611611
addressResolver?: AddressResolverParams
612612
leader?: boolean
613613
streamName?: string
614+
connectionName?: string
614615
}
615616

616617
export interface DeclarePublisherParams {

src/connection.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { Logger } from "./logger"
88
import { CloseRequest } from "./requests/close_request"
99
import { ExchangeCommandVersionsRequest } from "./requests/exchange_command_versions_request"
1010
import { OpenRequest } from "./requests/open_request"
11-
import { PeerPropertiesRequest } from "./requests/peer_properties_request"
11+
import { PROPERTIES as PEER_PROPERTIES, PeerPropertiesRequest } from "./requests/peer_properties_request"
1212
import { BufferSizeParams, BufferSizeSettings, Request } from "./requests/request"
1313
import { SaslAuthenticateRequest } from "./requests/sasl_authenticate_request"
1414
import { SaslHandshakeRequest } from "./requests/sasl_handshake_request"
@@ -43,14 +43,14 @@ import { coerce, lt } from "semver"
4343

4444
export type ConnectionClosedListener = (hadError: boolean) => void
4545

46-
export type ConnectionProxyListenersParams = ClientListenersParams & {
46+
export type ConnectionListenersParams = ClientListenersParams & {
4747
deliverV1?: DeliverListener
4848
deliverV2?: DeliverV2Listener
4949
consumer_update_query?: ConsumerUpdateQueryListener
5050
}
5151

52-
export type ConnectionProxyParams = ClientParams & {
53-
listeners?: ConnectionProxyListenersParams
52+
export type ConnectionParams = ClientParams & {
53+
listeners?: ConnectionListenersParams
5454
}
5555

5656
export type ConnectionInfo = {
@@ -87,7 +87,7 @@ export class Connection {
8787
private refs: number = 0
8888
private filteringEnabled: boolean = false
8989

90-
constructor(private readonly params: ConnectionProxyParams, private readonly logger: Logger) {
90+
constructor(private readonly params: ConnectionParams, private readonly logger: Logger) {
9191
this.hostname = params.hostname
9292
this.leader = params.leader ?? false
9393
this.streamName = params.streamName
@@ -108,11 +108,11 @@ export class Connection {
108108
this.connectionClosedListener = params.listeners?.connection_closed
109109
}
110110

111-
public static connect(params: ConnectionProxyParams, logger: Logger): Promise<Connection> {
111+
public static connect(params: ConnectionParams, logger: Logger): Promise<Connection> {
112112
return new Connection(params, logger).start()
113113
}
114114

115-
public static create(params: ConnectionProxyParams, logger: Logger): Connection {
115+
public static create(params: ConnectionParams, logger: Logger): Connection {
116116
return new Connection(params, logger)
117117
}
118118

@@ -199,7 +199,7 @@ export class Connection {
199199
}
200200
}
201201

202-
private registerListeners(listeners?: ConnectionProxyListenersParams) {
202+
private registerListeners(listeners?: ConnectionListenersParams) {
203203
if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update)
204204
if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm)
205205
if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error)
@@ -301,7 +301,11 @@ export class Connection {
301301

302302
private async exchangeProperties(): Promise<PeerPropertiesResponse> {
303303
this.logger.debug(`Exchange peer properties ...`)
304-
const res = await this.sendAndWait<PeerPropertiesResponse>(new PeerPropertiesRequest())
304+
const peerProperties = {
305+
...PEER_PROPERTIES,
306+
connection_name: this.params.connectionName ?? PEER_PROPERTIES.connection_name,
307+
}
308+
const res = await this.sendAndWait<PeerPropertiesResponse>(new PeerPropertiesRequest(peerProperties))
305309
if (!res.ok) {
306310
throw new Error(`Unable to exchange peer properties ${res.code} `)
307311
}
@@ -469,10 +473,10 @@ export function errorMessageOf(code: number): string {
469473
}
470474
}
471475

472-
export function connect(logger: Logger, params: ConnectionProxyParams) {
476+
export function connect(logger: Logger, params: ConnectionParams) {
473477
return Connection.connect(params, logger)
474478
}
475479

476-
export function create(logger: Logger, params: ConnectionProxyParams) {
480+
export function create(logger: Logger, params: ConnectionParams) {
477481
return Connection.create(params, logger)
478482
}

src/requests/peer_properties_request.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import { DataWriter } from "./data_writer"
66

77
export const PROPERTIES = {
88
product: "RabbitMQ Stream",
9-
version: "0.0.1",
9+
version: "0.2.0",
1010
platform: "javascript",
11-
copyright: "Copyright (c) 2020-2021 Coders51 srl",
11+
copyright: "Copyright (c) 2020-2024 Coders51 srl",
1212
information: "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/",
1313
connection_name: "Unknown",
1414
}

test/e2e/connect.test.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createClient } from "../support/fake_data"
44
import { Rabbit } from "../support/rabbit"
55
import { eventually, username, password } from "../support/util"
66
import { Version } from "../../src/versions"
7+
import { randomUUID } from "node:crypto"
78

89
describe("connect", () => {
910
let client: Client
@@ -27,6 +28,17 @@ describe("connect", () => {
2728
}, 5000)
2829
}).timeout(10000)
2930

31+
it("declaring connection name", async () => {
32+
const connectionName = `connection-name-${randomUUID()}`
33+
client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName)
34+
35+
await eventually(async () => {
36+
const connections = await rabbit.getConnections()
37+
expect(connections.length).eql(1)
38+
expect(connections[0].client_properties?.connection_name).eql(connectionName)
39+
}, 5000)
40+
}).timeout(10000)
41+
3042
it("and receive server-side message version declarations during handshake", async () => {
3143
client = await createClient(username, password)
3244

test/e2e/declare_consumer.test.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
createPublisher,
1616
createStreamName,
1717
} from "../support/fake_data"
18-
import { Rabbit } from "../support/rabbit"
18+
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
1919
import { getMaxSharedConnectionInstances, range } from "../../src/util"
2020
import { BufferDataReader } from "../../src/response_decoder"
2121
import {
@@ -29,6 +29,7 @@ import {
2929
} from "../support/util"
3030
import { readFileSync } from "fs"
3131
import path from "path"
32+
import { randomUUID } from "crypto"
3233

3334
describe("declare consumer", () => {
3435
let streamName: string
@@ -311,6 +312,29 @@ describe("declare consumer", () => {
311312
expect(countConsumersOverLimit).is.undefined
312313
expect(Array.from(counts.keys()).length).gt(1)
313314
}).timeout(10000)
315+
316+
describe("when the client declares a named connection", () => {
317+
let connectionName: string | undefined = undefined
318+
319+
beforeEach(async () => {
320+
try {
321+
await client.close()
322+
connectionName = `consumer-${randomUUID()}`
323+
client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName)
324+
} catch (e) {}
325+
})
326+
it("the name is inherited on the consumer connection", async () => {
327+
await createConsumer(streamName, client)
328+
329+
await eventually(async () => {
330+
const connections = await rabbit.getConnections()
331+
expect(connections.length).eql(2)
332+
expect(connections).to.satisfy((conns: RabbitConnectionResponse[]) => {
333+
return conns.every((conn) => conn.client_properties?.connection_name === connectionName)
334+
})
335+
}, 5000)
336+
}).timeout(6000)
337+
})
314338
})
315339

316340
function createProperties(): MessageProperties {

test/e2e/declare_publisher.test.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { expect } from "chai"
22
import { Client } from "../../src"
33
import { createClient, createPublisher, createStreamName } from "../support/fake_data"
4-
import { Rabbit } from "../support/rabbit"
4+
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
55
import { eventually, expectToThrowAsync, username, password } from "../support/util"
66
import { getMaxSharedConnectionInstances } from "../../src/util"
7+
import { randomUUID } from "crypto"
78

89
describe("declare publisher", () => {
910
let streamName: string
@@ -94,4 +95,27 @@ describe("declare publisher", () => {
9495
expect(countPublishersOverLimit).is.undefined
9596
expect(Array.from(counts.keys()).length).gt(1)
9697
}).timeout(10000)
98+
99+
describe("when the client declares a named connection", () => {
100+
let connectionName: string | undefined = undefined
101+
102+
beforeEach(async () => {
103+
try {
104+
await client.close()
105+
connectionName = `publisher-${randomUUID()}`
106+
client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName)
107+
} catch (e) {}
108+
})
109+
it("the name is inherited on the publisher connection", async () => {
110+
await createPublisher(streamName, client)
111+
112+
await eventually(async () => {
113+
const connections = await rabbit.getConnections()
114+
expect(connections.length).eql(2)
115+
expect(connections).to.satisfy((conns: RabbitConnectionResponse[]) => {
116+
return conns.every((conn) => conn.client_properties?.connection_name === connectionName)
117+
})
118+
}, 5000)
119+
}).timeout(10000)
120+
})
97121
})

test/support/fake_data.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ export async function createClient(
5757
listeners?: ClientListenersParams,
5858
frameMax?: number,
5959
bufferSizeSettings?: BufferSizeSettings,
60-
port?: number
60+
port?: number,
61+
connectionName?: string
6162
): Promise<Client> {
6263
const [firstNode] = getTestNodesFromEnv()
6364
return connect({
@@ -70,5 +71,6 @@ export async function createClient(
7071
heartbeat: 0,
7172
listeners: listeners,
7273
bufferSizeSettings: bufferSizeSettings,
74+
connectionName: connectionName,
7375
})
7476
}

test/support/rabbit.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,16 @@ import got from "got"
22
import { getTestNodesFromEnv } from "./util"
33
import { range } from "../../src/util"
44

5-
interface RabbitConnectionResponse {
5+
export interface RabbitConnectionResponse {
66
name: string
7+
client_properties?: {
8+
connection_name?: string
9+
copyright?: string
10+
information?: string
11+
platform?: string
12+
product?: string
13+
version?: string
14+
}
715
}
816

917
interface RabbitConsumerCredits {

0 commit comments

Comments
 (0)