Skip to content

Commit a56a632

Browse files
authored
Adds filtering feature on consumer side (#172)
* Adds filtering feature with publish request v2 and deliver v2 * Npm check fix * Revert of request interfaces * Revert to standard time out for test * PR review fixes * Adds filtering on consuming side * Rebase from filtering on publisher side * Remove connection close on declare consumer with filter error * Increase filtering test timeout * Fix on consumer filtering --------- Co-authored-by: magne <magnello@coders51.com>
1 parent 5485562 commit a56a632

File tree

3 files changed

+137
-10
lines changed

3 files changed

+137
-10
lines changed

src/client.ts

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,23 @@ export class Client {
177177
public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise<Consumer> {
178178
const consumerId = this.incConsumerId()
179179
const properties: Record<string, string> = {}
180-
const client = await this.getConnection(params.stream, false, params.connectionClosedListener)
181-
const consumer = new StreamConsumer(addOffsetFilterToHandle(handle, params.offset), {
182-
connection: client,
183-
stream: params.stream,
184-
consumerId,
185-
consumerRef: params.consumerRef,
186-
offset: params.offset,
187-
})
180+
const connection = await this.getConnection(params.stream, false, params.connectionClosedListener)
181+
182+
if (params.filter && !connection.isFilteringEnabled) {
183+
throw new Error(`Broker does not support message filtering.`)
184+
}
185+
186+
const consumer = new StreamConsumer(
187+
addOffsetFilterToHandle(handle, params.offset),
188+
{
189+
connection,
190+
stream: params.stream,
191+
consumerId,
192+
consumerRef: params.consumerRef,
193+
offset: params.offset,
194+
},
195+
params.filter
196+
)
188197
this.consumers.set(consumerId, consumer)
189198

190199
if (params.singleActive && !params.consumerRef) {
@@ -194,6 +203,12 @@ export class Client {
194203
properties["single-active-consumer"] = "true"
195204
properties["name"] = params.consumerRef!
196205
}
206+
if (params.filter) {
207+
for (let i = 0; i < params.filter.values.length; i++) {
208+
properties[`filter.${i}`] = params.filter.values[i]
209+
}
210+
properties["match-unfiltered"] = `${params.filter.matchUnfiltered}`
211+
}
197212

198213
const res = await this.connection.sendAndWait<SubscribeResponse>(
199214
new SubscribeRequest({ ...params, subscriptionId: consumerId, credit: 10, properties: properties })
@@ -441,6 +456,10 @@ export class Client {
441456
this.logger.debug(`on deliverV2 -> ${consumer.consumerRef}`)
442457
this.logger.debug(`response.messages.length: ${response.messages.length}`)
443458
await this.askForCredit({ credit: 1, subscriptionId: response.subscriptionId })
459+
if (consumer.filter) {
460+
response.messages.filter((x) => consumer.filter?.postFilterFunc(x)).map((x) => consumer.handle(x))
461+
return
462+
}
444463
response.messages.map((x) => consumer.handle(x))
445464
}
446465
}
@@ -610,12 +629,19 @@ export interface DeclareSuperStreamPublisherParams {
610629
routingStrategy?: RoutingStrategy
611630
}
612631

632+
export interface ConsumerFilter {
633+
values: string[]
634+
postFilterFunc: (msg: Message) => boolean
635+
matchUnfiltered: boolean
636+
}
637+
613638
export interface DeclareConsumerParams {
614639
stream: string
615640
consumerRef?: string
616641
offset: Offset
617642
connectionClosedListener?: ConnectionClosedListener
618643
singleActive?: boolean
644+
filter?: ConsumerFilter
619645
}
620646

621647
export interface DeclareSuperStreamConsumerParams {

src/consumer.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { ConsumerFilter } from "./client"
12
import { ConnectionInfo, Connection } from "./connection"
23
import { ConnectionPool } from "./connection_pool"
34
import { Message } from "./publisher"
@@ -29,7 +30,8 @@ export class StreamConsumer implements Consumer {
2930
consumerId: number
3031
consumerRef?: string
3132
offset: Offset
32-
}
33+
},
34+
readonly filter?: ConsumerFilter
3335
) {
3436
this.connection = params.connection
3537
this.stream = params.stream

test/e2e/filtering.test.ts

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { expect } from "chai"
22
import { randomUUID } from "crypto"
3-
import { Client } from "../../src"
3+
import { Client, Offset } from "../../src"
44
import { createClient, createStreamName } from "../support/fake_data"
55
import { Rabbit } from "../support/rabbit"
66
import { eventually, username, password } from "../support/util"
@@ -47,4 +47,103 @@ describe("filtering", () => {
4747
expect((await rabbit.getQueueInfo(streamName)).messages).eql(3)
4848
}, 10000)
4949
}).timeout(10000)
50+
51+
it("published messages are filtered on the consumer side", async () => {
52+
const filteredMsg: string[] = []
53+
const publisher = await client.declarePublisher(
54+
{ stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
55+
(msg) => msg.applicationProperties!["test"].toString()
56+
)
57+
const message1 = "test1"
58+
const message2 = "test2"
59+
const message3 = "test3"
60+
const applicationProperties1 = { test: "A" }
61+
const applicationProperties2 = { test: "B" }
62+
const applicationProperties3 = { test: "C" }
63+
await publisher.send(Buffer.from(message1), { applicationProperties: applicationProperties1 })
64+
await publisher.send(Buffer.from(message2), { applicationProperties: applicationProperties2 })
65+
await publisher.send(Buffer.from(message3), { applicationProperties: applicationProperties3 })
66+
67+
await client.declareConsumer(
68+
{
69+
stream: streamName,
70+
offset: Offset.first(),
71+
filter: {
72+
values: ["A", "B"],
73+
postFilterFunc: (msg) => msg.applicationProperties!["test"] === "A",
74+
matchUnfiltered: true,
75+
},
76+
},
77+
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
78+
)
79+
80+
await eventually(async () => {
81+
expect(filteredMsg[0]).eql("test1")
82+
expect(filteredMsg.length).eql(1)
83+
}, 10000)
84+
}).timeout(10000)
85+
86+
it("published messages are filtered on the server side keeping only the ones with filter value", async () => {
87+
const filteredMsg: string[] = []
88+
const publisher = await client.declarePublisher(
89+
{ stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
90+
(msg) => (msg.applicationProperties ? msg.applicationProperties["test"].toString() : undefined)
91+
)
92+
const applicationProperties1 = { test: "A" }
93+
const applicationProperties2 = { test: "B" }
94+
for (let i = 0; i < 1000; i++)
95+
await publisher.send(Buffer.from(`test${i + 1}`), { applicationProperties: applicationProperties1 })
96+
for (let i = 0; i < 1000; i++)
97+
await publisher.send(Buffer.from(`test${i + 1}`), { applicationProperties: applicationProperties2 })
98+
for (let i = 0; i < 1000; i++) await publisher.send(Buffer.from(`test${i + 1}`))
99+
100+
await client.declareConsumer(
101+
{
102+
stream: streamName,
103+
offset: Offset.first(),
104+
filter: {
105+
values: ["A", "B"],
106+
postFilterFunc: (_msg) => true,
107+
matchUnfiltered: false,
108+
},
109+
},
110+
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
111+
)
112+
113+
await eventually(async () => {
114+
expect(filteredMsg.length).eql(2000)
115+
}, 10000)
116+
}).timeout(10000)
117+
118+
it("published messages are filtered on the server side keeping even the ones with filter value", async () => {
119+
const filteredMsg: string[] = []
120+
const publisher = await client.declarePublisher(
121+
{ stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
122+
(msg) => (msg.applicationProperties ? msg.applicationProperties["test"].toString() : undefined)
123+
)
124+
const applicationProperties1 = { test: "A" }
125+
const applicationProperties2 = { test: "B" }
126+
for (let i = 0; i < 1000; i++)
127+
await publisher.send(Buffer.from(`test${i + 1}`), { applicationProperties: applicationProperties1 })
128+
for (let i = 0; i < 1000; i++)
129+
await publisher.send(Buffer.from(`test${i + 1}`), { applicationProperties: applicationProperties2 })
130+
for (let i = 0; i < 1000; i++) await publisher.send(Buffer.from(`test${i + 1}`))
131+
132+
await client.declareConsumer(
133+
{
134+
stream: streamName,
135+
offset: Offset.first(),
136+
filter: {
137+
values: ["A", "B"],
138+
postFilterFunc: (_msg) => true,
139+
matchUnfiltered: true,
140+
},
141+
},
142+
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
143+
)
144+
145+
await eventually(async () => {
146+
expect(filteredMsg.length).eql(3000)
147+
}, 10000)
148+
}).timeout(10000)
50149
})

0 commit comments

Comments
 (0)