Skip to content

Client doesn't send heartbeat request, then marks coordinator as dead #2149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
DenisKuplyakov opened this issue Oct 23, 2020 · 4 comments
Closed

Comments

@DenisKuplyakov
Copy link

DenisKuplyakov commented Oct 23, 2020

Logs (some details, like ips is hidden or replaced to foobar):

Oct 23 15:24:57 somehost 4e92484f7f82[1175]: 2020-10-23T12:24:57.626142 DEBUG kafka.coordinator Heartbeat: FOOBAR[7240] FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04
Oct 23 15:24:57 somehost 4e92484f7f82[1175]: 2020-10-23T12:24:57.626734 DEBUG kafka.protocol.parser Sending request HeartbeatRequest_v1(group='FOOBAR', generation_id=7240, member_id='FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04')
Oct 23 15:24:57 somehost 4e92484f7f82[1175]: 2020-10-23T12:24:57.627174 DEBUG kafka.conn <BrokerConnection node_id=coordinator-2 host=foobar:9303 <connected> [IPv4 ('ip-hidden', 9303)]> Request 19: HeartbeatRequest_v1(group='FOOBAR', generation_id=7240, member_id='FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04')
Oct 23 15:24:57 somehost 4e92484f7f82[1175]: 2020-10-23T12:24:57.728863 DEBUG kafka.protocol.parser Received correlation id: 19
Oct 23 15:24:57 somehost 4e92484f7f82[1175]: 2020-10-23T12:24:57.729032 DEBUG kafka.protocol.parser Processing response HeartbeatResponse_v1
Oct 23 15:24:57 somehost 4e92484f7f82[1175]: 2020-10-23T12:24:57.729243 DEBUG kafka.conn <BrokerConnection node_id=coordinator-2 host=foobar:9303 <connected> [IPv4 ('ip-hidden', 9303)]> Response 19 (101.51934623718262 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
Oct 23 15:24:57 somehost 4e92484f7f82[1175]: 2020-10-23T12:24:57.729583 DEBUG kafka.coordinator Received successful heartbeat response for group FOOBAR

Oct 23 15:25:00 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:00.648066 DEBUG kafka.coordinator Heartbeat: FOOBAR[7240] FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04
Oct 23 15:25:00 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:00.648336 DEBUG kafka.protocol.parser Sending request HeartbeatRequest_v1(group='FOOBAR', generation_id=7240, member_id='FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04')
Oct 23 15:25:00 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:00.648620 DEBUG kafka.conn <BrokerConnection node_id=coordinator-2 host=foobar:9303 <connected> [IPv4 ('ip-hidden', 9303)]> Request 20: HeartbeatRequest_v1(group='FOOBAR', generation_id=7240, member_id='FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04')
Oct 23 15:25:00 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:00.749656 DEBUG kafka.protocol.parser Received correlation id: 20
Oct 23 15:25:00 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:00.749793 DEBUG kafka.protocol.parser Processing response HeartbeatResponse_v1
Oct 23 15:25:00 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:00.749955 DEBUG kafka.conn <BrokerConnection node_id=coordinator-2 host=foobar:9303 <connected> [IPv4 ('ip-hidden', 9303)]> Response 20 (101.14645957946777 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=0)
Oct 23 15:25:00 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:00.750142 DEBUG kafka.coordinator Received successful heartbeat response for group FOOBAR

Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.056094 WARNING kafka.coordinator Heartbeat session expired, marking coordinator dead
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.060982 WARNING kafka.coordinator Marking the coordinator dead (node coordinator-2) for group FOOBAR: Heartbeat session expired.
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.156602 DEBUG kafka.coordinator Sending group coordinator request for group FOOBAR to broker 0
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.157879 DEBUG kafka.protocol.parser Sending request GroupCoordinatorRequest_v0(consumer_group='FOOBAR')
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.158614 DEBUG kafka.conn <BrokerConnection node_id=0 host=foobar:9301 <connected> [IPv4 ('ip-hidden', 9301)]> Request 2: GroupCoordinatorRequest_v0(consumer_group='FOOBAR')
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.368393 DEBUG kafka.protocol.parser Received correlation id: 2
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.368560 DEBUG kafka.protocol.parser Processing response GroupCoordinatorResponse_v0
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.368820 DEBUG kafka.conn <BrokerConnection node_id=0 host=foobar:9301 <connected> [IPv4 ('ip-hidden', 9301)]> Response 2 (210.0355625152588 ms): GroupCoordinatorResponse_v0(error_code=0, coordinator_id=2, host='foobar', port=9303)
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.369029 DEBUG kafka.coordinator Received group coordinator response GroupCoordinatorResponse_v0(error_code=0, coordinator_id=2, host='foobar', port=9303)
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.369175 DEBUG kafka.cluster Updating coordinator for FOOBAR: GroupCoordinatorResponse_v0(error_code=0, coordinator_id=2, host='foobar', port=9303)
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.369349 INFO kafka.cluster Group coordinator for FOOBAR is BrokerMetadata(nodeId='coordinator-2', host='foobar', port=9303, rack=None)
Oct 23 15:25:17 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:17.369510 INFO kafka.coordinator Discovered coordinator coordinator-2 for group FOOBAR

Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.419661 DEBUG kafka.coordinator Heartbeat: FOOBAR[7240] FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.433918 DEBUG kafka.protocol.parser Sending request HeartbeatRequest_v1(group='FOOBAR', generation_id=7240, member_id='FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04')
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.434712 DEBUG kafka.conn <BrokerConnection node_id=coordinator-2 host=foobar:9303 <connected> [IPv4 ('ip-hidden', 9303)]> Request 21: HeartbeatRequest_v1(group='FOOBAR', generation_id=7240, member_id='FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04')
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.542339 DEBUG kafka.protocol.parser Received correlation id: 21
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.542508 DEBUG kafka.protocol.parser Processing response HeartbeatResponse_v1
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.542689 DEBUG kafka.conn <BrokerConnection node_id=coordinator-2 host=foobar:9303 <connected> [IPv4 ('ip-hidden', 9303)]> Response 21 (107.81598091125488 ms): HeartbeatResponse_v1(throttle_time_ms=0, error_code=25)
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.542945 WARNING kafka.coordinator Heartbeat: local member_id was not recognized; this consumer needs to re-join

Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.544498 DEBUG kafka.coordinator Heartbeat: FOOBAR[-1]
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.544619 DEBUG kafka.protocol.parser Sending request HeartbeatRequest_v1(group='FOOBAR', generation_id=-1, member_id='')
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.544811 DEBUG kafka.conn <BrokerConnection node_id=coordinator-2 host=foobar:9303 <connected> [IPv4 ('ip-hidden', 9303)]> Request 22: HeartbeatRequest_v1(group='FOOBAR', generation_id=-1, member_id='')
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.569289 DEBUG kafka.coordinator Group state is not stable, disabling heartbeats
Oct 23 15:25:20 somehost 4e92484f7f82[1175]: 2020-10-23T12:25:20.569578 DEBUG kafka.coordinator Heartbeat disabled. Waiting

In the logs above I see:

  1. client successfully sends heartbeat requests every 3 seconds.
  2. then there is no log line that request was sent like this
    `Sending request HeartbeatRequest_v1(group='FOOBAR', generation_id=7240, member_id='FOOBAR_3-fa7e3d3c-5be1-489f-be03-ee9f7a016d04'))
  3. after 17 seconds client get 'heartbeat expired' (this delay is always around 15-17 seconds)
  4. client thinks coordinator is dead
  5. new coordinator is chosen to the same one
  6. invalid member id reveals

Such situation repeats even if I keep single consumer in the group. I should to mention that kafka is accessed through the internet.

Consumer settings:

  • max.poll.interval is 5 minutes (we have heavy processing)
  • max.poll.records is 1
  • auto_commit disabled
  • session timeout is 10 seconds
  • heartbeat interval is 3 seconds

kafka-python version is 2.0.1

@jeffwidman
Copy link
Contributor

jeffwidman commented Oct 26, 2020

Can you try 2.0.2?

I haven't looked in detail, but there's a chance this was solved by #2064...

@DenisKuplyakov
Copy link
Author

We have migrated to confluent-kafka-python, which solved the problem. As the problem reproduces only in production environment and kafka is not under our maintenance we have no ability to repeat with 2.0.2. Sorry...

We have heavy processing with neural networks and mxnet inside message processing. I thought that maybe mxnet acquires GIL that prevents heartbeat thread from running. Is there are a chance that GIL-blocking code violates kafka-python internal processes?

And our problem doesn't look similar to #2064 as there is a complete deadlock, and in our case we have interruptions in heatbeat sending process.

@dpkp
Copy link
Owner

dpkp commented Nov 18, 2020

We have migrated to confluent-kafka-python, which solved the problem. As the problem reproduces only in production environment and kafka is not under our maintenance we have no ability to repeat with 2.0.2. Sorry...

No problem, confluent-kafka-python wraps librdkafka and I believe does not hold the GIL while processing background tasks like heartbeats etc.

We have heavy processing with neural networks and mxnet inside message processing. I thought that maybe mxnet acquires GIL that prevents heartbeat thread from running. Is there are a chance that GIL-blocking code violates kafka-python internal processes?

Yes, if you have a system that is holding the GIL for longer than 15secs then the heartbeat thread would never get scheduled and would cause this type of behavior. BUT, holding the GIL for that long seems a bit strange. Threads automatically release the GIL on IO, and CPU-bound threads will also automatically release the GIL after a set number of cpython instructions. I haven't looked closely at mxnet, but maybe mxnet is holding the GIL while pushing CPU load to an external library and so not triggering additional cpython instructions (which would prevent automatic GIL release)?

And our problem doesn't look similar to #2064 as there is a complete deadlock, and in our case we have interruptions in heatbeat sending process.

If mxnet is holding GIL for an extended period of time you would not expect to see any logs from any other system during that time. Is that what your logs show?

@dpkp dpkp closed this as completed Nov 18, 2020
@dpkp
Copy link
Owner

dpkp commented Nov 18, 2020

Also worth noting that the multiprocessing module can be used to parallelize CPU-bound work across multiple CPUs using processes instead of threads, which avoids GIL contention (though adds overhead).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants