diff --git a/README.md b/README.md index fdf88967..0103aea9 100644 --- a/README.md +++ b/README.md @@ -276,7 +276,49 @@ await client.close() ### Filtering -Work in progress ⚠️ +It is possible to tag messages while publishing and filter them on both the broker side and client side + +```typescript +const client = await connect({ + hostname: "localhost", + port: 5552, + username: "rabbit", + password: "rabbit", + vhost: "/", +}) + +const publisher = await client.declarePublisher( + { stream: streamName, publisherRef: `my-publisher-${randomUUID()}` }, + (msg) => msg.applicationProperties!["test"].toString() // Tags the message +) +const message1 = "test1" +const message2 = "test2" +const message3 = "test3" +const applicationProperties1 = { test: "A" } +const applicationProperties2 = { test: "B" } + +await publisher.send(Buffer.from(message1), { applicationProperties: applicationProperties1 }) +await publisher.send(Buffer.from(message2), { applicationProperties: applicationProperties1 }) +await publisher.send(Buffer.from(message3), { applicationProperties: applicationProperties2 }) + +await client.declareConsumer( + { + stream: streamName, + offset: Offset.first(), + // Filter option for the consumer + filter: { + values: ["A", "B"], + postFilterFunc: (msg) => msg.applicationProperties!["test"] === "A", + matchUnfiltered: true, + }, + }, + (msg) => filteredMsg.push(msg.content.toString("utf-8")) +) + +await sleep(2000) + +await client.close() +``` ## Running Examples diff --git a/src/requests/create_stream_request.ts b/src/requests/create_stream_request.ts index 85b8b221..4481ec28 100644 --- a/src/requests/create_stream_request.ts +++ b/src/requests/create_stream_request.ts @@ -3,11 +3,11 @@ import { AbstractRequest } from "./abstract_request" import { DataWriter } from "./data_writer" export interface CreateStreamArguments { - "x-queue-leader-locator"?: string - "x-max-age"?: string - "x-stream-max-segment-size-bytes"?: number - "x-initial-cluster-size"?: number - "x-max-length-bytes"?: number + "queue-leader-locator"?: "random" | "client-local" | "least-leaders" + "max-age"?: string + "stream-max-segment-size-bytes"?: number + "initial-cluster-size"?: number + "max-length-bytes"?: number } export class CreateStreamRequest extends AbstractRequest { diff --git a/test/support/rabbit.ts b/test/support/rabbit.ts index 6da32791..270ec283 100644 --- a/test/support/rabbit.ts +++ b/test/support/rabbit.ts @@ -1,6 +1,7 @@ import got from "got" import { getTestNodesFromEnv } from "./util" import { range } from "../../src/util" +import { CreateStreamArguments } from "../../src/requests/create_stream_request" export interface RabbitConnectionResponse { name: string @@ -26,6 +27,7 @@ interface MessageInfoResponse { messages_unacknowledged: number types: "stream" | "quorum" | "classic" node: string + arguments?: CreateStreamArguments } interface RabbitPublishersResponse { @@ -96,6 +98,7 @@ export class Rabbit { responseType: "json", } ) + return ret.body } diff --git a/test/unit/create_stream.test.ts b/test/unit/create_stream.test.ts index dc821429..204255f1 100644 --- a/test/unit/create_stream.test.ts +++ b/test/unit/create_stream.test.ts @@ -9,11 +9,11 @@ describe("Stream", () => { const rabbit = new Rabbit(username, password) const streamName = `test-stream-${randomUUID()}` const payload = { - "x-queue-leader-locator": "test", - "x-max-age": "test", - "x-stream-max-segment-size-bytes": 42, - "x-initial-cluster-size": 42, - "x-max-length-bytes": 42, + "queue-leader-locator": "random" as const, + "max-age": "120s", + "stream-max-segment-size-bytes": 1000, + "initial-cluster-size": 5, + "max-length-bytes": 20000, } let client: Client @@ -43,6 +43,21 @@ describe("Stream", () => { expect(result.name).to.be.eql(streamName) }) + it("Should create a new Stream with the given arguments", async () => { + const resp = await client.createStream({ stream: streamName, arguments: payload }) + + expect(resp).to.be.true + const result = await rabbit.getQueueInfo(streamName) + expect(result.arguments).to.be.eql({ + "x-queue-type": "stream", + "x-queue-leader-locator": payload["queue-leader-locator"], + "x-max-age": payload["max-age"], + "x-stream-max-segment-size-bytes": payload["stream-max-segment-size-bytes"], + "x-initial-cluster-size": payload["initial-cluster-size"], + "x-max-length-bytes": payload["max-length-bytes"], + }) + }) + it("Should be idempotent and ignore a duplicate Stream error", async () => { await client.createStream({ stream: streamName, arguments: payload }) const resp = await client.createStream({ stream: streamName, arguments: payload }) diff --git a/test/unit/create_super_stream.test.ts b/test/unit/create_super_stream.test.ts index b60a1078..c108685e 100644 --- a/test/unit/create_super_stream.test.ts +++ b/test/unit/create_super_stream.test.ts @@ -10,11 +10,11 @@ describe("Super Stream", () => { const rabbit = new Rabbit(username, password) const streamName = `test-stream-${randomUUID()}` const payload = { - "x-queue-leader-locator": "test", - "x-max-age": "test", - "x-stream-max-segment-size-bytes": 42, - "x-initial-cluster-size": 42, - "x-max-length-bytes": 42, + "queue-leader-locator": "random" as const, + "max-age": "120s", + "stream-max-segment-size-bytes": 1000, + "initial-cluster-size": 5, + "max-length-bytes": 20000, } let client: Client @@ -47,6 +47,27 @@ describe("Super Stream", () => { expect(result.map((r) => r.name)).to.have.members(Array.from(Array(3).keys()).map((n) => `${streamName}-${n}`)) }) + it("Should create a new Super Stream with 3 partitions by default with the given arguments", async () => { + const resp = await client.createSuperStream({ streamName, arguments: payload }) + + expect(resp).to.be.true + const result = await rabbit.getSuperStreamQueues("%2F", streamName) + expect(result.map((r) => r.name)).to.have.members(Array.from(Array(3).keys()).map((n) => `${streamName}-${n}`)) + await Promise.all( + Array.from(Array(3).keys()).map(async (n) => { + const queue = await rabbit.getQueueInfo(`${streamName}-${n}`) + expect(queue.arguments).to.be.eql({ + "x-queue-type": "stream", + "x-queue-leader-locator": payload["queue-leader-locator"], + "x-max-age": payload["max-age"], + "x-stream-max-segment-size-bytes": payload["stream-max-segment-size-bytes"], + "x-initial-cluster-size": payload["initial-cluster-size"], + "x-max-length-bytes": payload["max-length-bytes"], + }) + }) + ) + }) + it("Should create a new Super Stream with 2 partitions", async () => { const resp = await client.createSuperStream({ streamName, arguments: payload }, undefined, 2)