-
Notifications
You must be signed in to change notification settings - Fork 23
Closed
Labels
bugSomething isn't workingSomething isn't workingfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleasedBug or improvement that's done, it is in the development branch but yet unreleased
Description
Environment Information
- OS: Mac M3 Sonoma 14.6.1
- Node Version: 22.12.0
- NPM Version: 10.9.0
- confluent-kafka-javascript version: 1.2.0
Steps to Reproduce
Setting up a consumer to process a list of offsets using the pause/seek/resume pattern identified a 5-second delay that occurs per iteration. This delay is not present performing the same activity with KafkaJS.
I'm hopeful there is some related librdkafka configuration that can be used to bring the behavior inline with KafkaJS. If not, please treat this as a bug, as this delay makes seek a non-viable option in any sort of production environment.
The following code demonstrates with both KafkaJS and Confluent.
import {KafkaJS as Confluent, RdKafka, TopicPartitionOffset} from "@confluentinc/kafka-javascript";
import {Admin, Consumer, EachMessagePayload, Kafka, logLevel, Producer} from "kafkajs";
const topicFn = () => `test-offset-topic-${Date.now()}`;
const standardGroupIdFn = () => `test-standard-group-${Date.now()}`;
const offsetGroupIdFn = () => `test-offset-group-${Date.now()}`;
describe("Seek offsets", async () => {
let kafka: Kafka | Confluent.Kafka;
let admin: Admin | Confluent.Admin | undefined;
let producer: Producer | Confluent.Producer | undefined;
let standardConsumer: Consumer | Confluent.Consumer | undefined;
let offsetConsumer: Consumer | Confluent.Consumer | undefined;
let standardReady: boolean;
let offsetReady: boolean;
const total = 10;
let received: number;
let replayed: number;
let offsets: (TopicPartitionOffset | Confluent.TopicPartitionOffset)[];
beforeEach(() => {
standardReady = false;
offsetReady = false;
received = 0;
replayed = 0;
offsets = [];
});
afterEach(async () => {
await offsetConsumer?.disconnect();
await standardConsumer?.disconnect();
await producer?.disconnect();
await admin?.disconnect();
});
it("supports seeking consumer offsets with KafkaJS", async () => {
kafka = new Kafka({brokers: ["localhost:9092"], logLevel: logLevel.NOTHING});
admin = kafka.admin();
await admin.connect();
const topic = topicFn();
await admin.createTopics({topics: [{topic}]});
producer = kafka.producer();
await producer.connect();
await sendMessages(topic);
standardConsumer = kafka.consumer({groupId: standardGroupIdFn()});
standardConsumer.on(standardConsumer.events.GROUP_JOIN, (event: any) => {
standardReady = true;
});
await standardConsumer.connect();
await standardConsumer.subscribe({topic, fromBeginning: true});
await standardConsumer.run({eachMessage: doStandardConsumer});
offsetConsumer = kafka.consumer({groupId: offsetGroupIdFn()});
offsetConsumer.on(offsetConsumer.events.GROUP_JOIN, (event: any) => {
offsetReady = true;
});
await offsetConsumer.connect();
await offsetConsumer.subscribe({topic: topic, fromBeginning: true});
await offsetConsumer.run({eachMessage: doOffsetConsumer});
await doTest(topic);
});
it("supports seeking consumer offsets with Confluent", async () => {
kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"], logLevel: Confluent.logLevel.NOTHING}});
admin = kafka.admin();
await admin.connect();
const topic = topicFn();
await admin.createTopics({topics: [{topic}]});
producer = kafka.producer();
await producer.connect();
await sendMessages(topic);
standardConsumer = kafka.consumer({
kafkaJS: {groupId: standardGroupIdFn(), fromBeginning: true},
rebalance_cb: (err: any, assignment: any, consumer: any) => {
if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
if (!standardReady) standardReady = true;
}
});
await standardConsumer.connect();
await standardConsumer.subscribe({topic});
await standardConsumer.run({eachMessage: doStandardConsumer});
offsetConsumer = kafka.consumer({
kafkaJS: {groupId: offsetGroupIdFn(), fromBeginning: true},
rebalance_cb: (err: any, assignment: any, consumer: any) => {
if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
if (!offsetReady) offsetReady = true;
}
});
await offsetConsumer.connect();
await offsetConsumer.subscribe({topic});
await offsetConsumer.run({eachMessage: doOffsetConsumer});
await doTest(topic);
});
async function sendMessages(topic: string) {
for (let i = 0; i < total; i++) {
await producer!.send({topic: topic, messages: [{value: "data"}]});
}
}
async function doStandardConsumer(payload: EachMessagePayload | Confluent.EachMessagePayload) {
const offset =
typeof payload.message.offset === "string" ? parseInt(payload.message.offset) : payload.message.offset;
console.log(
`[${new Date().toISOString()}] Standard consumer received => partition: ${payload.partition}; offset: ${offset}`
);
received++;
if (offset % 3 === 0) {
offsets.push({topic: payload.topic, partition: payload.partition, offset});
}
}
async function doOffsetConsumer(payload: EachMessagePayload | Confluent.EachMessagePayload) {
console.log(
`[${new Date().toISOString()}] Offset consumer received => partition: ${payload.partition}; offset: ${payload.message.offset}`
);
replayed++;
offsetConsumer!.pause([{topic: payload.topic}]);
seekNextOffset(payload.topic);
}
async function doTest(topic: string) {
await until(() => standardReady && offsetReady);
offsetConsumer!.pause([{topic}]);
await until(() => received === total);
seekNextOffset(topic);
await until(() => replayed === Math.ceil(total / 3));
}
function seekNextOffset(topic: string) {
if (offsets.length) {
const offset = offsets.shift();
offsetConsumer!.seek(offset! as any);
console.log(`[${new Date().toISOString()}] Seek to offset: ${offset!.offset}`);
offsetConsumer!.resume([{topic: offset!.topic}]);
}
}
async function until(condition: () => boolean) {
const timeout = 60000;
const finish = Date.now() + timeout;
while (Date.now() <= finish) {
const result = condition();
if (result) return;
await new Promise(resolve => setTimeout(resolve, 50));
}
throw new Error(`Failed within ${timeout!}ms`);
}
});The results with KafkaJS take roughly 3-4 ms per seek iteration:
[2025-04-09T15:27:12.946Z] Standard consumer received => partition: 0; offset: 0
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 1
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 2
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 3
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 4
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 5
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 6
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 7
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 8
[2025-04-09T15:27:12.947Z] Standard consumer received => partition: 0; offset: 9
[2025-04-09T15:27:12.951Z] Seek to offset: 0
[2025-04-09T15:27:12.955Z] Offset consumer received => partition: 0; offset: 0
[2025-04-09T15:27:12.955Z] Seek to offset: 3
[2025-04-09T15:27:12.959Z] Offset consumer received => partition: 0; offset: 3
[2025-04-09T15:27:12.959Z] Seek to offset: 6
[2025-04-09T15:27:12.962Z] Offset consumer received => partition: 0; offset: 6
[2025-04-09T15:27:12.962Z] Seek to offset: 9
[2025-04-09T15:27:12.966Z] Offset consumer received => partition: 0; offset: 9
The results with Confluent take roughly 5 seconds per seek iteration:
[2025-04-09T15:27:24.206Z] Standard consumer received => partition: 0; offset: 0
[2025-04-09T15:27:24.207Z] Standard consumer received => partition: 0; offset: 1
[2025-04-09T15:27:24.207Z] Standard consumer received => partition: 0; offset: 2
[2025-04-09T15:27:24.207Z] Standard consumer received => partition: 0; offset: 3
[2025-04-09T15:27:24.207Z] Standard consumer received => partition: 0; offset: 4
[2025-04-09T15:27:24.207Z] Standard consumer received => partition: 0; offset: 5
[2025-04-09T15:27:24.208Z] Standard consumer received => partition: 0; offset: 6
[2025-04-09T15:27:24.208Z] Standard consumer received => partition: 0; offset: 7
[2025-04-09T15:27:24.208Z] Standard consumer received => partition: 0; offset: 8
[2025-04-09T15:27:24.208Z] Standard consumer received => partition: 0; offset: 9
[2025-04-09T15:27:24.260Z] Seek to offset: 0
[2025-04-09T15:27:29.216Z] Offset consumer received => partition: 0; offset: 0
[2025-04-09T15:27:29.216Z] Seek to offset: 3
[2025-04-09T15:27:34.219Z] Offset consumer received => partition: 0; offset: 3
[2025-04-09T15:27:34.220Z] Seek to offset: 6
[2025-04-09T15:27:39.224Z] Offset consumer received => partition: 0; offset: 6
[2025-04-09T15:27:39.224Z] Seek to offset: 9
[2025-04-09T15:27:44.226Z] Offset consumer received => partition: 0; offset: 9
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingfixed-present-in-next-releaseBug or improvement that's done, it is in the development branch but yet unreleasedBug or improvement that's done, it is in the development branch but yet unreleased