Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,49 @@ await client.close()

### Filtering

Work in progress ⚠️
It is possible to tag messages while publishing and filter them on both the broker side and client side

```typescript
const client = await connect({
hostname: "localhost",
port: 5552,
username: "rabbit",
password: "rabbit",
vhost: "/",
})

const publisher = await client.declarePublisher(
{ stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
(msg) => msg.applicationProperties!["test"].toString() // Tags the message
)
const message1 = "test1"
const message2 = "test2"
const message3 = "test3"
const applicationProperties1 = { test: "A" }
const applicationProperties2 = { test: "B" }

await publisher.send(Buffer.from(message1), { applicationProperties: applicationProperties1 })
await publisher.send(Buffer.from(message2), { applicationProperties: applicationProperties1 })
await publisher.send(Buffer.from(message3), { applicationProperties: applicationProperties2 })

await client.declareConsumer(
{
stream: streamName,
offset: Offset.first(),
// Filter option for the consumer
filter: {
values: ["A", "B"],
postFilterFunc: (msg) => msg.applicationProperties!["test"] === "A",
matchUnfiltered: true,
},
},
(msg) => filteredMsg.push(msg.content.toString("utf-8"))
)

await sleep(2000)

await client.close()
```

## Running Examples

Expand Down
10 changes: 5 additions & 5 deletions src/requests/create_stream_request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ import { AbstractRequest } from "./abstract_request"
import { DataWriter } from "./data_writer"

export interface CreateStreamArguments {
"x-queue-leader-locator"?: string
"x-max-age"?: string
"x-stream-max-segment-size-bytes"?: number
"x-initial-cluster-size"?: number
"x-max-length-bytes"?: number
"queue-leader-locator"?: "random" | "client-local" | "least-leaders"
"max-age"?: string
"stream-max-segment-size-bytes"?: number
"initial-cluster-size"?: number
"max-length-bytes"?: number
}

export class CreateStreamRequest extends AbstractRequest {
Expand Down
3 changes: 3 additions & 0 deletions test/support/rabbit.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import got from "got"
import { getTestNodesFromEnv } from "./util"
import { range } from "../../src/util"
import { CreateStreamArguments } from "../../src/requests/create_stream_request"

export interface RabbitConnectionResponse {
name: string
Expand All @@ -26,6 +27,7 @@ interface MessageInfoResponse {
messages_unacknowledged: number
types: "stream" | "quorum" | "classic"
node: string
arguments?: CreateStreamArguments
}

interface RabbitPublishersResponse {
Expand Down Expand Up @@ -96,6 +98,7 @@ export class Rabbit {
responseType: "json",
}
)

return ret.body
}

Expand Down
25 changes: 20 additions & 5 deletions test/unit/create_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ describe("Stream", () => {
const rabbit = new Rabbit(username, password)
const streamName = `test-stream-${randomUUID()}`
const payload = {
"x-queue-leader-locator": "test",
"x-max-age": "test",
"x-stream-max-segment-size-bytes": 42,
"x-initial-cluster-size": 42,
"x-max-length-bytes": 42,
"queue-leader-locator": "random" as const,
"max-age": "120s",
"stream-max-segment-size-bytes": 1000,
"initial-cluster-size": 5,
"max-length-bytes": 20000,
}
let client: Client

Expand Down Expand Up @@ -43,6 +43,21 @@ describe("Stream", () => {
expect(result.name).to.be.eql(streamName)
})

it("Should create a new Stream with the given arguments", async () => {
const resp = await client.createStream({ stream: streamName, arguments: payload })

expect(resp).to.be.true
const result = await rabbit.getQueueInfo(streamName)
expect(result.arguments).to.be.eql({
"x-queue-type": "stream",
"x-queue-leader-locator": payload["queue-leader-locator"],
"x-max-age": payload["max-age"],
"x-stream-max-segment-size-bytes": payload["stream-max-segment-size-bytes"],
"x-initial-cluster-size": payload["initial-cluster-size"],
"x-max-length-bytes": payload["max-length-bytes"],
})
})

it("Should be idempotent and ignore a duplicate Stream error", async () => {
await client.createStream({ stream: streamName, arguments: payload })
const resp = await client.createStream({ stream: streamName, arguments: payload })
Expand Down
31 changes: 26 additions & 5 deletions test/unit/create_super_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ describe("Super Stream", () => {
const rabbit = new Rabbit(username, password)
const streamName = `test-stream-${randomUUID()}`
const payload = {
"x-queue-leader-locator": "test",
"x-max-age": "test",
"x-stream-max-segment-size-bytes": 42,
"x-initial-cluster-size": 42,
"x-max-length-bytes": 42,
"queue-leader-locator": "random" as const,
"max-age": "120s",
"stream-max-segment-size-bytes": 1000,
"initial-cluster-size": 5,
"max-length-bytes": 20000,
}
let client: Client

Expand Down Expand Up @@ -47,6 +47,27 @@ describe("Super Stream", () => {
expect(result.map((r) => r.name)).to.have.members(Array.from(Array(3).keys()).map((n) => `${streamName}-${n}`))
})

it("Should create a new Super Stream with 3 partitions by default with the given arguments", async () => {
const resp = await client.createSuperStream({ streamName, arguments: payload })

expect(resp).to.be.true
const result = await rabbit.getSuperStreamQueues("%2F", streamName)
expect(result.map((r) => r.name)).to.have.members(Array.from(Array(3).keys()).map((n) => `${streamName}-${n}`))
await Promise.all(
Array.from(Array(3).keys()).map(async (n) => {
const queue = await rabbit.getQueueInfo(`${streamName}-${n}`)
expect(queue.arguments).to.be.eql({
"x-queue-type": "stream",
"x-queue-leader-locator": payload["queue-leader-locator"],
"x-max-age": payload["max-age"],
"x-stream-max-segment-size-bytes": payload["stream-max-segment-size-bytes"],
"x-initial-cluster-size": payload["initial-cluster-size"],
"x-max-length-bytes": payload["max-length-bytes"],
})
})
)
})

it("Should create a new Super Stream with 2 partitions", async () => {
const resp = await client.createSuperStream({ streamName, arguments: payload }, undefined, 2)

Expand Down