Skip to content

Consumer deadlock in coordinator when heartbeat request timeout #1985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
huangcuiyang opened this issue Jan 15, 2020 · 14 comments
Closed

Consumer deadlock in coordinator when heartbeat request timeout #1985

huangcuiyang opened this issue Jan 15, 2020 · 14 comments

Comments

@huangcuiyang
Copy link
Contributor

huangcuiyang commented Jan 15, 2020

All my consumers in group is disconnected, and i find that consumer process meet a deadlock in coordinator. i want to fix the deadlock by upgrade sdk version from 1.4.1 to 1.4.7, but it happed again...help me @dpkp
hung in coordinator/base.py:1006 _handle_heartbeat_failure().self.coordinator._lock

  • kafka-python version is 1.4.7
  • Show the gdb bt in thread 1
(gdb) thread 1
[Switching to thread 1 (Thread 0x7fe31915a740 (LWP 26))]
#0  0x00007fe31896a79b in futex_abstimed_wait (cancel=true, private=<optimized out>, abstime=0x0, expected=0, 
    futex=0x2662670) at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:43
43	      err = lll_futex_wait (futex, expected, private);
(gdb) bt
#0  0x00007fe31896a79b in futex_abstimed_wait (cancel=true, private=<optimized out>, abstime=0x0, expected=0, 
    futex=0x2662670) at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:43
#1  do_futex_wait (sem=sem@entry=0x2662670, abstime=0x0) at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:223
#2  0x00007fe31896a82f in __new_sem_wait_slow (sem=0x2662670, abstime=0x0)
    at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:292
#3  0x00007fe31896a8cb in __new_sem_wait (sem=<optimized out>) at ../nptl/sysdeps/unix/sysv/linux/sem_wait.c:28
#4  0x00007fe318c8a535 in PyThread_acquire_lock (lock=0x2662670, waitflag=1)
    at /usr/src/debug/Python-2.7.5/Python/thread_pthread.h:323
#5  0x00007fe318c8e1c2 in lock_PyThread_acquire_lock (self=0x24aa1b0, args=<optimized out>)
    at /usr/src/debug/Python-2.7.5/Modules/threadmodule.c:52
#6  0x00007fe318c5daf0 in call_function (oparg=<optimized out>, pp_stack=0x7ffd30399e30)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4408
#7  PyEval_EvalFrameEx (
    f=f@entry=Frame 0x27b5c20, for file /usr/lib64/python2.7/threading.py, line 173, in acquire (self=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, blocking=1, me=140613355153216), throwflag=throwflag@entry=0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#8  0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=1, kws=0x129c058, kwcount=0, defs=0x7fe318fae7e8, defcount=1, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#9  0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039a040, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#10 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039a040)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#11 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x129bed0, for file /usr/lib64/python2.7/threading.py, line 285, in __enter__ (self=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#12 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=args@entry=0x25922e8, argcount=1, kws=kws@entry=0x0, kwcount=kwcount@entry=0, defs=defs@entry=0x0, 
    defcount=defcount@entry=0, closure=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#13 0x00007fe318be9798 in function_call (func=<function at remote 0x7fe318fc0b18>, 
    arg=(<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>,), kw=0x0) at /usr/src/debug/Python-2.7.5/Objects/funcobject.c:526
#14 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<function at remote 0x7fe318fc0b18>, 
    arg=arg@entry=(<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLo---Type <return> to continue, or q <return> to quit---
ck__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>,), kw=kw@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#15 0x00007fe318bd38d5 in instancemethod_call (func=<function at remote 0x7fe318fc0b18>, 
    arg=(<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>,), kw=0x0) at /usr/src/debug/Python-2.7.5/Objects/classobject.c:2602
#16 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<instancemethod at remote 0x25ca6e0>, arg=arg@entry=(), 
    kw=kw@entry=0x0) at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#17 0x00007fe318bc51dc in PyObject_CallFunctionObjArgs (
    callable=callable@entry=<instancemethod at remote 0x25ca6e0>)
    at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2760
