Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,12 @@ export class Client {
if (!chosenNode) {
throw new Error(`Stream was not found on any node`)
}
const cachedConnection = ConnectionPool.getUsableCachedConnection(purpose, streamName, chosenNode.host)
const cachedConnection = ConnectionPool.getUsableCachedConnection(
purpose,
streamName,
this.connection.vhost,
chosenNode.host
)
if (cachedConnection) return cachedConnection

const newConnection = await this.getConnectionOnChosenNode(
Expand All @@ -622,7 +627,7 @@ export class Client {
connectionClosedListener
)

ConnectionPool.cacheConnection(purpose, streamName, newConnection.hostname, newConnection)
ConnectionPool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection)
return newConnection
}

Expand Down
6 changes: 5 additions & 1 deletion src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export type ConnectionInfo = {
port: number
id: string
ready: boolean
vhost: string
readable?: boolean
writable?: boolean
localPort?: number
Expand All @@ -78,6 +79,7 @@ type ListenerEntry = {

export class Connection {
public readonly hostname: string
public readonly vhost: string
public readonly leader: boolean
public readonly streamName: string | undefined
private socket: Socket
Expand Down Expand Up @@ -109,6 +111,7 @@ export class Connection {
private readonly logger: Logger
) {
this.hostname = params.hostname
this.vhost = params.vhost
this.leader = params.leader ?? false
this.streamName = params.streamName
if (params.frameMax) this.frameMax = params.frameMax
Expand Down Expand Up @@ -374,6 +377,7 @@ export class Connection {
writable: this.socket.writable,
localPort: this.socket.localPort,
ready: this.ready,
vhost: this.vhost,
}
}

Expand Down Expand Up @@ -484,7 +488,7 @@ export class Connection {
}

private virtualHostIsNotValid(virtualHost: string) {
if (!virtualHost || virtualHost.split("/").length !== 2) {
if (!virtualHost) {
return true
}

Expand Down
22 changes: 14 additions & 8 deletions src/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@ export class ConnectionPool {
private static consumerConnectionProxies = new Map<InstanceKey, Connection[]>()
private static publisherConnectionProxies = new Map<InstanceKey, Connection[]>()

public static getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, host: string) {
public static getUsableCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) {
const map =
purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
const key = ConnectionPool.getCacheKey(streamName, host)
const key = ConnectionPool.getCacheKey(streamName, vhost, host)
const proxies = map.get(key) || []
const connection = proxies.at(-1)
const refCount = connection?.refCount
return refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined
}

public static cacheConnection(purpose: ConnectionPurpose, streamName: string, host: string, client: Connection) {
public static cacheConnection(
purpose: ConnectionPurpose,
streamName: string,
vhost: string,
host: string,
client: Connection
) {
const map =
purpose === "publisher" ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
const key = ConnectionPool.getCacheKey(streamName, host)
const key = ConnectionPool.getCacheKey(streamName, vhost, host)
const currentlyCached = map.get(key) || []
currentlyCached.push(client)
map.set(key, currentlyCached)
Expand All @@ -36,18 +42,18 @@ export class ConnectionPool {
}

public static removeCachedConnection(connection: Connection) {
const { leader, streamName, hostname: host } = connection
const { leader, streamName, hostname: host, vhost } = connection
if (streamName === undefined) return
const m = leader ? ConnectionPool.publisherConnectionProxies : ConnectionPool.consumerConnectionProxies
const k = ConnectionPool.getCacheKey(streamName, host)
const k = ConnectionPool.getCacheKey(streamName, vhost, host)
const mappedClientList = m.get(k)
if (mappedClientList) {
const filtered = mappedClientList.filter((c) => c !== connection)
m.set(k, filtered)
}
}

private static getCacheKey(streamName: string, host: string) {
return `${streamName}@${host}`
private static getCacheKey(streamName: string, vhost: string, host: string) {
return `${streamName}@${vhost}@${host}`
}
}
4 changes: 2 additions & 2 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ export class StreamConsumer implements Consumer {
}

public getConnectionInfo(): ConnectionInfo {
const { host, port, id, readable, localPort, ready } = this.connection.getConnectionInfo()
return { host, port, id, readable, localPort, ready }
const { host, port, id, readable, localPort, ready, vhost } = this.connection.getConnectionInfo()
return { host, port, id, readable, localPort, ready, vhost }
}

public get localOffset() {
Expand Down
4 changes: 2 additions & 2 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ export class StreamPublisher implements Publisher {
}

public getConnectionInfo(): ConnectionInfo {
const { host, port, id, writable, localPort, ready } = this.connection.getConnectionInfo()
return { host, port, id, writable, localPort, ready }
const { host, port, id, writable, localPort, ready, vhost } = this.connection.getConnectionInfo()
return { host, port, id, writable, localPort, ready, vhost }
}

public on(event: "metadata_update", listener: MetadataUpdateListener): void
Expand Down
78 changes: 78 additions & 0 deletions test/e2e/stream_cache.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import { expect } from "chai"
import got from "got"
import { Client } from "../../src"
import { createClient, createStreamName } from "../support/fake_data"
import { Rabbit, RabbitConnectionResponse } from "../support/rabbit"
import { getTestNodesFromEnv, password, username } from "../support/util"

async function createVhost(vhost: string): Promise<undefined> {
const uriVhost = encodeURIComponent(vhost)
const port = process.env.RABBIT_MQ_MANAGEMENT_PORT || 15672
const firstNode = getTestNodesFromEnv().shift()!
await got.put<RabbitConnectionResponse>(`http://${firstNode.host}:${port}/api/vhosts/${uriVhost}`, {
username: username,
password: password,
})
await got
.put<RabbitConnectionResponse>(`http://${firstNode.host}:${port}/api/permissions/${uriVhost}/${username}`, {
json: {
read: ".*",
write: ".*",
configure: ".*",
},
username: username,
password: password,
})
.json()
}

async function deleteVhost(vhost: string): Promise<RabbitConnectionResponse> {
const uriVhost = encodeURIComponent(vhost)
const port = process.env.RABBIT_MQ_MANAGEMENT_PORT || 15672
const firstNode = getTestNodesFromEnv().shift()!
const r = await got.delete<RabbitConnectionResponse>(`http://${firstNode.host}:${port}/api/vhosts/${uriVhost}`, {
username: username,
password: password,
})

return r.body
}

describe("cache", () => {
const vhost1 = "vhost1"
let streamName: string
const rabbit = new Rabbit(username, password)
let client: Client
let client2: Client
before(async () => {
await createVhost(vhost1)
})
beforeEach(async () => {
client = await createClient(username, password)
client2 = await createClient(username, password, undefined, undefined, undefined, undefined, undefined, vhost1)
streamName = createStreamName()
await client.createStream({ stream: streamName })
await client2.createStream({ stream: streamName })
})
afterEach(async () => {
try {
await client.close()
await client2.close()
await deleteVhost(vhost1)
await rabbit.deleteStream(streamName)
await rabbit.closeAllConnections()
await rabbit.deleteAllQueues({ match: /my-stream-/ })
} catch (_e) {}
})

it("should cache using the vhost as well as the stream name", async () => {
const publisher1 = await client.declarePublisher({
stream: streamName,
})
expect(publisher1.getConnectionInfo().vhost).eql("/")
const publisher2 = await client2.declarePublisher({
stream: streamName,
})
expect(publisher2.getConnectionInfo().vhost).eql(vhost1)
})
})
5 changes: 3 additions & 2 deletions test/support/fake_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ export async function createClient(
frameMax?: number,
bufferSizeSettings?: BufferSizeSettings,
port?: number,
connectionName?: string
connectionName?: string,
vhost?: string
): Promise<Client> {
const [firstNode] = getTestNodesFromEnv()
return connect(
Expand All @@ -74,7 +75,7 @@ export async function createClient(
port: port ?? firstNode.port,
username,
password,
vhost: "/",
vhost: vhost ?? "/",
frameMax: frameMax ?? 0,
heartbeat: 0,
listeners: listeners,
Expand Down