Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_buf_destroy() always aborts - causing an immediate crash of the consumer app #353

Closed
chriscrenshaw opened this issue Aug 14, 2015 · 26 comments

Comments

@chriscrenshaw
Copy link

Client uses latest from rdkafka C from master branch, connecting to Kafka 0.8.2.
OS is Centos 6.6.

As soon as data is put into Kafka, the library consumes it and immediately crashes.
Unfortunately when run within valgrind, with both the memtool and helgrind, the problem does
not re-occur. When run directly, occurs every time, and immediately.

Also, the consumer ran perfectly for a couple of weeks of heavy testing, and suddenly stopped
working. My suspicion is that a configuration change (or some other change) in Kafka and/or ZK
may be triggering a new code path in the consumer/library - although this is just intuition. There
were no changes/upgrades to Kafka or ZK (in short, not sure what changed).

Here is the stack trace from gdb:
#0 0x00007f50f6fd8625 in raise () from /lib64/libc.so.6
#1 0x00007f50f6fd9e05 in abort () from /lib64/libc.so.6
#2 0x00007f50f7016537 in __libc_message () from /lib64/libc.so.6
#3 0x00007f50f701be66 in malloc_printerr () from /lib64/libc.so.6
#4 0x00007f50f701e9b3 in _int_free () from /lib64/libc.so.6
#5 0x0000000000444ea1 in rd_kafka_buf_destroy (rkbuf=0x7f50e800b030) at rdkafka_broker.c:166
#6 0x000000000043f8c5 in rd_kafka_op_destroy (rko=0x7f50e800b190) at rdkafka.c:198
#7 0x000000000040a6bf in run_kafka_thread (arg=0x2673a70) at src/run_kafka_thr.c:179
#8 0x00007f50f79e39d1 in start_thread () from /lib64/libpthread.so.0
#9 0x00007f50f708e8fd in clone () from /lib64/libc.so.6

@edenhill
Copy link
Contributor

That doesn't sound good.
Could you do this in gdb for me:

frame 6
p *rko
p *rko->rko_rkbuf

@chriscrenshaw
Copy link
Author

(gdb) frame 6
#6  0x000000000043f8c5 in rd_kafka_op_destroy (rko=0x7f50e800b190) at rdkafka.c:198
198         rd_kafka_buf_destroy(rko->rko_rkbuf);
(gdb) p *rko
$1 = {rko_link = {tqe_next = 0x0, tqe_prev = 0x7f50d800d338}, rko_type = RD_KAFKA_OP_FETCH, rko_flags = 0, 
  rko_msgq = {rkmq_msgs = {tqh_first = 0x0, tqh_last = 0x0}, rkmq_msg_cnt = 0, rkmq_msg_bytes = 0}, 
  rko_replyq = 0x0, rko_intarg = 0, rko_rkm = 0x0, rko_rkmessage = {err = RD_KAFKA_RESP_ERR_NO_ERROR, 
    rkt = 0x2671f90, partition = 15, payload = 0x7f50e800c396, len = 882, key = 0x7f50e800c38d, key_len = 5, 
    offset = 56643, _private = 0x7f50e800b190}, rko_rkbuf = 0x7f50e800b030, rko_metadata = 0x0, 
  rko_rktp = 0x7f50d800d1f0}
(gdb) p *rko->rko_rkbuf
$2 = {rkbuf_link = {tqe_next = 0x0, tqe_prev = 0x0}, rkbuf_corrid = 0, rkbuf_ts_retry = 0, rkbuf_flags = 0, 
  rkbuf_msg = {msg_name = 0x0, msg_namelen = 0, msg_iov = 0x7f50e800b160, msg_iovlen = 2, msg_control = 0x0, 
    msg_controllen = 0, msg_flags = 0}, rkbuf_iov = 0x7f50e800b160, rkbuf_iovcnt = 2, rkbuf_of = 2032, 
  rkbuf_len = 2024, rkbuf_size = 0, rkbuf_buf = 0x7f50e800b180 "в", rkbuf_buf2 = 0x7f50e800bf20 "", 
  rkbuf_wbuf = 0x7f50e800b180 "в", rkbuf_wof = 0, rkbuf_reqhdr = {Size = 0, ApiKey = 0, ApiVersion = 0, 
    CorrId = 0}, rkbuf_reshdr = {Size = -335085568, CorrId = 386}, rkbuf_expected_size = 0, rkbuf_cb = 0, 
  rkbuf_hndcb = 0, rkbuf_hndopaque = 0x0, rkbuf_refcnt = 0, rkbuf_opaque = 0x0, rkbuf_retries = 0, 
  rkbuf_ts_enq = 0, rkbuf_ts_sent = 0, rkbuf_ts_timeout = 0, rkbuf_offset = 0, rkbuf_msgq = {rkmq_msgs = {
      tqh_first = 0x0, tqh_last = 0x7f50e800b140}, rkmq_msg_cnt = 0, rkmq_msg_bytes = 0}}
