Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:

services:
rabbitmq:
image: rabbitmq:3.13-rc-management
image: rabbitmq:4.0.5-management
options: --hostname test-node --name test-node
env:
RABBITMQ_DEFAULT_USER: "test-user"
Expand Down
6 changes: 3 additions & 3 deletions cluster/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
networks:
- back
hostname: node0
image: rabbitmq:3.13-rc-management
image: rabbitmq:4.0.5-management
ports:
- "5560:5550"
- "5561:5551"
Expand All @@ -27,7 +27,7 @@ services:
networks:
- back
hostname: node1
image: rabbitmq:3.13-rc-management
image: rabbitmq:4.0.5-management
ports:
- "5570:5550"
- "5571:5551"
Expand All @@ -45,7 +45,7 @@ services:
networks:
- back
hostname: node2
image: rabbitmq:3.13-rc-management
image: rabbitmq:4.0.5-management
ports:
- "5580:5550"
- "5581:5551"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbitmq-stream:
image: rabbitmq:3.13-rc-management
image: rabbitmq:4.0.5-management
container_name: rabbitmq-stream
restart: unless-stopped
hostname: "rabbitmq"
Expand Down
2 changes: 1 addition & 1 deletion example/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "2"

services:
rabbitmq-stream:
image: rabbitmq:3-management
image: rabbitmq:4.0.5-management
container_name: rabbitmq-stream
restart: unless-stopped
hostname: "rabbitmq"
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/declare_consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,15 @@ describe("declare consumer", () => {
it("messageAnnotations with bytes are read correctly", async () => {
const messageAnnotations: MessageAnnotations[] = []
const annotations = { test: new AmqpByte(123) }
await rabbit.createStream("testQ")
await rabbit.createStream(streamName)
await client.declareConsumer(
{ stream: "testQ", offset: Offset.next(), consumerRef: "test" },
{ stream: streamName, offset: Offset.next(), consumerRef: "test" },
(message: Message) => {
messageAnnotations.push(message.messageAnnotations ?? {})
}
)

const testP = await client.declarePublisher({ stream: "testQ" })
const testP = await client.declarePublisher({ stream: streamName })
await testP.send(Buffer.from("Hello"), { messageAnnotations: annotations })

await eventually(async () => {
Expand Down
15 changes: 12 additions & 3 deletions test/e2e/filtering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ describe("filtering", () => {
}).timeout(10000)

it("published messages are filtered on the server side keeping only the ones with filter value", async () => {
const filteredMsg: string[] = []
const expectedMessages: string[] = []
const notCorrectlyFilteredMessages: string[] = []
const publisher = await client.declarePublisher(
{ stream: streamName, publisherRef: `my-publisher-${randomUUID()}` },
(msg) => (msg.applicationProperties ? msg.applicationProperties["test"].toString() : undefined)
Expand All @@ -110,12 +111,20 @@ describe("filtering", () => {
},
},
(msg) => {
filteredMsg.push(msg.content.toString("utf-8"))
if (msg.applicationProperties?.test === "A" || msg.applicationProperties?.test === "B")
expectedMessages.push(msg.content.toString("utf-8"))
else notCorrectlyFilteredMessages.push(msg.content.toString("utf-8"))
}
)

//RabbitMQ uses a Bloom filter for server side filtering.
//A Bloom filter is very efficient in terms of storage and speed, but it is probabilistic: it can return false positives.
//Because of this, the broker can send messages it believes match the expected filter values whereas they do not. That's why some client-side filtering logic is necessary.
//For this reason some messages may not be correctly filtered, but we expect the number of them to be very low.
//For more information: https://www.rabbitmq.com/blog/2023/10/16/stream-filtering
await eventually(async () => {
expect(filteredMsg.length).eql(2000)
expect(expectedMessages.length).eql(2000)
expect(notCorrectlyFilteredMessages.length).below(150)
}, 10000)
}).timeout(15000)

Expand Down
10 changes: 8 additions & 2 deletions test/e2e/offset.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ describe("offset", () => {
const receivedMessages: Message[] = []
const publisher = await client.declarePublisher({ stream: testStreamName })
const previousMessages = await sendANumberOfRandomMessages(publisher)
await wait(10)
await publisher.flush()
await wait(100)
const offset = new Date()
const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length)

await client.declareConsumer(
{ stream: testStreamName, consumerRef: "my consumer", offset: Offset.timestamp(offset) },
Expand All @@ -161,9 +161,15 @@ describe("offset", () => {
}
)

const laterMessages = await sendANumberOfRandomMessages(publisher, previousMessages.length)

await eventually(async () => {
expect(receivedMessages).to.have.length(laterMessages.length)
})

await always(async () => {
expect(receivedMessages).to.have.length(laterMessages.length)
})
})
})

Expand Down
6 changes: 5 additions & 1 deletion test/e2e/route_query.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ describe("RouteQuery command", () => {
const streamName = randomUUID()
await rabbit.createStream(streamName)

await expectToThrowAsync(() => client.routeQuery({ routingKey: "0", superStream: streamName }), Error)
try {
await expectToThrowAsync(() => client.routeQuery({ routingKey: "0", superStream: streamName }), Error)
} finally {
await rabbit.deleteStream(streamName)
}
})
})
1 change: 1 addition & 0 deletions test/support/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ export const sendANumberOfRandomMessages = async (publisher: Publisher, offset =
const noOfMessages = Math.floor(Math.random() * 10) + 1
const messages = Array.from(Array(noOfMessages).keys()).map((_, i) => `Message number ${i + offset + 1}`)
await Promise.all(messages.map((m) => publisher.send(Buffer.from(m))))
await publisher.flush()
return messages
}

Expand Down
Loading