Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3888 Use background thread to process consumer heartbeats #1266

Merged
merged 13 commits into from
Dec 21, 2017

Conversation

dpkp
Copy link
Owner

@dpkp dpkp commented Oct 18, 2017

This PR implements KAFKA-3888 (including related KAFKA-4431). It adds significant complexity and relies on managing shared mutable state, which I really don't feel great about. Nonetheless, I wanted to put this up so that folks can see where it's at and possibly test out independently.

One remaining issue: address default configuration changes when broker does not support rebalance timeout (only session timeout).

ref #948 / includes #1258
See also KAFKA-4160

Copy link
Collaborator

@jeffwidman jeffwidman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the hard work on this.

Left a few comments. I need to re-read the KIP to understand the config parameters a little better...

@@ -512,46 +527,40 @@ def poll(self, timeout_ms=None, future=None, delayed_tasks=True):
Returns:
list: responses received (can be empty)
"""
if timeout_ms is None:
if future is not None:
timeout_ms = 100
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this does. What are the side effects of hardcoding this value here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dpkp This looks more like a hack actually. Skipped it in the parent PR, but yea...

Copy link
Owner Author

@dpkp dpkp Oct 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, deserves a comment. The issue is that we now have to account for the future being resolved in a different thread.

Prior to this change we would block on network IO and then check the future's completion after processing the network IO and any corresponding futures. But now that we have separated future handling from network IO, and we have (intentionally) moved the future handling into a separate section that is not locked, we can have a situation where one thread is called with a simple timeout and another is called with a future (block until resolved).

The first thread acquires the lock and receives the response that will resolve the future in its _poll() call. The response is put on the internal pending_completions queue and the lock is released. Now the second thread takes the lock and checks whether the future is resolved. It is not, so it calls _poll with a full request timeout, which is now 5 minutes. The first thread continues, without the lock, and begins processing pending_completions. The future is resolved with the queued response, and the first thread finishes. But the second thread is now waiting for network IO that it thinks is necessary to complete its future. Except that the future is already done and so there is no network IO coming. It will simply wait for 5 minutes and then timeout before rechecking the future and then finally returning.

My solution here is to simply reduce the network IO timeout when called with a future to reduce the unneeded block time when this occurs. Another solution might be to try to register any future that is "blocking" and call _wake_up() on when such a future is resolved. I decided against that approach because I think it is too complex and it is likely to add overhead to every response, not just ones for which some thread is blocking on a future.

Copy link
Collaborator

@jeffwidman jeffwidman Oct 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am glad I asked. I never would have realized that. Add this explanation as a code comment would be much appreciated.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same =)

@@ -721,6 +739,7 @@ def _maybe_refresh_metadata(self):
Returns:
int: milliseconds until next refresh
"""
# This should be locked when running multi-threaded
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is slightly ambiguous.

What does This refer to? I assume self.cluster.ttl(), but possibly you meant a broader scope than that.

And who is responsible for enforcing the locking? The caller of set_topics() or the implementation of self.cluster.ttl()?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment relates to entire function, which is currently only called inside a locked section. Will update.

task (callable): task to be unscheduled
"""
self._delayed_tasks.remove(task)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does adding background heartbeat enable task scheduling to be removed? Aren't there still have tasks that need to be scheduled for execution at some point in the future, such as a metadata refresh? Or, in the case of metadata refresh, is that already checked every time we call poll() and hence doesn't need to be scheduled?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other "tasks" are metadata refresh and offset commits. These are now handled inline during client.poll and coordinator.poll, respectively.

kafka/conn.py Outdated
@@ -704,7 +716,7 @@ def can_send_more(self):
def recv(self):
"""Non-blocking network receive.

Return response if available
Return list of (response, future)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding the word "tuples" improves readability: Return list of (response, future) tuples

class's monitor. Generally this means acquiring the lock before reading or
writing the state of the group (e.g. generation, member_id) and holding the
lock when sending a request that affects the state of the group
(e.g. JoinGroup, LeaveGroup).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic job with the docs, as always.

# metadata is fresh, any metadata update that changes the topic
# subscriptions and arrives with a rebalance in progress will
# essentially be ignored. See KAFKA-3949 for the complete
# description of the problem.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this PR also fix #1241?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that will need a separate PR

"""
Return the time to the next needed invocation of {@link #poll(long)}.
@param now current time in milliseconds
@return the maximum time in milliseconds the caller should wait before the next invocation of poll()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docstring will need to be updated from Java to python

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

callback, offsets, exception = self.completed_offset_commits.popleft()
callback(offsets, exception)
except IndexError:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the try/except really needed? Why not while self.completed_offset_commits:?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that seems cleaner

kafka/errors.py Outdated
typically implies that the poll loop is spending too much
time message processing. You can address this either by
increasing the session timeout or by reducing the maximum
size of batches returned in poll() with max.poll.records.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message will need to be tweaked slightly to handle brokers that do/don't support heartbeating so that end users know what knobs to adjust. This will in turn depend on how max_poll_interval_ms is interpreted for brokers that don't support heartbeating... whether it is ignored or used in conjunction with session_timeout_ms somehow...

Also the param names need to be switched from Java to python.

Copy link
Owner Author

@dpkp dpkp Oct 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on updating the text. But the CommitFailed error is only raised when using group coordinated groups. It will not be seen when using zookeeper offsets from 0.8.2. That said, it can be seen by users that do not have max_poll_interval_ms support (0.9 <= broker < 0.10.1), and so it may need some tweaking when we decide how to manage max_poll_interval_ms in that context.

#(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]),
# Errors.RequestTimedOutError, True, False),
#(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]),
# Errors.RebalanceInProgressError, False, True),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the initial purpose of these? Why were they commented out?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure and so I deleted them.

@tvoinarovskyi
Copy link
Collaborator

Will try to do a review tomorrow. Sorry for the holdup.

@dpkp dpkp force-pushed the KAFKA_3888_heartbeat_thread branch from 50640a3 to ea6de4c Compare October 21, 2017 16:31
Copy link
Collaborator

@tvoinarovskyi tvoinarovskyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did go through, but it's a bit too big to reason on the full scale. We should give it some time to just sit there after the merge, something is bound to pop up.
Any ideas how to actually test the feature? I didn't find any integration test on the feature.


if self._heartbeat_thread is None:
log.debug('Starting new heartbeat thread')
self._heartbeat_thread = HeartbeatThread(weakref.proxy(self))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use the weakref.proxy here? Do we even need it as a weak reference?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a circular reference between coordinator <-> heartbeat_thread. The weakref here means that the existence of the heartbeat_thread will not prevent the coordinator from being garbage collected.

self.disable()
return

# When consumer.wakeup() is implemented, we need to
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add XXX or FIXME here, so we don't lose it.

self.coordinator.heartbeat.sent_heartbeat()
future = self.coordinator._send_heartbeat_request()
future.add_callback(self._handle_heartbeat_success)
future.add_errback(self._handle_heartbeat_failure)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, you can get that from reading Java code. It never shrinks, parts are only added on top. Probably because a lot of people are working on it, but really is hard to follow sometimes.

@tvoinarovskyi
Copy link
Collaborator

Great job on this!

@dpkp
Copy link
Owner Author

dpkp commented Oct 22, 2017

So this is a very large PR. I think it would be possible to break it into a few smaller parts (locks, MemberState/Generation, coordinator.poll, heartbeat_thread). The benefit of keeping it together is that it is easier to cross-reference with the java PR that implements the same.

Re testing, I think the existing test suite covers a fair amount of the raw functionality. But we should add some specific tests for heartbeating while not polling. Separately, I am very interested in refactoring to a sans-io core that can be more easily unit tested without requiring fixtures or mocking. But I'll leave that for another day (I've been working on a simple state-machine representation of the group coordinator in a separate branch).

@tvoinarovskyi
Copy link
Collaborator

@dpkp - is the configuration change backward compatible? If I have a consumer configured with a custom 'session_timeout_ms' will it not break after update? As far as I see it will require to add 'max_poll_ms'...

@tvoinarovskyi
Copy link
Collaborator

As for the merge, I think it's ok as a big PR. It is not something that has meaning in merging separately.

@dpkp
Copy link
Owner Author

dpkp commented Oct 23, 2017

@tvoinarovskyi - I put the compatibility logic into KafkaConsumer and left the Coordinator classes to simply enforce constraints. KafkaConsumer will check the api_version after KafkaClient bootstrap, and if < 0.10.1 (max_poll_interval_ms not supported in JoinGroupRequest) then it will set max_poll_interval_ms = session_timeout_ms . In addition, if the user provides max_poll_interval_ms but not session_timeout_ms, we use the same value for session_timeout_ms. If the user provides neither, we use 30000/30000 defaults instead of the 10000/300000 used when max_poll_interval_ms is supported.

@jeffwidman
Copy link
Collaborator

What are next steps here? @dpkp are there still features/tests you're planning to add when you get time or is this ready to merge to master?

@dpkp
Copy link
Owner Author

dpkp commented Nov 7, 2017

I had hoped to deploy this build to production in some isolated services and verify that it performs as expected. I haven't had a chance to do that yet. Other than that I think it's ready to land. (sans merge conflict)

@jeffwidman
Copy link
Collaborator

Sounds good. I've been hoping to do a few impromptu tests myself, particularly in a couple of services we have that use gevent, but haven't had a chance either...

@tvoinarovskyi
Copy link
Collaborator

@dpkp Hey there, this PR will only get harder to rebase later. Is there any reason besides testing to not merge it? If it's merged we can at least find some feedback from others if it works as expected.

@dpkp
Copy link
Owner Author

dpkp commented Dec 17, 2017 via email

@dpkp dpkp force-pushed the KAFKA_3888_heartbeat_thread branch from 9d2087e to 276c2a2 Compare December 20, 2017 18:28
@dpkp
Copy link
Owner Author

dpkp commented Dec 20, 2017

I rebased + fixed conflicts. I'm planning to merge after travis tests pass

@dpkp dpkp force-pushed the KAFKA_3888_heartbeat_thread branch from 276c2a2 to a709cc5 Compare December 21, 2017 19:39
@dpkp dpkp merged commit ad024d1 into master Dec 21, 2017
@jeffwidman jeffwidman deleted the KAFKA_3888_heartbeat_thread branch December 21, 2017 22:58
@jeffwidman
Copy link
Collaborator

Thanks again for all the hard work on this massive PR.

@everpcpc
Copy link
Contributor

thanks for the hard work 👍

@sanfilippopablo
Copy link

  1. Is this already out in the wild in some kafka-python version?
  2. From what I understand, this will allow me to have long processing times without kafka removing the consumer from the consumer group without having to set the session timeout to the worst case processing time. Is that right?

@tvoinarovskyi
Copy link
Collaborator

@sanfilippopablo

  1. Yes, as of 1.4.0
  2. Yes, your assumption is correct

@vkjv
Copy link

vkjv commented Feb 13, 2019

May be its quite long, but I am running into the same issue with 1.4.4 client. I am using iterator interface to receive messages, which is in a separate thread. And the processing is done in another thread. All the configuration parameters set to default except for request_timeout which is set to 60seconds.
Every 5 minutes, I get the error heartbeat expired and consumer is dead.
And messages have been lost. (I am not sure about it, but it seems so)
How do I get over it?

@jeffwidman
Copy link
Collaborator

@vkjv this is a closed issue, you are better off opening a new one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants