Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce client poll timeout when no IFRs #1823

Merged
merged 1 commit into from
May 30, 2019
Merged

Conversation

dpkp
Copy link
Owner

@dpkp dpkp commented May 29, 2019

When there are no IFRs, client.poll can potentially block up to the full request timeout ms. For consumers, this would also prevent the heartbeat thread from acquiring the client lock and therefore prevent the heartbeat loop from proceeding. By reducing the poll timeout to the retry timeout (typically 100ms), we should help prevent lock contention and enable the heartbeat thread to loop more freely. Note that when there is an IFR, we may still block for the full request timeout, so this isn't a general solution.


This change is Reviewable

Copy link
Contributor

@jeffwidman jeffwidman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea makes sense to me

kafka/client_async.py Show resolved Hide resolved
@jeffwidman jeffwidman merged commit 9f0b518 into master May 30, 2019
@jeffwidman jeffwidman deleted the no_ifrs_short_poll branch May 30, 2019 04:46
@jeffwidman
Copy link
Contributor

jeffwidman commented Jun 20, 2019

WOW!

I have a script that fetches all consumer offsets and calculates consumer lag for all consumer groups on cluster. On one cluster that was consistently taking ~242 seconds so the metrics agent was constantly timing out the script.

Part of the problem was that my I/O code was two nested for loops:

for broker in kafka_admin_client._client.cluster.brokers():
    for consumer_group, group_type in kafka_admin_client.list_consumer_groups(broker_ids=[broker.nodeId]):
        # consumer groups from Kafka < 0.9 that store their offset
        # in Kafka don't use Kafka for group-coordination so
        # group_type is empty
        if group_type in ('consumer', ''):
            # Typically the consumer group offset fetch sequence is:
            # 1. For each broker in the cluster, send a ListGroupsRequest
            # 2. For each consumer group, send a FindGroupCoordinatorRequest
            # 3. Query the group coordinator for the consumer's offsets.
            # However, since Kafka brokers only include consumer
            # groups in their ListGroupsResponse when they are the
            # coordinator for that group, we can skip the
            # FindGroupCoordinatorRequest.
            this_group_offsets = kafka_admin_client.list_consumer_group_offsets(
                group_id=consumer_group, group_coordinator_id=broker.nodeId)

In other words, under the covers the code was making each request sequentially and waiting for the response before continuing. So I spent some time converting that to async I/O using callbacks (see #1845).

I immediately saw the fetch time drop from 242 seconds to 2 seconds, and considered myself a minor genius.

However, I was curious so also ran the old code and saw that it took ~3 seconds when run using the code from master. I eventually tracked the change down to this PR...

When I use the callbacks approach without the change in this PR, I see collection time taking either 31 or 61 seconds, just depends. So it did make a difference. But as soon as I added this change, callbacks only saves ~0.5-1 seconds.

@dpkp You mentioned:

Note that when there is an IFR, we may still block for the full request timeout, so this isn't a general solution.

Given the dramatic improvement I saw from this PR, I'm wondering whether we might see similar results from this possible "more general solution"??

PS: @Lou-Cipher you may want to also update your code to incorporate this change--I suspect it will provide significant speedups that you were working toward in #1807.

@dpkp
Copy link
Owner Author

dpkp commented Sep 30, 2019

I think the more general solution here is to make the list of connections tracked by the selector more dynamic so that we only "select" against connections that have pending sends or recvs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants