Skip to content

Commit 74fcb86

Browse files
l4mbymagne
andauthored
[bug] creation of stream with arguments (#176)
* Creation of stream with arguments fix * Adds tests to check for stream arguments * Add filtering example on readme * Fix readme filtering section --------- Co-authored-by: magne <magnello@coders51.com>
1 parent 37f6133 commit 74fcb86

File tree

5 files changed

+97
-16
lines changed

5 files changed

+97
-16
lines changed

README.md

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,49 @@ await client.close()
276276

277277
### Filtering
278278

279-
Work in progress ⚠️
279+
It is possible to tag messages while publishing and filter them on both the broker side and client side
280+
281+
```typescript
282+
const client = await connect({
283+
hostname: "localhost",
284+
port: 5552,
285+
username: "rabbit",
286+
password: "rabbit",
287+
vhost: "/",
288+
})
289+
290+
const publisher = await client.declarePublisher(
291+
{ stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
292+
(msg) => msg.applicationProperties!["test"].toString() // Tags the message
293+
)
294+
const message1 = "test1"
295+
const message2 = "test2"
296+
const message3 = "test3"
297+
const applicationProperties1 = { test: "A" }
298+
const applicationProperties2 = { test: "B" }
299+
300+
await publisher.send(Buffer.from(message1), { applicationProperties: applicationProperties1 })
301+
await publisher.send(Buffer.from(message2), { applicationProperties: applicationProperties1 })
302+
await publisher.send(Buffer.from(message3), { applicationProperties: applicationProperties2 })
303+
304+
await client.declareConsumer(
305+
{
306+
stream: streamName,
307+
offset: Offset.first(),
308+
// Filter option for the consumer
309+
filter: {
310+
values: ["A", "B"],
311+
postFilterFunc: (msg) => msg.applicationProperties!["test"] === "A",
312+
matchUnfiltered: true,
313+
},
314+
},
315+
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
316+
)
317+
318+
await sleep(2000)
319+
320+
await client.close()
321+
```
280322

281323
## Running Examples
282324

src/requests/create_stream_request.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ import { AbstractRequest } from "./abstract_request"
33
import { DataWriter } from "./data_writer"
44

55
export interface CreateStreamArguments {
6-
"x-queue-leader-locator"?: string
7-
"x-max-age"?: string
8-
"x-stream-max-segment-size-bytes"?: number
9-
"x-initial-cluster-size"?: number
10-
"x-max-length-bytes"?: number
6+
"queue-leader-locator"?: "random" | "client-local" | "least-leaders"
7+
"max-age"?: string
8+
"stream-max-segment-size-bytes"?: number
9+
"initial-cluster-size"?: number
10+
"max-length-bytes"?: number
1111
}
1212

1313
export class CreateStreamRequest extends AbstractRequest {

test/support/rabbit.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import got from "got"
22
import { getTestNodesFromEnv } from "./util"
33
import { range } from "../../src/util"
4+
import { CreateStreamArguments } from "../../src/requests/create_stream_request"
45

56
export interface RabbitConnectionResponse {
67
name: string
@@ -26,6 +27,7 @@ interface MessageInfoResponse {
2627
messages_unacknowledged: number
2728
types: "stream" | "quorum" | "classic"
2829
node: string
30+
arguments?: CreateStreamArguments
2931
}
3032

3133
interface RabbitPublishersResponse {
@@ -96,6 +98,7 @@ export class Rabbit {
9698
responseType: "json",
9799
}
98100
)
101+
99102
return ret.body
100103
}
101104

test/unit/create_stream.test.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ describe("Stream", () => {
99
const rabbit = new Rabbit(username, password)
1010
const streamName = `test-stream-${randomUUID()}`
1111
const payload = {
12-
"x-queue-leader-locator": "test",
13-
"x-max-age": "test",
14-
"x-stream-max-segment-size-bytes": 42,
15-
"x-initial-cluster-size": 42,
16-
"x-max-length-bytes": 42,
12+
"queue-leader-locator": "random" as const,
13+
"max-age": "120s",
14+
"stream-max-segment-size-bytes": 1000,
15+
"initial-cluster-size": 5,
16+
"max-length-bytes": 20000,
1717
}
1818
let client: Client
1919

@@ -43,6 +43,21 @@ describe("Stream", () => {
4343
expect(result.name).to.be.eql(streamName)
4444
})
4545

46+
it("Should create a new Stream with the given arguments", async () => {
47+
const resp = await client.createStream({ stream: streamName, arguments: payload })
48+
49+
expect(resp).to.be.true
50+
const result = await rabbit.getQueueInfo(streamName)
51+
expect(result.arguments).to.be.eql({
52+
"x-queue-type": "stream",
53+
"x-queue-leader-locator": payload["queue-leader-locator"],
54+
"x-max-age": payload["max-age"],
55+
"x-stream-max-segment-size-bytes": payload["stream-max-segment-size-bytes"],
56+
"x-initial-cluster-size": payload["initial-cluster-size"],
57+
"x-max-length-bytes": payload["max-length-bytes"],
58+
})
59+
})
60+
4661
it("Should be idempotent and ignore a duplicate Stream error", async () => {
4762
await client.createStream({ stream: streamName, arguments: payload })
4863
const resp = await client.createStream({ stream: streamName, arguments: payload })

test/unit/create_super_stream.test.ts

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ describe("Super Stream", () => {
1010
const rabbit = new Rabbit(username, password)
1111
const streamName = `test-stream-${randomUUID()}`
1212
const payload = {
13-
"x-queue-leader-locator": "test",
14-
"x-max-age": "test",
15-
"x-stream-max-segment-size-bytes": 42,
16-
"x-initial-cluster-size": 42,
17-
"x-max-length-bytes": 42,
13+
"queue-leader-locator": "random" as const,
14+
"max-age": "120s",
15+
"stream-max-segment-size-bytes": 1000,
16+
"initial-cluster-size": 5,
17+
"max-length-bytes": 20000,
1818
}
1919
let client: Client
2020

@@ -47,6 +47,27 @@ describe("Super Stream", () => {
4747
expect(result.map((r) => r.name)).to.have.members(Array.from(Array(3).keys()).map((n) => `${streamName}-${n}`))
4848
})
4949

50+
it("Should create a new Super Stream with 3 partitions by default with the given arguments", async () => {
51+
const resp = await client.createSuperStream({ streamName, arguments: payload })
52+
53+
expect(resp).to.be.true
54+
const result = await rabbit.getSuperStreamQueues("%2F", streamName)
55+
expect(result.map((r) => r.name)).to.have.members(Array.from(Array(3).keys()).map((n) => `${streamName}-${n}`))
56+
await Promise.all(
57+
Array.from(Array(3).keys()).map(async (n) => {
58+
const queue = await rabbit.getQueueInfo(`${streamName}-${n}`)
59+
expect(queue.arguments).to.be.eql({
60+
"x-queue-type": "stream",
61+
"x-queue-leader-locator": payload["queue-leader-locator"],
62+
"x-max-age": payload["max-age"],
63+
"x-stream-max-segment-size-bytes": payload["stream-max-segment-size-bytes"],
64+
"x-initial-cluster-size": payload["initial-cluster-size"],
65+
"x-max-length-bytes": payload["max-length-bytes"],
66+
})
67+
})
68+
)
69+
})
70+
5071
it("Should create a new Super Stream with 2 partitions", async () => {
5172
const resp = await client.createSuperStream({ streamName, arguments: payload }, undefined, 2)
5273

0 commit comments

Comments
 (0)