diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index ad4d990b..5854472f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,7 +20,7 @@ jobs: services: rabbitmq: - image: rabbitmq:3.13-rc-management + image: rabbitmq:4.0.5-management options: --hostname test-node --name test-node env: RABBITMQ_DEFAULT_USER: "test-user" diff --git a/cluster/docker-compose.yml b/cluster/docker-compose.yml index 0e7f408d..9fff48b7 100755 --- a/cluster/docker-compose.yml +++ b/cluster/docker-compose.yml @@ -9,7 +9,7 @@ services: networks: - back hostname: node0 - image: rabbitmq:3.13-rc-management + image: rabbitmq:4.0.5-management ports: - "5560:5550" - "5561:5551" @@ -27,7 +27,7 @@ services: networks: - back hostname: node1 - image: rabbitmq:3.13-rc-management + image: rabbitmq:4.0.5-management ports: - "5570:5550" - "5571:5551" @@ -45,7 +45,7 @@ services: networks: - back hostname: node2 - image: rabbitmq:3.13-rc-management + image: rabbitmq:4.0.5-management ports: - "5580:5550" - "5581:5551" diff --git a/docker-compose.yaml b/docker-compose.yaml index c0f2ece1..167e150d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,6 @@ services: rabbitmq-stream: - image: rabbitmq:3.13-rc-management + image: rabbitmq:4.0.5-management container_name: rabbitmq-stream restart: unless-stopped hostname: "rabbitmq" diff --git a/example/docker-compose.yaml b/example/docker-compose.yaml index ca23d71d..9025cb77 100644 --- a/example/docker-compose.yaml +++ b/example/docker-compose.yaml @@ -2,7 +2,7 @@ version: "2" services: rabbitmq-stream: - image: rabbitmq:3-management + image: rabbitmq:4.0.5-management container_name: rabbitmq-stream restart: unless-stopped hostname: "rabbitmq" diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 9cf17221..22d8b03b 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -277,15 +277,15 @@ describe("declare consumer", () => { it("messageAnnotations with bytes are read correctly", async () => { const messageAnnotations: MessageAnnotations[] = [] const annotations = { test: new AmqpByte(123) } - await rabbit.createStream("testQ") + await rabbit.createStream(streamName) await client.declareConsumer( - { stream: "testQ", offset: Offset.next(), consumerRef: "test" }, + { stream: streamName, offset: Offset.next(), consumerRef: "test" }, (message: Message) => { messageAnnotations.push(message.messageAnnotations ?? {}) } ) - const testP = await client.declarePublisher({ stream: "testQ" }) + const testP = await client.declarePublisher({ stream: streamName }) await testP.send(Buffer.from("Hello"), { messageAnnotations: annotations }) await eventually(async () => { diff --git a/test/e2e/filtering.test.ts b/test/e2e/filtering.test.ts index ddba0dad..7cfe967a 100644 --- a/test/e2e/filtering.test.ts +++ b/test/e2e/filtering.test.ts @@ -86,7 +86,8 @@ describe("filtering", () => { }).timeout(10000) it("published messages are filtered on the server side keeping only the ones with filter value", async () => { - const filteredMsg: string[] = [] + const expectedMessages: string[] = [] + const notCorrectlyFilteredMessages: string[] = [] const publisher = await client.declarePublisher( { stream: streamName, publisherRef: `my-publisher-${randomUUID()}` }, (msg) => (msg.applicationProperties ? msg.applicationProperties["test"].toString() : undefined) @@ -110,12 +111,20 @@ describe("filtering", () => { }, }, (msg) => { - filteredMsg.push(msg.content.toString("utf-8")) + if (msg.applicationProperties?.test === "A" || msg.applicationProperties?.test === "B") + expectedMessages.push(msg.content.toString("utf-8")) + else notCorrectlyFilteredMessages.push(msg.content.toString("utf-8")) } ) + //RabbitMQ uses a Bloom filter for server side filtering. + //A Bloom filter is very efficient in terms of storage and speed, but it is probabilistic: it can return false positives. + //Because of this, the broker can send messages it believes match the expected filter values whereas they do not. That's why some client-side filtering logic is necessary. + //For this reason some messages may not be correctly filtered, but we expect the number of them to be very low. + //For more information: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering await eventually(async () => { - expect(filteredMsg.length).eql(2000) + expect(expectedMessages.length).eql(2000) + expect(notCorrectlyFilteredMessages.length).below(150) }, 10000) }).timeout(15000) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 8e8e01dd..8a79db85 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -150,9 +150,9 @@ describe("offset", () => { const receivedMessages: Message[] = [] const publisher = await client.declarePublisher({ stream: testStreamName }) const previousMessages = await sendANumberOfRandomMessages(publisher) - await wait(10) + await publisher.flush() + await wait(100) const offset = new Date() - const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) await client.declareConsumer( { stream: testStreamName, consumerRef: "my consumer", offset: Offset.timestamp(offset) }, @@ -161,9 +161,15 @@ describe("offset", () => { } ) + const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) + await eventually(async () => { expect(receivedMessages).to.have.length(laterMessages.length) }) + + await always(async () => { + expect(receivedMessages).to.have.length(laterMessages.length) + }) }) }) diff --git a/test/e2e/route_query.test.ts b/test/e2e/route_query.test.ts index 98376730..1c84965a 100644 --- a/test/e2e/route_query.test.ts +++ b/test/e2e/route_query.test.ts @@ -37,6 +37,10 @@ describe("RouteQuery command", () => { const streamName = randomUUID() await rabbit.createStream(streamName) - await expectToThrowAsync(() => client.routeQuery({ routingKey: "0", superStream: streamName }), Error) + try { + await expectToThrowAsync(() => client.routeQuery({ routingKey: "0", superStream: streamName }), Error) + } finally { + await rabbit.deleteStream(streamName) + } }) }) diff --git a/test/support/util.ts b/test/support/util.ts index 373d31cf..3839ec85 100644 --- a/test/support/util.ts +++ b/test/support/util.ts @@ -216,6 +216,7 @@ export const sendANumberOfRandomMessages = async (publisher: Publisher, offset = const noOfMessages = Math.floor(Math.random() * 10) + 1 const messages = Array.from(Array(noOfMessages).keys()).map((_, i) => `Message number ${i + offset + 1}`) await Promise.all(messages.map((m) => publisher.send(Buffer.from(m)))) + await publisher.flush() return messages }