Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer.subscribe(pattern='x') sometimes picks up topic but not partitions #1237

Closed
jeffwidman opened this issue Oct 5, 2017 · 4 comments

Comments

@jeffwidman
Copy link
Collaborator

jeffwidman commented Oct 5, 2017

There appears to be a race condition bug of some kind in KafkaConsumer.subscribe(pattern='some pattern').

Normally the call works fine, the consumers picks up matching topics, assigns partitions to group members, etc.

However, once in a blue moon I've observed that the consumer finds the matching topic, but never successfully assigns the topic partitions to the group members. Once it's in this state, it will call poll() for hours without returning messages because the consumer thinks it has no assigned partitions, and because the consumer's subscription already contains the topic, there's never a change that triggers a rebalance.

I'm embarrassed to say that I've spent 40+ hours over the past two weeks trying to figure this out as we hit it in production, but all I've managed to do is isolate is a semi-consistently reproducible example. Unfortunately that requires running a service that has a KafkaConsumer instance and has a bunch of associated docker containers, so I can't make this setup public. The wrapper service does use gevent which I'm not very familiar with, but I disabled all the service's other greenlets so I don't think that should affect this at all.

Every time I try to isolate it down to a simple kafka-python script, I cannot reproduce it. But after spending hours stepping through the code, I'm reasonably certain it's a race condition in kafka-python and not the wrapper service.

Here's what I know:

  1. The issue doesn't show up the first time I run the service. If I kill the service (without calling KafkaConsumer.close()) and then restart it before the group coordinator evicts the consumer from the group, then I trigger the issue. If I then kill it, wait until I know the group coordinator has evicted all consumers, and then re-run it, it will work fine. Unfortunately, I have no idea if this behavior is related to the root cause, or just a trigger that makes the docker kafka container busy enough that it slows down its response times.

  2. In the failure case, calling KafkaConsumer.subscription() returns the expected topic name, but calling KafkaConsumer.assignment() returns an empty set.

  3. In the failure case, I can see that the cluster metadata object has both the topic and the list of partitions, so the cluster metadata is getting correctly updated, it's just not making it into the group assignments.

  4. SubscriptionState.change_subscription() has a check that short circuits the group rebalance if the previous/current topic subscriptions are equal. If I comment out this return in that short circuit check, the group rebalances properly and the problem disappears.

  5. Tracing the TCP calls in Wireshark, I see the following:

    Success case:

    1. Metadata v1 Request
    2. Metadata v2 Response
    3. GroupCoordinator v0 Request
    4. GroupCoordinator v0 Response
    5. JoinGroup v0 Request - protocol member metadata is all 0's
    6. JoinGroup v0 Response - protocol member metadata is all 0's
    7. SyncGroup v0 Request - member assignment is all 0's
    8. SyncGroup v0 Response - member assignment is all 0's
      (note this is a second generation of the group)
    9. JoinGroup v0 Request - protocol member metadata has data
    10. JoinGroup v0 Response - protocol member metadata has data
    11. SyncGroup v0 Request - member assignment has data
    12. SyncGroup v0 Response - member assignment has data
    13. From here on it's the expected behavior of polling the assigned parttions with the occasion Metadata Request/Response when the metadata refresh timeout kicks in

    Failure case:
    1. Metadata v1 Request
    2. Metadata v2 Response
    3. GroupCoordinator v0 Request
    4. GroupCoordinator v0 Response
    5. JoinGroup v0 Request - protocol member metadata is all 0's
    6. JoinGroup v0 Response - protocol member metadata is all 0's
    7. SyncGroup v0 Request - member assignment is all 0's
    (Here is the problem, we never trigger a second JoinGroup v0 Request that contains the partition data)
    8. From here on there are no requests other than the Metadata Request/Response when the metadata refresh timeout kicks in

Setup:

  • Single Kafka broker, version 0.10.2.1, running on docker.
  • Single instance of the consumer, so it always elects itself as the leader and consumes all partitions for the topic.
  • To keep things simple, my topic has only one partition. However, this race condition might be partition agnostic, meaning that a consumer might be working perfectly and then we expand the number of partitions and it might not pick up that the partitions changed.

After spending a lot of time poking through this code, I understand why the consumer is stuck once this happens, but I don't understand how it gets into this state in the first place.

@tvoinarovskyi
Copy link
Collaborator

tvoinarovskyi commented Oct 5, 2017

I did see something similar in aiokafka aio-libs/aiokafka#118. The reason was a race condition between sending a MetadataRequest and JoinGroup, but that can only happen if we have 2 sockets...

Tracked down the problem parts of Coordinator I was dealing with that time, seems like problem KAFKA-3949.

@jeffwidman
Copy link
Collaborator Author

jeffwidman commented Oct 5, 2017

Thanks @tvoinarovskyi, I think you're on to something that KAFKA-3949 matches what I'm seeing. I'm still double-checking, which is slow because I'm a little hazy on which log lines are executed in which thread context, and because it only consistently appears within this test case that runs under gevent, I can't just print the thread id.

Here's logs--note that I added additional logging of the various flags to see when state was changing:

Success case when I first run the script:

2017-10-05 21:14:31,177	INFO    	consumer            	get_instance:191	2505  	140667947242832	Get kafka consumer instance FileUpdateWorker.kafka-dc1-1:9092.0
2017-10-05 21:14:31,178	INFO    	consumer            	__init__:105	2505  	140667947242832	Initializing Kafka Consumer with {'metadata_max_age_ms': 1000, 'session_timeout_ms': 60000, 'request_timeout_ms': 300500, 'enable_auto_commit': False, 'client_id': 'consumer-0b2cf0d35ac5-2505', 'auto_offset_reset': 'earliest', 'value_deserializer': <function <lambda> at 0x7fefd040b668>, 'bootstrap_servers': ['kafka-dc1-1:9092'], 'group_id': 'FileUpdateWorker', 'api_version': (0, 10), 'key_deserializer': <function <lambda> at 0x7fefd040b5f0>}
2017-10-05 21:14:31,179	INFO    	client_async        	_bootstrap:213	2505  	140667947242832	Bootstrapping cluster metadata from [('kafka-dc1-1', 9092, 0)]
2017-10-05 21:14:31,180	INFO    	client_async        	_bootstrap:230	2505  	140667947242832	Attempting to bootstrap via node at kafka-dc1-1:9092
2017-10-05 21:14:31,232	INFO    	conn                	connect:272	2505  	140667947242832	<BrokerConnection node_id=bootstrap host=kafka-dc1-1/172.18.0.4 port=9092>: connecting to 172.18.0.4:9092
2017-10-05 21:14:31,232	INFO    	client_async        	_conn_state_change:283	2505  	140667947242832	Node bootstrap connected
2017-10-05 21:14:31,236	INFO    	cluster             	update_metadata:289	2505  	140667947242832	Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 13, groups: 0)
2017-10-05 21:14:31,237	INFO    	client_async        	_bootstrap:250	2505  	140667947242832	Bootstrap succeeded: found 1 brokers and 12 topics.
2017-10-05 21:14:31,238	INFO    	conn                	close:508	2505  	140667947242832	<BrokerConnection node_id=bootstrap host=kafka-dc1-1/172.18.0.4 port=9092>: Closing connection.
2017-10-05 21:14:31,251	INFO    	subscription_state  	subscribe:114	2505  	140667947242832	Subscribing to pattern: /configdist2_update/
2017-10-05 21:14:31,255	INFO    	group               	subscribe:816	2505  	140667947242832	Subscribed to topic pattern: configdist2_update
2017-10-05 21:14:31,256	INFO    	file_update_worker  	_create_consumer_with_retry:104	2505  	140667947242832	Created kafka consumer successfully, topic configdist2_update, group_name FileUpdateWorker
2017-10-05 21:14:31,256	INFO    	client_async        	_maybe_connect:323	2505  	140667947242832	Initiating connection to node 1 at kafka-dc1-1:9092
2017-10-05 21:14:31,262	INFO    	conn                	connect:272	2505  	140667947242832	<BrokerConnection node_id=1 host=kafka-dc1-1/172.18.0.4 port=9092>: connecting to 172.18.0.4:9092
2017-10-05 21:14:31,264	INFO    	client_async        	_conn_state_change:283	2505  	140667947242832	Node 1 connected
2017-10-05 21:14:31,270	INFO    	base                	_send_group_coordinator_request:495	2505  	140667947242832	Sending group coordinator request for group FileUpdateWorker to broker 1
2017-10-05 21:14:31,272	INFO    	base                	_handle_group_coordinator_response:504	2505  	140667947242832	Received group coordinator response GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host=u'kafka-dc1-1', port=9092)
2017-10-05 21:14:31,273	INFO    	cluster             	add_group_coordinator:312	2505  	140667947242832	Updating coordinator for FileUpdateWorker: GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host=u'kafka-dc1-1', port=9092)
2017-10-05 21:14:31,274	INFO    	cluster             	add_group_coordinator:342	2505  	140667947242832	Group coordinator for FileUpdateWorker is BrokerMetadata(nodeId=1, host=u'kafka-dc1-1', port=9092, rack=None)
2017-10-05 21:14:31,274	INFO    	base                	_handle_group_coordinator_response:522	2505  	140667947242832	Discovered coordinator 1 for group FileUpdateWorker
2017-10-05 21:14:31,275	INFO    	base                	ensure_active_group:236	2505  	140667947242832	checking self.need_rejoin(), if false, no need to rebalalnce, so return immediately
2017-10-05 21:14:31,275	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is True
2017-10-05 21:14:31,275	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is True
2017-10-05 21:14:31,276	INFO    	consumer            	need_rejoin:302	2505  	140667947242832	self.need_rejoin() is True, super.need_rejoin() is True, needs_partition_assignment is False
2017-10-05 21:14:31,276	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is True
2017-10-05 21:14:31,276	INFO    	base                	ensure_active_group:240	2505  	140667947242832	is a rebalance already in progress? value of self.rejoining: False
2017-10-05 21:14:31,277	INFO    	consumer            	_on_join_prepare:278	2505  	140667947242832	Revoking previously assigned partitions set([]) for group FileUpdateWorker
2017-10-05 21:14:31,278	INFO    	subscription_state  	mark_for_reassignment:176	2505  	140667947242832	current value of needs_partition_assignment: False will be set to True
2017-10-05 21:14:31,278	INFO    	base                	ensure_active_group:245	2505  	140667947242832	beginning group rebalance of `while self.need_rejoin()` loop
2017-10-05 21:14:31,279	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is True
2017-10-05 21:14:31,279	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is True
2017-10-05 21:14:31,279	INFO    	consumer            	need_rejoin:302	2505  	140667947242832	self.need_rejoin() is True, super.need_rejoin() is True, needs_partition_assignment is True
2017-10-05 21:14:31,280	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is True
2017-10-05 21:14:31,280	INFO    	base                	ensure_active_group:247	2505  	140667947242832	looping through while loop because self.need_rejoin() was (still) true
2017-10-05 21:14:31,281	INFO    	base                	ensure_active_group:254	2505  	140667947242832	self._client.in_flight_request_count(self.coordinator_id) is 0
2017-10-05 21:14:31,281	INFO    	base                	_send_join_group_request:301	2505  	140667947242832	(Re-)joining group FileUpdateWorker
2017-10-05 21:14:31,282	INFO    	base                	_send_join_group_request:314	2505  	140667947242832	Sending JoinGroup (JoinGroupRequest_v0(group='FileUpdateWorker', session_timeout=60000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')])) to coordinator 1
2017-10-05 21:14:31,285	INFO    	base                	_handle_join_group_response:336	2505  	140667947242832	Received successful JoinGroup response for group FileUpdateWorker: JoinGroupResponse_v0(error_code=0, generation_id=1, group_protocol=u'range', leader_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', members=[(member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', member_metadata='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')])
2017-10-05 21:14:31,285	INFO    	base                	_handle_join_group_response:342	2505  	140667947242832	Joined group 'FileUpdateWorker' (generation 1) with member_id consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c
2017-10-05 21:14:31,286	INFO    	base                	_handle_join_group_response:346	2505  	140667947242832	Elected group leader -- performing partition assignments using range
2017-10-05 21:14:31,290	INFO    	consumer            	_perform_assignment:261	2505  	140667947242832	Performing assignment for group FileUpdateWorker using strategy range with subscriptions {u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c': ConsumerProtocolMemberMetadata(version=0, subscription=[], user_data='')}
2017-10-05 21:14:31,291	INFO    	consumer            	_perform_assignment:265	2505  	140667947242832	Finished assignment for group FileUpdateWorker: {u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c': ConsumerProtocolMemberAssignment(version=0, assignment=[], user_data='')}
2017-10-05 21:14:31,291	INFO    	base                	_on_join_leader:425	2505  	140667947242832	Sending leader SyncGroup for group FileUpdateWorker to coordinator 1: SyncGroupRequest_v0(group='FileUpdateWorker', generation_id=1, member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', group_assignment=[(member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', member_metadata='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')])
2017-10-05 21:14:31,294	INFO    	base                	_handle_sync_group_response:450	2505  	140667947242832	Successfully joined group FileUpdateWorker with generation 1
2017-10-05 21:14:31,295	INFO    	subscription_state  	assign_from_subscribed:234	2505  	140667947242832	current value of needs_partition_assignment: True will be set to False
2017-10-05 21:14:31,295	INFO    	subscription_state  	assign_from_subscribed:236	2505  	140667947242832	Updated partition assignment: []
2017-10-05 21:14:31,295	INFO    	consumer            	_on_join_complete:224	2505  	140667947242832	Setting newly assigned partitions set([]) for group FileUpdateWorker
2017-10-05 21:14:31,296	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,296	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,297	INFO    	consumer            	need_rejoin:302	2505  	140667947242832	self.need_rejoin() is False, super.need_rejoin() is False, needs_partition_assignment is False
2017-10-05 21:14:31,297	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,298	INFO    	fetcher             	fetched_records:321	2505  	140667947242832	starting fetched_records()
2017-10-05 21:14:31,299	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,299	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,300	INFO    	consumer            	need_rejoin:302	2505  	140667947242832	self.need_rejoin() is False, super.need_rejoin() is False, needs_partition_assignment is False
2017-10-05 21:14:31,301	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,301	INFO    	base                	__call__:684	2505  	140667947242832	Heartbeat task unneeded now, retrying in 2.99428415298
2017-10-05 21:14:31,304	INFO    	fetcher             	fetched_records:321	2505  	140667947242832	starting fetched_records()
2017-10-05 21:14:31,405	INFO    	base                	ensure_active_group:236	2505  	140667947242832	checking self.need_rejoin(), if false, no need to rebalalnce, so return immediately
2017-10-05 21:14:31,405	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,405	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,407	INFO    	consumer            	need_rejoin:302	2505  	140667947242832	self.need_rejoin() is False, super.need_rejoin() is False, needs_partition_assignment is False
2017-10-05 21:14:31,408	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,409	INFO    	fetcher             	fetched_records:321	2505  	140667947242832	starting fetched_records()
2017-10-05 21:14:31,411	INFO    	client_async        	_maybe_refresh_metadata:731	2505  	140667947242832	Sending metadata request MetadataRequest_v1(topics=NULL) to node 1
2017-10-05 21:14:31,414	INFO    	cluster             	update_metadata:289	2505  	140667947242832	Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 13, groups: 1)
2017-10-05 21:14:31,414	INFO    	consumer            	_handle_metadata_update:142	2505  	140667947242832	_handle_metadata_update() appended configdist2_update
2017-10-05 21:14:31,415	INFO    	subscription_state  	change_subscription:148	2505  	140667947242832	Updating subscribed topics to: [u'configdist2_update']
2017-10-05 21:14:31,415	INFO    	subscription_state  	change_subscription:151	2505  	140667947242832	current value of needs_partition_assignment: False will be set to True
2017-10-05 21:14:31,416	INFO    	consumer            	_subscription_metadata_changed:177	2505  	140667947242832	group_subscription: set([u'configdist2_update'])
2017-10-05 21:14:31,416	INFO    	consumer            	_handle_metadata_update:155	2505  	140667947242832	current metadata snapshot for configdist2_update: set([0])
2017-10-05 21:14:31,417	INFO    	subscription_state  	mark_for_reassignment:176	2505  	140667947242832	current value of needs_partition_assignment: True will be set to True
2017-10-05 21:14:31,417	INFO    	fetcher             	fetched_records:321	2505  	140667947242832	starting fetched_records()
2017-10-05 21:14:31,418	INFO    	fetcher             	fetched_records:327	2505  	140667947242832	returning no records because needs_partition_assignment is True
2017-10-05 21:14:31,418	INFO    	file_update_worker  	_retrieve_message_with_retry:128	2505  	140667947242832	2a:
                        subscription: set([u'configdist2_update'])
                        assignment: set([])
2017-10-05 21:14:31,519	INFO    	base                	ensure_active_group:236	2505  	140667947242832	checking self.need_rejoin(), if false, no need to rebalalnce, so return immediately
2017-10-05 21:14:31,520	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,522	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,529	INFO    	consumer            	need_rejoin:302	2505  	140667947242832	self.need_rejoin() is True, super.need_rejoin() is False, needs_partition_assignment is True
2017-10-05 21:14:31,530	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,530	INFO    	base                	ensure_active_group:240	2505  	140667947242832	is a rebalance already in progress? value of self.rejoining: False
2017-10-05 21:14:31,531	INFO    	consumer            	_on_join_prepare:278	2505  	140667947242832	Revoking previously assigned partitions set([]) for group FileUpdateWorker
2017-10-05 21:14:31,531	INFO    	subscription_state  	mark_for_reassignment:176	2505  	140667947242832	current value of needs_partition_assignment: True will be set to True
2017-10-05 21:14:31,531	INFO    	base                	ensure_active_group:245	2505  	140667947242832	beginning group rebalance of `while self.need_rejoin()` loop
2017-10-05 21:14:31,532	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,532	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,532	INFO    	consumer            	need_rejoin:302	2505  	140667947242832	self.need_rejoin() is True, super.need_rejoin() is False, needs_partition_assignment is True
2017-10-05 21:14:31,533	INFO    	base                	need_rejoin:231	2505  	140667947242832	rejoin_needed is False
2017-10-05 21:14:31,533	INFO    	base                	ensure_active_group:247	2505  	140667947242832	looping through while loop because self.need_rejoin() was (still) true
2017-10-05 21:14:31,534	INFO    	base                	ensure_active_group:254	2505  	140667947242832	self._client.in_flight_request_count(self.coordinator_id) is 0
2017-10-05 21:14:31,534	INFO    	base                	_send_join_group_request:301	2505  	140667947242832	(Re-)joining group FileUpdateWorker
2017-10-05 21:14:31,535	INFO    	base                	_send_join_group_request:314	2505  	140667947242832	Sending JoinGroup (JoinGroupRequest_v0(group='FileUpdateWorker', session_timeout=60000, member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata='\x00\x00\x00\x00\x00\x01\x00\x12configdist2_update\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata='\x00\x00\x00\x00\x00\x01\x00\x12configdist2_update\x00\x00\x00\x00')])) to coordinator 1
2017-10-05 21:14:31,536	INFO    	client_async        	_maybe_refresh_metadata:731	2505  	140667947242832	Sending metadata request MetadataRequest_v1(topics=NULL) to node 1
2017-10-05 21:14:31,553	INFO    	base                	_handle_join_group_response:336	2505  	140667947242832	Received successful JoinGroup response for group FileUpdateWorker: JoinGroupResponse_v0(error_code=0, generation_id=2, group_protocol=u'range', leader_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', members=[(member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x12configdist2_update\x00\x00\x00\x00')])
2017-10-05 21:14:31,554	INFO    	base                	_handle_join_group_response:342	2505  	140667947242832	Joined group 'FileUpdateWorker' (generation 2) with member_id consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c
2017-10-05 21:14:31,554	INFO    	base                	_handle_join_group_response:346	2505  	140667947242832	Elected group leader -- performing partition assignments using range
2017-10-05 21:14:31,555	INFO    	consumer            	_perform_assignment:261	2505  	140667947242832	Performing assignment for group FileUpdateWorker using strategy range with subscriptions {u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c': ConsumerProtocolMemberMetadata(version=0, subscription=[u'configdist2_update'], user_data='')}
2017-10-05 21:14:31,556	INFO    	consumer            	_perform_assignment:265	2505  	140667947242832	Finished assignment for group FileUpdateWorker: {u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c': ConsumerProtocolMemberAssignment(version=0, assignment=[(topic=u'configdist2_update', partitions=[0])], user_data='')}
2017-10-05 21:14:31,557	INFO    	base                	_on_join_leader:425	2505  	140667947242832	Sending leader SyncGroup for group FileUpdateWorker to coordinator 1: SyncGroupRequest_v0(group='FileUpdateWorker', generation_id=2, member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', group_assignment=[(member_id=u'consumer-0b2cf0d35ac5-2505-dd52175d-fd04-46ce-a643-7c5ecdee626c', member_metadata='\x00\x00\x00\x00\x00\x01\x00\x12configdist2_update\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00')])
2017-10-05 21:14:31,562	INFO    	cluster             	update_metadata:289	2505  	140667947242832	Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 13, groups: 1)
2017-10-05 21:14:31,562	INFO    	consumer            	_handle_metadata_update:142	2505  	140667947242832	_handle_metadata_update() appended configdist2_update
2017-10-05 21:14:31,563	WARNING 	subscription_state  	change_subscription:142	2505  	140667947242832	subscription unchanged by change_subscription([u'configdist2_update'])
2017-10-05 21:14:31,563	INFO    	consumer            	_subscription_metadata_changed:177	2505  	140667947242832	group_subscription: set([u'configdist2_update'])
2017-10-05 21:14:31,563	INFO    	base                	_handle_sync_group_response:450	2505  	140667947242832	Successfully joined group FileUpdateWorker with generation 2
2017-10-05 21:14:31,564	INFO    	subscription_state  	assign_from_subscribed:234	2505  	140667947242832	current value of needs_partition_assignment: True will be set to False
2017-10-05 21:14:31,564	INFO    	subscription_state  	assign_from_subscribed:236	2505  	140667947242832	Updated partition assignment: [TopicPartition(topic=u'configdist2_update', partition=0)]
2017-10-05 21:14:31,564	INFO    	subscription_state  	assign_from_subscribed:240	2505  	140667947242832	if we got here it's not broken, exiting

Failure case which happens when I immediately re-run the script:

2017-10-05 21:14:50,108	INFO    	consumer            	__init__:105	2516  	140602010883408	Initializing Kafka Consumer with {'metadata_max_age_ms': 1000, 'session_timeout_ms': 60000, 'request_timeout_ms': 300500, 'enable_auto_commit': False, 'client_id': 'consumer-0b2cf0d35ac5-2516', 'auto_offset_reset': 'earliest', 'value_deserializer': <function <lambda> at 0x7fe07623b668>, 'bootstrap_servers': ['kafka-dc1-1:9092'], 'group_id': 'FileUpdateWorker', 'api_version': (0, 10), 'key_deserializer': <function <lambda> at 0x7fe07623b5f0>}
2017-10-05 21:14:50,110	INFO    	client_async        	_bootstrap:213	2516  	140602010883408	Bootstrapping cluster metadata from [('kafka-dc1-1', 9092, 0)]
2017-10-05 21:14:50,110	INFO    	client_async        	_bootstrap:230	2516  	140602010883408	Attempting to bootstrap via node at kafka-dc1-1:9092
2017-10-05 21:14:50,165	INFO    	conn                	connect:272	2516  	140602010883408	<BrokerConnection node_id=bootstrap host=kafka-dc1-1/172.18.0.4 port=9092>: connecting to 172.18.0.4:9092
2017-10-05 21:14:50,166	INFO    	client_async        	_conn_state_change:283	2516  	140602010883408	Node bootstrap connected
2017-10-05 21:14:50,169	INFO    	cluster             	update_metadata:289	2516  	140602010883408	Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 13, groups: 0)
2017-10-05 21:14:50,170	INFO    	client_async        	_bootstrap:250	2516  	140602010883408	Bootstrap succeeded: found 1 brokers and 12 topics.
2017-10-05 21:14:50,170	INFO    	conn                	close:508	2516  	140602010883408	<BrokerConnection node_id=bootstrap host=kafka-dc1-1/172.18.0.4 port=9092>: Closing connection.
2017-10-05 21:14:50,172	INFO    	subscription_state  	subscribe:114	2516  	140602010883408	Subscribing to pattern: /configdist2_update/
2017-10-05 21:14:50,173	INFO    	group               	subscribe:816	2516  	140602010883408	Subscribed to topic pattern: configdist2_update
2017-10-05 21:14:50,174	INFO    	file_update_worker  	_create_consumer_with_retry:104	2516  	140602010883408	Created kafka consumer successfully, topic configdist2_update, group_name FileUpdateWorker
2017-10-05 21:14:50,174	INFO    	client_async        	_maybe_connect:323	2516  	140602010883408	Initiating connection to node 1 at kafka-dc1-1:9092
2017-10-05 21:14:50,178	INFO    	conn                	connect:272	2516  	140602010883408	<BrokerConnection node_id=1 host=kafka-dc1-1/172.18.0.4 port=9092>: connecting to 172.18.0.4:9092
2017-10-05 21:14:50,179	INFO    	client_async        	_conn_state_change:283	2516  	140602010883408	Node 1 connected
2017-10-05 21:14:50,182	INFO    	base                	_send_group_coordinator_request:495	2516  	140602010883408	Sending group coordinator request for group FileUpdateWorker to broker 1
2017-10-05 21:14:50,185	INFO    	base                	_handle_group_coordinator_response:504	2516  	140602010883408	Received group coordinator response GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host=u'kafka-dc1-1', port=9092)
2017-10-05 21:14:50,186	INFO    	cluster             	add_group_coordinator:312	2516  	140602010883408	Updating coordinator for FileUpdateWorker: GroupCoordinatorResponse_v0(error_code=0, coordinator_id=1, host=u'kafka-dc1-1', port=9092)
2017-10-05 21:14:50,186	INFO    	cluster             	add_group_coordinator:342	2516  	140602010883408	Group coordinator for FileUpdateWorker is BrokerMetadata(nodeId=1, host=u'kafka-dc1-1', port=9092, rack=None)
2017-10-05 21:14:50,186	INFO    	base                	_handle_group_coordinator_response:522	2516  	140602010883408	Discovered coordinator 1 for group FileUpdateWorker
2017-10-05 21:14:50,187	INFO    	base                	ensure_active_group:236	2516  	140602010883408	checking self.need_rejoin(), if false, no need to rebalalnce, so return immediately
2017-10-05 21:14:50,187	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is True
2017-10-05 21:14:50,187	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is True
2017-10-05 21:14:50,188	INFO    	consumer            	need_rejoin:302	2516  	140602010883408	self.need_rejoin() is True, super.need_rejoin() is True, needs_partition_assignment is False
2017-10-05 21:14:50,188	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is True
2017-10-05 21:14:50,188	INFO    	base                	ensure_active_group:240	2516  	140602010883408	is a rebalance already in progress? value of self.rejoining: False
2017-10-05 21:14:50,189	INFO    	consumer            	_on_join_prepare:278	2516  	140602010883408	Revoking previously assigned partitions set([]) for group FileUpdateWorker
2017-10-05 21:14:50,189	INFO    	subscription_state  	mark_for_reassignment:176	2516  	140602010883408	current value of needs_partition_assignment: False will be set to True
2017-10-05 21:14:50,190	INFO    	base                	ensure_active_group:245	2516  	140602010883408	beginning group rebalance of `while self.need_rejoin()` loop
2017-10-05 21:14:50,190	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is True
2017-10-05 21:14:50,191	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is True
2017-10-05 21:14:50,191	INFO    	consumer            	need_rejoin:302	2516  	140602010883408	self.need_rejoin() is True, super.need_rejoin() is True, needs_partition_assignment is True
2017-10-05 21:14:50,191	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is True
2017-10-05 21:14:50,191	INFO    	base                	ensure_active_group:247	2516  	140602010883408	looping through while loop because self.need_rejoin() was (still) true
2017-10-05 21:14:50,193	INFO    	base                	ensure_active_group:254	2516  	140602010883408	self._client.in_flight_request_count(self.coordinator_id) is 0
2017-10-05 21:14:50,194	INFO    	base                	_send_join_group_request:301	2516  	140602010883408	(Re-)joining group FileUpdateWorker
2017-10-05 21:14:50,194	INFO    	base                	_send_join_group_request:314	2516  	140602010883408	Sending JoinGroup (JoinGroupRequest_v0(group='FileUpdateWorker', session_timeout=60000, member_id='', protocol_type='consumer', group_protocols=[(protocol_name='range', protocol_metadata='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), (protocol_name='roundrobin', protocol_metadata='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')])) to coordinator 1
2017-10-05 21:14:50,271	INFO    	client_async        	_maybe_refresh_metadata:731	2516  	140602010883408	Sending metadata request MetadataRequest_v1(topics=NULL) to node 1
2017-10-05 21:15:31,571	INFO    	base                	_handle_join_group_response:336	2516  	140602010883408	Received successful JoinGroup response for group FileUpdateWorker: JoinGroupResponse_v0(error_code=0, generation_id=3, group_protocol=u'range', leader_id=u'consumer-0b2cf0d35ac5-2516-cd3fa9e1-6fdd-469b-957e-3bf42e2456ad', member_id=u'consumer-0b2cf0d35ac5-2516-cd3fa9e1-6fdd-469b-957e-3bf42e2456ad', members=[(member_id=u'consumer-0b2cf0d35ac5-2516-cd3fa9e1-6fdd-469b-957e-3bf42e2456ad', member_metadata='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')])
2017-10-05 21:15:31,572	INFO    	base                	_handle_join_group_response:342	2516  	140602010883408	Joined group 'FileUpdateWorker' (generation 3) with member_id consumer-0b2cf0d35ac5-2516-cd3fa9e1-6fdd-469b-957e-3bf42e2456ad
2017-10-05 21:15:31,572	INFO    	base                	_handle_join_group_response:346	2516  	140602010883408	Elected group leader -- performing partition assignments using range
2017-10-05 21:15:31,573	INFO    	consumer            	_perform_assignment:261	2516  	140602010883408	Performing assignment for group FileUpdateWorker using strategy range with subscriptions {u'consumer-0b2cf0d35ac5-2516-cd3fa9e1-6fdd-469b-957e-3bf42e2456ad': ConsumerProtocolMemberMetadata(version=0, subscription=[], user_data='')}
2017-10-05 21:15:31,574	INFO    	consumer            	_perform_assignment:265	2516  	140602010883408	Finished assignment for group FileUpdateWorker: {u'consumer-0b2cf0d35ac5-2516-cd3fa9e1-6fdd-469b-957e-3bf42e2456ad': ConsumerProtocolMemberAssignment(version=0, assignment=[], user_data='')}
2017-10-05 21:15:31,574	INFO    	base                	_on_join_leader:425	2516  	140602010883408	Sending leader SyncGroup for group FileUpdateWorker to coordinator 1: SyncGroupRequest_v0(group='FileUpdateWorker', generation_id=3, member_id=u'consumer-0b2cf0d35ac5-2516-cd3fa9e1-6fdd-469b-957e-3bf42e2456ad', group_assignment=[(member_id=u'consumer-0b2cf0d35ac5-2516-cd3fa9e1-6fdd-469b-957e-3bf42e2456ad', member_metadata='\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')])
2017-10-05 21:15:31,577	INFO    	cluster             	update_metadata:289	2516  	140602010883408	Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 13, groups: 1)
2017-10-05 21:15:31,578	INFO    	consumer            	_handle_metadata_update:142	2516  	140602010883408	_handle_metadata_update() appended configdist2_update
2017-10-05 21:15:31,578	INFO    	subscription_state  	change_subscription:148	2516  	140602010883408	Updating subscribed topics to: [u'configdist2_update']
2017-10-05 21:15:31,578	INFO    	subscription_state  	change_subscription:151	2516  	140602010883408	current value of needs_partition_assignment: True will be set to True
2017-10-05 21:15:31,579	INFO    	consumer            	_subscription_metadata_changed:177	2516  	140602010883408	group_subscription: set([u'configdist2_update'])
2017-10-05 21:15:31,579	INFO    	consumer            	_handle_metadata_update:155	2516  	140602010883408	current metadata snapshot for configdist2_update: set([0])
2017-10-05 21:15:31,580	INFO    	subscription_state  	mark_for_reassignment:176	2516  	140602010883408	current value of needs_partition_assignment: True will be set to True
2017-10-05 21:15:31,581	INFO    	base                	_handle_sync_group_response:450	2516  	140602010883408	Successfully joined group FileUpdateWorker with generation 3
2017-10-05 21:15:31,582	INFO    	subscription_state  	assign_from_subscribed:234	2516  	140602010883408	current value of needs_partition_assignment: True will be set to False
2017-10-05 21:15:31,582	INFO    	subscription_state  	assign_from_subscribed:236	2516  	140602010883408	Updated partition assignment: []
2017-10-05 21:15:31,582	INFO    	consumer            	_on_join_complete:224	2516  	140602010883408	Setting newly assigned partitions set([]) for group FileUpdateWorker
2017-10-05 21:15:31,582	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,583	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,583	INFO    	consumer            	need_rejoin:302	2516  	140602010883408	self.need_rejoin() is False, super.need_rejoin() is False, needs_partition_assignment is False
2017-10-05 21:15:31,583	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,584	INFO    	fetcher             	fetched_records:321	2516  	140602010883408	starting fetched_records()
2017-10-05 21:15:31,584	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,584	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,585	INFO    	consumer            	need_rejoin:302	2516  	140602010883408	self.need_rejoin() is False, super.need_rejoin() is False, needs_partition_assignment is False
2017-10-05 21:15:31,585	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,585	INFO    	base                	__call__:684	2516  	140602010883408	Heartbeat task unneeded now, retrying in 2.99699807167
2017-10-05 21:15:31,588	INFO    	fetcher             	fetched_records:321	2516  	140602010883408	starting fetched_records()
2017-10-05 21:15:31,588	INFO    	file_update_worker  	_retrieve_message_with_retry:128	2516  	140602010883408	2a:
                        subscription: set([u'configdist2_update'])
                        assignment: set([])
2017-10-05 21:15:31,689	INFO    	base                	ensure_active_group:236	2516  	140602010883408	checking self.need_rejoin(), if false, no need to rebalalnce, so return immediately
2017-10-05 21:15:31,690	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,690	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,690	INFO    	consumer            	need_rejoin:302	2516  	140602010883408	self.need_rejoin() is False, super.need_rejoin() is False, needs_partition_assignment is False
2017-10-05 21:15:31,691	INFO    	base                	need_rejoin:231	2516  	140602010883408	rejoin_needed is False
2017-10-05 21:15:31,691	INFO    	fetcher             	fetched_records:321	2516  	140602010883408	starting fetched_records()
2017-10-05 21:15:31,692	INFO    	client_async        	_maybe_refresh_metadata:731	2516  	140602010883408	Sending metadata request MetadataRequest_v1(topics=NULL) to node 1
2017-10-05 21:15:31,694	INFO    	cluster             	update_metadata:289	2516  	140602010883408	Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 13, groups: 1)
2017-10-05 21:15:31,694	INFO    	consumer            	_handle_metadata_update:142	2516  	140602010883408	_handle_metadata_update() appended configdist2_update
2017-10-05 21:15:31,695	WARNING 	subscription_state  	change_subscription:142	2516  	140602010883408	subscription unchanged by change_subscription([u'configdist2_update'])
2017-10-05 21:15:31,695	INFO    	consumer            	_subscription_metadata_changed:177	2516  	140602010883408	group_subscription: set([u'configdist2_update'])
2017-10-05 21:15:31,695	INFO    	fetcher             	fetched_records:321	2516  	140602010883408	starting fetched_records()
2017-10-05 21:15:31,696	INFO    	file_update_worker  	_retrieve_message_with_retry:128	2516  	140602010883408	2a:
                        subscription: set([u'configdist2_update'])
                        assignment: set([])
# Keeps looping print the above lines between calls from my script (file_update_worker)

jeffwidman added a commit that referenced this issue Oct 6, 2017
If the group leader somehow gets in a state that it has an empty partition assignment, then `self._assignment_snapshot` will be `{}` which evaluates to `False`. So `self._subscription.mark_for_reassignment()` will never be triggered, even if `self._assignment_snapshot != self._metadata_snapshot`. 

Fixes the symptoms of #1237 although I suspect there's an additional bug in that case that triggers the condition of the the group leader getting an empty partition assignment.
@jeffwidman
Copy link
Collaborator Author

jeffwidman commented Oct 6, 2017

After implementing the fix in #1240, I cannot reproduce this issue.

This fix does not incorporate the changes from KAFKA-3949 at all, so I'm a bit confused on whether this fixes the symptom of the KAFKA-3949 race condition or a different issue altogether. If the change in #1240 is all that is needed to (effectively) fix KAFKA-3949, why did the Java crew do a much more extensive refactor? I suspect I'm just overlooking something obvious here...

Related: #1241 / #1242

jeffwidman added a commit that referenced this issue Oct 6, 2017
If the group leader somehow gets in a state that it has an empty partition assignment, then `self._assignment_snapshot` will be `{}` which evaluates to `False`. So `self._subscription.mark_for_reassignment()` will never be triggered, even if `self._assignment_snapshot != self._metadata_snapshot`. 

Fixes the symptoms of #1237 although I suspect there's an additional bug in that case that triggers the condition of the the group leader getting an empty partition assignment.
@jeffwidman
Copy link
Collaborator Author

Fixed by #1240. There may be an additional race condition due to KAFKA-3949, but that's tracked in #1241, so closing this.

jeffwidman added a commit that referenced this issue Oct 19, 2017
Be pedantic about checking for identity rather than equality to avoid issues like #1237 / 411bc08
jeffwidman added a commit that referenced this issue Oct 19, 2017
Be pedantic about checking for identity rather than equality to avoid issues like #1237 / 411bc08
jeffwidman added a commit that referenced this issue Oct 19, 2017
Be pedantic about checking for identity rather than equality to avoid issues like #1237 / 411bc08
jeffwidman added a commit that referenced this issue Oct 19, 2017
Be pedantic about checking for identity rather than equality to avoid issues like #1237 / 411bc08
88manpreet pushed a commit to Yelp/kafka-python that referenced this issue Jul 16, 2018
If the group leader somehow gets in a state that it has an empty partition assignment, then `self._assignment_snapshot` will be `{}` which evaluates to `False`. So `self._subscription.mark_for_reassignment()` will never be triggered, even if `self._assignment_snapshot != self._metadata_snapshot`. 

Fixes the symptoms of dpkp#1237 although I suspect there's an additional bug in that case that triggers the condition of the the group leader getting an empty partition assignment.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants