-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Always buffer all publish framed together #123
Conversation
Even if the user doesn't await basicPublish all framed beloging to one publish should be published together. So intead of waiting for a potentially blocked socket to be drain enqueue all data and only await for the last sent frame. Fixes #49
I'm using Node 22 to test this out:
I cloned the repo, did
The test code import { AMQPClient } from "./amqp-client.js/lib/cjs/index.js"
const setup = async (opts: {
endpoint: string
exchange: string
queue: string
}) => {
const client = new AMQPClient(opts.endpoint)
const connection = await client.connect()
connection.onerror = (e: unknown) => {
console.error("AMQP Error", e)
}
const channel = await client.channel()
await channel.confirmSelect()
await channel.exchangeDeclare(
opts.exchange,
"topic",
{
durable: true,
autoDelete: false,
},
{ defaultRoutingKey: "" }
)
await channel.queue(
opts.queue,
{
durable: true,
exclusive: false,
autoDelete: false,
},
{
"x-max-priority": 10,
}
)
return channel
}
const benchmarkMqClient = async (opts: {
messageCount: number
payloadSizeInKb: number
}) => {
const channel = await setup({
endpoint: "amqp://localhost:5672",
exchange: "test_exchange",
queue: "test_queue",
})
const largeString = "a".repeat(opts.payloadSizeInKb * 1024)
const message = JSON.stringify({
payload: largeString,
})
console.log(`Starting benchmark for ${opts.messageCount} messages...`)
// Start time
const startTime = performance.now()
const promises = []
for (let i = 0; i < opts.messageCount; i++) {
console.log(`Message Id=${i}`)
promises.push(
channel
.basicPublish("test_exchange", "/", message, {
deliveryMode: 2,
})
.then((reply: number) =>
console.log(`Publish ${i} successful. Reply ${reply}`)
)
)
}
await Promise.all(promises)
const endTime = performance.now()
const durationInSeconds = (endTime - startTime) / 1000
const messagesPerSecond = opts.messageCount / durationInSeconds
console.log(`Benchmark completed: ${opts.messageCount} messages published.`)
console.log(`Time taken: ${durationInSeconds.toFixed(2)} seconds.`)
console.log(`Messages per second: ${messagesPerSecond.toFixed(2)}.`)
}
// Run the benchmark
await benchmarkMqClient({ messageCount: 400, payloadSizeInKb: 10 }) The run command is as follows:
It is failing in this configuration with:
The result when run on
Also, I won't get any |
I personally feel the issue is with promises - ideally |
Yeah, that would kill the performance, but if it's only for large messages (body size > frame max size) maybe it's ok. |
What I found during the testing is, all the frames will be sent on the channel, but they never recieve the ACK. Thus, the This might be due to channel being overloaded - need to see more about it. Since channels are not thread-safe, by extension they are not meant to be used for concurrent writes as well. Wrapping This comes at the cost of hindering the performance. Using multiple channels for publishing and load balancing on them would be helpful. |
I would really like to understand how JS can intermingle sequential writes to the socket, I get it when we did Javascript/node doesn't have a native mutex structure, so can't really solve it that way either. I'm now expanding the internal buffer if the buffer size isn't enough for all publish frames together. A side effect of that is that the memory usage will be at least 2x the body size. But maybe that's ok. We can increase the default frameMax to 128KB too, and that will probably less of a problem. |
Please give the latest commit a go @vinayakakv |
I've found success with I will surely try out the latest commit and will let you know. |
Hey @carlhoerberg I just tested the fix and it worked fine. Shall we add the load testing under |
Even if the user doesn't await basicPublish all framed beloging to one publish should be published together. So intead of waiting for a potentially blocked socket to be drain enqueue all data and only await for the last sent frame.
Fixes #49