Skip to content

Commit 5c90105

Browse files
committed
Rebase from refactor
1 parent 02b067b commit 5c90105

File tree

3 files changed

+54
-19
lines changed

3 files changed

+54
-19
lines changed

src/client.ts

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Compression, CompressionType, GzipCompression, NoneCompression } from "
44
import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer"
55
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
66
import { Logger, NullLogger } from "./logger"
7-
import { Message, Publisher, StreamPublisher } from "./publisher"
7+
import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
88
import { ConsumerUpdateResponse } from "./requests/consumer_update_response"
99
import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request"
1010
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
@@ -138,7 +138,7 @@ export class Client {
138138
await connection.close()
139139
throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
140140
}
141-
if (filter && !connection.filteringEnabled) {
141+
if (filter && !connection.isFilteringEnabled) {
142142
await connection.close()
143143
throw new Error(`Broker does not support message filtering.`)
144144
}
@@ -418,17 +418,31 @@ export class Client {
418418
return consumerId
419419
}
420420

421-
private getDeliverCallback() {
422-
return async (deliverVersion: "deliverV1" | "deliverV2", deliverResponse: DeliverResponse | DeliverResponseV2) => {
423-
const consumer = this.consumers.get(deliverResponse.subscriptionId)
421+
private getDeliverV1Callback() {
422+
return async (response: DeliverResponse) => {
423+
const consumer = this.consumers.get(response.subscriptionId)
424+
if (!consumer) {
425+
this.logger.error(`On deliverV1 no consumer found`)
426+
return
427+
}
428+
this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`)
429+
this.logger.debug(`response.messages.length: ${response.messages.length}`)
430+
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
431+
response.messages.map((x) => consumer.handle(x))
432+
}
433+
}
434+
435+
private getDeliverV2Callback() {
436+
return async (response: DeliverResponseV2) => {
437+
const consumer = this.consumers.get(response.subscriptionId)
424438
if (!consumer) {
425-
this.logger.error(`On ${deliverVersion} no consumer found`)
439+
this.logger.error(`On deliverV2 no consumer found`)
426440
return
427441
}
428-
this.logger.debug(`on ${deliverVersion} -> ${consumer.consumerRef}`)
429-
this.logger.debug(`deliverResponse.messages.length: ${deliverResponse.messages.length}`)
430-
await this.askForCredit({ credit: 1, subscriptionId: deliverResponse.subscriptionId })
431-
deliverResponse.messages.map((x) => consumer.handle(x))
442+
this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`)
443+
this.logger.debug(`response.messages.length: ${response.messages.length}`)
444+
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
445+
response.messages.map((x) => consumer.handle(x))
432446
}
433447
}
434448

@@ -501,7 +515,8 @@ export class Client {
501515
const connectionListeners = {
502516
...this.params.listeners,
503517
connection_closed: connectionClosedListener,
504-
deliver: this.getDeliverCallback(),
518+
deliverV1: this.getDeliverV1Callback(),
519+
deliverV2: this.getDeliverV2Callback(),
505520
consumer_update_query: this.getConsumerUpdateCallback(),
506521
}
507522
return { ...this.params, listeners: connectionListeners, leader: leader, streamName: streamName }

src/connection.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { TuneRequest } from "./requests/tune_request"
1616
import {
1717
ConsumerUpdateQueryListener,
1818
DeliverListener,
19+
DeliverV2Listener,
1920
MetadataUpdateListener,
2021
PublishConfirmListener,
2122
PublishErrorListener,
@@ -29,7 +30,7 @@ import { Response } from "./responses/response"
2930
import { SaslAuthenticateResponse } from "./responses/sasl_authenticate_response"
3031
import { SaslHandshakeResponse } from "./responses/sasl_handshake_response"
3132
import { TuneResponse } from "./responses/tune_response"
32-
import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, removeFrom } from "./util"
33+
import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, removeFrom } from "./util"
3334
import { Version, checkServerDeclaredVersions, getClientSupportedVersions } from "./versions"
3435
import { WaitingResponse } from "./waiting_response"
3536
import { ClientListenersParams, ClientParams, ClosingParams, QueryOffsetParams, StoreOffsetParams } from "./client"
@@ -38,11 +39,13 @@ import { QueryPublisherRequest } from "./requests/query_publisher_request"
3839
import { StoreOffsetRequest } from "./requests/store_offset_request"
3940
import { QueryOffsetResponse } from "./responses/query_offset_response"
4041
import { QueryOffsetRequest } from "./requests/query_offset_request"
42+
import { coerce, lt } from "semver"
4143

4244
export type ConnectionClosedListener = (hadError: boolean) => void
4345

4446
export type ConnectionProxyListenersParams = ClientListenersParams & {
45-
deliver?: DeliverListener
47+
deliverV1?: DeliverListener
48+
deliverV2?: DeliverV2Listener
4649
consumer_update_query?: ConsumerUpdateQueryListener
4750
}
4851

@@ -82,6 +85,7 @@ export class Connection {
8285
private serverEndpoint: { host: string; port: number } = { host: "", port: 5552 }
8386
private readonly serverDeclaredVersions: Version[] = []
8487
private refs: number = 0
88+
private filteringEnabled: boolean = false
8589

8690
constructor(private readonly params: ConnectionProxyParams, private readonly logger: Logger) {
8791
this.hostname = params.hostname
@@ -125,6 +129,7 @@ export class Connection {
125129
this.socket.on("connect", async () => {
126130
this.logger.info(`Connected to RabbitMQ ${this.params.hostname}:${this.params.port}`)
127131
this.peerProperties = (await this.exchangeProperties()).properties
132+
this.filteringEnabled = lt(coerce(this.rabbitManagementVersion)!, REQUIRED_MANAGEMENT_VERSION) ? false : true
128133
await this.auth({ username: this.params.username, password: this.params.password })
129134
const { heartbeat } = await this.tune(this.params.heartbeat ?? 0)
130135
await this.open({ virtualHost: this.params.vhost })
@@ -151,15 +156,23 @@ export class Connection {
151156
public on(event: "metadata_update", listener: MetadataUpdateListener): void
152157
public on(event: "publish_confirm", listener: PublishConfirmListener): void
153158
public on(event: "publish_error", listener: PublishErrorListener): void
154-
public on(event: "deliver", listener: DeliverListener): void
159+
public on(event: "deliverV1", listener: DeliverListener): void
160+
public on(event: "deliverV2", listener: DeliverV2Listener): void
155161
public on(event: "consumer_update_query", listener: ConsumerUpdateQueryListener): void
156162
public on(
157-
event: "metadata_update" | "publish_confirm" | "publish_error" | "deliver" | "consumer_update_query",
163+
event:
164+
| "metadata_update"
165+
| "publish_confirm"
166+
| "publish_error"
167+
| "deliverV1"
168+
| "deliverV2"
169+
| "consumer_update_query",
158170
listener:
159171
| MetadataUpdateListener
160172
| PublishConfirmListener
161173
| PublishErrorListener
162174
| DeliverListener
175+
| DeliverV2Listener
163176
| ConsumerUpdateQueryListener
164177
) {
165178
switch (event) {
@@ -172,8 +185,11 @@ export class Connection {
172185
case "publish_error":
173186
this.decoder.on("publish_error", listener as PublishErrorListener)
174187
break
175-
case "deliver":
176-
this.decoder.on("deliver", listener as DeliverListener)
188+
case "deliverV1":
189+
this.decoder.on("deliverV1", listener as DeliverListener)
190+
break
191+
case "deliverV2":
192+
this.decoder.on("deliverV2", listener as DeliverV2Listener)
177193
break
178194
case "consumer_update_query":
179195
this.decoder.on("consumer_update_query", listener as ConsumerUpdateQueryListener)
@@ -187,7 +203,8 @@ export class Connection {
187203
if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update)
188204
if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm)
189205
if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error)
190-
if (listeners?.deliver) this.decoder.on("deliver", listeners.deliver)
206+
if (listeners?.deliverV1) this.decoder.on("deliverV1", listeners.deliverV1)
207+
if (listeners?.deliverV2) this.decoder.on("deliverV2", listeners.deliverV2)
191208
if (listeners?.consumer_update_query) this.decoder.on("consumer_update_query", listeners.consumer_update_query)
192209
}
193210

@@ -332,6 +349,10 @@ export class Connection {
332349
return this.peerProperties.version
333350
}
334351

352+
public get isFilteringEnabled() {
353+
return this.filteringEnabled
354+
}
355+
335356
private async auth(params: { username: string; password: string }) {
336357
this.logger.debug(`Start authentication process ...`)
337358
this.logger.debug(`Start SASL handshake ...`)

src/publisher.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util"
1111
import { MetadataUpdateListener } from "./response_decoder"
1212
import { ConnectionInfo, Connection } from "./connection"
1313
import { ConnectionPool } from "./connection_pool"
14-
import { coerce, lt } from "semver"
1514
import { PublishRequestV2 } from "./requests/publish_request_v2"
1615

1716
export type MessageApplicationProperties = Record<string, string | number>

0 commit comments

Comments
 (0)