You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
hi,edenhill. Once I compiled my consumer with jemalloc, it must crash.If I move jemalloc, consumer runs without any problems. The program crashed at the this place every time:
string msg = string(static_cast<const char *>(imsg->payload()),0,static_cast(imsg->len()));
imsg is a pointer points to a RdKafka::Message. The gdb info is as follow:
(gdb) bt
#0 0x00007f6ccfd725b2 in __strlen_sse42 () from /lib64/libc.so.6 #1 0x00007f6cd0509f20 in std::basic_string<char, std::char_traits, std::allocator >::basic_string(char const*, std::allocator const&) ()
from /usr/lib64/libstdc++.so.6 #2 0x00000000005c0aa6 in KafkaConsumer::CmdMsgProcess (this=Unhandled dwarf expression opcode 0xf3
) at ./main/rdkafka_client.cpp:88 #3 0x00000000005c286e in KafkaConsumer::CmdConsume (this=0x7f6c6e43a000, res=0x7f69dd7ea130) at ./main/rdkafka_client.cpp:47 #4 0x00000000005b258a in ConsumeThread (args=0x7f6bf2acf460) at ./main/RtConsumeClient.cpp:131 #5 0x00007f6cd0fad9d1 in start_thread () from /lib64/libpthread.so.0 #6 0x00007f6ccfd278fd in clone () from /lib64/libc.so.6
(gdb) f 2 #2 0x00000000005c0aa6 in KafkaConsumer::CmdMsgProcess (this=Unhandled dwarf expression opcode 0xf3
) at ./main/rdkafka_client.cpp:88
88 ./main/rdkafka_client.cpp: No such file or directory.
in ./main/rdkafka_client.cpp
(gdb) p *(imsg)
$1 = {RdKafka::Message = {vptr.Message = 0xb48390}, topic = 0x7f6c6e439000, rkmessage_ = 0x7f6c74c3db90, free_rkmessage_ = true, rkmessage_err_ = {
err = RD_KAFKA_RESP_ERR__TIMED_OUT, rkt = 0x0, partition = 0, payload = 0x0, len = 0, key = 0x0, key_len = 0, offset = 0, private = 0x0}, key = 0x7f6c6e43c030}
(gdb) p *(imsg->rkmessage_)
$2 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f6c6e43b100, partition = 37, payload = 0x7f6c74dffc58, len = 936, key = 0x7f6c74dffc47, key_len = 13,
offset = 640602, private = 0x7f6c74c3db40}
I noticed that rkmessage->err=RD_KAFKA_RESP_ERR_NO_ERROR
but rkmessage_err_.err=RD_KAFKA_RESP_ERR__TIMED_OUT. Is it abnormal?
There is another question about threads number.If there were 3 brokers in kafka cluster, I created a RdKafka::Consumer to consume message.From issue #284, can I conclude that the process will create 6 threads (two threads every broker) + 1 thread (rd_kafka_t). Or you have optimized this in the newest version.
ps:I use the newest version in master branch today.
The text was updated successfully, but these errors were encountered:
If Message->err() it means the Message is actual an error or event propagation, not an actual message, and in this case you should not access ->payload() or ->key().
rdkafka will create one thread per bootstrap broker and one thread per real broker learnt from the cluster.
So if you have three brokers and you supply all three as bootstrap brokers then there will be 3*2 threads.
The latest master will try to migrate a bootstrap broker handle to a proper broker handle thus not wasting any threads on bootstrap brokers. But this requires the hostname and port to be identical of the bootstrap and real broker.
hi,edenhill. Once I compiled my consumer with jemalloc, it must crash.If I move jemalloc, consumer runs without any problems. The program crashed at the this place every time:
string msg = string(static_cast<const char *>(imsg->payload()),0,static_cast(imsg->len()));
imsg is a pointer points to a RdKafka::Message. The gdb info is as follow:
(gdb) bt
#0 0x00007f6ccfd725b2 in __strlen_sse42 () from /lib64/libc.so.6
#1 0x00007f6cd0509f20 in std::basic_string<char, std::char_traits, std::allocator >::basic_string(char const*, std::allocator const&) ()
from /usr/lib64/libstdc++.so.6
#2 0x00000000005c0aa6 in KafkaConsumer::CmdMsgProcess (this=Unhandled dwarf expression opcode 0xf3
) at ./main/rdkafka_client.cpp:88
#3 0x00000000005c286e in KafkaConsumer::CmdConsume (this=0x7f6c6e43a000, res=0x7f69dd7ea130) at ./main/rdkafka_client.cpp:47
#4 0x00000000005b258a in ConsumeThread (args=0x7f6bf2acf460) at ./main/RtConsumeClient.cpp:131
#5 0x00007f6cd0fad9d1 in start_thread () from /lib64/libpthread.so.0
#6 0x00007f6ccfd278fd in clone () from /lib64/libc.so.6
(gdb) f 2
#2 0x00000000005c0aa6 in KafkaConsumer::CmdMsgProcess (this=Unhandled dwarf expression opcode 0xf3
) at ./main/rdkafka_client.cpp:88
88 ./main/rdkafka_client.cpp: No such file or directory.
in ./main/rdkafka_client.cpp
(gdb) p *(imsg)
$1 = {RdKafka::Message = {vptr.Message = 0xb48390}, topic = 0x7f6c6e439000, rkmessage_ = 0x7f6c74c3db90, free_rkmessage_ = true, rkmessage_err_ = {
err = RD_KAFKA_RESP_ERR__TIMED_OUT, rkt = 0x0, partition = 0, payload = 0x0, len = 0, key = 0x0, key_len = 0, offset = 0, private = 0x0}, key = 0x7f6c6e43c030}
(gdb) p *(imsg->rkmessage_)
$2 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x7f6c6e43b100, partition = 37, payload = 0x7f6c74dffc58, len = 936, key = 0x7f6c74dffc47, key_len = 13,
offset = 640602, private = 0x7f6c74c3db40}
I noticed that rkmessage->err=RD_KAFKA_RESP_ERR_NO_ERROR
but rkmessage_err_.err=RD_KAFKA_RESP_ERR__TIMED_OUT. Is it abnormal?
There is another question about threads number.If there were 3 brokers in kafka cluster, I created a RdKafka::Consumer to consume message.From issue #284, can I conclude that the process will create 6 threads (two threads every broker) + 1 thread (rd_kafka_t). Or you have optimized this in the newest version.
ps:I use the newest version in master branch today.
The text was updated successfully, but these errors were encountered: