- 
                Notifications
    You must be signed in to change notification settings 
- Fork 23
Description
This is an odd one.
We're attempting to phase-in use of the Confluent library to our app for new functionality. This app has existing functionality using KafkaJS, and the existing functionality is in production. Given the "early access" nature of the Confluent library, it makes sense to adopt it for the new functionality, but retain KafkaJS for the existing functionality for now.
The new functionality was added, integration tests showed everything working, but as soon as we deployed to an environment using external Kafka cluster(s) the KafkaJS connections immediately started failing with:
Connection error: Client network socket disconnected before secure TLS connection was established {"broker":"XXXX.westus2.azure.confluent.cloud:9092","clientId":"kafkajs"}: Error: Client network socket disconnected before secure TLS connection was established
I narrowed it down to starting multiple consumers for both KafkaJS and Confluent concurrently... with SSL connections.
I created a test that replicates the scenario by creating consumers for 5 topics each with KafkaJS and Confluent concurrently on a Confluent Cloud cluster. There are also tests that run just 5 KafkaJS consumers or just 5 Confluent consumers to prove there is no issue when run independently.
Notes:
- Topics must already exist.
- This connects both Confluent and KafkaJS to the same cluster; but I see the same behavior when Confluent and KafkaJS are used to connect to different clusters.
- 5 seems like a magic number, in that for me I can repro consistently with 5. If I run the test with less than 5, results are inconsistent (sometimes there is no issue). Our app has 10's of consumers on both sides.
- Used XXXXin place of identifying details/credentials.
- Tests should be run with an extended timeout (I was using 60s).
import {KafkaJS as Confluent} from "@confluentinc/kafka-javascript";
import {Logger} from "@confluentinc/kafka-javascript/types/kafkajs.js";
import {fail} from "assert";
import {Consumer, ConsumerGroupJoinEvent, Kafka, logLevel} from "kafkajs";
const KAFKA_JS_TOPICS: string[] = [
  "test-kafkajs-topic",
  "test-kafkajs-topic-2",
  "test-kafkajs-topic-3",
  "test-kafkajs-topic-4",
  "test-kafkajs-topic-5"
];
const CONFLUENT_TOPICS: string[] = [
  "test-confluent-topic",
  "test-confluent-topic-2",
  "test-confluent-topic-3",
  "test-confluent-topic-4",
  "test-confluent-topic-5"
];
describe("Supports KafkaJS and Confluent consumers", async () => {
  let confluentConsumers: Confluent.Consumer[] = [];
  let kafkaJSConsumers: Consumer[] = [];
  afterEach(async () => {
    const promises: Promise<void>[] = [];
    for (const consumer of kafkaJSConsumers) {
      promises.push(consumer.disconnect());
    }
    for (const consumer of confluentConsumers) {
      promises.push(consumer.disconnect());
    }
    await Promise.all(promises);
    confluentConsumers = [];
    kafkaJSConsumers = [];
  });
  it("Handles concurrent startup of multiple KafkaJS consumers", async () => {
    await doTest(KAFKA_JS_TOPICS, []);
  });
  it("Handles concurrent startup of multiple Confluent consumers", async () => {
    await doTest([], CONFLUENT_TOPICS);
  });
  it("Handles concurrent startup of multiple KafkaJS and Confluent consumers", async () => {
    await doTest(KAFKA_JS_TOPICS, CONFLUENT_TOPICS);
  });
  async function doTest(kafkaJSTopics: string[], confluentTopics: string[]) {
    const kafkaJSKafka = new Kafka({
      brokers: ["XXXX.westus2.azure.confluent.cloud:9092"],
      ssl: true,
      sasl: {
        mechanism: "plain",
        username: "XXXX",
        password: "XXXX"
      },
      logLevel: logLevel.INFO,
      logCreator: kafkaLevel => {
        return entry => {
          const {timestamp, logger, message, stack, ...others} = entry.log;
          console.log(`[KafkaJS:${entry.namespace}] ${message} ${JSON.stringify(others)}${stack ? `: ${stack}` : ""}`);
        };
      }
    });
    const confluentKafka = new Confluent.Kafka({
      kafkaJS: {
        brokers: ["XXXX.westus2.azure.confluent.cloud:9092"],
        ssl: true,
        sasl: {
          mechanism: "plain",
          username: "XXXX",
          password: "XXXX"
        },
        logLevel: Confluent.logLevel.INFO,
        logger: new ConfluentLogger()
      }
    });
    kafkaJSConsumers = [];
    let kafkaJSConnected: number = 0;
    setImmediate(async () => {
      for (const topic of kafkaJSTopics) {
        const kafkaJSConsumer = kafkaJSKafka.consumer({groupId: `${topic}-group`});
        kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: ConsumerGroupJoinEvent) => {
          kafkaJSConnected++;
        });
        await kafkaJSConsumer.connect();
        await kafkaJSConsumer.subscribe({topic});
        await kafkaJSConsumer.run({
          eachMessage: async ({message}) => {}
        });
        kafkaJSConsumers.push(kafkaJSConsumer);
      }
    });
    confluentConsumers = [];
    let confluentConnected: number = 0;
    setImmediate(async () => {
      for (const topic of confluentTopics) {
        const confluentConsumer = confluentKafka.consumer({kafkaJS: {groupId: `${topic}-group`}});
        await confluentConsumer.connect();
        confluentConnected++;
        await confluentConsumer.subscribe({topic});
        await confluentConsumer.run({
          eachMessage: async ({message}) => {}
        });
        confluentConsumers.push(confluentConsumer);
      }
    });
    await until(async () => confluentTopics.length == confluentConnected);
    for (const consumer of confluentConsumers) {
      await until(async () => consumer.assignment().length > 0);
    }
    await until(async () => kafkaJSTopics.length == kafkaJSConnected);
  }
});
class ConfluentLogger implements Logger {
  private logLevel: Confluent.logLevel;
  constructor() {
    this.logLevel = Confluent.logLevel.INFO;
  }
  setLogLevel(logLevel: Confluent.logLevel) {
    this.logLevel = logLevel;
  }
  info = (message: string, extra?: object) => this.doLog(Confluent.logLevel.INFO, message, extra);
  error = (message: string, extra?: object) => this.doLog(Confluent.logLevel.ERROR, message, extra);
  warn = (message: string, extra?: object) => this.doLog(Confluent.logLevel.WARN, message, extra);
  debug = (message: string, extra?: object) => this.doLog(Confluent.logLevel.DEBUG, message, extra);
  namespace() {
    return this;
  }
  private doLog(level: Confluent.logLevel, message: string, extra?: object) {
    if (this.logLevel >= level) {
      console.log(`[ConfluentKafka] ${message}${extra ? ` ${JSON.stringify(extra)}` : ""}`);
    }
  }
}
async function until(condition: () => Promise<boolean>) {
  const timeout = 30000;
  const finish = Date.now() + timeout;
  while (Date.now() <= finish) {
    const result = await condition();
    if (result) return;
    await new Promise(resolve => setTimeout(resolve, 500));
  }
  fail(`Failed within ${timeout!}ms`);
}The test for both ultimately fails to connect all the consumers and on the KafkaJS side produces many occurrences of this error (which is not present when running KafkaJS only):
[KafkaJS:Connection] Connection error: Client network socket disconnected before secure TLS connection was established {"broker":"XXXX.westus2.azure.confluent.cloud:9092","clientId":"kafkajs"}: Error: Client network socket disconnected before secure TLS connection was established
    at connResetException (node:internal/errors:787:14)
    at TLSSocket.onConnectEnd (node:_tls_wrap:1727:19)
    at TLSSocket.emit (node:events:530:35)
    at endReadableNT (node:internal/streams/readable:1696:12)
    at process.processTicksAndRejections (node:internal/process/task_queues:82:21)
It would be helpful to understand what is conflicting here and if it can be prevented on the Confluent side or if there is a way to work around it.
I have confirmed that if I start the KafkaJS consumers before the Confluent consumers, the KafkaJS connections succeed. This is not viable in a real-world scenario however, because if later on the connection is dropped and the consumer tries to reconnect it will encounter this same issue.