From 32f21eca1bc974360e915e8e82de66a6f2cb0117 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Tue, 21 Jan 2025 10:35:01 +0100 Subject: [PATCH 01/15] increased timeout value --- test/e2e/offset.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 8e8e01dd..343e50da 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -150,7 +150,7 @@ describe("offset", () => { const receivedMessages: Message[] = [] const publisher = await client.declarePublisher({ stream: testStreamName }) const previousMessages = await sendANumberOfRandomMessages(publisher) - await wait(10) + await wait(1000) const offset = new Date() const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) @@ -164,7 +164,7 @@ describe("offset", () => { await eventually(async () => { expect(receivedMessages).to.have.length(laterMessages.length) }) - }) + }).timeout(5000) }) describe("store", () => { From 5bcf76b1ad1dfbd09ba3120fd911abbb396401f8 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Tue, 21 Jan 2025 13:27:40 +0100 Subject: [PATCH 02/15] fix filter flaky test --- test/e2e/filtering.test.ts | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/test/e2e/filtering.test.ts b/test/e2e/filtering.test.ts index ddba0dad..e84a9485 100644 --- a/test/e2e/filtering.test.ts +++ b/test/e2e/filtering.test.ts @@ -86,7 +86,8 @@ describe("filtering", () => { }).timeout(10000) it("published messages are filtered on the server side keeping only the ones with filter value", async () => { - const filteredMsg: string[] = [] + const expectedMessages: string[] = [] + const notCorrectlyFilteredMessages: string[] = [] const publisher = await client.declarePublisher( { stream: streamName, publisherRef: `my-publisher-${randomUUID()}` }, (msg) => (msg.applicationProperties ? msg.applicationProperties["test"].toString() : undefined) @@ -110,12 +111,20 @@ describe("filtering", () => { }, }, (msg) => { - filteredMsg.push(msg.content.toString("utf-8")) + if (msg.applicationProperties?.test === "A" || msg.applicationProperties?.test === "B") + expectedMessages.push(msg.content.toString("utf-8")) + else notCorrectlyFilteredMessages.push(msg.content.toString("utf-8")) } ) + //RabbitMQ uses a Bloom filter for server side filtering. + //A Bloom filter is very efficient in terms of storage and speed, but it is probabilistic: it can return false positives. + //Because of this, the broker can send messages it believes match the expected filter values whereas they do not. That's why some client-side filtering logic is necessary. + //For this reason some messages may not be correctly filtered, but we expecect the number of them to be very low. + //For more information: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering await eventually(async () => { - expect(filteredMsg.length).eql(2000) + expect(expectedMessages.length).eql(2000) + expect(notCorrectlyFilteredMessages.length).below(100) }, 10000) }).timeout(15000) From 4ad1411bbcb5912462d1fe86324f19ba531fa149 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Tue, 21 Jan 2025 15:10:32 +0100 Subject: [PATCH 03/15] update rabbitmq image --- .github/workflows/main.yml | 2 +- cluster/docker-compose.yml | 6 +++--- docker-compose.yaml | 2 +- example/docker-compose.yaml | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index ad4d990b..e8517106 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,7 +20,7 @@ jobs: services: rabbitmq: - image: rabbitmq:3.13-rc-management + image: rabbitmq:4-management options: --hostname test-node --name test-node env: RABBITMQ_DEFAULT_USER: "test-user" diff --git a/cluster/docker-compose.yml b/cluster/docker-compose.yml index 0e7f408d..5df2d51a 100755 --- a/cluster/docker-compose.yml +++ b/cluster/docker-compose.yml @@ -9,7 +9,7 @@ services: networks: - back hostname: node0 - image: rabbitmq:3.13-rc-management + image: rabbitmq:4-management ports: - "5560:5550" - "5561:5551" @@ -27,7 +27,7 @@ services: networks: - back hostname: node1 - image: rabbitmq:3.13-rc-management + image: rabbitmq:4-management ports: - "5570:5550" - "5571:5551" @@ -45,7 +45,7 @@ services: networks: - back hostname: node2 - image: rabbitmq:3.13-rc-management + image: rabbitmq:4-management ports: - "5580:5550" - "5581:5551" diff --git a/docker-compose.yaml b/docker-compose.yaml index c0f2ece1..52a7fb14 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,6 @@ services: rabbitmq-stream: - image: rabbitmq:3.13-rc-management + image: rabbitmq:4-management container_name: rabbitmq-stream restart: unless-stopped hostname: "rabbitmq" diff --git a/example/docker-compose.yaml b/example/docker-compose.yaml index ca23d71d..0d162f9a 100644 --- a/example/docker-compose.yaml +++ b/example/docker-compose.yaml @@ -2,7 +2,7 @@ version: "2" services: rabbitmq-stream: - image: rabbitmq:3-management + image: rabbitmq:4-management container_name: rabbitmq-stream restart: unless-stopped hostname: "rabbitmq" From 0576f6d5a4b9df13ad95ecf0bbab2c3df7058d83 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Tue, 21 Jan 2025 15:35:00 +0100 Subject: [PATCH 04/15] delete queues after tests --- test/e2e/declare_consumer.test.ts | 6 +++--- test/e2e/route_query.test.ts | 6 +++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/test/e2e/declare_consumer.test.ts b/test/e2e/declare_consumer.test.ts index 9cf17221..22d8b03b 100644 --- a/test/e2e/declare_consumer.test.ts +++ b/test/e2e/declare_consumer.test.ts @@ -277,15 +277,15 @@ describe("declare consumer", () => { it("messageAnnotations with bytes are read correctly", async () => { const messageAnnotations: MessageAnnotations[] = [] const annotations = { test: new AmqpByte(123) } - await rabbit.createStream("testQ") + await rabbit.createStream(streamName) await client.declareConsumer( - { stream: "testQ", offset: Offset.next(), consumerRef: "test" }, + { stream: streamName, offset: Offset.next(), consumerRef: "test" }, (message: Message) => { messageAnnotations.push(message.messageAnnotations ?? {}) } ) - const testP = await client.declarePublisher({ stream: "testQ" }) + const testP = await client.declarePublisher({ stream: streamName }) await testP.send(Buffer.from("Hello"), { messageAnnotations: annotations }) await eventually(async () => { diff --git a/test/e2e/route_query.test.ts b/test/e2e/route_query.test.ts index 98376730..1c84965a 100644 --- a/test/e2e/route_query.test.ts +++ b/test/e2e/route_query.test.ts @@ -37,6 +37,10 @@ describe("RouteQuery command", () => { const streamName = randomUUID() await rabbit.createStream(streamName) - await expectToThrowAsync(() => client.routeQuery({ routingKey: "0", superStream: streamName }), Error) + try { + await expectToThrowAsync(() => client.routeQuery({ routingKey: "0", superStream: streamName }), Error) + } finally { + await rabbit.deleteStream(streamName) + } }) }) From 34c0268b0c601464da060dee0ad7897ffd5c1750 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Tue, 21 Jan 2025 16:00:09 +0100 Subject: [PATCH 05/15] flush messages and increase timeout --- test/e2e/offset.test.ts | 2 +- test/support/util.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 343e50da..733c789a 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -163,7 +163,7 @@ describe("offset", () => { await eventually(async () => { expect(receivedMessages).to.have.length(laterMessages.length) - }) + }, 2500) }).timeout(5000) }) diff --git a/test/support/util.ts b/test/support/util.ts index 373d31cf..3839ec85 100644 --- a/test/support/util.ts +++ b/test/support/util.ts @@ -216,6 +216,7 @@ export const sendANumberOfRandomMessages = async (publisher: Publisher, offset = const noOfMessages = Math.floor(Math.random() * 10) + 1 const messages = Array.from(Array(noOfMessages).keys()).map((_, i) => `Message number ${i + offset + 1}`) await Promise.all(messages.map((m) => publisher.send(Buffer.from(m)))) + await publisher.flush() return messages } From 27286ee799168c4617c19b5e284fc71c2c74118f Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Tue, 21 Jan 2025 17:18:42 +0100 Subject: [PATCH 06/15] another flaky test fixed --- test/e2e/offset.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 733c789a..97afa7cc 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -150,8 +150,8 @@ describe("offset", () => { const receivedMessages: Message[] = [] const publisher = await client.declarePublisher({ stream: testStreamName }) const previousMessages = await sendANumberOfRandomMessages(publisher) - await wait(1000) const offset = new Date() + await wait(1000) const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) await client.declareConsumer( From e7cf9765e8c4030395a7d30bebaaec8cde1e649f Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Tue, 21 Jan 2025 17:42:12 +0100 Subject: [PATCH 07/15] typo --- test/e2e/filtering.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/filtering.test.ts b/test/e2e/filtering.test.ts index e84a9485..f12997f7 100644 --- a/test/e2e/filtering.test.ts +++ b/test/e2e/filtering.test.ts @@ -120,7 +120,7 @@ describe("filtering", () => { //RabbitMQ uses a Bloom filter for server side filtering. //A Bloom filter is very efficient in terms of storage and speed, but it is probabilistic: it can return false positives. //Because of this, the broker can send messages it believes match the expected filter values whereas they do not. That's why some client-side filtering logic is necessary. - //For this reason some messages may not be correctly filtered, but we expecect the number of them to be very low. + //For this reason some messages may not be correctly filtered, but we expect the number of them to be very low. //For more information: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering await eventually(async () => { expect(expectedMessages.length).eql(2000) From b21b3318c3d7570632d05d6ccd3aa026431ad4a6 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Tue, 21 Jan 2025 17:49:56 +0100 Subject: [PATCH 08/15] increase timeout --- test/e2e/offset.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 97afa7cc..ece69435 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -150,6 +150,7 @@ describe("offset", () => { const receivedMessages: Message[] = [] const publisher = await client.declarePublisher({ stream: testStreamName }) const previousMessages = await sendANumberOfRandomMessages(publisher) + await wait(1000) const offset = new Date() await wait(1000) const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) @@ -164,7 +165,7 @@ describe("offset", () => { await eventually(async () => { expect(receivedMessages).to.have.length(laterMessages.length) }, 2500) - }).timeout(5000) + }).timeout(7000) }) describe("store", () => { From 42744bdfb34aa1e1f96fc566b44c96abc19da61a Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Wed, 22 Jan 2025 10:50:35 +0100 Subject: [PATCH 09/15] rabbit version with minor --- .github/workflows/main.yml | 2 +- cluster/docker-compose.yml | 6 +++--- docker-compose.yaml | 2 +- example/docker-compose.yaml | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e8517106..5854472f 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -20,7 +20,7 @@ jobs: services: rabbitmq: - image: rabbitmq:4-management + image: rabbitmq:4.0.5-management options: --hostname test-node --name test-node env: RABBITMQ_DEFAULT_USER: "test-user" diff --git a/cluster/docker-compose.yml b/cluster/docker-compose.yml index 5df2d51a..9fff48b7 100755 --- a/cluster/docker-compose.yml +++ b/cluster/docker-compose.yml @@ -9,7 +9,7 @@ services: networks: - back hostname: node0 - image: rabbitmq:4-management + image: rabbitmq:4.0.5-management ports: - "5560:5550" - "5561:5551" @@ -27,7 +27,7 @@ services: networks: - back hostname: node1 - image: rabbitmq:4-management + image: rabbitmq:4.0.5-management ports: - "5570:5550" - "5571:5551" @@ -45,7 +45,7 @@ services: networks: - back hostname: node2 - image: rabbitmq:4-management + image: rabbitmq:4.0.5-management ports: - "5580:5550" - "5581:5551" diff --git a/docker-compose.yaml b/docker-compose.yaml index 52a7fb14..167e150d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,6 @@ services: rabbitmq-stream: - image: rabbitmq:4-management + image: rabbitmq:4.0.5-management container_name: rabbitmq-stream restart: unless-stopped hostname: "rabbitmq" diff --git a/example/docker-compose.yaml b/example/docker-compose.yaml index 0d162f9a..9025cb77 100644 --- a/example/docker-compose.yaml +++ b/example/docker-compose.yaml @@ -2,7 +2,7 @@ version: "2" services: rabbitmq-stream: - image: rabbitmq:4-management + image: rabbitmq:4.0.5-management container_name: rabbitmq-stream restart: unless-stopped hostname: "rabbitmq" From 93d6a1a242e3ba07c88eea7e0a2cc02960bf0f51 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Wed, 22 Jan 2025 10:59:45 +0100 Subject: [PATCH 10/15] Update test/e2e/offset.test.ts Co-authored-by: icappello --- test/e2e/offset.test.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index ece69435..701988f6 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -152,7 +152,9 @@ describe("offset", () => { const previousMessages = await sendANumberOfRandomMessages(publisher) await wait(1000) const offset = new Date() - await wait(1000) + await publisher.flush() + await wait(1000) + const offset = new Date() const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) await client.declareConsumer( From 9d2e3684a76f914fea137a3997e5359e33ba4a57 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Wed, 22 Jan 2025 11:21:43 +0100 Subject: [PATCH 11/15] rollback --- test/e2e/offset.test.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 701988f6..ece69435 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -152,9 +152,7 @@ describe("offset", () => { const previousMessages = await sendANumberOfRandomMessages(publisher) await wait(1000) const offset = new Date() - await publisher.flush() - await wait(1000) - const offset = new Date() + await wait(1000) const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) await client.declareConsumer( From 5c7e87da4e6513cc6b7ec1c8d7bcef450c1f834b Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Wed, 22 Jan 2025 11:26:03 +0100 Subject: [PATCH 12/15] moved message sending later --- test/e2e/offset.test.ts | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index ece69435..41ef1ad2 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -150,10 +150,9 @@ describe("offset", () => { const receivedMessages: Message[] = [] const publisher = await client.declarePublisher({ stream: testStreamName }) const previousMessages = await sendANumberOfRandomMessages(publisher) - await wait(1000) + await publisher.flush() + await wait(100) const offset = new Date() - await wait(1000) - const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) await client.declareConsumer( { stream: testStreamName, consumerRef: "my consumer", offset: Offset.timestamp(offset) }, @@ -162,10 +161,12 @@ describe("offset", () => { } ) + const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length) + await eventually(async () => { expect(receivedMessages).to.have.length(laterMessages.length) - }, 2500) - }).timeout(7000) + }) + }) }) describe("store", () => { From dbdade5ec87ca292a595d850141293922920c810 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Wed, 22 Jan 2025 11:28:25 +0100 Subject: [PATCH 13/15] added always --- test/e2e/offset.test.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 41ef1ad2..1623de7b 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -146,7 +146,7 @@ describe("offset", () => { }, 5000) }).timeout(10000) - it("if offset is of type timestamp, all the messages belonging to batches sent earlier than the timestamp should be skipped", async () => { + it.only("if offset is of type timestamp, all the messages belonging to batches sent earlier than the timestamp should be skipped", async () => { const receivedMessages: Message[] = [] const publisher = await client.declarePublisher({ stream: testStreamName }) const previousMessages = await sendANumberOfRandomMessages(publisher) @@ -166,6 +166,10 @@ describe("offset", () => { await eventually(async () => { expect(receivedMessages).to.have.length(laterMessages.length) }) + + await always(async () => { + expect(receivedMessages).to.have.length(laterMessages.length) + }) }) }) From 40ddea7832b29d02926134349e02b48cf522738c Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Wed, 22 Jan 2025 11:33:42 +0100 Subject: [PATCH 14/15] removed only --- test/e2e/offset.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/offset.test.ts b/test/e2e/offset.test.ts index 1623de7b..8a79db85 100644 --- a/test/e2e/offset.test.ts +++ b/test/e2e/offset.test.ts @@ -146,7 +146,7 @@ describe("offset", () => { }, 5000) }).timeout(10000) - it.only("if offset is of type timestamp, all the messages belonging to batches sent earlier than the timestamp should be skipped", async () => { + it("if offset is of type timestamp, all the messages belonging to batches sent earlier than the timestamp should be skipped", async () => { const receivedMessages: Message[] = [] const publisher = await client.declarePublisher({ stream: testStreamName }) const previousMessages = await sendANumberOfRandomMessages(publisher) From 0abb2be168fedd6c6d1cab0b745ef1cc48845890 Mon Sep 17 00:00:00 2001 From: Diego Ceccacci Date: Wed, 22 Jan 2025 11:39:51 +0100 Subject: [PATCH 15/15] updated below threshold --- test/e2e/filtering.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/filtering.test.ts b/test/e2e/filtering.test.ts index f12997f7..7cfe967a 100644 --- a/test/e2e/filtering.test.ts +++ b/test/e2e/filtering.test.ts @@ -124,7 +124,7 @@ describe("filtering", () => { //For more information: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering await eventually(async () => { expect(expectedMessages.length).eql(2000) - expect(notCorrectlyFilteredMessages.length).below(100) + expect(notCorrectlyFilteredMessages.length).below(150) }, 10000) }).timeout(15000)