#18 0x00007fe318c5825b in PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2b9bff0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/coordinator/base.py, line 1006, in _handle_heartbeat_failure (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x---Type <return> to continue, or q <return> to quit---
24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thread__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at re...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:2928
#19 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=2, kws=0x7fe304004430, kwcount=0, defs=0x0, defcount=0, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#20 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039a8a0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#21 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039a8a0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#22 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7fe304004270, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/future.py, line 79, in _call_backs (self=<Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<instancemethod at remote 0x2495910>], value=None, _callbacks=[<instancemethod at remote 0x21ba0f0>]) at remote 0x7fe30c08a690>, back_type='errback', backs=[...], value=<...>, f=<instancemethod at remote 0x2495910>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#23 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=4, kws=0x284f048, kwcount=0, defs=0x0, defcount=0, closure=closure@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#24 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039aab0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#25 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039aab0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#26 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x284eea0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/future.py, line 45, in failure (self=<Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<instancemethod at remote 0x2495910>], value=None, _callbacks=[<instancemethod at remote 0x21ba0f0>]) at remote 0x7fe30c08a690>, e=<...>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#27 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=2, kws=0x138fe70, kwcount=0, defs=0x0, defcount=0, closure=closure@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#28 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039acc0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#29 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039acc0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#30 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x138fcc0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/coordinator/base.py, line 499, in _failed_request (self=<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x24e3520>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_---Type <return> to continue, or q <return> to quit---
id=None, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>, _generation=<Generation(member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', protocol=u'range', generation_id=663) at remote 0x7fe3...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#31 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=args@entry=0x25311e8, argcount=5, kws=kws@entry=0x7fe31911a068, kwcount=kwcount@entry=0, 
    defs=defs@entry=0x0, defcount=defcount@entry=0, closure=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#32 0x00007fe318be988d in function_call (func=<function at remote 0x19897d0>, 
    arg=(<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x24e3520>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=None, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>, _generation=<Generation(member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', protocol=u'range', generation_id=663) at remote 0x7fe30c056850>, _cluster=<ClusterMetadata(_groups={'collect-aflogs-parser': 'coordinator-1'}, _coordinator_brokers={'coordinator-1': <BrokerMetadata at remote 0x2531...(truncated), kw={}) at /usr/src/debug/Python-2.7.5/Objects/funcobject.c:526
#33 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<function at remote 0x19897d0>, 
    arg=arg@entry=(<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x24e3520>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=None, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>, _generation=<Generation(member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', protocol=u'range', generation_id=663) at remote 0x7fe30c056850>, _cluster=<ClusterMetadata(_groups={'collect-aflogs-parser': 'coordinator-1'}, _coordinator_brokers={'coordinator-1': <BrokerMetadata at remote 0x2531...(truncated), kw=kw@entry={}) at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#34 0x00007fe318bd38d5 in instancemethod_call (func=<function at remote 0x19897d0>, 
    arg=(<ConsumerCoordinator(completed_offset_commits=<collections.deque at remote 0x24e3520>, _is_leader=False, rejoining=False, rejoin_needed=False, join_future=None, coordinator_id=None, _assignment_snapshot=None, _lock=<_Condition(_Condition__lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613085271808, _RLock__block=<thread.lock at remote 0x24aa1b0>, _RLock__count=1) at remote 0x24bef90>, acquire=<instancemethod at remote 0x224bf50>, _is_owned=<instancemethod at remote 0x24407d0>, _release_save=<instancemethod at remote 0x23d45f0>, release=<instancemethod at remote 0x226c1e0>, _acquire_restore=<instancemethod at remote 0x226c190>, _Verbose__verbose=False, _Condition__waiters=[]) at remote 0x24bef50>, _generation=<Generation(member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', protocol=u'range', generation_id=663) at remote 0x7fe30c056850>, _cluster=<ClusterMetadata(_groups={'collect-aflogs-parser': 'coordinator-1'}, _coordinator_brokers={'coordinator-1': <BrokerMetadata at remote 0x2531...(truncate---Type <return> to continue, or q <return> to quit---
d), kw={}) at /usr/src/debug/Python-2.7.5/Objects/classobject.c:2602
#35 0x00007fe318bc48e3 in PyObject_Call (func=<instancemethod at remote 0x2468aa0>, 
    arg=arg@entry=('coordinator-1', <HeartbeatRequest_v1(encode=<WeakMethod(_target_id=140613136197072, _method_id=22846832, target=<weakref at remote 0x24c1a48>, method=<weakref at remote 0x24c17e0>) at remote 0x7fe30c08ae50>, member_id=u'soapa_collect_proxy-57ffdc60-08f7-4bcd-9860-4b4b82422fc9', group='collect-aflogs-parser', generation_id=663) at remote 0x7fe30c08a5d0>, <Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<instancemethod at remote 0x2495910>], value=None, _callbacks=[<instancemethod at remote 0x21ba0f0>]) at remote 0x7fe30c08a690>, <...>), kw=kw@entry={}) at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#36 0x00007fe31673e8f1 in partial_call (pto=0x24c16d8, args=<optimized out>, kw=0x0)
    at /usr/src/debug/Python-2.7.5/Modules/_functoolsmodule.c:205
#37 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<functools.partial at remote 0x24c16d8>, 
    arg=arg@entry=(<RequestTimedOutError at remote 0x25ca5a0>,), kw=kw@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#38 0x00007fe318c59036 in do_call (nk=<optimized out>, na=<optimized out>, pp_stack=0x7ffd3039b290, 
    func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4626
#39 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039b290)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4431
#40 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2b98ca0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/future.py, line 79, in _call_backs (self=<Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<functools.partial at remote 0x24c16d8>], value=None, _callbacks=[<functools.partial at remote 0x24c1---Type <return> to continue, or q <return> to quit---
520>]) at remote 0x24cb610>, back_type='errback', backs=[...], value=<...>, f=<functools.partial at remote 0x24c16d8>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#41 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=4, kws=0x2738118, kwcount=0, defs=0x0, defcount=0, closure=closure@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#42 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039b4a0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#43 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039b4a0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#44 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2737f70, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/future.py, line 45, in failure (self=<Future(exception=<RequestTimedOutError at remote 0x25ca5a0>, is_done=True, _errbacks=[<functools.partial at remote 0x24c16d8>], value=None, _callbacks=[<functools.partial at remote 0x24c1520>]) at remote 0x24cb610>, e=<...>), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#45 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=2, kws=0x26914e8, kwcount=0, defs=0x0, defcount=0, closure=closure@entry=0x0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#46 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039b6b0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#47 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039b6b0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#48 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2691320, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/conn.py, line 887, in close (self=<BrokerConnection(_gai=[], _failures=1, afi=0, state='<disconnected>', _sensors=<BrokerConnectionMetrics(metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': 'soapa_collect_proxy'}, time_window_ms=30000) at remote 0x255fa50>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x157c908>, _samples=[<Sample(event_count=228, last_window_ms=<float at remote 0x7fe30c2a2890>, value=<float at remote 0x19e8cf28>, initial_value=<float at remote 0x157c908>) at remote 0x24be510>, <Sample(event_count=294, last_window_ms=<float at remote 0x7fe30c21a068>, value=<float at remote 0x7fe30c21a1a0>, initial_value=<float at remote 0x157c908>) at remote 0x24cb750>], _current=0) at remote 0x24b7950>, _unit=3) at remote 0x24b7990>, _metric_name=<MetricName(_...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#49 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=1, kws=0x2568328, kwcount=1, defs=0x17b10e8, defcount=1, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#50 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039b8c0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#51 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039b8c0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#52 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2568150, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/client_async.py, line 698, in _poll (self=<KafkaClient(_conns=<Dict at remote 0x265a9e0>, _sending=set([]), _idle---Type <return> to continue, or q <return> to quit---
_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x2641b68>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 4], [...], 2], [...], None], _OrderedDict__map={2: [...], 4: [...]}) at remote 0x265b2a0>, next_idle_close_check_time=<float at remote 0x7fe30c11fd38>) at remote 0x255fd10>, _closed=False, _pending_completion=<collections.deque at remote 0x218cc90>, _wake_r=<_socket.socket at remote 0x21aab40>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613355153216, _RLock__block=<thread.lock at remote 0x1ac1730>, _RLock__count=1) at remote 0x255fcd0>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMetric(_config=<MetricConfig(event_window=9223372036854775807, _samples=2,...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#53 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=2, kws=0x7fe30c0c4da0, kwcount=0, defs=0x0, defcount=0, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#54 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039bad0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#55 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039bad0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#56 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x7fe30c0c4bd0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/client_async.py, line 598, in poll (self=<KafkaClient(_conns=<Dict at remote 0x265a9e0>, _sending=set([]), _idle_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x2641b68>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 4], [...], 2], [...], None], _OrderedDict__map={2: [...], 4: [...]}) at remote 0x265b2a0>, next_idle_close_check_time=<float at remote 0x7fe30c11fd38>) at remote 0x255fd10>, _closed=False, _---Type <return> to continue, or q <return> to quit---
pending_completion=<collections.deque at remote 0x218cc90>, _wake_r=<_socket.socket at remote 0x21aab40>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613355153216, _RLock__block=<thread.lock at remote 0x1ac1730>, _RLock__count=1) at remote 0x255fcd0>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMetric(_config=<MetricConfig(event_window=9223372036854775807, _sample...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#57 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=1, kws=0x2683af0, kwcount=1, defs=0x14ae890, defcount=2, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#58 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039bce0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#59 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039bce0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#60 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2683930, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/consumer/group.py, line 692, in _poll_once (self=<KafkaConsumer(_metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': 'soapa_collect_proxy'}, time_window_ms=30000) at remote 0x255fa50>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x157c908>, _samples=[<Sample(event_count=228, last_window_ms=<float at remote 0x7fe30c2a2890>, value=<float at remote 0x19e8cf28>, initial_value=<float at remote 0x157c908>) at remote 0x24be510>, <Sample(event_count=294, last_window_ms=<float at remote 0x7fe30c21a068>, value=<float at remote 0x7fe30c21a1a0>, initial_value=<float at remote 0x157c908>) at remote 0x24cb750>], _current=0) at remote 0x24b7950>, _unit=3) at remote 0x---Type <return> to continue, or q <return> to quit---
24b7990>, _metric_name=<MetricName(_group='consumer-metrics', _description='The average number of network oper...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#61 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=3, kws=0x26838f0, kwcount=1, defs=0x18e6428, defcount=1, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#62 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039bef0, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#63 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039bef0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#64 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2683720, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/consumer/group.py, line 645, in poll (self=<KafkaConsumer(_metrics=<Metrics(_config=<MetricConfig(event_window=9223372036854775807, _samples=2, quota=None, tags={'client-id': 'soapa_collect_proxy'}, time_window_ms=30000) at remote 0x255fa50>, _children_sensors={<Sensor(_metrics=[<KafkaMetric(_config=<...>, _measurable=<Rate(_stat=<Count(_initial_value=<float at remote 0x157c908>, _samples=[<Sample(event_count=228, last_window_ms=<float at remote 0x7fe30c2a2890>, value=<float at remote 0x19e8cf28>, initial_value=<float at remote 0x157c908>) at remote 0x24be510>, <Sample(event_count=294, last_window_ms=<float at remote 0x7fe30c21a068>, value=<float at remote 0x7fe30c21a1a0>, initial_value=<float at remote 0x157c908>) at remote 0x24cb750>], _current=0) at remote 0x24b7950>, _unit=3) at remote 0x24b7990>, _metric_name=<MetricName(_group='consumer-metrics', _description='The average number of network operations...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#65 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=<optimized out>, argcount=3, kws=0x2648ef0, kwcount=0, defs=0x196f568, defcount=3, 
    closure=closure@entry=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#66 0x00007fe318c5d33c in fast_function (nk=<optimized out>, na=<optimized out>, n=<optimized out>, 
    pp_stack=0x7ffd3039c100, func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4504
#67 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039c100)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#68 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x2648d50, for file /matrix/soapa-app/app-work/app-sdk/soapa-xxx-lib/python/utils/kafka_client.py, line 73, in poll (timeout=1000, max_count=1), throwflag=throwflag@entry=0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#69 0x00007fe318c5d4bd in fast_function (nk=<optimized out>, na=<optimized out>, n=2, pp_stack=0x7ffd3039c270, 
    func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4494
#70 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039c270)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#71 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x21d1de0, for file /matrix/soapa-app/app-work/app-sdk/soapa-xxx-lib/python/file_parser/base_parser.py, line 138, in start (self=<Parser(kafka_topic='aflogs-1', s3_client=<S3Client(conn=<S3Connection(request_hook=None, num_retries=6, ca_certificates_file='/matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/boto/cacerts/cacerts.txt', _provider_type='aws', http_unretryable_exceptions=[<type at remote 0x250f250>], https_connection_factory=None, is_secure=False, protocol='http', port=12001, _connection=('eds-1.novalocal', 12001, False), _auth_handler=<HmacAuthV1Handler(_hmac_256=None, host='eds-1.novalocal', _hmac=<HMAC(digest_cons=<built-in funct---Type <return> to continue, or q <return> to quit---
ion openssl_sha1>, outer=<_hashlib.HASH at remote 0x24e2490>, inner=<_hashlib.HASH at remote 0x24e24e0>, digest_size=20L) at remote 0x24b6e18>, _provider=<Provider(acl_header='x-amz-acl', server_side_encryption_header='x-amz-server-side-encryption', copy_source_version_id='x-amz-copy-source-version-id', profile_name=None, mfa_header='x-amz-...(truncated), throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#72 0x00007fe318c5d4bd in fast_function (nk=<optimized out>, na=<optimized out>, n=1, pp_stack=0x7ffd3039c3e0, 
    func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4494
#73 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039c3e0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#74 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x21c9e50, for file /matrix/soapa-app/app-work/app-sdk/soapa-xxx-lib/python/file_parser/main.py, line 38, in each_parse (setting={'kafka_topic': 'aflogs-1', 'MQ': {'group_id': 'collect-aflogs-parser', 'type': 'kafka', 'keyword': 'NGAF_SECEVENT', 'key': 'collect-receive-file-eds'}, 'parser_module': 'file_parser.parser_af_secevent.parser'}, class_module=<module at remote 0x2141d00>, terminate=<function at remote 0x21bda28>), 
    throwflag=throwflag@entry=0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3040
#75 0x00007fe318c5fe3d in PyEval_EvalCodeEx (co=<optimized out>, globals=<optimized out>, locals=locals@entry=0x0, 
    args=args@entry=0x7fe31911a068, argcount=0, kws=kws@entry=0x2128548, kwcount=kwcount@entry=3, 
    defs=defs@entry=0x0, defcount=defcount@entry=0, closure=0x0) at /usr/src/debug/Python-2.7.5/Python/ceval.c:3640
#76 0x00007fe318be988d in function_call (func=<function at remote 0x213a5f0>, arg=(), 
    kw={'kafka_topic': 'aflogs-1', 'MQ': {'group_id': 'collect-aflogs-parser', 'type': 'kafka', 'keyword': 'NGAF_SECEVENT', 'key': 'collect-receive-file-eds'}, 'parser_module': 'file_parser.parser_af_secevent.parser'})
    at /usr/src/debug/Python-2.7.5/Objects/funcobject.c:526
#77 0x00007fe318bc48e3 in PyObject_Call (func=func@entry=<function at remote 0x213a5f0>, arg=arg@entry=(), 
    kw=kw@entry={'kafka_topic': 'aflogs-1', 'MQ': {'group_id': 'collect-aflogs-parser', 'type': 'kafka', 'keyword': 'NGAF_SECEVENT', 'key': 'collect-receive-file-eds'}, 'parser_module': 'file_parser.parser_af_secevent.parser'})
    at /usr/src/debug/Python-2.7.5/Objects/abstract.c:2529
#78 0x00007fe318c584fd in ext_do_call (nk=<optimized out>, na=<optimized out>, flags=<optimized out>, 
    pp_stack=0x7ffd3039c6b8, func=<function at remote 0x213a5f0>)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4721
#79 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x21c9c80, for file /usr/lib64/python2.7/multiprocessing/process.py, line 114, in run (self=<Process(_counter=<itertools.count at remote 0x213b998>, _daemonic=False, _target=<function at remote 0x213a5f0>, _args=(), _children=set([]), _tempdir=None, _name='Process-1', _authkey=<AuthenticationString at remote 0x17a1de0>, _parent_pid=8, _kwargs={'kafka_topic': 'aflogs-1', 'MQ': {'group_id': 'collect-aflogs-parser', 'type': 'kafka', 'keyword': 'NGAF_SECEVENT', 'key': 'collect-receive-file-eds'}, 'parser_module': 'file_parser.parser_af_secevent.parser'}, _identity=(1,), _popen=None) at remote 0x2137a10>), throwflag=throwflag@entry=0)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:3079
#80 0x00007fe318c5d4bd in fast_function (nk=<optimized out>, na=<optimized out>, n=1, pp_stack=0x7ffd3039c820, 
    func=<optimized out>) at /usr/src/debug/Python-2.7.5/Python/ceval.c:4494
#81 call_function (oparg=<optimized out>, pp_stack=0x7ffd3039c820)
    at /usr/src/debug/Python-2.7.5/Python/ceval.c:4429
#82 PyEval_EvalFrameEx (
    f=f@entry=Frame 0x21c9470, for file /usr/lib64/python2.7/multiprocessing/process.py, line 258, in _bootstrap 
  • And gdb bt in thread 2
