Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seg fault with multi brokers #224

Closed
affranchi opened this issue Mar 25, 2015 · 11 comments
Closed

Seg fault with multi brokers #224

affranchi opened this issue Mar 25, 2015 · 11 comments

Comments

@affranchi
Copy link

Hello,

I'm using rdkafka library. I got seg fault when I set multi brokers.
FYI, seg fault didn't provide any traces and I generated multi broker topic using the below command:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic mytopic --partitions 1 -replication-factor 3

My code to open connection for producer is on the below (broker list is provided from parameters with the form "IP:PORT,IP2:PORT2,IP3:PORT3") :

void KafkaRdHandler::openConnection(std::string topicName)
{

conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

if (conf->set("queue.buffering.max.messages","50000",errstr) != RdKafka::Conf::CONF_OK) {
    logWarn(mName + " : " + errstr, 0);
}

if (conf->set("batch.num.messages","500",errstr) != RdKafka::Conf::CONF_OK) {
    logWarn(mName + " : " + errstr, 0);
}

logDebug(mName + " : " + mBrokerUrl + " : " + topicName,0);
if (conf->set("metadata.broker.list", mBrokerUrl, errstr) != RdKafka::Conf::CONF_OK) {
    logWarn(mName + " : " + errstr,0);        
}

if (conf->set("message.send.max.retries", "0", errstr) !=
RdKafka::Conf::CONF_OK) {
logWarn(mName + " : " + errstr,0);
}

ex_event_cb = new ExampleProducerEventCb();
conf->set("event_cb", ex_event_cb, errstr);

/*
 * Producer mode
 */
ex_dr_cb = new ExampleDeliveryReportCb();

/* Set delivery report callback */
conf->set("dr_cb", ex_dr_cb, errstr);   

/*
 * Create producer using accumulated global configuration.
 */
mProducer = RdKafka::Producer::create(conf, errstr);
if (!mProducer) {
  logErr(mName + " : Failed to create producer",0);
}

logDebug(mName + " : Created producer " + mProducer->name() + " and topic : " + topicName,0);


/*
 * Create topic handle.
 */
mUpTopic = RdKafka::Topic::create(mProducer, topicName, //mUpTopicName,
                                               tconf, errstr);
if (!mUpTopic) {
  logErr(mName + " : Failed to create topic.",0);
}

}

Is there something I missed? It works well with single broker.

In addition to the above question, does rdkafka not support "bootstrap.server" and "block.on.buffer.full" in new producer configs (http://kafka.apache.org/documentation.html#newproducerconfigs)?

Thank you in advance.

Regards,
K

@edenhill
Copy link
Contributor

Can you reproduce this by providing the semi list of brokers to rdkafka_example or rdkafka_example_cpp with the -b option?

I will add support (aliasing) for the new producer configs.

@edenhill
Copy link
Contributor

Could you also try running your program inside gdb?

$ gdb ./your-program
...
(gdb)  run
<wait for crash>
(gdb) bt

@affranchi
Copy link
Author

I'm sorry for late response.
I didn't try with rdkaka_example_cpp yet. (I'll do soon.)

#0 0x00007fffe4071ff0 in ?? ()
#1 0x00007ffff31038bf in RdKafka::log_cb_trampoline (rk=, level=5, fac=, buf=0x7fffe97f8590 "Topic responseAcknowledgement [0] migrated to unknown broker 1: requesting metadata update")
at HandleImpl.cpp:55
#2 0x00007ffff2ed69e6 in rd_kafka_log0 (rk=0x7fffe40725f0, extra=, level=5, fac=0x7ffff2ef2847 "TOPICBRK", fmt=) at rdkafka.c:141
#3 0x00007ffff2ee9ef9 in rd_kafka_topic_leader_update (rkb=, mdp=, rkt=0x7fffe40724d0) at rdkafka_topic.c:688
#4 rd_kafka_topic_metadata_update (rkb=0x7fffe4072920, mdt=0x475d513a) at rdkafka_topic.c:1032
#5 0x00007ffff2ee6f77 in rd_kafka_metadata_handle (size=1243, buf=, rko=0x4e373b70, rkb=0x7fffe4072920) at rdkafka_broker.c:976
#6 rd_kafka_broker_metadata_reply (rkb=0x7fffe4072920, err=0, reply=0x4765fc20, request=0x4765fab0, opaque=0x4e373b70) at rdkafka_broker.c:1027
#7 0x00007ffff2ee2907 in rd_kafka_req_response (rkbuf=0x4765fc20, rkb=0x7fffe4072920) at rdkafka_broker.c:1321
#8 rd_kafka_recv (rkb=0x7fffe4072920) at rdkafka_broker.c:1513
#9 0x00007ffff2ee32b0 in rd_kafka_broker_io_serve (rkb=0x7fffe4072920) at rdkafka_broker.c:2452
#10 0x00007ffff2ee545a in rd_kafka_broker_ua_idle (rkb=0x7fffe4072920) at rdkafka_broker.c:2475
#11 rd_kafka_broker_thread_main (arg=0x7fffe4072920) at rdkafka_broker.c:4150
#12 0x00007ffff68ecb50 in start_thread (arg=) at pthread_create.c:304
#13 0x00007ffff2483e6d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:112
#14 0x0000000000000000 in ?? ()

The GDB trace is shown on the above.
It's running over 3 brokers now.

Regards,
K

@edenhill
Copy link
Contributor

Looks like your event/log callback is crashing or erroneously set up.
Did you set an EventCb class?

@affranchi
Copy link
Author

Yes, I did.

On Header file:

    ExampleProducerEventCb* ex_event_cb;
    ExampleDeliveryReportCb* ex_dr_cb;

on the function in CPP:

ex_event_cb = new ExampleProducerEventCb();
conf->set("event_cb", ex_event_cb, errstr);

/*

  • Producer mode
    */
    ex_dr_cb = new ExampleDeliveryReportCb();

/* Set delivery report callback */
conf->set("dr_cb", ex_dr_cb, errstr);

Did I do something wrong?
FYI, it works with commenting out those event sets.

@edenhill
Copy link
Contributor

What does your ExampleProducerEventCb class look like?

@affranchi
Copy link
Author

I copied from the example.

class ExampleProducerEventCb : public RdKafka::EventCb {
 public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
    {
      case RdKafka::Event::EVENT_ERROR:
        Log_TRACE( "Producer ERROR (" + RdKafka::err2str(event.err()) + "): " + event.str(), 0);
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        Log_TRACE( "Producer STATS " + event.str(), 0 );
        break;

      case RdKafka::Event::EVENT_LOG:
        Log_TRACE( "Producer LOG-" + std::to_string(event.severity()) + "-" + event.fac() + ": " + event.str(),0 );
        break;

      default:
        Log_TRACE( "Producer EVENT " + std::to_string(event.type()) + " (" + RdKafka::err2str(event.err()) + "): " + event.str(), 0);
        break;
    }
  }
};

@edenhill
Copy link
Contributor

Maybe your instance of ex_event_cb is freed/deleted?

@edenhill
Copy link
Contributor

Try running your program with valgrind:
valgrind ./your-program

It should report what the problem is

@affranchi
Copy link
Author

I'm sorry for late response.
I found the place which deletes ex_evenet_cb. That's my bad.

Thank you so much for your help!

@edenhill
Copy link
Contributor

Glad you found it!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants