Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,19 +493,19 @@ export class Client {
if (!chosenNode) {
throw new Error(`Stream was not found on any node`)
}
const cachedConnectionProxy = ConnectionPool.getUsableCachedConnection(leader, streamName, chosenNode.host)
if (cachedConnectionProxy) return cachedConnectionProxy
const cachedConnection = ConnectionPool.getUsableCachedConnection(leader, streamName, chosenNode.host)
if (cachedConnection) return cachedConnection

const newConnectionProxy = await this.getConnectionOnChosenNode(
const newConnection = await this.getConnectionOnChosenNode(
leader,
streamName,
chosenNode,
metadata,
connectionClosedListener
)

ConnectionPool.cacheConnection(leader, streamName, newConnectionProxy.hostname, newConnectionProxy)
return newConnectionProxy
ConnectionPool.cacheConnection(leader, streamName, newConnection.hostname, newConnection)
return newConnection
}

private createSuperStreamPartitionsAndBindingKeys(
Expand Down Expand Up @@ -611,6 +611,7 @@ export interface ClientParams {
addressResolver?: AddressResolverParams
leader?: boolean
streamName?: string
connectionName?: string
}

export interface DeclarePublisherParams {
Expand Down
26 changes: 15 additions & 11 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Logger } from "./logger"
import { CloseRequest } from "./requests/close_request"
import { ExchangeCommandVersionsRequest } from "./requests/exchange_command_versions_request"
import { OpenRequest } from "./requests/open_request"
import { PeerPropertiesRequest } from "./requests/peer_properties_request"
import { PROPERTIES as PEER_PROPERTIES, PeerPropertiesRequest } from "./requests/peer_properties_request"
import { BufferSizeParams, BufferSizeSettings, Request } from "./requests/request"
import { SaslAuthenticateRequest } from "./requests/sasl_authenticate_request"
import { SaslHandshakeRequest } from "./requests/sasl_handshake_request"
Expand Down Expand Up @@ -43,14 +43,14 @@ import { coerce, lt } from "semver"

export type ConnectionClosedListener = (hadError: boolean) => void

export type ConnectionProxyListenersParams = ClientListenersParams & {
export type ConnectionListenersParams = ClientListenersParams & {
deliverV1?: DeliverListener
deliverV2?: DeliverV2Listener
consumer_update_query?: ConsumerUpdateQueryListener
}

export type ConnectionProxyParams = ClientParams & {
listeners?: ConnectionProxyListenersParams
export type ConnectionParams = ClientParams & {
listeners?: ConnectionListenersParams
}

export type ConnectionInfo = {
Expand Down Expand Up @@ -87,7 +87,7 @@ export class Connection {
private refs: number = 0
private filteringEnabled: boolean = false

constructor(private readonly params: ConnectionProxyParams, private readonly logger: Logger) {
constructor(private readonly params: ConnectionParams, private readonly logger: Logger) {
this.hostname = params.hostname
this.leader = params.leader ?? false
this.streamName = params.streamName
Expand All @@ -108,11 +108,11 @@ export class Connection {
this.connectionClosedListener = params.listeners?.connection_closed
}

public static connect(params: ConnectionProxyParams, logger: Logger): Promise<Connection> {
public static connect(params: ConnectionParams, logger: Logger): Promise<Connection> {
return new Connection(params, logger).start()
}

public static create(params: ConnectionProxyParams, logger: Logger): Connection {
public static create(params: ConnectionParams, logger: Logger): Connection {
return new Connection(params, logger)
}

Expand Down Expand Up @@ -199,7 +199,7 @@ export class Connection {
}
}

private registerListeners(listeners?: ConnectionProxyListenersParams) {
private registerListeners(listeners?: ConnectionListenersParams) {
if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update)
if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm)
if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error)
Expand Down Expand Up @@ -301,7 +301,11 @@ export class Connection {

private async exchangeProperties(): Promise<PeerPropertiesResponse> {
this.logger.debug(`Exchange peer properties ...`)
const res = await this.sendAndWait<PeerPropertiesResponse>(new PeerPropertiesRequest())
const peerProperties = {
...PEER_PROPERTIES,
connection_name: this.params.connectionName ?? PEER_PROPERTIES.connection_name,
}
const res = await this.sendAndWait<PeerPropertiesResponse>(new PeerPropertiesRequest(peerProperties))
if (!res.ok) {
throw new Error(`Unable to exchange peer properties ${res.code} `)
}
Expand Down Expand Up @@ -469,10 +473,10 @@ export function errorMessageOf(code: number): string {
}
}

export function connect(logger: Logger, params: ConnectionProxyParams) {
export function connect(logger: Logger, params: ConnectionParams) {
return Connection.connect(params, logger)
}

export function create(logger: Logger, params: ConnectionProxyParams) {
export function create(logger: Logger, params: ConnectionParams) {
return Connection.create(params, logger)
}
4 changes: 2 additions & 2 deletions src/requests/peer_properties_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import { DataWriter } from "./data_writer"

export const PROPERTIES = {
product: "RabbitMQ Stream",
version: "0.0.1",
version: "0.2.0",
platform: "javascript",
copyright: "Copyright (c) 2020-2021 Coders51 srl",
copyright: "Copyright (c) 2020-2024 Coders51 srl",
information: "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/",
connection_name: "Unknown",
}
Expand Down
12 changes: 12 additions & 0 deletions test/e2e/connect.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { createClient } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { eventually, username, password } from "../support/util"
import { Version } from "../../src/versions"
import { randomUUID } from "node:crypto"

describe("connect", () => {
let client: Client
Expand All @@ -27,6 +28,17 @@ describe("connect", () => {
}, 5000)
}).timeout(10000)

it("declaring connection name", async () => {
const connectionName = `connection-name-${randomUUID()}`
client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName)

await eventually(async () => {
const connections = await rabbit.getConnections()
expect(connections.length).eql(1)
expect(connections[0].client_properties?.connection_name).eql(connectionName)
}, 5000)
}).timeout(10000)

it("and receive server-side message version declarations during handshake", async () => {
client = await createClient(username, password)

Expand Down
26 changes: 25 additions & 1 deletion test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
createPublisher,
createStreamName,
} from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
import { getMaxSharedConnectionInstances, range } from "../../src/util"
import { BufferDataReader } from "../../src/response_decoder"
import {
Expand All @@ -29,6 +29,7 @@ import {
} from "../support/util"
import { readFileSync } from "fs"
import path from "path"
import { randomUUID } from "crypto"

describe("declare consumer", () => {
let streamName: string
Expand Down Expand Up @@ -311,6 +312,29 @@ describe("declare consumer", () => {
expect(countConsumersOverLimit).is.undefined
expect(Array.from(counts.keys()).length).gt(1)
}).timeout(10000)

describe("when the client declares a named connection", () => {
let connectionName: string | undefined = undefined

beforeEach(async () => {
try {
await client.close()
connectionName = `consumer-${randomUUID()}`
client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName)
} catch (e) {}
})
it("the name is inherited on the consumer connection", async () => {
await createConsumer(streamName, client)

await eventually(async () => {
const connections = await rabbit.getConnections()
expect(connections.length).eql(2)
expect(connections).to.satisfy((conns: RabbitConnectionResponse[]) => {
return conns.every((conn) => conn.client_properties?.connection_name === connectionName)
})
}, 5000)
}).timeout(6000)
})
})

function createProperties(): MessageProperties {
Expand Down
26 changes: 25 additions & 1 deletion test/e2e/declare_publisher.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { expect } from "chai"
import { Client } from "../../src"
import { createClient, createPublisher, createStreamName } from "../support/fake_data"
import { Rabbit } from "../support/rabbit"
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
import { eventually, expectToThrowAsync, username, password } from "../support/util"
import { getMaxSharedConnectionInstances } from "../../src/util"
import { randomUUID } from "crypto"

describe("declare publisher", () => {
let streamName: string
Expand Down Expand Up @@ -94,4 +95,27 @@ describe("declare publisher", () => {
expect(countPublishersOverLimit).is.undefined
expect(Array.from(counts.keys()).length).gt(1)
}).timeout(10000)

describe("when the client declares a named connection", () => {
let connectionName: string | undefined = undefined

beforeEach(async () => {
try {
await client.close()
connectionName = `publisher-${randomUUID()}`
client = await createClient(username, password, undefined, undefined, undefined, undefined, connectionName)
} catch (e) {}
})
it("the name is inherited on the publisher connection", async () => {
await createPublisher(streamName, client)

await eventually(async () => {
const connections = await rabbit.getConnections()
expect(connections.length).eql(2)
expect(connections).to.satisfy((conns: RabbitConnectionResponse[]) => {
return conns.every((conn) => conn.client_properties?.connection_name === connectionName)
})
}, 5000)
}).timeout(10000)
})
})
4 changes: 3 additions & 1 deletion test/support/fake_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ export async function createClient(
listeners?: ClientListenersParams,
frameMax?: number,
bufferSizeSettings?: BufferSizeSettings,
port?: number
port?: number,
connectionName?: string
): Promise<Client> {
const [firstNode] = getTestNodesFromEnv()
return connect({
Expand All @@ -70,5 +71,6 @@ export async function createClient(
heartbeat: 0,
listeners: listeners,
bufferSizeSettings: bufferSizeSettings,
connectionName: connectionName,
})
}
10 changes: 9 additions & 1 deletion test/support/rabbit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@ import got from "got"
import { getTestNodesFromEnv } from "./util"
import { range } from "../../src/util"

interface RabbitConnectionResponse {
export interface RabbitConnectionResponse {
name: string
client_properties?: {
connection_name?: string
copyright?: string
information?: string
platform?: string
product?: string
version?: string
}
}

interface RabbitConsumerCredits {
Expand Down