(gdb) thread 2
[Switching to thread 2 (Thread 0x7fe308ff9700 (LWP 28))]
#0  0x00007fe31896a79b in futex_abstimed_wait (cancel=true, private=<optimized out>, abstime=0x0, expected=0, 
    futex=0x265b040) at ../nptl/sysdeps/unix/sysv/linux/sem_waitcommon.c:43
43	      err = lll_futex_wait (futex, expected, private);
(gdb) py-bt
#4 Waiting for a lock (e.g. GIL)
#5 Waiting for a lock (e.g. GIL)
#7 Frame 0x7fe30c01c130, for file /usr/lib64/python2.7/threading.py, line 173, in acquire (self=<_RLock(_Verbose__verbose=False, _RLock__owner=140613355153216, _RLock__block=<thread.lock at remote 0x1ac1730>, _RLock__count=1) at remote 0x255fcd0>, blocking=1, me=140613085271808)
    rc = self.__block.acquire(blocking)
#14 Frame 0x7fe304007740, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/client_async.py, line 572, in poll (self=<KafkaClient(_conns=<Dict at remote 0x265a9e0>, _sending=set([]), _idle_expiry_manager=<IdleConnectionManager(connections_max_idle=<float at remote 0x2641b68>, lru_connections=<OrderedDict(_OrderedDict__root=[[[[...], [...], 4], [...], 2], [...], None], _OrderedDict__map={2: [...], 4: [...]}) at remote 0x265b2a0>, next_idle_close_check_time=<float at remote 0x7fe30c11fd38>) at remote 0x255fd10>, _closed=False, _pending_completion=<collections.deque at remote 0x218cc90>, _wake_r=<_socket.socket at remote 0x21aab40>, _lock=<_RLock(_Verbose__verbose=False, _RLock__owner=140613355153216, _RLock__block=<thread.lock at remote 0x1ac1730>, _RLock__count=1) at remote 0x255fcd0>, _connecting=set([]), _sensors=<KafkaClientMetrics(metric_group_name='consumer-metrics', select_time=<Sensor(_metrics=[<KafkaMetric(_config=<MetricConfig(event_window=9223372036854775807, _sample...(truncated)
    with self._lock:
#18 Frame 0x7fe304001690, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/coordinator/base.py, line 965, in _run_once (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thre---Type <return> to continue, or q <return> to quit---
ad__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at remote 0x2592c...(truncated)
    self.coordinator._client.poll(timeout_ms=0)
#22 Frame 0x7fe3040014b0, for file /matrix/soapa-platform/platform-work/platform-sdk/third-party-lib/python/kafka/coordinator/base.py, line 933, in run (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thread__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at remote 0x2592cd0>) a...(truncated)
    self._run_once()
#26 Frame 0x7fe304001290, for file /usr/lib64/python2.7/threading.py, line 811, in __bootstrap_inner (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thread__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Conditi---Type <return> to continue, or q <return> to quit---
on(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at remote 0x2592cd0>) at remote 0x25920d0>, _Thread__stderr=<file at remot...(truncated)
    self.run()
#29 Frame 0x7fe3040010c0, for file /usr/lib64/python2.7/threading.py, line 784, in __bootstrap (self=<HeartbeatThread(coordinator=<weakproxy at remote 0x24b57e0>, _Thread__ident=140613085271808, failed=None, _Thread__block=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa330>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa330>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa330>) at remote 0x2592d10>, _Thread__name='collect-aflogs-parser-heartbeat', _Thread__daemonic=True, enabled=True, _Thread__started=<_Event(_Verbose__verbose=False, _Event__flag=True, _Event__cond=<_Condition(_Verbose__verbose=False, _Condition__lock=<thread.lock at remote 0x24aa310>, acquire=<built-in method acquire of thread.lock object at remote 0x24aa310>, _Condition__waiters=[], release=<built-in method release of thread.lock object at remote 0x24aa310>) at remote 0x2592cd0>) at remote 0x25920d0>, _Thread__stderr=<file at remote 0x7f...(truncated)
    self.__bootstrap_inner()
@huangcuiyang huangcuiyang changed the title deadlock in coordinator when heartbeat request timeout Consumer deadlock in coordinator when heartbeat request timeout Jan 16, 2020
@rosekin
Copy link

rosekin commented Mar 4, 2020

any resolution?

@huangcuiyang
Copy link
Contributor Author

this problem almost happend once a day in k8s container enviornment.

@huangcuiyang
Copy link
Contributor Author

any resolution?

you meeting this also ??

@PabloL007
Copy link

PabloL007 commented May 11, 2020

Experiencing this problem as well with version 2.0.1 of the lib, any update?

@huangcuiyang
Copy link
Contributor Author

huangcuiyang commented May 14, 2020

it works well with the down fix ways:

delete self.coordinator._client.poll(timeout_ms=0) code at HeartbeatThread._run_once() function in coordinator/base.py file.

OR

add client._lock before get coordinator._lock at BaseCoordinator.ensure_active_group() and HeartbeatThread._run_once() in coordinator/base.py file.

@gablank
Copy link

gablank commented May 20, 2020

We are also seeing this issue using kafka-python version 2.0.1.

Our heartbeat thread is stuck at client_async.py:574, while our MainThread is stuck at base.py:1009.

@dpkp
Copy link
Owner

dpkp commented May 21, 2020

I think I understand what is happening now. Thanks for all the notes and pings, and apologies that it is still not fixed! Though, as always, PRs and contributions are welcome!

Here is what I see:

The heartbeat thread sends a HeartbeatRequest. The request times out / does not receive a response / encounters a connection problem. Meanwhile, the Heartbeat thread continues to process its loop, holding the coordinator lock. The Main thread grabs the client lock and begins processing responses / connections. It sees that there was a networking problem on the connection that is waiting for a HeartbeatResponse. The networking issue prompts the main thread to close the connection. This continues while the Main thread is still holding the client lock, as networking and socket connections cannot be made concurrently -- the lock is required.

At this point BrokerConnection.close() is called and the connection attempts to fail all pending in-flight-requests. In this case, one of these is the HeartbeatRequest. The failure is processed immediately, and the heartbeat_failure callback is executed. But this callback requires acquiring the coordinator lock, which the Heartbeat thread is holding. So the main thread waits.

But now the Heartbeat thread attempts to trigger networking via a call to client.poll() . This blocks because it cannot acquire the client lock. So we have deadlock.

There are a few ways to attack this problem. The first would be to attempt to defer future failure processing in the main thread / client until after the client lock is released. Our primary defense against deadlock right now is to enforce a strict lock acquisition order: coordinator lock -> client lock. (see #1821). We currently drop the client lock to process future.success callbacks for this reason. Unfortunately, connection errors are harder to isolate. There are some obvious places within client._poll where we call conn.close, and we could handle these directly. But there are other places within the BrokerConnection class that call self.close() during various other points in the connection lifecycle. To really stamp out this issue we would also need to have a way to collect failed ifr futures from all of these other places. Perhaps we need to add an interface to BrokerConnection that holds a dequeue of failed ifrs? Will need to think on this.

A second way to attack this problem is to give the HeartbeatThread its own client and stop sharing the client code between the two threads. This is closer to what the java client does and would more effectively prevent deadlock because we would be eliminating the need for the threads to synchronize on the client lock (They would still need to synchronize on the coordinator lock).

@gablank
Copy link

gablank commented Jun 4, 2020

it works well with the down fix ways:

delete self.coordinator._client.poll(timeout_ms=0) code at HeartbeatThread._run_once() function in coordinator/base.py file.

OR

add client._lock before get coordinator._lock at BaseCoordinator.ensure_active_group() and HeartbeatThread._run_once() in coordinator/base.py file.

I would advice against using the first of these fixes (haven't tested the second one). Removing that line makes consumers in consumer groups fail with a "CommitFailedError". I assume this is because that line is essential in communicating that the consumer is alive.

@jeffwidman
Copy link
Contributor

Adding some notes from my conversation with Dana on this:

[In addition to the two approaches mentioned here]... There is a third approach that changes the lock order back to client->coordinator here: #2064 It also possible that that approach works, but I haven't tested it myself. I do know that we originally used that lock order and I decided to change it at some point, also trying to solve deadlock issues. But things have been cleaned up a lot since then, especially with the totally async send code (sends used to go straight to the network, now they are buffered and are only sent during client.poll). So I wouldn't be surprised if changing lock order back would work now.

@BakanovKirill
Copy link

BakanovKirill commented Jul 29, 2020

@huangcuiyang Any luck with getting this pull request #2064 to be merged?

@jeffwidman
Copy link
Contributor

@dpkp we really ought to get this solved, it's clearly affecting quite a number of folks... at the very least we ought to try #2064 as it does fix the issue... not clear if it creates other issues, but IMO fixing known breakage is probably better here than fearing unknown breakage.

@dpkp dpkp closed this as completed in 91daea3 Sep 7, 2020
@mjattiot
Copy link

mjattiot commented Sep 9, 2020

@huangcuiyang while waiting for your fix to be merged and released, did you find a workaround ?
We have a k8s environment as well and we'd like to kill pods as soon as this issue arises. Any tips ?

@takwas
Copy link

takwas commented Sep 29, 2020

@mjattiot: Do you surmise that wrapping the code block in a try/except and and calling sys.exit() would be a way to workaround the K8s situation?

@Sp1derMAN
Copy link

What are the alternatives now for this problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants