Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_topic_destroy0 will crash ?? @ 0.8.6 #360

Closed
ylgeeker opened this issue Aug 28, 2015 · 12 comments
Closed

rd_kafka_topic_destroy0 will crash ?? @ 0.8.6 #360

ylgeeker opened this issue Aug 28, 2015 · 12 comments
Labels

Comments

@ylgeeker
Copy link

I have download the newest version-0.8.6 in tag.
It will be crashed when it runs for a long time and it will send the data by thousands of topics.
the stack call :
#0 0x00007f59acf2f885 in raise () from /lib64/libc.so.6

Missing separate debuginfos, use: debuginfo-install glibc-2.12-1.49.tl1.x86_64 libgcc-4.4.6-3.el6.x86_64 libstdc++-4.4.6-3.el6.x86_64
(gdb) bt
#0 0x00007f59acf2f885 in raise () from /lib64/libc.so.6
#1 0x00007f59acf31065 in abort () from /lib64/libc.so.6
#2 0x00000000005a253b in rd_kafka_crash (file=, line=, function=, rk=0x1fe3f90, reason=) at rdkafka.c:1877
#3 0x00000000005b23d8 in rd_kafka_topic_destroy0 (rkt=0x7f562800fec0) at rdkafka_topic.c:410
#4 0x00000000005b3891 in rd_kafka_topic_metadata_update (rkb=0x1fe9d60, mdt=) at rdkafka_topic.c:1056
#5 0x00000000005b0bf7 in rd_kafka_metadata_handle (rkb=0x1fe9d60, err=0, reply=0x7f56f0000940, request=0x7f56f0000aa0, opaque=0x7f5694001440) at rdkafka_broker.c:976
#6 rd_kafka_broker_metadata_reply (rkb=0x1fe9d60, err=0, reply=0x7f56f0000940, request=0x7f56f0000aa0, opaque=0x7f5694001440) at rdkafka_broker.c:1027
#7 0x00000000005ac587 in rd_kafka_req_response (rkb=0x1fe9d60) at rdkafka_broker.c:1321
#8 rd_kafka_recv (rkb=0x1fe9d60) at rdkafka_broker.c:1513
#9 0x00000000005acf30 in rd_kafka_broker_io_serve (rkb=0x1fe9d60) at rdkafka_broker.c:2452
#10 0x00000000005af0da in rd_kafka_broker_ua_idle (arg=0x1fe9d60) at rdkafka_broker.c:2475
#11 rd_kafka_broker_thread_main (arg=0x1fe9d60) at rdkafka_broker.c:4150
#12 0x00007f59adc2a7f1 in start_thread () from /lib64/libpthread.so.0
#13 0x00007f59acfe2ccd in clone () from /lib64/libc.so.6

(gdb)

@edenhill
Copy link
Contributor

In gdb can you go do this for me please?

fr 3
p *rkt

How many topics is your application using?
How long does the program typically run before crashing?
Have you observed the program's memory usage, is it running out of memory?

Thanks

@edenhill edenhill added the bug label Aug 28, 2015
@ylgeeker
Copy link
Author

@edenhill
In gdb , I input fr 3, it break here:

void rd_kafka_topic_destroy0 (rd_kafka_topic_t *rkt) {

    //  ignore .....
rd_kafka_assert(rkt->rkt_rk, rkt->rkt_refcnt == 0);   //  CRASH HERE

// ignore .....

}

So, I think that it is wrong to maintaince the topic 's refcont .
Let me show u how I use it.
1、I will create a new topic by the function rd_kafka_topic_new when I produce a msg by rd_kafka_produce. Is it OK ? For this , I am not sure!

Q:How many topics is your application using?
A:There are more than 3000 topics and the number is increasing, in my application .

Q:How long does the program typically run before crashing?
A: It is random , 1 day, 3days or 5 days and so on.

Q:Have you observed the program's memory usage, is it running out of memory?
A: I am sure it is not, there is enough memory to use.