(gdb) 

@chriscrenshaw
Copy link
Author

Also have large helgrind output that I can send (won't fit in comment window)

@edenhill
Copy link
Contributor

Can you mail me the helgrind output to info@edenhill.se?

@edenhill
Copy link
Contributor

When it crashes does glibc give you any information to why it crashes?
Typically when things crash in free() it prints stuff like "double free" or "invalid pointer" when crashing.
You could also try dereferring the memory in gdb to see if it is mapped:
x/2024c rko->rko_rkbuf->rkbuf_buf2

@chriscrenshaw
Copy link
Author

*** glibc detected *** /tmp/pirate_debug: double free or corruption (!prev): 0x00007ffee80239a0 ***

@edenhill
Copy link
Contributor

Ah, thanks. Which rd_kafka_consume*() API are you using to retrieve messages?

@chriscrenshaw
Copy link
Author

ssize_t message_count = rd_kafka_consume_batch(rkt, partition, timeout_millis, messages, max_messages);

@edenhill
Copy link
Contributor

Okay, and where does it crash?
When calling rd_kafka_message_destroy()?
On which message in your messages[] array?
Can you print the rd_kafka_message_t pointer for each message in array to make sure there aren't any duplicates?

@chriscrenshaw
Copy link
Author

Here is the processing loop - without the uninteresting guts in the middle:

for (ssize_t a = 0; a < messages_read; a++) {
rd_kafka_message_t *message = messages[a];

if (message->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) {
    ...
}
rd_kafka_message_destroy(message);
messages[a] = 0;

}

@chriscrenshaw
Copy link
Author

And yes, it seems to be when calling rd_kafka_message_destroy()

@edenhill
Copy link
Contributor

Looks good.
Can you double check there aren't any duplicates returned in the array?
Are you using any form of compression, if so which?

@chriscrenshaw
Copy link
Author

No compression.
Checking for dups now.

@edenhill
Copy link
Contributor

Out of curiosity, what if you do either of these, does the problem persist?:

  • move away any processing code from your loop, just do consume_batch() followed by the destroys.
  • replace consume_batch() with consume_callback() or consume()

@edenhill
Copy link
Contributor

Also, if you try consuming the same topic+partition and offset with kafkacat (or rdkafka_example), does that crash too?

@chriscrenshaw
Copy link
Author

I haven't tried any other clients (what is kafkacat?) I can try it.
Also, I can try your other suggestions regarding code changes to the client - would you like me to try?

@edenhill
Copy link
Contributor

kafkacat is a generic consumer&producer built on top of librdkafka, so it would utilize the same library code as your program:
https://github.com/edenhill/kafkacat

Yes, please try the modifications.

@chriscrenshaw
Copy link
Author

Took out the code in the middle (now just consume batch, then free messages).
Problem no longer occurring. Interesting.

But I also know that when I run valgrind, problem never occurs. So, not sure if the problem was the
processing code, or the fact that the timings were changed.

@chriscrenshaw
Copy link
Author

Is it worth taking the time to change the batch consume to individual? (and putting the processing code back in)

@edenhill
Copy link
Contributor

Interesting.
And you're sure you are not freeing the rkmessage->payload pointer in your code or in any code to which you pass that pointer?

@chriscrenshaw
Copy link
Author

FYI when running valgrind with mem tool, everything comes up very clean.

@chriscrenshaw
Copy link
Author

Let me double check

@edenhill
Copy link
Contributor

Try adding a usleep(500000); in your processing block (instead of your ordinary code) to see if you can trigger the race condition, if any.

@chriscrenshaw
Copy link
Author

Definitely not freeing the payload, let me try the timer

@chriscrenshaw
Copy link
Author

Ok, I think the problem is solved. Here again is the processing loop (with slightly more detail).
Notice the line that writes a NULL to the end of the payload.
When I comment out that line, works fine. When I put it in, it fails. Consistently.

Aside from the fact that I should be writing a 0 at [len-1] instead of [len], is there any other
problem with doing this? I suspect I'm just stepping one past the end of the buffer and
corrupting the next value. If I use [len-1] would you see any problems this, regarding the
shared buffer approach being used?

for (ssize_t a = 0; a < messages_read; a++) {
rd_kafka_message_t *message = messages[a];

        if (message->err != RD_KAFKA_RESP_ERR__PARTITION_EOF) {
            offset = message->offset;   
            if (message->len > 0 && message->payload != NULL) {
                char *content = (char *) message->payload;
                content[message->len] = 0;

                // Do stuff
            }
        }
        rd_kafka_message_destroy(message);
        messages[a] = 0;
    }

@edenhill
Copy link
Contributor

Good catch.

There is currently no problem doing so, but I cannot guarantee that it will be future proof.
I.e., if memory mappings are moved to disk or is otherwise shared or constrained in any way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants