Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threads not getting cleaned up #1803

Closed
3 of 7 tasks
vk-coder opened this issue May 11, 2018 · 11 comments
Closed
3 of 7 tasks

threads not getting cleaned up #1803

vk-coder opened this issue May 11, 2018 · 11 comments

Comments

@vk-coder
Copy link

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

If broker communication is not established threads don't get cleaned up

How to reproduce

  1. Create producer with broker ip which is not yet started.
  2. Similarly create consumer with broker ip which is not yet started.
  3. fetch metadata using consumer->metadata , which would return error
  4. delete producer and consumer
  5. repeat from beginning

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): 0.11.0
  • Apache Kafka version: <REPLACE with e.g., 0.10.2.3>
  • librdkafka client configuration:
    Producer config
    conf->set("metadata.broker.list", brokers, errstr);
    conf->set("dr_cb", delivery_cb, errstr);
    conf->set("event_cb", event_cb, errstr);
    conf->set("log_level", "0", errstr);
    conf->set("client.id", m_kafka_client_id, errstr);

consumer config
conf->set("metadata.broker.list", brokers, errstr);
conf->set("bootstrap.servers", brokers, errstr);
conf->set("topic.metadata.refresh.interval.ms","3000", errstr);
conf->set("default_topic_conf", tconf.get(), errstr);
conf->set("log_level", "0", errstr);
conf->set("group.id", group_id, errstr);
conf->set("client.id", m_kafka_client_id, errstr);
conf->set("auto.offset.reset", "earliest", errstr);
conf->set("rebalance_cb", rebalance_cb, errstr);
conf->set("statistics.interval.ms", "3000", errstr);
conf->set("event_cb", event_cb, errstr);

  • Operating system: CentOS Linux release 7.4.1708 (Core)

  • Provide logs (with debug=.. as necessary) from librdkafka
    LOG-7-WAKEUPFD: [thrd:app]: 10.60.11.3:9092/bootstrap: Enabled low-latency partition queue wake-ups
    LOG-7-BRKMAIN: [thrd::0/internal]: :0/internal: Enter main broker thread
    LOG-7-WAKEUPFD: [thrd:app]: 10.60.11.3:9092/bootstrap: Enabled low-latency ops queue wake-ups
    LOG-7-STATE: [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
    LOG-7-BROADCAST: [thrd::0/internal]: Broadcasting state change
    LOG-7-BROKER: [thrd:app]: 10.60.11.3:9092/bootstrap: Added new broker with NodeId -1
    LOG-7-TOPIC: [thrd:app]: New local topic: test1-b
    LOG-7-BRKMAIN: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Enter main broker thread
    LOG-7-TOPPARNEW: [thrd:app]: NEW test1-b [-1] 0x1eb6e10 (at rd_kafka_topic_new0:282)
    LOG-7-CONNECT: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: broker in state INIT connecting
    LOG-7-METADATA: [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
    LOG-7-TOPIC: [thrd:app]: New local topic: test1-u
    LOG-7-TOPPARNEW: [thrd:app]: NEW test1-u [-1] 0x1eb29e0 (at rd_kafka_topic_new0:282)
    LOG-7-METADATA: [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
    LOG-7-CONNECT: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Connecting to ipv4#10.60.11.3:9092 (plaintext) with socket 11
    LOG-7-STATE: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Broker changed state INIT -> CONNECT
    LOG-7-BROADCAST: [thrd:10.60.11.3:9092/bootstrap]: Broadcasting state change
    LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
    LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
    LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
    LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
    LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
    LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
    LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
    LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
    LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
    LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
    LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
    LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
    LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
    LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
    LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
    LOG-7-DESTROY: [thrd:app]: Terminating instance
    LOG-7-TERMINATE: [thrd:app]: Interrupting timers
    LOG-7-TERMINATE: [thrd:app]: Sending TERMINATE to main background thread
    LOG-7-BROADCAST: [thrd:app]: Broadcasting state change
    LOG-7-TERMINATE: [thrd:app]: Joining main background thread
    LOG-7-DESTROY: [thrd:main]: Destroy internal
    LOG-7-DESTROY: [thrd:main]: Removing all topics
    LOG-7-TOPPARREMOVE: [thrd:main]: Removing toppar test1-b [-1] 0x1eb6e10
    LOG-7-DESTROY: [thrd:main]: test1-b [-1]: 0x1eb6e10 DESTROY_FINAL
    LOG-7-TOPPARREMOVE: [thrd:main]: Removing toppar test1-u [-1] 0x1eb29e0
    LOG-7-DESTROY: [thrd:main]: test1-u [-1]: 0x1eb29e0 DESTROY_FINAL
    LOG-7-TERMINATE: [thrd:main]: Purging reply queue
    LOG-7-TERMINATE: [thrd:main]: Decommissioning internal broker
    LOG-7-TERMINATE: [thrd:main]: Join 2 broker thread(s)
    LOG-7-TERMINATE: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Handle is terminating: failed 0 request(s) in retry+outbuf
    LOG-7-BROKERFAIL: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress)
    LOG-7-STATE: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Broker changed state CONNECT -> DOWN
    LOG-7-BROADCAST: [thrd:10.60.11.3:9092/bootstrap]: Broadcasting state change
    LOG-7-BUFQ: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Purging bufq with 0 buffers
    LOG-7-BUFQ: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Updating 0 buffers on connection reset
    LOG-7-TERMINATE: [thrd::0/internal]: :0/internal: Handle is terminating: failed 0 request(s) in retry+outbuf
    LOG-7-BROKERFAIL: [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Success)
    LOG-7-STATE: [thrd::0/internal]: :0/internal: Broker changed state UP -> DOWN
    LOG-7-BROADCAST: [thrd::0/internal]: Broadcasting state change
    LOG-7-BUFQ: [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
    LOG-7-BUFQ: [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
    LOG-7-TERMINATE: [thrd:main]: Main background thread exiting
    LOG-7-TERMINATE: [thrd:app]: Destroying op queues
    LOG-7-TERMINATE: [thrd:app]: Termination done: freeing resources

  • Provide broker log excerpts

  • Critical issue

@edenhill
Copy link
Contributor

Please try to reproduce on latest release v0.11.4

@vk-coder
Copy link
Author

Ohh i tried it on 0.11.4 as well but mentioned the version that is in our production. sorry about that.

above logs are production and below logs are from 0.11.4 version

LOG-7-WAKEUPFD: [thrd:app]: 10.60.11.3:9092/bootstrap: Enabled low-latency partition queue wake-ups
LOG-7-WAKEUPFD: [thrd:app]: 10.60.11.3:9092/bootstrap: Enabled low-latency ops queue wake-ups
LOG-7-BRKMAIN: [thrd::0/internal]: :0/internal: Enter main broker thread
LOG-7-STATE: [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
LOG-7-BROADCAST: [thrd::0/internal]: Broadcasting state change
LOG-7-BROKER: [thrd:app]: 10.60.11.3:9092/bootstrap: Added new broker with NodeId -1
LOG-7-TOPIC: [thrd:app]: New local topic: test1-b
LOG-7-BRKMAIN: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Enter main broker thread
LOG-7-TOPPARNEW: [thrd:app]: NEW test1-b [-1] 0xf85e10 (at rd_kafka_topic_new0:325)
LOG-7-CONNECT: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: broker in state INIT connecting
LOG-7-METADATA: [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
LOG-7-TOPIC: [thrd:app]: New local topic: test1-u
LOG-7-TOPPARNEW: [thrd:app]: NEW test1-u [-1] 0xf82720 (at rd_kafka_topic_new0:325)
LOG-7-METADATA: [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
LOG-7-CONNECT: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Connecting to ipv4#10.60.11.3:9092 (plaintext) with socket 11
LOG-7-STATE: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Broker changed state INIT -> CONNECT
LOG-7-BROADCAST: [thrd:10.60.11.3:9092/bootstrap]: Broadcasting state change
LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
LOG-7-NOINFO: [thrd:main]: Topic test1-b partition count is zero: should refresh metadata
LOG-7-NOINFO: [thrd:main]: Topic test1-u partition count is zero: should refresh metadata
LOG-7-METADATA: [thrd:main]: Skipping metadata refresh of 2 topic(s): no usable brokers
LOG-7-DESTROY: [thrd:app]: Terminating instance
LOG-7-TERMINATE: [thrd:app]: Interrupting timers
LOG-7-TERMINATE: [thrd:app]: Sending TERMINATE to main background thread
LOG-7-BROADCAST: [thrd:app]: Broadcasting state change
LOG-7-TERMINATE: [thrd:app]: Joining main background thread
LOG-7-DESTROY: [thrd:main]: Destroy internal
LOG-7-DESTROY: [thrd:main]: Removing all topics
LOG-7-TOPPARREMOVE: [thrd:main]: Removing toppar test1-b [-1] 0xf85e10
LOG-7-DESTROY: [thrd:main]: test1-b [-1]: 0xf85e10 DESTROY_FINAL
LOG-7-TOPPARREMOVE: [thrd:main]: Removing toppar test1-u [-1] 0xf82720
LOG-7-DESTROY: [thrd:main]: test1-u [-1]: 0xf82720 DESTROY_FINAL
LOG-7-TERMINATE: [thrd:main]: Purging reply queue
LOG-7-TERMINATE: [thrd:main]: Decommissioning internal broker
LOG-7-TERMINATE: [thrd:main]: Join 2 broker thread(s)
LOG-7-TERMINATE: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Handle is terminating: failed 0 request(s) in retry+outbuf
LOG-7-BROKERFAIL: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: failed: err: Local: Broker handle destroyed: (errno: Operation now in progress)
LOG-7-TERM: [thrd::0/internal]: :0/internal: Received TERMINATE op in state UP: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
LOG-7-STATE: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Broker changed state CONNECT -> DOWN
LOG-7-BROADCAST: [thrd:10.60.11.3:9092/bootstrap]: Broadcasting state change
LOG-7-BUFQ: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Purging bufq with 0 buffers
LOG-7-BUFQ: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Updating 0 buffers on connection reset
LOG-7-TERM: [thrd:10.60.11.3:9092/bootstrap]: 10.60.11.3:9092/bootstrap: Received TERMINATE op in state DOWN: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
LOG-7-TERMINATE: [thrd::0/internal]: :0/internal: Handle is terminating: failed 0 request(s) in retry+outbuf
LOG-7-BROKERFAIL: [thrd::0/internal]: :0/internal: failed: err: Local: Broker handle destroyed: (errno: Success)
LOG-7-STATE: [thrd::0/internal]: :0/internal: Broker changed state UP -> DOWN
LOG-7-BROADCAST: [thrd::0/internal]: Broadcasting state change
LOG-7-BUFQ: [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
LOG-7-BUFQ: [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
LOG-7-TERMINATE: [thrd:main]: Main background thread exiting
LOG-7-TERMINATE: [thrd:app]: Destroying op queues
LOG-7-TERMINATE: [thrd:app]: Termination done: freeing resources

@vk-coder
Copy link
Author

anymore information required? Thanks!

@edenhill
Copy link
Contributor

It looks like that log indicates that all threads are indeed exiting.
How are you measuring threads left behind?

@vk-coder
Copy link
Author

ps -eLf | grep
all reported threads are stuck in librdkafka function calls

#0 0x00007f6acc031cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00000000004b8e65 in cnd_timedwait_ms (cnd=cnd@entry=0xab41b8, mtx=mtx@entry=0xab4190, timeout_ms=) at tinycthread.c:501
#2 0x00000000004853aa in rd_kafka_q_serve (rkq=0xab4190, timeout_ms=, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK,
callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:440
#3 0x000000000045a43c in rd_kafka_thread_main (arg=arg@entry=0xabf760) at rdkafka.c:1227
#4 0x00000000004b8c07 in _thrd_wrapper_function (aArg=) at tinycthread.c:624
#5 0x00007f6acc02de25 in start_thread () from /lib64/libpthread.so.0
#6 0x00007f6acaa7834d in clone () from /lib64/libc.so.6

@edenhill
Copy link
Contributor

in gdb can you go to stack frame 3 and print p rk.rk_name to see which instance this is?
Do you see just this thread hanging or more?

@vk-coder
Copy link
Author

#0 0x00007faa91a76cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
Missing separate debuginfos, use: debuginfo-install glibc-2.17-196.el7_4.2.x86_64 libevent-2.0.21-4.el7.x86_64 libgcc-4.8.5-16.el7_4.1.x86_64 libstdc++-4.8.5-16.el7_4.1.x86_64 openssl-libs-1.0.2k-8.el7.x86_64 protobuf-3.3.1-2.el7.centos.x86_64 zlib-1.2.7-17.el7.x86_64
(gdb) bt
#0 0x00007faa91a76cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1 0x00000000004bb825 in cnd_timedwait_ms (cnd=cnd@entry=0x1b2e2c8, mtx=mtx@entry=0x1b2e2a0, timeout_ms=) at tinycthread.c:501
#2 0x0000000000487a9a in rd_kafka_q_serve (rkq=0x1b2e2a0, timeout_ms=, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_CALLBACK,
callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:447
#3 0x000000000045a9bc in rd_kafka_thread_main (arg=arg@entry=0x1b39100) at rdkafka.c:1266
#4 0x00000000004bb5c7 in _thrd_wrapper_function (aArg=) at tinycthread.c:624
#5 0x00007faa91a72e25 in start_thread () from /lib64/libpthread.so.0
#6 0x00007faa904bd34d in clone () from /lib64/libc.so.6
(gdb) frame 3
#3 0x000000000045a9bc in rd_kafka_thread_main (arg=arg@entry=0x1b39100) at rdkafka.c:1266
1266 rd_kafka_q_serve(rk->rk_ops, (int)(sleeptime / 1000), 0,
(gdb) p rk.rk_name
$1 = "test1#consumer-2", '\000' <repeats 111 times>
(gdb)

@edenhill
Copy link
Contributor

Pay attention that all outstanding librdkafka objects are destroyed prior to destroying the instance.
https://github.com/edenhill/librdkafka/wiki/Proper-termination-sequence

@vk-coder
Copy link
Author

so this is what i have to do

struct rd_kafka_s *rk = _consumer->c_ptr();
delete _consumer;
rd_kafka_destroy(rk);

this stops all the threads. unfortunately this is not explicitly done in any of the examples as most of the examples call exit.

above call is for all those who want a graceful shutdown of application or shutdown and go into repeat mode.

thanks for you time.

@edenhill edenhill reopened this May 16, 2018
@edenhill
Copy link
Contributor

Calling rd_kafka_destroy(rk) should not be done from the C++ API.

@vk-coder
Copy link
Author

RdKafka::KafkaConsumer was wrapped inside another container and "close()" was not getting redirected to RdKafka::KafkaConsumer::close

after fixing the this issue in my application, things are working fine as of now.

@edenhill thanks for your time and help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants