-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Confluent vs KafkaJS performance feedback #42
Comments
Thanks a lot for filing this issue! I took a look, I saw where the problem is. Results on my machine with identical code (Ubuntu on WSL), running a single local broker locally. I replaced log.info with console.log and defined an until method to poll the condition function every 100ms. Initial results with the same code - I could reproduce the horrible performance.
I will go into a workaround first, and then dive into the reason in a follow up comment. The actual workaround is a couple of lines: - await producer.send({
+ producer.send({
topic,
messages: [{value: "one"}]
});
if (++sentCount % 100 === 0) {
+ await producer.flush({ timeout: 5000 });
console.log(`Sent ${String(sentCount).padStart(4, " ")} : ${Date.now() - start}ms`);
} However, this makes the comparison a bit unfair as librdkafka will internally use batching. I added these two options to the producer config to turn off batching, and make sure that no more than one produce request is in-flight at the same time. This isn't required by the work-around, but it's required for the fairness of the test. - const producer = kafka.producer();
+ const producer = kafka.producer({'linger.ms': 0, 'max.in.flight': 1,}); Results after that for 1000 messages, Confluent is still slower but within the same order of magnitude:
Results for 10K messages (logging per 1K rather than per 100). Confluent becomes faster. The more you increase the messages, the larger this difference grows.
There were cases in the Confluent client where the producer started producing before the consumer had been assigned partitions (rebalance still ongoing), so I added this before producer.connect() to make sure that doesn't happen: await until(async () => consumer.assignment().length > 0, {timeout: 5000}); |
Details of the issue: Once a produce request is sent, librdkafka delivers the delivery-report on a poll basis. Internally the Confluent library calls this polling function with setInterval. When producer.send() is awaited on, it will block at least until the next poll. The interval between polls is 500ms, and if one is sending messages continuously, every We're discussing a few solutions internally. I'll keep this issue open until we're doing so. |
I've merged a change within the code to fix this flow, and added guidelines in the MIGRATION.md file as well. For maintaining parity with librdkafka, the parameter 'linger.ms' has the value 5ms by default, and as a consequence, awaiting a send waits for 5ms + the send time. Additional info in the MIGRATION file here, specifically,
Here are the perf results, the only change I've made to the files is the addition of linger.ms as described above (no workaround/flush() calls needed). 1000 msgs
10k msgs, printing every 1000
With that, I'm marking the issue fixed, but please reopen this or another issue for any more perf issues. Thanks again for opening this. |
Can you confirm that In our scenario, our app has tracking that must be updated only after the messages are successfully delivered (with default |
Re-running with lingerMs: 0 definitely puts the message production on the fast track. I'm wondering if there is a similar optimization that can be made on the consumer side. With KafkaJS messages are being picked up by the consumer and processed nearly immediately as they are sent.
With Confluent, the send is now fast, but there is clearly a near-1 second processing interval at play on the consumer side:
In our application, a "message" is sent through multiple topics/consumers for processing. In the end our most important performance metric is the delay between message origination and reception in the final consumer. One of the responsibilities of this flow is cache eviction, so timeliness matters. I put together an example to illustrate. KafkaJS import {Kafka} from "kafkajs";
const producers: Producer[] = [];
const consumers: Consumer[] = [];
const topic = "test-kafkajs-topic";
const total = 5;
const kafka = new Kafka({brokers: ["localhost:9092"]});
let lastReceived: number | undefined;
for (let i = 0; i < total; i++) {
const consumer = kafka.consumer({groupId: `${topic}-${i}-group`});
await consumer.connect();
await consumer.subscribe({topic: `${topic}-${i}`});
await consumer.run({
eachMessage: async ({message}) => {
log.info(`Rec'd @${i} : ${Date.now() - start}ms`);
if (i < total - 1) {
await producers[i].send({
topic: `${topic}-${i + 1}`,
messages: [{value: "one"}]
});
}
lastReceived = i;
}
});
consumers.push(consumer);
}
for (let i = 0; i < total; i++) {
const producer = kafka.producer();
await producer.connect();
producers.push(producer);
}
const start = Date.now();
await producers[0].send({
topic: `${topic}-0`,
messages: [{value: "one"}]
});
log.info(`Sent 1 : ${Date.now() - start}ms`);
await until(async () => lastReceived == total - 1);
for (let i = 0; i < total; i++) {
await producers[i].disconnect();
await consumers[i].disconnect();
}
Confluent import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";
const producers: Confluent.Producer[] = [];
const consumers: Confluent.Consumer[] = [];
const topic = "test-confluent-topic";
const total = 5;
const kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
let lastReceived: number | undefined;
for (let i = 0; i < total; i++) {
const consumer = kafka.consumer({kafkaJS: {groupId: `${topic}-${i}-group`}});
await consumer.connect();
await consumer.subscribe({topic: `${topic}-${i}`});
await consumer.run({
eachMessage: async ({message}) => {
log.info(`Rec'd @${i} : ${Date.now() - start}ms`);
if (i < total - 1) {
await producers[i].send({
topic: `${topic}-${i + 1}`,
messages: [{value: "one"}]
});
}
lastReceived = i;
}
});
consumers.push(consumer);
}
for (let i = 0; i < total; i++) {
const producer = kafka.producer({"linger.ms": 0});
await producer.connect();
producers.push(producer);
}
const start = Date.now();
await producers[0].send({
topic: `${topic}-0`,
messages: [{value: "one"}]
});
log.info(`Sent 1 : ${Date.now() - start}ms`);
await until(async () => lastReceived == total - 1);
for (let i = 0; i < total; i++) {
await producers[i].disconnect();
await consumers[i].disconnect();
}
|
Regarding this, yes, you will get a thrown error when awaiting send if the produce fails.
|
Regarding your second message, there are two things,
Before
After
Thanks for attaching the use case example, it's very helpful. |
Regarding the second question above. |
Understood. It appears that KafkaJS handles this by loading metadata for all topics at the time the producer is connected, not on-demand when the message is sent. For our case, this is preferable because this occurs during the startup phase, before the app is "ready" and considered online -- so there is no hit/delay when the first message is sent. |
I understand the requirement, but at the same time, we avoid caching metadata for all topics initially, because it incurs a penalty for memory (small), and for subsequent metadata requests (large), since we make follow-up metadata requests on all the topics we have already cached, so the size of the metadata request can grow very large. There is a workaround for it within the underlying library, I will work to make it accessible from the Javascript binding, and discuss with my team if there's anything I can do to make the process seamless. |
Hi @milindl,
Circling back to this, is the workaround accessible, and if so can you provide more details? |
It isn't accessible yet, but I discussed this within the team and we finalized an approach. There's a non-trivial code change required internally to make it accessible, I'll keep this thread updated when I start working on it. |
We've added the workaround in v0.5.1, to use the workaround, an example is given here: https://github.com/confluentinc/confluent-kafka-javascript/blob/master/examples/kafkajs/admin/dependent-admin.js#L72 . |
I ran a test sending and receiving 1000 messages individually (no batching) using the KafkaJS library, and then ran the same test using the Confluent library (following the migration instructions).
KafkaJS: 455ms
Confluent: 501951ms
That's not a typo. In this case, the Confluent test took 1000x time to complete.
I'm presuming there is some tuning that can be done via configuration; but this was an "out of the box" conversion, and my attempts at "tuning" the configuration did not yield any noticeable differences.
Notes
KafkaJS
Confluent
The text was updated successfully, but these errors were encountered: