Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Why is prefetching topic metadata not handled internally? #189

Open
apeloquin-agilysys opened this issue Nov 26, 2024 · 0 comments

Comments

@apeloquin-agilysys
Copy link

apeloquin-agilysys commented Nov 26, 2024

The workaround dependent-admin.js was provided in #42 as a means to optimize the time required to sent the first message to each topic after startup.

It works, wonderfully. However, the approach is generally predicated on the idea that you know all the topics a producer will send messages to at the time of initialization. Due to the structure of our applications using this library, that is not always the case and not easily obtained without some non-trivial refactoring. So I experimented with the idea of "lazy-load" wrapper around the producer that tracks topic usage and calls fetchTopicMetadata when a new topic is encountered.

I didn't expect it to yield much in performance as the whole point of this exercise was to move the metadata fetching to the startup phase before the app is considered "ready", and doing it on-demand is counter to that. When I saw the results, I immediately wondered why is this not the default, internal behavior of the Confluent producer in the first place?

In the example below, we are not loading any topic metadata in advance of the call to the send method. The wrapper tracks the last metadata load or send time for each topic and will call fetchTopicMetadata prior to send anytime a topic is uninitialized or the metadata has expired according to metadata.max.age.ms value. (The expiration portion is not exercised in the test).

import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";

class SmartProducer {
  private producer: Confluent.Producer;
  private admin: Confluent.Admin;
  private metadataMaxAgeMs: number;
  private latestByTopic: Map<string, number> = new Map();

  constructor(producer: Confluent.Producer, metadataMaxAgeMs: number = 900000) {
    this.producer = producer;
    this.admin = producer.dependentAdmin();
    this.metadataMaxAgeMs = metadataMaxAgeMs;
  }

  async connect(): Promise<void> {
    await this.producer.connect();
    return this.admin.connect();
  }

  async disconnect(): Promise<void> {
    await this.admin.disconnect();
    return this.producer.disconnect();
  }

  async send(record: Confluent.ProducerRecord): Promise<Confluent.RecordMetadata[]> {
    await this.prefetchMetadata([record.topic]);
    return this.producer.send(record);
  }

  async sendBatch(batch: Confluent.ProducerBatch): Promise<Confluent.RecordMetadata[]> {
    if (batch.topicMessages) {
      const topics = new Set<string>();
      for (const {topic} of batch.topicMessages) topics.add(topic);
      await this.prefetchMetadata(topics);
    }
    return this.producer.sendBatch(batch);
  }

  private async prefetchMetadata(topics: Iterable<string>): Promise<void> {
    const topicsToFetch: string[] = [];
    const now = Date.now();
    for (const topic of topics) {
      const latest = this.latestByTopic.get(topic);
      if (!latest || now - latest >= this.metadataMaxAgeMs) topicsToFetch.push(topic);
      this.latestByTopic.set(topic, now);
    }
    if (topicsToFetch.length) await this.admin.fetchTopicMetadata({topics: topicsToFetch});
  }
}

const iterations = 10;

describe("Compare Confluent Producer and 'Smart' Producer", () => {
  let kafka: Confluent.Kafka;
  let admin: Confluent.Admin;
  const topics: Confluent.ITopicConfig[] = [];

  before(async () => {
    kafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
    admin = kafka.admin();
    await admin.connect();

    for (let i = 0; i < iterations; i++) {
      topics.push({topic: `test-topic-${i}`});
    }
    await admin.createTopics({topics});
  });

  after(async () => {
    await admin.disconnect();
  });

  it("'Smart' Producer", async () => {
    await sendMessageToEachTopic(new SmartProducer(kafka.producer({"linger.ms": 0})));
  });

  it("Confluent Producer", async () => {
    await sendMessageToEachTopic(kafka.producer({"linger.ms": 0}));
  });

  async function sendMessageToEachTopic(producer: Pick<Confluent.Producer, "connect" | "send" | "disconnect">) {
    const start = Date.now();
    await producer.connect();
    console.log(`Connected @ ${Date.now() - start}ms`);
    for (let i = 0; i < iterations; i++) {
      await producer.send({topic: topics[i].topic, messages: messages()});
      console.log(`Message #${i} sent @ ${Date.now() - start}ms`);
    }
    await producer.disconnect();
  }

  function messages(): Confluent.Message[] {
    return [{value: `sent at ${new Date().toISOString()}`}];
  }
});

Output from the 'Smart' Producer test:

Connected @ 4ms
Message #0 sent @ 6ms
Message #1 sent @ 7ms
Message #2 sent @ 8ms
Message #3 sent @ 8ms
Message #4 sent @ 9ms
Message #5 sent @ 9ms
Message #6 sent @ 10ms
Message #7 sent @ 10ms
Message #8 sent @ 11ms
Message #9 sent @ 11ms

Output from the Confluent Producer test:

Connected @ 4ms
Message #0 sent @ 1008ms
Message #1 sent @ 2015ms
Message #2 sent @ 3017ms
Message #3 sent @ 4021ms
Message #4 sent @ 5024ms
Message #5 sent @ 6026ms
Message #6 sent @ 7029ms
Message #7 sent @ 8033ms
Message #8 sent @ 9033ms
Message #9 sent @ 10037ms
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant