Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rdkafka mock using C++ API end up with 'Required feature not supported by broker' (Error code: -165) #2693

Closed
4 of 7 tasks
quentingodeau opened this issue Jan 20, 2020 · 13 comments

Comments

@quentingodeau
Copy link

Description

I'm trying to create a integration test in C++ using the high level API. I end up with the following error from RdKafka:
Required feature not supported by broker
Error code: -165

I can read in the mock code that it does not support consumer group yet.
Does the error I'm facing is related to this limitation ?
If it's the case, how can I use the high level KafkaConsumer API that make group.id mandatory.

How to reproduce

Here below the code used:

  // Create kafka mock cluster
  char             errstr[256];
  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  rd_kafka_t *     rk   = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
  ASSERT_TRUE(nullptr != rk) << "Failed to create mock cluster rd_kafka_t: " << errstr;

  rd_kafka_mock_cluster_t *mcluster = rd_kafka_mock_cluster_new(rk, 3 /* broker_cnt */);
  ASSERT_TRUE(nullptr != mcluster) << "Failed to acquire mock cluster";

  const char *bootstraps = rd_kafka_mock_cluster_bootstraps(mcluster);

  // Set env KAFKA_BROKER variable with the addresses of the mock cluster
  setenv("KAFKA_BROKER", bootstraps, 1);

  // For clarity I do not detail the creation code
  // Creation of RdKafka::Producer
  // Creation of RdKafka::KafkaConsumer


  // Destroy mock cluster
  rd_kafka_mock_cluster_destroy(mcluster);
  rd_kafka_destroy(rk);

Configuration:

kafka:
  configuration:
    metadata.broker.list: ${KAFKA_BROKER:""}
    client.id: MOCK
    # Think this is not supported and failed on consume...
    group.id: test-integration
    #enable.auto.commit: false
    #enable.partition.assignment: false
    # broker.version.fallback: '0.9.0'
    api.version.request: true
    max.poll.interval.ms: 10000
    # test.mock.num.brokers: 3
  topics:
    my-topic-name:
      name: real-name-of-the-topic

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

  • librdkafka version (release number or git tag): 1.3.0>
  • Apache Kafka version: N/A (mock)
  • librdkafka client configuration: provide in section 'How to reproduce'
  • Operating system: Centos 7 (x64) but sample code is compiled in 32 bits
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Correct, the mock broker does not yet support balanced consumer groups, which is what subscribe() requires to operate.

You can use the non-balanced consumer instead by calling assign() with the specific partitions to consume rather than subscribe().

@edenhill
Copy link
Contributor

edenhill commented Feb 5, 2020

There's now initial naiive support for consumer groups in the mock broker on the sessmtout branch (soon to be merged)

@quentingodeau
Copy link
Author

Thanks I will try to take sometime to test this this week :)

@quentingodeau
Copy link
Author

Hi @edenhill,

So I test the new mock feature and my error (Required feature not supported by broker) disappear but, I do not receive the message on the consumer part. I just run my test and did not take time to look into it, I will try to do it next week :)

@edenhill
Copy link
Contributor

Did you seed the topic first by producing messages to it?

@quentingodeau
Copy link
Author

quentingodeau commented Feb 21, 2020

Did you seed the topic first by producing messages to it?

Yes I try this two but no improvement :(
Does the fact I use the C++ API can change something ?
Can I create the topic without required to send a message to it ?

@edenhill
Copy link
Contributor

Try setting debug=mock config on the mock rd_kafka_t instance.

@quentingodeau
Copy link
Author

quentingodeau commented Feb 21, 2020

Here the code that allow me to create the mock cluster:

class KafkaClusterMock {
 public:
  struct TopicDescription {
    std::string topic_name;
    int         partition_cnt;
    int         replication_factor;
  };

  KafkaClusterMock(int broker_cnt, const std::vector<TopicDescription> &topics) : rk_(nullptr), mcluster_(nullptr) {
    char             errstr[256];
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    rk_                   = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (nullptr == rk_) {
      throw std::runtime_error(std::string("Failed to create mock cluster rd_kafka_t: ") + errstr);
    }

    mcluster_ = rd_kafka_mock_cluster_new(rk_, broker_cnt);
    if (nullptr == mcluster_) {
      throw std::runtime_error("Failed to acquire mock cluster");
    }

    for (const auto &topic : topics) {
      rd_kafka_resp_err_t topic_err =
          rd_kafka_mock_topic_create(mcluster_, topic.topic_name.data(), topic.partition_cnt, topic.replication_factor);
      if (RD_KAFKA_RESP_ERR_NO_ERROR != topic_err) {
        throw std::runtime_error("Failed to create the mock topic (" + topic.topic_name + "): " + rd_kafka_err2str(topic_err));
      }
    }
  }

  ~KafkaClusterMock() {
    rd_kafka_mock_cluster_destroy(mcluster_);
    rd_kafka_destroy(rk_);
  }

  inline std::string bootstraps() const { return rd_kafka_mock_cluster_bootstraps(mcluster_); }

 private:
  rd_kafka_t *             rk_;
  rd_kafka_mock_cluster_t *mcluster_;
};

Instantiate like this:

KafkaClusterMock kafka_cluster_mock(broker_cnt, {{.topic_name = "real-name-of-the-topic", .partition_cnt = 1, .replication_factor = 1}});

And the traces:

%7|1582301639.800|INIT|MOCK-p#producer-2| [thrd:app]: librdkafka v1.4.0-pre5 (0x1040005) MOCK-p#producer-2 initialized (builtin.features snappy,sasl,regex,lz4,sasl_plain,plugins, GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS HDRHISTOGRAM SNAPPY SOCKEM, debug 0x10000)
%7|1582301639.801|INIT|MOCK-c#consumer-3| [thrd:app]: librdkafka v1.4.0-pre5 (0x1040005) MOCK-c#consumer-3 initialized (builtin.features snappy,sasl,regex,lz4,sasl_plain,plugins, GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS HDRHISTOGRAM SNAPPY SOCKEM, debug 0x10000)
%7|1582301654.303|DESTROY|MOCK-c#consumer-3| [thrd:app]: Terminating instance (destroy flags none (0x0))
%3|1582301654.303|ERROR|MOCK-c#consumer-3| [thrd:GroupCoordinator]: 1/1 brokers are down
%7|1582301654.303|DESTROY|MOCK-c#consumer-3| [thrd:main]: Destroy internal
%7|1582301654.303|DESTROY|MOCK-c#consumer-3| [thrd:main]: Removing all topics
%7|1582301654.304|DESTROY|MOCK-p#producer-2| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1582301654.304|DESTROY|MOCK-p#producer-2| [thrd:main]: Destroy internal
%7|1582301654.304|DESTROY|MOCK-p#producer-2| [thrd:main]: Removing all topics

@edenhill
Copy link
Contributor

The consumer debug implies that it is unable to connect to the mock cluster, check the bootstrap servers and enable debug=broker on the consumer and debug=mock on the mock cluster.

@quentingodeau
Copy link
Author

Sorry I miss your point the first time thanks :)
Here the new log file: kafka-mock.log

@edenhill
Copy link
Contributor

You are destroying the consumer:
%7|1582304892.552|DESTROY|MOCK-c#consumer-3| [thrd:app]: Terminating instance (destroy flags none (0x0))

@quentingodeau
Copy link
Author

A ok, maybe I'm doing it too soon... Because I have thread I was assuming that wait for one second was enough. But now I realize that it might be not... I will change the way I performed my test and tell you the results tomorrow. Thanks again for your help.

@quentingodeau
Copy link
Author

quentingodeau commented Feb 22, 2020

Hi @edenhill,

I finally found my issue. The issue was that I start the consumer and the producer almost at the same time, first the producer and just after the consumer. Because of what you said in a previous message:

Did you seed the topic first by producing messages to it?

I assumed that my issue was I missing a first message to really initialize the topic so I add in my mock class the following lines:

void KafkaClusterMock::createTopics(const std::vector<KafkaClusterMock::TopicDescription> &topics) {
  for (const auto &topic : topics) {
    rd_kafka_resp_err_t topic_err =
        rd_kafka_mock_topic_create(mcluster_, topic.topic_name.data(), topic.partition_cnt, topic.replication_factor);
    if (RD_KAFKA_RESP_ERR_NO_ERROR != topic_err) {
      throw std::runtime_error("Failed to create the mock topic (" + topic.topic_name + "): " + rd_kafka_err2str(topic_err));
    }
    seedTopic(topic.topic_name);
  }
}

void KafkaClusterMock::seedTopic(const std::string &topic_name) {
  char        errstr[256]        = {'\0'};
  std::string bootstraps_servers = bootstraps();

  rd_kafka_conf_t *conf = rd_kafka_conf_new();
  rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
  // rd_kafka_conf_set(conf, "debug", "all", errstr, sizeof(errstr));

  if (rd_kafka_conf_set(conf, "bootstrap.servers", bootstraps_servers.data(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    throw std::runtime_error("Failed to configure 'bootstrap.servers' to seed the topic " + topic_name + "error: " + errstr);
  }

  rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
  if (nullptr == rk) {
    throw std::runtime_error("Failed to create RdKafka producer to seed the topic " + topic_name + "error: " + errstr);
  }

  rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
  // Make sure all replicas are in-sync after producing so that consume test wont fail.
  rd_kafka_conf_res_t conf_result = rd_kafka_topic_conf_set(topic_conf, "request.required.acks", "-1", errstr, sizeof(errstr));
  if (conf_result != RD_KAFKA_CONF_OK) {
    throw std::runtime_error(std::string("Invalid configuration request.required.acks error: ") + errstr);
  }

  rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name.data(), topic_conf);
  if (nullptr == rkt) {
    throw std::runtime_error("Failed to create RdKafka topic " + topic_name);
  }

  int remains = 1;
  if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, nullptr, 0, nullptr, 0, &remains) == -1) {
    throw std::runtime_error("Failed to produce a message on " + topic_name + " to seed it");
  }

  while (remains > 0 && rd_kafka_outq_len(rk) > 0) {
    rd_kafka_poll(rk, 1000);
  }

  rd_kafka_topic_destroy(rkt);
  rd_kafka_destroy(rk);
  if (remains != 0) {
    throw std::runtime_error("Failed to delivered a message on " + topic_name + " to seed it");
  }
}

Like this I'm sure that the brokers run and the topic exist. Then like I said before I start a new producer and consumer. At this point the new producer/consumer created have almost the default configuration and run in two different thread.

Here my hypothesis on one happen:
I was assuming that the consumer will, through call subscribe(topics)*, start at the offset 1 and so read the next message publish by the producer. But maybe it required some time and the producer send is message to fast so when the consumer really subscribe the message has already been publish on the broker. So by changing the consumer configuration auto.offset.reset = beginning I get my messages.

Is my hypothesis correct, or do I still miss something?

* The subscribe is perform at the beginning of the consumer thread, so in pseudo code:

producer->start();
consumer->start();
producer->publish(object);

@edenhill edenhill closed this as completed Apr 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants