Skip to content

Consumer client running in to a deadlock #2077

Closed
@vsrini-ns

Description

@vsrini-ns

Problem Details

We are seeing a deadlock situation with the Kafka consumer especially when the Group Coordinator is not reachable for a period of time larger than the request timeout. The heartbeat thread and the AsyncClient are waiting for each other to release the lock.

Based on the analysis, it appears there is a race condition for the deadlock to happen between the heartbeat thread and the Async client especially when the consumer group coordinator is not reachable beyond the maximum request timeout limit.

A combination of parallel execution of the heartbeat request, broker connection timeout and the cancellation of the any pending in-flight requests when the consumer group coordinator is not reachable could lead us to this situation.

Branch: Kafka-Python 1.4.7

Consumer Properties

{'bootstrap_servers': ['kafka01:9092', 'kafka02:9092', 'kafka03:9092', 'kafka04.:9092', 'kafka05:9092', 'kafka06:9092'], 'enable_auto_commit': True, 'session_timeout_ms': 60000, 'max_poll_records': 100, 'group_id': 'deadlock-test'}

From the Logs

1) Consumer is polling continuously 
2) Heart Beat is failing (unable to talk to group coordinator) - coordinator is marked dead
3) Heartbeat session expired, marking coordinator dead 	
(00:45:55.498 -0700 to 00:49:56.374 -0700)
4) Auto offset commit failing continuously (NodeNotReadyError), while poll(..) is able to fetch records
   - 00:45:10.520 -0700 to 00:50:03.251 -0700
5) Async client connection is closed after the request timeout is elapsed (timed out after 305000 ms. Closing connection)
   - 00:50:03.253 -0700

Code Analysis

ConsumerGroup.Poll()	
	-> Consumer.Poll()  
		-> Activates Heart Beat Thread to send heart beat							HeartBeat Thread
		-> CommitOffsetAsync                                      							-> lock()
			-> AsyncClient.poll()												-> AsyncClient.poll() 
				-> lock()													-> monitor.lock() (waiting for CommitOffsetAsync to release lock)
					-> _poll()
						-> conn.close(error) [ Close All Request Timeout Connections ]
							-> Iterates In-Flight requests and associated callbacks
								-> Executes Callbacks in loop
									-> HandleHeartBeatFailure   			HandleHeartBeatFailure()
																-> lock() --> (waiting for heartbeat thread to release the lock)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions