Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 51 additions & 56 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class Client {
private consumers = new Map<string, ConsumerMappedValue>()
private publishers = new Map<string, PublisherMappedValue>()
private compressions = new Map<CompressionType, Compression>()
private connection: Connection
private locatorConnection: Connection
private pool: ConnectionPool = new ConnectionPool()

private constructor(
Expand All @@ -76,20 +76,19 @@ export class Client {
) {
this.compressions.set(CompressionType.None, NoneCompression.create())
this.compressions.set(CompressionType.Gzip, GzipCompression.create())
this.connection = this.getLocatorConnection()
this.connection.incrRefCount()
this.locatorConnection = this.getLocatorConnection()
}

getCompression(compressionType: CompressionType) {
return this.connection.getCompression(compressionType)
return this.locatorConnection.getCompression(compressionType)
}

registerCompression(compression: Compression) {
this.connection.registerCompression(compression)
this.locatorConnection.registerCompression(compression)
}

public start(): Promise<Client> {
return this.connection.start().then(
return this.locatorConnection.start().then(
(_res) => {
return this
},
Expand All @@ -104,26 +103,18 @@ export class Client {
this.logger.info(`${this.id} Closing client...`)
if (this.publisherCounts()) {
this.logger.info(`Stopping all producers...`)
await this.closeAllPublishers(true)
await this.closeAllPublishers()
}
if (this.consumerCounts()) {
this.logger.info(`Stopping all consumers...`)
await this.closeAllConsumers(true)
}
this.connection.decrRefCount()
await this.closeConnectionIfUnused(this.connection, params)
}

private async closeConnectionIfUnused(connection: Connection, params: ClosingParams) {
if (connection.refCount <= 0) {
this.pool.removeCachedConnection(this.connection)
await this.connection.close({ ...params, manuallyClose: true })
await this.closeAllConsumers()
}
await this.locatorConnection.close({ ...params, manuallyClose: true })
}

public async queryMetadata(params: QueryMetadataParams): Promise<StreamMetadata[]> {
const { streams } = params
const res = await this.connection.sendAndWait<MetadataResponse>(new MetadataRequest({ streams }))
const res = await this.locatorConnection.sendAndWait<MetadataResponse>(new MetadataRequest({ streams }))
if (!res.ok) {
throw new Error(`Query Metadata command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
Expand All @@ -135,7 +126,7 @@ export class Client {

public async queryPartitions(params: QueryPartitionsParams): Promise<string[]> {
const { superStream } = params
const res = await this.connection.sendAndWait<PartitionsResponse>(new PartitionsQuery({ superStream }))
const res = await this.locatorConnection.sendAndWait<PartitionsResponse>(new PartitionsQuery({ superStream }))
if (!res.ok) {
throw new Error(`Query Partitions command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
Expand All @@ -158,7 +149,7 @@ export class Client {
}
let lastPublishingId = 0n
if (streamPublisherParams.publisherRef) {
lastPublishingId = await this.connection.queryPublisherSequence({
lastPublishingId = await this.locatorConnection.queryPublisherSequence({
stream: streamPublisherParams.stream,
publisherRef: streamPublisherParams.publisherRef,
})
Expand All @@ -178,7 +169,7 @@ export class Client {
public async deletePublisher(extendedPublisherId: string) {
const { publisher, connection } = this.publishers.get(extendedPublisherId) ?? {
publisher: undefined,
connection: this.connection,
connection: this.locatorConnection,
}
const publisherId = extractPublisherId(extendedPublisherId)
const res = await connection.sendAndWait<DeletePublisherResponse>(new DeletePublisherRequest(publisherId))
Expand Down Expand Up @@ -241,7 +232,7 @@ export class Client {
}

const consumerId = extractConsumerId(extendedConsumerId)
const { streamInfos } = await this.connection.sendAndWait<MetadataResponse>(
const { streamInfos } = await this.locatorConnection.sendAndWait<MetadataResponse>(
new MetadataRequest({ streams: [activeConsumer.consumer.streamName] })
)
if (streamInfos.length > 0 && streamExists(streamInfos[0])) {
Expand Down Expand Up @@ -280,16 +271,16 @@ export class Client {
}

public queryOffset(params: QueryOffsetParams) {
return this.connection.queryOffset(params)
return this.locatorConnection.queryOffset(params)
}

private async closeAllConsumers(manuallyClose: boolean) {
await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close(manuallyClose)))
private async closeAllConsumers() {
await Promise.all([...this.consumers.values()].map(({ consumer }) => consumer.close()))
this.consumers = new Map<string, ConsumerMappedValue>()
}

private async closeAllPublishers(manuallyClose: boolean) {
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(manuallyClose)))
private async closeAllPublishers() {
await Promise.all([...this.publishers.values()].map((c) => c.publisher.close(true)))
this.publishers = new Map<string, PublisherMappedValue>()
}

Expand All @@ -307,7 +298,7 @@ export class Client {

public async createStream(params: { stream: string; arguments?: CreateStreamArguments }): Promise<true> {
this.logger.debug(`Create Stream...`)
const res = await this.connection.sendAndWait<CreateStreamResponse>(new CreateStreamRequest(params))
const res = await this.locatorConnection.sendAndWait<CreateStreamResponse>(new CreateStreamRequest(params))
if (res.code === STREAM_ALREADY_EXISTS_ERROR_CODE) {
return true
}
Expand All @@ -321,7 +312,7 @@ export class Client {

public async deleteStream(params: { stream: string }): Promise<true> {
this.logger.debug(`Delete Stream...`)
const res = await this.connection.sendAndWait<DeleteStreamResponse>(new DeleteStreamRequest(params.stream))
const res = await this.locatorConnection.sendAndWait<DeleteStreamResponse>(new DeleteStreamRequest(params.stream))
if (!res.ok) {
throw new Error(`Delete Stream command returned error with code ${res.code}`)
}
Expand Down Expand Up @@ -349,7 +340,7 @@ export class Client {
numberOfPartitions,
bindingKeys
)
const res = await this.connection.sendAndWait<CreateSuperStreamResponse>(
const res = await this.locatorConnection.sendAndWait<CreateSuperStreamResponse>(
new CreateSuperStreamRequest({ ...params, partitions, bindingKeys: streamBindingKeys })
)
if (res.code === STREAM_ALREADY_EXISTS_ERROR_CODE) {
Expand All @@ -371,7 +362,7 @@ export class Client {
}

this.logger.debug(`Delete Super Stream...`)
const res = await this.connection.sendAndWait<DeleteSuperStreamResponse>(
const res = await this.locatorConnection.sendAndWait<DeleteSuperStreamResponse>(
new DeleteSuperStreamRequest(params.streamName)
)
if (!res.ok) {
Expand All @@ -382,7 +373,7 @@ export class Client {
}

public async streamStatsRequest(streamName: string) {
const res = await this.connection.sendAndWait<StreamStatsResponse>(new StreamStatsRequest(streamName))
const res = await this.locatorConnection.sendAndWait<StreamStatsResponse>(new StreamStatsRequest(streamName))
if (!res.ok) {
throw new Error(`Stream Stats command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
Expand All @@ -391,24 +382,24 @@ export class Client {
}

public getConnectionInfo(): ConnectionInfo {
return this.connection.getConnectionInfo()
return this.locatorConnection.getConnectionInfo()
}

public async subscribe(params: SubscribeParams): Promise<SubscribeResponse> {
const res = await this.connection.sendAndWait<SubscribeResponse>(new SubscribeRequest({ ...params }))
const res = await this.locatorConnection.sendAndWait<SubscribeResponse>(new SubscribeRequest({ ...params }))
if (!res.ok) {
throw new Error(`Subscribe command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
return res
}

public async restart() {
this.logger.info(`Restarting client connection ${this.connection.connectionId}`)
this.logger.info(`Restarting client connection ${this.locatorConnection.connectionId}`)
const uniqueConnectionIds = new Set<string>()
uniqueConnectionIds.add(this.connection.connectionId)
uniqueConnectionIds.add(this.locatorConnection.connectionId)

await wait(5000)
await this.connection.restart()
await this.locatorConnection.restart()

for (const { consumer, connection, params } of this.consumers.values()) {
if (!uniqueConnectionIds.has(connection.connectionId)) {
Expand All @@ -431,19 +422,19 @@ export class Client {
}

public get maxFrameSize() {
return this.connection.maxFrameSize ?? DEFAULT_FRAME_MAX
return this.locatorConnection.maxFrameSize ?? DEFAULT_FRAME_MAX
}

public get serverVersions() {
return this.connection.serverVersions
return this.locatorConnection.serverVersions
}

public get rabbitManagementVersion() {
return this.connection.rabbitManagementVersion
return this.locatorConnection.rabbitManagementVersion
}

public async routeQuery(params: { routingKey: string; superStream: string }) {
const res = await this.connection.sendAndWait<RouteResponse>(new RouteQuery(params))
const res = await this.locatorConnection.sendAndWait<RouteResponse>(new RouteQuery(params))
if (!res.ok) {
throw new Error(`Route Query command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
Expand All @@ -452,7 +443,7 @@ export class Client {
}

public async partitionsQuery(params: { superStream: string }) {
const res = await this.connection.sendAndWait<PartitionsResponse>(new PartitionsQuery(params))
const res = await this.locatorConnection.sendAndWait<PartitionsResponse>(new PartitionsQuery(params))
if (!res.ok) {
throw new Error(`Partitions Query command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
}
Expand Down Expand Up @@ -629,23 +620,27 @@ export class Client {
connectionClosedListener?: ConnectionClosedListener
): Promise<Connection> {
const [metadata] = await this.queryMetadata({ streams: [streamName] })
const chosenNode = chooseNode(metadata, purpose === "publisher")
const isPublisher = purpose === "publisher"
const chosenNode = chooseNode(metadata, isPublisher)
if (!chosenNode) {
throw new Error(`Stream was not found on any node`)
}
const cachedConnection = this.pool.getCachedConnection(purpose, streamName, this.connection.vhost, chosenNode.host)
if (cachedConnection) return cachedConnection

const newConnection = await this.getConnectionOnChosenNode(
const connection = await this.pool.getConnection(
purpose,
streamName,
chosenNode,
metadata,
connectionClosedListener
this.locatorConnection.vhost,
chosenNode.host,
async () => {
return await this.getConnectionOnChosenNode(
isPublisher,
streamName,
chosenNode,
metadata,
connectionClosedListener
)
}
)

this.pool.cacheConnection(purpose, streamName, this.connection.vhost, newConnection.hostname, newConnection)
return newConnection
return connection
}

private createSuperStreamPartitionsAndBindingKeys(
Expand Down Expand Up @@ -688,13 +683,13 @@ export class Client {
}

private async getConnectionOnChosenNode(
purpose: ConnectionPurpose,
isPublisher: boolean,
streamName: string,
chosenNode: { host: string; port: number },
metadata: StreamMetadata,
connectionClosedListener?: ConnectionClosedListener
): Promise<Connection> {
const connectionParams = this.buildConnectionParams(purpose === "publisher", streamName, connectionClosedListener)
const connectionParams = this.buildConnectionParams(isPublisher, streamName, connectionClosedListener)
if (this.params.addressResolver && this.params.addressResolver.enabled) {
const maxAttempts = computeMaxAttempts(metadata)
const resolver = this.params.addressResolver
Expand Down Expand Up @@ -727,7 +722,7 @@ export class Client {
}

private async closing(consumer: StreamConsumer, extendedConsumerId: string) {
await consumer.close(true)
await consumer.close()
this.consumers.delete(extendedConsumerId)
this.logger.info(`Closed consumer with id: ${extendedConsumerId}`)
}
Expand Down
63 changes: 34 additions & 29 deletions src/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,58 @@ type InstanceKey = string
export type ConnectionPurpose = "consumer" | "publisher"

export class ConnectionPool {
private consumerConnectionProxies: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()
private publisherConnectionProxies: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()

public getCachedConnection(purpose: ConnectionPurpose, streamName: string, vhost: string, host: string) {
const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies
const key = this.getCacheKey(streamName, vhost, host)
const proxies = map.get(key) || []
const connection = proxies.at(-1)
const refCount = connection?.refCount
return refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined
}
private connectionsMap: Map<InstanceKey, Connection[]> = new Map<InstanceKey, Connection[]>()

public cacheConnection(
purpose: ConnectionPurpose,
public async getConnection(
entityType: ConnectionPurpose,
streamName: string,
vhost: string,
host: string,
client: Connection
connectionCreator: () => Promise<Connection>
) {
const map = purpose === "publisher" ? this.publisherConnectionProxies : this.consumerConnectionProxies
const key = this.getCacheKey(streamName, vhost, host)
const currentlyCached = map.get(key) || []
currentlyCached.push(client)
map.set(key, currentlyCached)
const key = this.getCacheKey(streamName, vhost, host, entityType)
const connections = this.connectionsMap.get(key) || []
const connection = connections.at(-1)
const refCount = connection?.refCount
const cachedConnection =
refCount !== undefined && refCount < getMaxSharedConnectionInstances() ? connection : undefined

if (cachedConnection) {
return cachedConnection
} else {
const newConnection = await connectionCreator()
this.cacheConnection(key, newConnection)
return newConnection
}
}

public removeIfUnused(connection: Connection) {
public async releaseConnection(connection: Connection, manuallyClose = true): Promise<void> {
connection.decrRefCount()
if (connection.refCount <= 0) {
await connection.close({ closingCode: 0, closingReason: "", manuallyClose })
this.removeCachedConnection(connection)
return true
}
return false
}

public removeCachedConnection(connection: Connection) {
private cacheConnection(key: string, connection: Connection) {
const currentlyCached = this.connectionsMap.get(key) || []
currentlyCached.push(connection)
this.connectionsMap.set(key, currentlyCached)
}

private removeCachedConnection(connection: Connection) {
const { leader, streamName, hostname: host, vhost } = connection
if (streamName === undefined) return
const m = leader ? this.publisherConnectionProxies : this.consumerConnectionProxies
const k = this.getCacheKey(streamName, vhost, host)
const mappedClientList = m.get(k)
const entityType = leader ? "publisher" : "consumer"
const k = this.getCacheKey(streamName, vhost, host, entityType)
const mappedClientList = this.connectionsMap.get(k)
if (mappedClientList) {
const filtered = mappedClientList.filter((c) => c !== connection)
m.set(k, filtered)
this.connectionsMap.set(k, filtered)
}
}

private getCacheKey(streamName: string, vhost: string, host: string) {
return `${streamName}@${vhost}@${host}`
private getCacheKey(streamName: string, vhost: string, host: string, entityType: ConnectionPurpose) {
return `${streamName}@${vhost}@${host}@${entityType}`
}
}
7 changes: 2 additions & 5 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,9 @@ export class StreamConsumer implements Consumer {
this.singleActive = params.singleActive ?? false
}

async close(manuallyClose: boolean): Promise<void> {
async close(): Promise<void> {
this.closed = true
this.connection.decrRefCount()
if (this.pool.removeIfUnused(this.connection)) {
await this.connection.close({ closingCode: 0, closingReason: "", manuallyClose })
}
await this.pool.releaseConnection(this.connection)
}

public storeOffset(offsetValue: bigint): Promise<void> {
Expand Down
Loading