Do you understand me ? I am not good at english .

THANKS

@ylgeeker
Copy link
Author

@edenhill
Sorry, i can not do it "p *rkt" in my application, because the corefile is lost.
I use it in my server, i am afraid that it will be crashed at sometime.
PLEASE HELP !

@edenhill
Copy link
Contributor

Can you elaborate more on what your code looks like?
Are you using one single rd_kafka_t producer handle and then create topics with topic_new() prior to calling produce()?
The number of topic_new() and topic_destroy() calls need to be symetrical, i.e., for each topic_new() you will need to eventually call topic_destroy(), but it is important that you dont call topic_destory() one time too many, that could be a reason for the refcount going below zero.

@ylgeeker
Copy link
Author

@edenhill
i use rdkafka like this:

int KP ::create(string & broker)
{
//
// CREATE A PRODUCER
//
s_brokers = broker;

conf = rd_kafka_conf_new();
//
// rk is the class KafkaProducer's member 
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if(rk == NULL)
{
    return -1;
}

if(rd_kafka_brokers_add(rk, s_brokers.c_str()) == 0)
{
    return -1;
}

//
// SUCCESS
//
return 0;

}

int KP ::produce(string & topic, int partition, string ctx)
{
//
// WRITE the ctx INTO Kafka server by topic
//
int res;
rd_kafka_topic_t *rkt;

rkt = rd_kafka_topic_new(rk, topic.c_str(), NULL);
res = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char*>(ctx.c_str()), ctx.size(), NULL, 0, NULL);
if(res == -1)
{
    return -1;
}
//
// SUCCESS
//
return 0;

}

i am sure that i have never to call topic_destory by myself. some error ?

@ylgeeker
Copy link
Author

@edenhill
i am using the function KafkaProducer::produce(string & topic, int partition, string ctx) in single alone thread.

KafkaProducer::create(string & broker) only be called one time at the begining

@ylgeeker
Copy link
Author

may i have your skype ?

@ylgeeker
Copy link
Author

i using the KP like below:

// at the application begining

KP kp;
kp.create(/* brokers ip list */);

// in a single alone thread fuction
int thread_func(void* param)
{
while( need_to_read)
{
target_data = get_one_data_from_the_data_queue();

      kp.produce(some_topic, partition_num, target_data);
}

return 0;
}

@edenhill
Copy link
Contributor

Not calling destroy() eventually is definately a problem, it means topic refcounts will keep leaking and eventually wrap around.
refcounts are 32-bit signed integers though meaning it will wrap at 2.4 billion which is quite much.
You say that the application crashes within a couple of days, lets say 1 day, that means you need a produce rate of about 25000 msgs/s before wrapping. Does that sound reasonable?

Anyway, to fix your problem you need to call rd_kafka_topic_destroy(rkt) after you're done with the topic.
But since you have so many topics and rdkafka isn't optimized for fast lookups of topic names my strong suggestion is that you add a caching layer in your app that lets you effectively look up the rd_kafka_topic_t handle and use for the produce() call.
In this case you will only call topic_new() once (when inserting the topic in the cache) and call topic_destroy() once (when removing the topic from the cache, or on program shutdown).

@ylgeeker
Copy link
Author

You mean I should Create a topic ,produce,destory the topic .like this sequence?but in the rdkafka,it maintainces a topics array,and when the response is arrived it will destory the topic from the topics.i am not sure if I destory it by myself is okay.by the way, I will produce rate of 25000msgs/s .

@edenhill
Copy link
Contributor

Yes, that is the sequence you need to use:

  • create topic: rd_kafka_topic_new(), only do this once, the first you see the topic. Use a cache to look up the topic_t handle.
  • produce messages, preferably as many as possible: produce() x N
  • destroy topic when you're done with it, preferably at program exit: rd_kafka_topic_destroy()

@edenhill edenhill added misuse and removed bug labels Aug 28, 2015
@ylgeeker
Copy link
Author

Oh,thanks for your suggestion .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants