Skip to content

Consumer close() function hungs and prevent app exit #1573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
odemkovych opened this issue May 26, 2023 · 7 comments
Closed

Consumer close() function hungs and prevent app exit #1573

odemkovych opened this issue May 26, 2023 · 7 comments
Assignees

Comments

@odemkovych
Copy link

I'm trying to resolve an issue I've faced with a consumer finalization:
A consumer had subscribed to a Topic and was processing data from the Topic. Then the Topic was deleted.
After some time the Consumer received error about non-existing topic and my application tried to perform consumer.close()
But unfortunately, the close() hang.
I've tried to investigate it a bit and see that it stuck at some wait lock "futex(0x7fc6f2ffd9d0, FUTEX_WAIT, 32, NULL"
I am not familiar much with rdkafka or confluent-kafka-python projects but see one inconsistency:

here rd_kafka_queue_destroy is called after consumer close

rd_kafka_queue_destroy(self->u.Consumer.rkqu);

but documentation says that it should be called before
https://github.com/confluentinc/librdkafka/blob/4b63c6c8881968f7185da33f09cbd69561bb612c/src/rdkafka.h#L3432

@odemkovych
Copy link
Author

versions:
confluent-kafka 2.1.1
python 3.8

@odemkovych
Copy link
Author

I wonder if somebody can provide some reply. I need to understand if this issue makes sense and some refactoring could be done is scope of it.

@pranavrth
Copy link
Member

Can you provide code snippet that you are using? I will try to replicate the issue.

@pranavrth pranavrth self-assigned this Jun 7, 2023
@odemkovych
Copy link
Author

I will try to create a sample.

Here is strace and gdb log just in case:

root@agent-2:~# cat /tmp/strace.txt
futex(0x7fc6f2ffd9d0, FUTEX_WAIT, 32, NULL

(gdb) info threads
Id Target Id Frame
1 Thread 0x7fc6fd83b740 (LWP 24) "python3" __pthread_clockjoin_ex (threadid=140492457105152, thread_return=thread_return@entry=0x7ffcccc5a490, clockid=clockid@entry=0,
abstime=abstime@entry=0x0, block=block@entry=true) at pthread_join_common.c:145
2 Thread 0x7fc6fa75e700 (LWP 26) "python3" 0x00007fc6fdb1999f in __GI___poll (fds=0x7fc6fa8842e0, nfds=1, timeout=500) at ../sysdeps/unix/sysv/linux/poll.c:29
3 Thread 0x7fc6f9f5d700 (LWP 27) "rdk:main" futex_abstimed_wait_cancelable (private=, abstime=0x7fc6f9f5ccc0, clockid=, expected=0,
futex_word=0x2ab2764) at ../sysdeps/nptl/futex-internal.h:320

  • 4 Thread 0x7fc6f975c700 (LWP 28) "rdk:broker-1" futex_abstimed_wait_cancelable (private=, abstime=0x7fc6f975b8d0, clockid=, expected=0,
    futex_word=0x2bf95a0) at ../sysdeps/nptl/futex-internal.h:320
    5 Thread 0x7fc6f8f5b700 (LWP 29) "rdk:broker-1" 0x00007fc6fdb1999f in __GI___poll (fds=0x7fc6e80194c8, nfds=2, timeout=1000) at ../sysdeps/unix/sysv/linux/poll.c:29
    6 Thread 0x7fc6f3fff700 (LWP 30) "rdk:broker1001" 0x00007fc6fdb1999f in __GI___poll (fds=0x7fc6e4006028, nfds=2, timeout=1000) at ../sysdeps/unix/sysv/linux/poll.c:29
    7 Thread 0x7fc6f2ffd700 (LWP 32) "rdk:main" __pthread_clockjoin_ex (threadid=140492440319744, thread_return=thread_return@entry=0x7fc6f2ffcd00, clockid=clockid@entry=0,
    abstime=abstime@entry=0x0, block=block@entry=true) at pthread_join_common.c:145
    8 Thread 0x7fc6f1ffb700 (LWP 34) "rdk:broker1001" futex_abstimed_wait_cancelable (private=, abstime=0x7fc6f1ffa760, clockid=, expected=0,
    futex_word=0x2c10c14) at ../sysdeps/nptl/futex-internal.h:320
    (gdb) thread apply all bt

Thread 8 (Thread 0x7fc6f1ffb700 (LWP 34)):
#0 futex_abstimed_wait_cancelable (private=, abstime=0x7fc6f1ffa760, clockid=, expected=0, futex_word=0x2c10c14) at ../sysdeps/nptl/futex-internal.h:320
#1 __pthread_cond_wait_common (abstime=0x7fc6f1ffa760, clockid=, mutex=0x2c10bc0, cond=0x2c10be8) at pthread_cond_wait.c:520
#2 __pthread_cond_timedwait (cond=0x2c10be8, mutex=0x2c10bc0, abstime=0x7fc6f1ffa760) at pthread_cond_wait.c:665
#3 0x00007fc6fd9f9f8d in cnd_timedwait (cond=, mutex=, time_point=) at cnd_timedwait.c:25
#4 0x00007fc6fbebfc1f in rd_kafka_q_pop_serve () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#5 0x00007fc6fbea05ef in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#6 0x00007fc6fbea0699 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#7 0x00007fc6fbea12b3 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#8 0x00007fc6fbea19ab in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#9 0x00007fc6fbea21e9 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#10 0x00007fc6fd9ec6da in start_thread (arg=) at pthread_create.c:474
#11 0x00007fc6fdb26133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 7 (Thread 0x7fc6f2ffd700 (LWP 32)):
#0 __pthread_clockjoin_ex (threadid=140492440319744, thread_return=thread_return@entry=0x7fc6f2ffcd00, clockid=clockid@entry=0, abstime=abstime@entry=0x0, block=block@entry=true) at pthread_join_common.c:145
#1 0x00007fc6fd9f9b7e in thrd_join (thr=, res=0x7fc6f2ffcd4c) at thrd_join.c:25
#2 0x00007fc6fbe8ffae in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#3 0x00007fc6fbe90af0 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#4 0x00007fc6fd9ec6da in start_thread (arg=) at pthread_create.c:474
#5 0x00007fc6fdb26133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 6 (Thread 0x7fc6f3fff700 (LWP 30)):
#0 0x00007fc6fdb1999f in __GI___poll (fds=0x7fc6e4006028, nfds=2, timeout=1000) at ../sysdeps/unix/sysv/linux/poll.c:29
#1 0x00007fc6fbeb8aae in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#2 0x00007fc6fbeba7b0 in rd_kafka_transport_io_serve () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#3 0x00007fc6fbea09cf in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#4 0x00007fc6fbea1820 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#5 0x00007fc6fbea204d in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
--Type for more, q to quit, c to continue without paging--
#6 0x00007fc6fd9ec6da in start_thread (arg=) at pthread_create.c:474
#7 0x00007fc6fdb26133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 5 (Thread 0x7fc6f8f5b700 (LWP 29)):
#0 0x00007fc6fdb1999f in __GI___poll (fds=0x7fc6e80194c8, nfds=2, timeout=1000) at ../sysdeps/unix/sysv/linux/poll.c:29
#1 0x00007fc6fbeb8aae in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#2 0x00007fc6fbeba7b0 in rd_kafka_transport_io_serve () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#3 0x00007fc6fbea09cf in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#4 0x00007fc6fbea1820 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#5 0x00007fc6fbea204d in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#6 0x00007fc6fd9ec6da in start_thread (arg=) at pthread_create.c:474
#7 0x00007fc6fdb26133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 4 (Thread 0x7fc6f975c700 (LWP 28)):
#0 futex_abstimed_wait_cancelable (private=, abstime=0x7fc6f975b8d0, clockid=, expected=0, futex_word=0x2bf95a0) at ../sysdeps/nptl/futex-internal.h:320
#1 __pthread_cond_wait_common (abstime=0x7fc6f975b8d0, clockid=, mutex=0x2bf9550, cond=0x2bf9578) at pthread_cond_wait.c:520
#2 __pthread_cond_timedwait (cond=0x2bf9578, mutex=0x2bf9550, abstime=0x7fc6f975b8d0) at pthread_cond_wait.c:665
#3 0x00007fc6fd9f9f8d in cnd_timedwait (cond=, mutex=, time_point=) at cnd_timedwait.c:25
#4 0x00007fc6fbebfc1f in rd_kafka_q_pop_serve () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#5 0x00007fc6fbea05ef in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#6 0x00007fc6fbea0699 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#7 0x00007fc6fbea1b79 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#8 0x00007fc6fbea21e9 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#9 0x00007fc6fd9ec6da in start_thread (arg=) at pthread_create.c:474
#10 0x00007fc6fdb26133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 3 (Thread 0x7fc6f9f5d700 (LWP 27)):
#0 futex_abstimed_wait_cancelable (private=, abstime=0x7fc6f9f5ccc0, clockid=, expected=0, futex_word=0x2ab2764) at ../sysdeps/nptl/futex-internal.h:320
#1 __pthread_cond_wait_common (abstime=0x7fc6f9f5ccc0, clockid=, mutex=0x2ab2710, cond=0x2ab2738) at pthread_cond_wait.c:520
#2 __pthread_cond_timedwait (cond=0x2ab2738, mutex=0x2ab2710, abstime=0x7fc6f9f5ccc0) at pthread_cond_wait.c:665
#3 0x00007fc6fd9f9f8d in cnd_timedwait (cond=, mutex=, time_point=) at cnd_timedwait.c:25
--Type for more, q to quit, c to continue without paging--
#4 0x00007fc6fbebfe4f in rd_kafka_q_serve () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#5 0x00007fc6fbe90784 in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#6 0x00007fc6fd9ec6da in start_thread (arg=) at pthread_create.c:474
#7 0x00007fc6fdb26133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 2 (Thread 0x7fc6fa75e700 (LWP 26)):
#0 0x00007fc6fdb1999f in __GI___poll (fds=0x7fc6fa8842e0, nfds=1, timeout=500) at ../sysdeps/unix/sysv/linux/poll.c:29
#1 0x0000000000636af5 in ?? ()
#2 0x0000000000505163 in ?? ()
#3 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#4 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#5 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#6 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#7 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#8 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#9 0x000000000050b32c in ?? ()
#10 0x00000000005f52b2 in PyObject_Call ()
#11 0x000000000056d2bc in _PyEval_EvalFrameDefault ()
#12 0x00000000005f5ee6 in _PyFunction_Vectorcall ()
#13 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#14 0x00000000005f5ee6 in _PyFunction_Vectorcall ()
#15 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#16 0x00000000005f5ee6 in _PyFunction_Vectorcall ()
#17 0x000000000050b32c in ?? ()
#18 0x00000000005f52b2 in PyObject_Call ()
#19 0x00000000006568ec in ?? ()
#20 0x0000000000677e08 in ?? ()
#21 0x00007fc6fd9ec609 in start_thread (arg=) at pthread_create.c:477
#22 0x00007fc6fdb26133 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Thread 1 (Thread 0x7fc6fd83b740 (LWP 24)):
--Type for more, q to quit, c to continue without paging--
#0 __pthread_clockjoin_ex (threadid=140492457105152, thread_return=thread_return@entry=0x7ffcccc5a490, clockid=clockid@entry=0, abstime=abstime@entry=0x0, block=block@entry=true) at pthread_join_common.c:145
#1 0x00007fc6fd9f9b7e in thrd_join (thr=, res=0x7ffcccc5a4bc) at thrd_join.c:25
#2 0x00007fc6fbe8eddc in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/../confluent_kafka.libs/librdkafka-fe5c3862.so.1
#3 0x00007fc6fc786dca in ?? () from /opt/venvs/agent/lib/python3.8/site-packages/confluent_kafka/cimpl.cpython-38-x86_64-linux-gnu.so
#4 0x0000000000504b7b in ?? ()
#5 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#6 0x00000000005f5ee6 in _PyFunction_Vectorcall ()
#7 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#8 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#9 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#10 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#11 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#12 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#13 0x00000000005f52b2 in PyObject_Call ()
#14 0x000000000056d2bc in _PyEval_EvalFrameDefault ()
#15 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#16 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#17 0x00000000005f52b2 in PyObject_Call ()
#18 0x000000000056d2bc in _PyEval_EvalFrameDefault ()
#19 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#20 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#21 0x000000000050b32c in ?? ()
#22 0x00000000005f52b2 in PyObject_Call ()
#23 0x000000000056d2bc in _PyEval_EvalFrameDefault ()
#24 0x00000000005f5ee6 in _PyFunction_Vectorcall ()
#25 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#26 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#27 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#28 0x000000000056bbe1 in _PyEval_EvalFrameDefault ()
#29 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
--Type for more, q to quit, c to continue without paging--
#30 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#31 0x00000000005f52b2 in PyObject_Call ()
#32 0x000000000056d2bc in _PyEval_EvalFrameDefault ()
#33 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#34 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#35 0x000000000050b32c in ?? ()
#36 0x00000000005f52b2 in PyObject_Call ()
#37 0x000000000056d2bc in _PyEval_EvalFrameDefault ()
#38 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#39 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#40 0x000000000059cbaf in ?? ()
#41 0x00000000005f54ae in PyObject_Call ()
#42 0x000000000056d2bc in _PyEval_EvalFrameDefault ()
#43 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#44 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#45 0x000000000059cb2e in ?? ()
#46 0x00000000005f6706 in _PyObject_MakeTpCall ()
#47 0x0000000000570d34 in _PyEval_EvalFrameDefault ()
#48 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#49 0x000000000068e267 in PyEval_EvalCode ()
#50 0x0000000000600704 in ?? ()
#51 0x00000000005c4790 in ?? ()
#52 0x000000000056bab6 in _PyEval_EvalFrameDefault ()
#53 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#54 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#55 0x000000000056bab6 in _PyEval_EvalFrameDefault ()
#56 0x0000000000569d8a in _PyEval_EvalCodeWithName ()
#57 0x00000000005f60c3 in _PyFunction_Vectorcall ()
#58 0x00000000005f52b2 in PyObject_Call ()
#59 0x00000000006b7de2 in ?? ()
#60 0x00000000006b81e9 in Py_RunMain ()
--Type for more, q to quit, c to continue without paging--
#61 0x00000000006b840d in Py_BytesMain ()
#62 0x00007fc6fda2b083 in __libc_start_main (main=0x4ef3b0

, argc=6, argv=0x7ffcccc5d068, init=, fini=, rtld_fini=, stack_end=0x7ffcccc5d058) at ../csu/libc-start.c:308
#63 0x00000000005faa2e in _start ()
(gdb)

@odemkovych
Copy link
Author

@pranavrth could you please have a look to the code I've provided in this issue's description. What do you think is it a bug or just wrong documentation about 'rd_kafka_queue_destroy' call order?

@pranavrth
Copy link
Member

The documentation is right and the code is also right. As far as I understand, queue reference is created in the Python app and hence it is cleared after destroying the consumer. This issue is not caused because of the ordering.

Can you provide your code which is causing the issue?

@odemkovych
Copy link
Author

I've tried to create small sample but can't reproduce it quickly. Originally I've reproduced it when consumer worked and topic was deleted. if the code order is right we can close this item. I will create new issue with a code sample when I am able to reproduce it in more or less permanent way.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants