Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I meet the core ,use librdkafka-0.8.6.zip and Linux version 2.6.32, gcc version 4.8.2 #415

Closed
yangkai04 opened this issue Nov 9, 2015 · 12 comments
Labels

Comments

@yangkai04
Copy link

Program terminated with signal 11, Segmentation fault.
#0 0x00000000004c61d9 in RdKafka::log_cb_trampoline (rk=, level=3, fac=,

buf=0x7ff58f39a620 "nmg01-crm-hina59.nmg01:8092/59: Receive failed: Receive error: Connection timed out") at HandleImpl.cpp:55

55 handle->event_cb_->event_cb(event);
(gdb) bt
#0 0x00000000004c61d9 in RdKafka::log_cb_trampoline (rk=, level=3, fac=,

buf=0x7ff58f39a620 "nmg01-crm-hina59.nmg01.baidu.com:8092/59: Receive failed: Receive error: Connection timed out") at HandleImpl.cpp:55

#1 0x00000000004c7f69 in rd_kafka_log0 (rk=0x22be000, extra=extra@entry=0x0, level=level@entry=3, fac=fac@entry=0x6950ac "FAIL", fmt=fmt@entry=0x6951d5 "%s")

at rdkafka.c:141

#2 0x00000000004d58ce in rd_kafka_broker_fail (rkb=rkb@entry=0x2391800, err=RD_KAFKA_RESP_ERR__TRANSPORT, fmt=fmt@entry=0x69511d "Receive failed: %s")

at rdkafka_broker.c:398

#3 0x00000000004d5fe7 in rd_kafka_recv (rkb=rkb@entry=0x2391800) at rdkafka_broker.c:1519
#4 0x00000000004d6888 in rd_kafka_broker_io_serve (rkb=rkb@entry=0x2391800) at rdkafka_broker.c:2452
#5 0x00000000004d7103 in rd_kafka_broker_consumer_serve (rkb=0x2391800) at rdkafka_broker.c:4105
#6 rd_kafka_broker_thread_main (arg=) at rdkafka_broker.c:4154
#7 0x00007ff5a07b71c3 in ?? ()
#8 0x0000000000000000 in ?? ()

@edenhill
Copy link
Contributor

edenhill commented Nov 9, 2015

Are you getting this after destroying a Kafka handle?

@edenhill
Copy link
Contributor

edenhill commented Nov 9, 2015

Can do this in gdb?

 frame 0
 p *handle
 p *handle->event_cb_
 p *event

@yangkai04
Copy link
Author

not destroy , when i get the core , it is running, ok, wait for a moment

@yangkai04
Copy link
Author

Program terminated with signal 11, Segmentation fault.
#0 0x00000000004c61d9 in RdKafka::log_cb_trampoline (rk=, level=3, fac=,
buf=0x7ff58f39a620 "nmg01-crm-hina59.nmg01:8092/59: Receive failed: Receive error: Connection timed out") at HandleImpl.cpp:55
55 handle->event_cb_->event_cb(event);
(gdb) p handle
$1 = {RdKafka::Handle = {vptr.Handle = 0x98d6e0 <vtable for RdKafka::ConsumerImpl+96>}, rk = 0x22be000, event_cb
= 0x7fff3bd57db0, socket_cb_ = 0x0,
open_cb_ = 0x0, dr_cb_ = 0x0, partitioner_cb_ = 0x0}
(gdb) p *handle->event_cb_
$2 = {vptr.EventCb = 0x9}
(gdb) p *event
No symbol "operator
" in current context.

@yangkai04
Copy link
Author

when init, I did follows:
_consumer = RdKafka::Consumer::create(_conf, errstr);
_tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
_topic = RdKafka::Topic::create(_consumer, _topicname, _tconf, errstr);

// because there is 12 partition, so I start 12 partid
for (int partid = 0; partid < _partition; ++partid) {
RdKafka::ErrorCode resp = _consumer->start(_topic, partid, start_offset);
EXPECT_EQ_OR_RETURN_FAIL_WITH_MSG(resp, RdKafka::ERR_NO_ERROR, RdKafka::err2str(resp));
}

when run, i did:
for (int partid = 0; partid < _partition; ++partid) {
RdKafka::Message *msg = _consumer->consume(_topic, partid, 1000);
...
}

if I use with some wrong?

@yangkai04
Copy link
Author

I can recieve msg, but when run for a while, I meet the problem.

it seems a kafka broker down or some other problems caused.

@edenhill
Copy link
Contributor

edenhill commented Nov 9, 2015

How did you set up _conf?

@yangkai04
Copy link
Author

set up _conf as follows:
_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
res = _conf->set("metadata.broker.list", _brokers, errstr);
ExampleEventCb ex_event_cb;
res = _conf->set("event_cb", &ex_event_cb, errstr);

@yangkai04
Copy link
Author

class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::EVENT_ERROR:
FATAL_LOG("ERROR ( %s ): %s",
RdKafka::err2str(event.err()).c_str(), event.str().c_str());
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN) {
FATAL_LOG("ERROR ( %s ): %s",
RdKafka::err2str(event.err()).c_str(), event.str().c_str());
}
break;
case RdKafka::Event::EVENT_STATS:
NOTICE_LOG("starts %s", event.str().c_str());
break;
case RdKafka::Event::EVENT_LOG:
FATAL_LOG("LOG-%i-%s: %s", event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
FATAL_LOG("EVENT type(%s) ( %s ): %s",
event.type(), RdKafka::err2str(event.err()).c_str(), event.str().c_str());
break;
}
}
};

@yangkai04
Copy link
Author

by the way, when meet net error or other error, need i restart the consumer? what error need to restart and how?

thank you~!

@edenhill
Copy link
Contributor

I'm guessing your EventCb is stack allocated and goes out of scope.

@edenhill
Copy link
Contributor

You do not need to take any action when broker connection goes down, librdkafka will reconnect automatically.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants