Skip to content

Commit 5485562

Browse files
authored
Adds filtering feature on publishing side (#171)
* Adds filtering feature with publish request v2 and deliver v2 * Npm check fix * Rebase from refactor * Revert of request interfaces * Revert to standard time out for test * PR review fixes * Rename of test * Increase timeout for cluster --------- Co-authored-by: magne <magnello@coders51.com>
1 parent 2651eb9 commit 5485562

26 files changed

+436
-66
lines changed

src/client.ts

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { Compression, CompressionType, GzipCompression, NoneCompression } from "
44
import { Consumer, ConsumerFunc, StreamConsumer } from "./consumer"
55
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
66
import { Logger, NullLogger } from "./logger"
7-
import { Message, Publisher, StreamPublisher } from "./publisher"
7+
import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
88
import { ConsumerUpdateResponse } from "./requests/consumer_update_response"
99
import { CreateStreamArguments, CreateStreamRequest } from "./requests/create_stream_request"
1010
import { CreditRequest, CreditRequestParams } from "./requests/credit_request"
@@ -41,6 +41,7 @@ import { DeleteSuperStreamRequest } from "./requests/delete_super_stream_request
4141
import { lt, coerce } from "semver"
4242
import { ConnectionInfo, Connection, errorMessageOf } from "./connection"
4343
import { ConnectionPool } from "./connection_pool"
44+
import { DeliverResponseV2 } from "./responses/deliver_response_v2"
4445

4546
export type ConnectionClosedListener = (hadError: boolean) => void
4647

@@ -125,7 +126,7 @@ export class Client {
125126
return res.streams
126127
}
127128

128-
public async declarePublisher(params: DeclarePublisherParams): Promise<Publisher> {
129+
public async declarePublisher(params: DeclarePublisherParams, filter?: FilterFunc): Promise<Publisher> {
129130
const { stream, publisherRef } = params
130131
const publisherId = this.incPublisherId()
131132

@@ -137,16 +138,22 @@ export class Client {
137138
await connection.close()
138139
throw new Error(`Declare Publisher command returned error with code ${res.code} - ${errorMessageOf(res.code)}`)
139140
}
140-
const publisher = new StreamPublisher({
141-
connection: connection,
142-
stream: params.stream,
143-
publisherId: publisherId,
144-
publisherRef: params.publisherRef,
145-
boot: params.boot,
146-
maxFrameSize: this.maxFrameSize,
147-
maxChunkLength: params.maxChunkLength,
148-
logger: this.logger,
149-
})
141+
if (filter && !connection.isFilteringEnabled) {
142+
throw new Error(`Broker does not support message filtering.`)
143+
}
144+
const publisher = new StreamPublisher(
145+
{
146+
connection: connection,
147+
stream: params.stream,
148+
publisherId: publisherId,
149+
publisherRef: params.publisherRef,
150+
boot: params.boot,
151+
maxFrameSize: this.maxFrameSize,
152+
maxChunkLength: params.maxChunkLength,
153+
logger: this.logger,
154+
},
155+
filter
156+
)
150157
this.publishers.set(publisherId, { publisher: publisher, connection: connection })
151158
this.logger.info(
152159
`New publisher created with stream name ${params.stream}, publisher id ${publisherId} and publisher reference ${params.publisherRef}`
@@ -410,14 +417,28 @@ export class Client {
410417
return consumerId
411418
}
412419

413-
private getDeliverCallback() {
420+
private getDeliverV1Callback() {
414421
return async (response: DeliverResponse) => {
415422
const consumer = this.consumers.get(response.subscriptionId)
416423
if (!consumer) {
417-
this.logger.error(`On deliver no consumer found`)
424+
this.logger.error(`On deliverV1 no consumer found`)
425+
return
426+
}
427+
this.logger.debug(`on deliverV1 -> ${consumer.consumerRef}`)
428+
this.logger.debug(`response.messages.length: ${response.messages.length}`)
429+
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
430+
response.messages.map((x) => consumer.handle(x))
431+
}
432+
}
433+
434+
private getDeliverV2Callback() {
435+
return async (response: DeliverResponseV2) => {
436+
const consumer = this.consumers.get(response.subscriptionId)
437+
if (!consumer) {
438+
this.logger.error(`On deliverV2 no consumer found`)
418439
return
419440
}
420-
this.logger.debug(`on deliver -> ${consumer.consumerRef}`)
441+
this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`)
421442
this.logger.debug(`response.messages.length: ${response.messages.length}`)
422443
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
423444
response.messages.map((x) => consumer.handle(x))
@@ -493,7 +514,8 @@ export class Client {
493514
const connectionListeners = {
494515
...this.params.listeners,
495516
connection_closed: connectionClosedListener,
496-
deliver: this.getDeliverCallback(),
517+
deliverV1: this.getDeliverV1Callback(),
518+
deliverV2: this.getDeliverV2Callback(),
497519
consumer_update_query: this.getConsumerUpdateCallback(),
498520
}
499521
return { ...this.params, listeners: connectionListeners, leader: leader, streamName: streamName }

src/connection.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { TuneRequest } from "./requests/tune_request"
1616
import {
1717
ConsumerUpdateQueryListener,
1818
DeliverListener,
19+
DeliverV2Listener,
1920
MetadataUpdateListener,
2021
PublishConfirmListener,
2122
PublishErrorListener,
@@ -29,7 +30,7 @@ import { Response } from "./responses/response"
2930
import { SaslAuthenticateResponse } from "./responses/sasl_authenticate_response"
3031
import { SaslHandshakeResponse } from "./responses/sasl_handshake_response"
3132
import { TuneResponse } from "./responses/tune_response"
32-
import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, removeFrom } from "./util"
33+
import { DEFAULT_FRAME_MAX, DEFAULT_UNLIMITED_FRAME_MAX, REQUIRED_MANAGEMENT_VERSION, removeFrom } from "./util"
3334
import { Version, checkServerDeclaredVersions, getClientSupportedVersions } from "./versions"
3435
import { WaitingResponse } from "./waiting_response"
3536
import { ClientListenersParams, ClientParams, ClosingParams, QueryOffsetParams, StoreOffsetParams } from "./client"
@@ -38,11 +39,13 @@ import { QueryPublisherRequest } from "./requests/query_publisher_request"
3839
import { StoreOffsetRequest } from "./requests/store_offset_request"
3940
import { QueryOffsetResponse } from "./responses/query_offset_response"
4041
import { QueryOffsetRequest } from "./requests/query_offset_request"
42+
import { coerce, lt } from "semver"
4143

4244
export type ConnectionClosedListener = (hadError: boolean) => void
4345

4446
export type ConnectionProxyListenersParams = ClientListenersParams & {
45-
deliver?: DeliverListener
47+
deliverV1?: DeliverListener
48+
deliverV2?: DeliverV2Listener
4649
consumer_update_query?: ConsumerUpdateQueryListener
4750
}
4851

@@ -82,6 +85,7 @@ export class Connection {
8285
private serverEndpoint: { host: string; port: number } = { host: "", port: 5552 }
8386
private readonly serverDeclaredVersions: Version[] = []
8487
private refs: number = 0
88+
private filteringEnabled: boolean = false
8589

8690
constructor(private readonly params: ConnectionProxyParams, private readonly logger: Logger) {
8791
this.hostname = params.hostname
@@ -125,6 +129,7 @@ export class Connection {
125129
this.socket.on("connect", async () => {
126130
this.logger.info(`Connected to RabbitMQ ${this.params.hostname}:${this.params.port}`)
127131
this.peerProperties = (await this.exchangeProperties()).properties
132+
this.filteringEnabled = lt(coerce(this.rabbitManagementVersion)!, REQUIRED_MANAGEMENT_VERSION) ? false : true
128133
await this.auth({ username: this.params.username, password: this.params.password })
129134
const { heartbeat } = await this.tune(this.params.heartbeat ?? 0)
130135
await this.open({ virtualHost: this.params.vhost })
@@ -151,15 +156,23 @@ export class Connection {
151156
public on(event: "metadata_update", listener: MetadataUpdateListener): void
152157
public on(event: "publish_confirm", listener: PublishConfirmListener): void
153158
public on(event: "publish_error", listener: PublishErrorListener): void
154-
public on(event: "deliver", listener: DeliverListener): void
159+
public on(event: "deliverV1", listener: DeliverListener): void
160+
public on(event: "deliverV2", listener: DeliverV2Listener): void
155161
public on(event: "consumer_update_query", listener: ConsumerUpdateQueryListener): void
156162
public on(
157-
event: "metadata_update" | "publish_confirm" | "publish_error" | "deliver" | "consumer_update_query",
163+
event:
164+
| "metadata_update"
165+
| "publish_confirm"
166+
| "publish_error"
167+
| "deliverV1"
168+
| "deliverV2"
169+
| "consumer_update_query",
158170
listener:
159171
| MetadataUpdateListener
160172
| PublishConfirmListener
161173
| PublishErrorListener
162174
| DeliverListener
175+
| DeliverV2Listener
163176
| ConsumerUpdateQueryListener
164177
) {
165178
switch (event) {
@@ -172,8 +185,11 @@ export class Connection {
172185
case "publish_error":
173186
this.decoder.on("publish_error", listener as PublishErrorListener)
174187
break
175-
case "deliver":
176-
this.decoder.on("deliver", listener as DeliverListener)
188+
case "deliverV1":
189+
this.decoder.on("deliverV1", listener as DeliverListener)
190+
break
191+
case "deliverV2":
192+
this.decoder.on("deliverV2", listener as DeliverV2Listener)
177193
break
178194
case "consumer_update_query":
179195
this.decoder.on("consumer_update_query", listener as ConsumerUpdateQueryListener)
@@ -187,7 +203,8 @@ export class Connection {
187203
if (listeners?.metadata_update) this.decoder.on("metadata_update", listeners.metadata_update)
188204
if (listeners?.publish_confirm) this.decoder.on("publish_confirm", listeners.publish_confirm)
189205
if (listeners?.publish_error) this.decoder.on("publish_error", listeners.publish_error)
190-
if (listeners?.deliver) this.decoder.on("deliver", listeners.deliver)
206+
if (listeners?.deliverV1) this.decoder.on("deliverV1", listeners.deliverV1)
207+
if (listeners?.deliverV2) this.decoder.on("deliverV2", listeners.deliverV2)
191208
if (listeners?.consumer_update_query) this.decoder.on("consumer_update_query", listeners.consumer_update_query)
192209
}
193210

@@ -332,6 +349,10 @@ export class Connection {
332349
return this.peerProperties.version
333350
}
334351

352+
public get isFilteringEnabled() {
353+
return this.filteringEnabled
354+
}
355+
335356
private async auth(params: { username: string; password: string }) {
336357
this.logger.debug(`Start authentication process ...`)
337358
this.logger.debug(`Start SASL handshake ...`)

src/publisher.ts

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { DEFAULT_UNLIMITED_FRAME_MAX } from "./util"
1111
import { MetadataUpdateListener } from "./response_decoder"
1212
import { ConnectionInfo, Connection } from "./connection"
1313
import { ConnectionPool } from "./connection_pool"
14+
import { PublishRequestV2 } from "./requests/publish_request_v2"
1415

1516
export type MessageApplicationProperties = Record<string, string | number>
1617

@@ -70,6 +71,7 @@ export interface Publisher {
7071
readonly publisherId: number
7172
}
7273

74+
export type FilterFunc = (msg: Message) => string | undefined
7375
type PublishConfirmCallback = (err: number | null, publishingIds: bigint[]) => void
7476
export class StreamPublisher implements Publisher {
7577
private connection: Connection
@@ -85,16 +87,19 @@ export class StreamPublisher implements Publisher {
8587
private maxChunkLength: number
8688
private closed = false
8789

88-
constructor(params: {
89-
connection: Connection
90-
stream: string
91-
publisherId: number
92-
publisherRef?: string
93-
boot?: boolean
94-
maxFrameSize: number
95-
maxChunkLength?: number
96-
logger: Logger
97-
}) {
90+
constructor(
91+
params: {
92+
connection: Connection
93+
stream: string
94+
publisherId: number
95+
publisherRef?: string
96+
boot?: boolean
97+
maxFrameSize: number
98+
maxChunkLength?: number
99+
logger: Logger
100+
},
101+
private readonly filter?: FilterFunc
102+
) {
98103
this.connection = params.connection
99104
this.stream = params.stream
100105
this.publisherId = params.publisherId
@@ -187,6 +192,12 @@ export class StreamPublisher implements Publisher {
187192
}
188193

189194
private async enqueue(publishRequestMessage: PublishRequestMessage) {
195+
if (this.filter) {
196+
publishRequestMessage.filterValue = this.filter(publishRequestMessage.message)
197+
}
198+
if (!this.connection.isFilteringEnabled && this.filter) {
199+
throw new Error(`Your rabbit server management version does not support filtering.`)
200+
}
190201
this.checkMessageSize(publishRequestMessage)
191202
const sendCycleNeeded = this.add(publishRequestMessage)
192203
let sent = false
@@ -211,12 +222,19 @@ export class StreamPublisher implements Publisher {
211222
private async sendBuffer() {
212223
const chunk = this.popChunk()
213224
if (chunk.length > 0) {
214-
await this.connection.send(
215-
new PublishRequest({
216-
publisherId: this.publisherId,
217-
messages: chunk,
218-
})
219-
)
225+
this.filter
226+
? await this.connection.send(
227+
new PublishRequestV2({
228+
publisherId: this.publisherId,
229+
messages: chunk,
230+
})
231+
)
232+
: await this.connection.send(
233+
new PublishRequest({
234+
publisherId: this.publisherId,
235+
messages: chunk,
236+
})
237+
)
220238
}
221239
}
222240

src/requests/abstract_request.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ export class BufferDataWriter implements DataWriter {
4444
this._offset = this.buffer.writeInt8(data, this._offset)
4545
}
4646

47+
writeInt16(data: number) {
48+
const bytes = 2
49+
this.growIfNeeded(bytes)
50+
this._offset = this.buffer.writeInt16BE(data, this._offset)
51+
}
52+
4753
writeUInt8(data: number): void {
4854
const bytes = 1
4955
this.growIfNeeded(bytes)
@@ -113,7 +119,9 @@ export class BufferDataWriter implements DataWriter {
113119
export abstract class AbstractRequest implements Request {
114120
abstract get key(): number
115121
abstract get responseKey(): number
116-
readonly version = 1
122+
get version() {
123+
return 1
124+
}
117125

118126
toBuffer(bufferSizeParams?: BufferSizeParams, correlationId?: number): Buffer {
119127
const initialSize = bufferSizeParams?.initialSize ?? 65536

0 commit comments

Comments
 (0)