Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Help Wanted]Crash on rd_kafka_broker_destroy_final #3608

Open
7 tasks done
morningman opened this issue Nov 5, 2021 · 32 comments
Open
7 tasks done

[Help Wanted]Crash on rd_kafka_broker_destroy_final #3608

morningman opened this issue Nov 5, 2021 · 32 comments
Labels

Comments

@morningman
Copy link

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Do NOT create issues for questions, use the discussion forum: https://github.com/edenhill/librdkafka/discussions

Description

My program crash with following stack:

*** Aborted at 1632307784 (unix time) try "date -d @1632307784" if you are using GNU date ***
PC: @ 0x7f2e6145a387 __GI_raise
*** SIGABRT (@0x27de9) received by PID 163305 (TID 0x7f2d8a2f0700) from PID 163305; stack trace: ***
@ 0x1fd12e2 google::(anonymous namespace)::FailureSignalHandler()
@ 0x7f2e61b03630 (unknown)
@ 0x7f2e6145a387 __GI_raise
@ 0x7f2e6145ba78 __GI_abort
@ 0x7f2e614531a6 __assert_fail_base
@ 0x7f2e61453252 __GI___assert_fail
@ 0x1d7c767 rd_kafka_broker_destroy_final
@ 0x1dfccb8 rd_kafka_metadata_refresh_topics
@ 0x1dfd04a rd_kafka_metadata_refresh_known_topics
@ 0x1d774c2 rd_kafka_broker_fail
@ 0x1d87bd4 rd_kafka_broker_op_serve
@ 0x1d89463 rd_kafka_broker_ops_io_serve
@ 0x1d89968 rd_kafka_broker_consumer_serve
@ 0x1d8b485 rd_kafka_broker_thread_main
@ 0x1df43a7 _thrd_wrapper_function
@ 0x7f2e61afbea5 start_thread
@ 0x7f2e615229fd __clone

How to reproduce

I don't know how to reproduce, my program is a little bit complicated.
And all things go well at most cases. This error only happens occasionally.

I just want to know under what circumstances may the above problem occur so that I can continue to troubleshoot my program.

IMPORTANT: Always try to reproduce the issue on the latest released version (see https://github.com/edenhill/librdkafka/releases), if it can't be reproduced on the latest version the issue has been fixed.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): 1.8.0
  • Apache Kafka version: 0.11
  • librdkafka client configuration:
    enable.partition.eof=false enable.auto.offset.store=false statistics.interval.ms=0 auto.offset.reset=error api.version.request=true api.version.fallback.ms=0
  • Operating system: centos7 x86_64
  • Provide logs (with debug=.. as necessary) from librdkafka: No logs, just crash...
  • Provide broker log excerpts
  • Critical issue
@morningman morningman changed the title [Bug]Crash on rd_kafka_broker_destroy_final [Help Wanted]Crash on rd_kafka_broker_destroy_final Nov 5, 2021
@morningman
Copy link
Author

By the way, In my program, the RdKafka::KafkaConsumer is reused by many threads. But only one thread will use it at one time. Hope this info can help. Thank you!

@WindyGao
Copy link

WindyGao commented Nov 7, 2021

The same error occurs under kafka V2.5

@edenhill
Copy link
Contributor

edenhill commented Nov 7, 2021

Please provide a reproducible test program, thanks.

@morningman
Copy link
Author

Please provide a reproducible test program, thanks.

HI @edenhill , sorry that it is hard to reproduce... It just happens occasionally.
So I just hope to provide some possible reasons for this problem, so that we can further investigate.

@edenhill
Copy link
Contributor

edenhill commented Nov 8, 2021

There is not enough information to go on.

Please provide:

  • the assert reason itself (should be printed to stderr or stdout)
  • when is this happening? Are there any other events (disconnects, rebalances, etc) at the same time?
  • logs
  • check your object usage: make sure you are not re-creating Topic objects and don't use destroyed/deleted objects.

@morningman
Copy link
Author

There is not enough information to go on.

Please provide:

  • the assert reason itself (should be printed to stderr or stdout)
  • when is this happening? Are there any other events (disconnects, rebalances, etc) at the same time?
  • logs
  • check your object usage: make sure you are not re-creating Topic objects and don't use destroyed/deleted objects.

Thanks a lot! I will check my program to see if more info can be provided!

@morningman
Copy link
Author

Hi @edenhill I traced the code and find when calling
https://github.com/edenhill/librdkafka/blob/c0cfc24cce8b77a1e42d9668dd1cfd015bbf0d6b/src/rdkafka_metadata.c#L863

And if run into this branch:
https://github.com/edenhill/librdkafka/blob/c0cfc24cce8b77a1e42d9668dd1cfd015bbf0d6b/src/rdkafka_metadata.c#L880-L899

The rkb is got from rd_kafka_broker_any_usable(), and the destroy_rkb will be set to 1.
And finally:

https://github.com/edenhill/librdkafka/blob/c0cfc24cce8b77a1e42d9668dd1cfd015bbf0d6b/src/rdkafka_metadata.c#L940-L941

I am not sure if the rkb which is got from rd_kafka_broker_any_usable() is certainly created by current thread?

Because when calling rd_kafka_broker_destroy_final()

https://github.com/edenhill/librdkafka/blob/c0cfc24cce8b77a1e42d9668dd1cfd015bbf0d6b/src/rdkafka_broker.c#L5389-L5396

The application crashed at rd_assert(thrd_is_current(rkb->rkb_thread));

Is it a possible reason of this problem?

@yueyingyehua
Copy link

Hi @edenhill, same here as well, and it's happend when restart program due to the process had not consumed data for a period of time.
and my program crash with following stack, I hope it can help you:

#6  <signal handler called>
#7  0x0000ffffa02d3200 in raise () from /lib64/libc.so.6
#8  0x0000ffffa02d45ac in abort () from /lib64/libc.so.6
#9  0x0000ffffa02cc60c in __assert_fail_base () from /lib64/libc.so.6
#10 0x0000ffffa02cc68c in __assert_fail () from /lib64/libc.so.6
#11 0x0000ffff9b4ccd30 in rd_kafka_broker_destroy_final (rkb=0xaaaadee22e60) at rdkafka_broker.c:5412
#12 0x0000ffff9b5529ac in rd_kafka_metadata_refresh_topics (rk=rk@entry=0xaaaadeea2140, rkb=0xaaaadee22e60, rkb@entry=0x0,
    topics=topics@entry=0xffff18ca9ae0, force=force@entry=1 '\001', allow_auto_create=0 '\000',
    cgrp_update=cgrp_update@entry=0 '\000', reason=reason@entry=0xffff9b600840 "broker down") at rdkafka_metadata.c:954
#13 0x0000ffff9b552d90 in rd_kafka_metadata_refresh_known_topics (rk=0xaaaadeea2140, rkb=rkb@entry=0x0, force=force@entry=1 '\001',
    reason=reason@entry=0xffff9b600840 "broker down") at rdkafka_metadata.c:994
#14 0x0000ffff9b4c86e4 in rd_kafka_broker_fail (rkb=rkb@entry=0xaaaadeea3d90, level=level@entry=7,
    err=err@entry=RD_KAFKA_RESP_ERR__TRANSPORT, fmt=fmt@entry=0xffff9b6022d8 "Closing connection due to nodename change")
    at rdkafka_broker.c:625
#15 0x0000ffff9b4d6040 in rd_kafka_broker_op_serve (rkb=rkb@entry=0xaaaadeea3d90, rko=0xaaaadf5ac450) at rdkafka_broker.c:3342
#16 0x0000ffff9b4d6e0c in rd_kafka_broker_ops_serve (rkb=rkb@entry=0xaaaadeea3d90, timeout_us=<optimized out>, timeout_us@entry=0)
    at rdkafka_broker.c:3378
#17 0x0000ffff9b4d6ef4 in rd_kafka_broker_ops_io_serve (rkb=rkb@entry=0xaaaadeea3d90, abs_timeout=<optimized out>)
    at rdkafka_broker.c:3433
#18 0x0000ffff9b4d76f0 in rd_kafka_broker_consumer_serve (rkb=rkb@entry=0xaaaadeea3d90, abs_timeout=abs_timeout@entry=281471097680320)
    at rdkafka_broker.c:5023
#19 0x0000ffff9b4d8924 in rd_kafka_broker_serve (rkb=rkb@entry=0xaaaadeea3d90, timeout_ms=<optimized out>, timeout_ms@entry=1000)
    at rdkafka_broker.c:5166
#20 0x0000ffff9b4d8fd4 in rd_kafka_broker_thread_main (arg=0xaaaadeea3d90) at rdkafka_broker.c:5328
#21 0x0000ffffa0a7e7dc in start_thread () from /lib64/libpthread.so.0
#22 0x0000ffffa03759fc in thread_start () from /lib64/libc.so.6


(gdb) p *(rd_kafka_broker_t *)0xaaaadee22e60
$3 = {rkb_link = {tqe_next = 0xaaaadeea3d90, tqe_prev = 0xaaaadee27a30}, rkb_nodeid = 0, rkb_rsal = 0xaaaadf4029a0, rkb_ts_rsal_last = 188244515693, rkb_addr_last = 0xaaaadf4029a8, rkb_transport = 0x0, rkb_corrid = 3965, 
  rkb_connid = 1, rkb_ops = 0xaaaadee23ab0, rkb_lock = {__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, rkb_blocking_max_ms = 0, rkb_toppars = {tqh_first = 0x0, tqh_last = 0xaaaadee22ee0}, 
  rkb_toppar_cnt = 0, rkb_active_toppars = {cqh_first = 0xaaaadee22ef8, cqh_last = 0xaaaadee22ef8}, rkb_active_toppar_cnt = 0, rkb_active_toppar_next = 0x0, rkb_cgrp = 0x0, rkb_ts_fetch_backoff = 0, rkb_fetching = 1, 
  rkb_state = RD_KAFKA_BROKER_STATE_DOWN, rkb_ts_state = 190226149453, rkb_timeout_scan_intvl = {ri_ts_last = 190225875214, ri_fixed = 0, ri_backoff = 0}, rkb_blocking_request_cnt = {val = 0}, rkb_features = 8191, 
  rkb_ApiVersions = 0xaaaadf41fac0, rkb_ApiVersions_cnt = 48, rkb_ApiVersion_fail_intvl = {ri_ts_last = 188244522436, ri_fixed = 0, ri_backoff = 0}, rkb_source = RD_KAFKA_CONFIGURED, rkb_c = {tx_bytes = {val = 499041}, tx = {
      val = 3965}, tx_err = {val = 0}, tx_retries = {val = 0}, req_timeouts = {val = 0}, rx_bytes = {val = 475700}, rx = {val = 3964}, rx_err = {val = 0}, rx_corrid_err = {val = 0}, rx_partial = {val = 0}, zbuf_grow = {
      val = 0}, buf_grow = {val = 0}, wakeups = {val = 7998}, connects = {val = 1}, disconnects = {val = 1}, reqtype = {{val = 0}, {val = 3958}, {val = 2}, {val = 1}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {
        val = 0}, {val = 1}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {val = 0}, {val = 1}, {val = 1}, {val = 0} <repeats 17 times>, {val = 1}, {val = 0} <repeats 22 times>}, ts_send = {val = 0}, ts_recv = {
      val = 0}}, rkb_req_timeouts = 0, rkb_thread = 281471072502208, rkb_refcnt = {val = 0}, rkb_rk = 0xaaaadeea2140, rkb_recv_buf = 0x0, rkb_max_inflight = 1000000, rkb_outbufs = {rkbq_bufs = {tqh_first = 0x0, 
      tqh_last = 0xaaaadee23210}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_waitresps = {rkbq_bufs = {tqh_first = 0x0, tqh_last = 0xaaaadee23228}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_retrybufs = {
    rkbq_bufs = {tqh_first = 0x0, tqh_last = 0xaaaadee23240}, rkbq_cnt = {val = 0}, rkbq_msg_cnt = {val = 0}}, rkb_avg_int_latency = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {
      __size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, 
      hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_outbuf_latency = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, 
      __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_rtt = {ra_v = {maxv = 0, 
      minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {__size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, 
      p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, hdrsize = 0, stddev = 0, mean = 0}}, rkb_avg_throttle = {ra_v = {maxv = 0, minv = 0, avg = 0, sum = 0, cnt = 0, start = 0}, ra_lock = {
      __size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, ra_enabled = 0, ra_type = RD_AVG_GAUGE, ra_hdr = 0x0, ra_hist = {p50 = 0, p75 = 0, p90 = 0, p95 = 0, p99 = 0, p99_99 = 0, oor = 0, 
      hdrsize = 0, stddev = 0, mean = 0}}, rkb_name = "sasl_ssl://193.1.93.77:9092/0\000otstrap", '\000' <repeats 218 times>, rkb_nodename = "193.1.93.77:9092", '\000' <repeats 239 times>, rkb_port = 9092, 
  rkb_origname = 0xaaaadeea7fb0 "193.1.93.77", rkb_nodename_epoch = 0, rkb_connect_epoch = 0, rkb_logname = 0xaaaadeea5b00 "sasl_ssl://193.1.93.77:9092/0", rkb_logname_lock = {
    __size = '\000' <repeats 17 times>, "\002", '\000' <repeats 29 times>, __align = 0}, rkb_wakeup_fd = {100, 101}, rkb_toppar_wakeup_fd = -1, rkb_reconnect_backoff_ms = 10000, rkb_ts_reconnect = 188251240592, 
  rkb_persistconn = {internal = 0, coord = {val = 0}}, rkb_monitors = {tqh_first = 0x0, tqh_last = 0xaaaadee237c8}, rkb_coord_monitor = {rkbmon_link = {tqe_next = 0x0, tqe_prev = 0xaaaadee237c8}, rkbmon_rkb = 0x0, 
    rkbmon_q = 0xaaaadeea3160, rkbmon_cb = 0xffff9b58caa0 <rd_kafka_coord_rkb_monitor_cb>}, rkb_proto = RD_KAFKA_PROTO_SASL_SSL, rkb_down_reported = 1, rkb_sasl_kinit_refresh_tmr = {rtmr_link = {tqe_next = 0x0, 
      tqe_prev = 0x0}, rtmr_next = 0, rtmr_interval = 0, rtmr_oneshot = 0 '\000', rtmr_callback = 0x0, rtmr_arg = 0x0}, rkb_suppress = {unsupported_compression = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}, 
    unsupported_kip62 = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}, unsupported_kip345 = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}, fail_error = {ri_ts_last = 0, ri_fixed = 0, ri_backoff = 0}}, rkb_last_err = {
    errstr = "Broker handle is terminating", '\000' <repeats 483 times>, err = RD_KAFKA_RESP_ERR__DESTROY, cnt = 1}}
(gdb) info threads
  Id   Target Id                         Frame 
* 1    Thread 0xffff18cab1c0 (LWP 16597) 0x0000ffff9b5529ac in rd_kafka_metadata_refresh_topics (rk=rk@entry=0xaaaadeea2140, rkb=0xaaaadee22e60, rkb@entry=0x0, topics=topics@entry=0xffff18ca9ae0, force=force@entry=1 '\001', 
    allow_auto_create=0 '\000', cgrp_update=cgrp_update@entry=0 '\000', reason=reason@entry=0xffff9b600840 "broker down") at rdkafka_metadata.c:954
  2    Thread 0xffff9df761c0 (LWP 15599) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
  3    Thread 0xffff9e7771c0 (LWP 15598) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
  4    Thread 0xffff922a71c0 (LWP 15621) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
  5    Thread 0xffff9536f1c0 (LWP 15614) 0x0000ffffa036e76c in select () from /lib64/libc.so.6
  6    Thread 0xffff93b2b1c0 (LWP 15617) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
  7    Thread 0xffff9332a1c0 (LWP 15618) 0x0000ffffa0a877bc in do_futex_wait () from /lib64/libpthread.so.0
  8    Thread 0xffff94b2d1c0 (LWP 15615) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
  9    Thread 0xffff9432c1c0 (LWP 15616) 0x0000ffffa036e76c in select () from /lib64/libc.so.6
  10   Thread 0xffff8e29f1c0 (LWP 15629) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
  11   Thread 0xffff91aa61c0 (LWP 15622) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
  12   Thread 0xffff90aa41c0 (LWP 15624) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0
  13   Thread 0xffff8eaa01c0 (LWP 15628) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  14   Thread 0xffff8da9e1c0 (LWP 15630) 0x0000ffffa0a7fb2c in __pthread_timedjoin_ex () from /lib64/libpthread.so.0
  15   Thread 0xffff932a91c0 (LWP 15619) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
  16   Thread 0xffffa0ad6010 (LWP 15577) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  17   Thread 0xffff8ba9a1c0 (LWP 15634) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  18   Thread 0xffff8f2a11c0 (LWP 15627) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
  19   Thread 0xffff95ebf1c0 (LWP 15613) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  20   Thread 0xffff8c29b1c0 (LWP 15633) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  21   Thread 0xffff8ca9c1c0 (LWP 15632) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  22   Thread 0xffff844681c0 (LWP 15659) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  23   Thread 0xffff8a2971c0 (LWP 15637) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  24   Thread 0xffff8b2991c0 (LWP 15635) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  25   Thread 0xffff8d29d1c0 (LWP 15631) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  26   Thread 0xffff912a51c0 (LWP 15623) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
  27   Thread 0xffff8aa981c0 (LWP 15636) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  28   Thread 0xffff88a941c0 (LWP 15651) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  29   Thread 0xffff80c611c0 (LWP 16063) 0x0000ffffa0a84c38 in pthread_cond_wait@@GLIBC_2.17 () from /lib64/libpthread.so.0
  30   Thread 0xffff87a921c0 (LWP 15653) 0x0000ffffa0a7fb2c in __pthread_timedjoin_ex () from /lib64/libpthread.so.0
  31   Thread 0xffff89a961c0 (LWP 15638) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  32   Thread 0xffff892951c0 (LWP 15639) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  33   Thread 0xffff804601c0 (LWP 16064) 0x0000ffffa0a84c38 in pthread_cond_wait@@GLIBC_2.17 () from /lib64/libpthread.so.0
  34   Thread 0xffff814621c0 (LWP 16062) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  35   Thread 0xffff7e45c1c0 (LWP 16068) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  36   Thread 0xffff92aa81c0 (LWP 15620) 0x0000ffffa0375b24 in epoll_pwait () from /lib64/libc.so.6
  37   Thread 0xffff8528d1c0 (LWP 15658) 0x0000ffffa036e76c in select () from /lib64/libc.so.6
  38   Thread 0xffff7ec5d1c0 (LWP 16067) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  39   Thread 0xffff902a31c0 (LWP 15625) 0x0000ffffa0a87540 in do_futex_wait.constprop () from /lib64/libpthread.so.0
  40   Thread 0xffff81c631c0 (LWP 16061) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  41   Thread 0xffff7f45e1c0 (LWP 16066) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  42   Thread 0xffff7dc5b1c0 (LWP 16069) 0x0000ffffa0346538 in nanosleep () from /lib64/libc.so.6
  43   Thread 0xffff2de3b1c0 (LWP 16569) 0x0000ffffa0a88ec0 in nanosleep () from /lib64/libpthread.so.0

@trueeyu
Copy link

trueeyu commented Dec 23, 2021

following stack, I hop

I'm trying to solve this problem, can you tell me how to reproduce it

@yueyingyehua
Copy link

following stack, I hop

I'm trying to solve this problem, can you tell me how to reproduce it

I'm sorry, I have no idea about how to reproduce it. It happens occasionally.

@yangzhg
Copy link

yangzhg commented Feb 23, 2022

@edenhill I am also troubled by this problem, can you provide some help?

@edenhill
Copy link
Contributor

I'm guessing this only happens on client termination, rd_kafka_destroy(), correct?

@mensfeld
Copy link

I've seen it happen once as well. Cannot reproduce though and all I got is:

 rdkafka_broker.c:4633: rd_kafka_broker_destroy_final: Assertion `thrd_is_current(rkb->rkb_thread)' failed.

@mensfeld
Copy link

It happens on 2.2.0 via rdkafka-ruby in the context of a short lived consumer without a consumer group assigned. I use it to read data in a one-shot.

@emasab
Copy link
Contributor

emasab commented Aug 14, 2023

I need to reproduce it with a test, but I think here the problem comes from the assert that checks that the thread that calls destroy final for that broker is the same broker thread.
Given the struct is ref-counted, it can be that the count reaches zero in another thread, after broker thread already exited.

In previous stack trace destroy is called from rd_kafka_broker_thread_main, but given rd_kafka_metadata_refresh_topics calls rd_kafka_broker_any_usable, it can be a different broker thread.

@emasab emasab added the bug label Aug 14, 2023
@mensfeld
Copy link

mensfeld commented Aug 14, 2023

@emasab I was not able to reproduce it but I wonder if running rd_kafka_unsubscribe before closing would not mitigate the case of running metadata refresh on brokers alongside?

@emasab
Copy link
Contributor

emasab commented Aug 14, 2023

@mensfeld probably not, because the metadata refresh can happen independently from subscription, with a producer too.

About the fix, it could be removing the assert completely or setting rkb_thread to NULL when thread exits and don't fail the assert in that case.

@mensfeld
Copy link

probably not, because the metadata refresh can happen independently from subscription, with a producer too.

Thanks. Two more questions as a followup:

  1. If I unsubscribe, wouldn't it limit number of brokers with which I am connected? Hence minimizing number of brokers that can be refreshed during the shutdown?
  2. Can a refresh be triggered so it would not refresh during the shutdown?

I would love to at least partially mitigate this prior to having a fix (or even a repro).

I will try to reproduce it as well.

@mensfeld
Copy link

@emasab can I mitigate it (at least partially) by checking the statistics age of metadata for brokers and "waiting" out the refresh if the time is too close to it? If I know the frequency (which is user controller) and I do know the age (if this is what statistics publish), putting aside edge cases (cluster changes that would trigger refresh), the periodic one could we "waited out", right?

@atul-raghuwanshi
Copy link

@edenhill @mensfeld Any update here. We faced the similar issue of assert fail while calling consumer close after unsubscribe. It resulted in a critical application getting down. Would also be ok if someone could suggest a quick solution meanwhile it is fixed.

@mensfeld
Copy link

@atul-raghuwanshi you can use statistics to figure out the metadata refresh time (putting aside cluster changes ofc) and make sure you do not close around it

@Long-Wu-code
Copy link

I have same problem, is there anynoe slove this problem?

@JSoet
Copy link

JSoet commented May 24, 2024

We have a customer who has also ran into this problem a few times when using a newer version of our software which uses librdkafka 1.9.2. Unfortunately we haven't been able to reproduce the issue ourselves in our tests. However, the customer had previously been using an older version of our software that was using librdkafka 1.3.0, and hadn't run into it, so we gave them a newer version of our software with that older library and they haven't been able to reproduce it. So it seems that the issue was introduced somewhere between version 1.3.0 and 1.9.2

@atul-raghuwanshi
Copy link

@Long-Wu-code @JSoet we used a force metadata fetch before closing consumer to bypass the issue and that seems to work for our case. SInce then we have not encountered the issue.

@JSoet
Copy link

JSoet commented May 24, 2024

@atul-raghuwanshi
Thanks, so you just do the metadata fetch and then are able to immediately close? You then don't need to do anything with the statistics and calculating exactly when to close, as I think was suggested above by @mensfeld ?

@JSoet
Copy link

JSoet commented Jun 24, 2024

@atul-raghuwanshi
Just wanted to check if you saw my question above? Just curious if that's all that you had to do...

@lazern
Copy link

lazern commented Jan 24, 2025

We have a customer who has also ran into this problem a few times when using a newer version of our software which uses librdkafka 2.4 , earlier we were using librdkafka 0.11 which didnt had this issue. Unfortunatley We are not able to reproduce the issue yet. Is there a fix yet for this issue ?

@lazern
Copy link

lazern commented Jan 24, 2025

@JSoet were you able to apply the workaround mentioned by @atul-raghuwanshi . Is it working

@JSoet
Copy link

JSoet commented Jan 28, 2025

@lazern
We weren't able to test the reproduction steps. It was a customer of ours who ran into this issue, and we were never able to reproduce it locally despite repeated attempts. We ended up shipping the customer a version of our software with librdkafka 1.3.0 before finding out about the workaround, and haven't been able to test the workaround yet.

@lazern
Copy link

lazern commented Jan 29, 2025

@atul-raghuwanshi Just want to re-iterate the question @JSoet asked, about the workaround "metadata fetch and then are able to immediately close? You then don't need to do anything with the statistics and calculating exactly when to close"

@lazern
Copy link

lazern commented Feb 7, 2025

No luck by force metadata fetch before closing consumer. Still crash occurs. @edenhill @mensfeld @emasab is there any solution identified for this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests