Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic FindCoordinator selection #140

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ docs/_build
integration-test/
tests-env/
.pytest_cache/
settings.json
.dccache
38 changes: 17 additions & 21 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest, DescribeLogDirsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.admin.coordinator_type import CoordinatorType
from kafka.protocol.commit import OffsetFetchRequest, FindCoordinatorRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Array
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
Expand Down Expand Up @@ -217,6 +218,7 @@ def __init__(self, **configs):

self._closed = False
self._refresh_controller_id()
self._cluster_metadata = self._get_cluster_metadata().to_object()
log.debug("KafkaAdminClient started.")

def close(self):
Expand Down Expand Up @@ -294,18 +296,14 @@ def _find_coordinator_id_send_request(self, group_id):
name as a string.
:return: A message future
"""
# TODO add support for dynamically picking version of
# GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
# When I experimented with this, the coordinator value returned in
# GroupCoordinatorResponse_v1 didn't match the value returned by
# GroupCoordinatorResponse_v0 and I couldn't figure out why.
version = 0
# version = self._matching_api_version(GroupCoordinatorRequest)
version = self._matching_api_version(FindCoordinatorRequest)
if version <= 0:
request = GroupCoordinatorRequest[version](group_id)
request = FindCoordinatorRequest[version](group_id)
elif version >= 1:
request = FindCoordinatorRequest[version](group_id, CoordinatorType.GROUP)
else:
raise NotImplementedError(
"Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
"Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return self._send_request_to_node(self._client.least_loaded_node(), request)

Expand All @@ -325,7 +323,7 @@ def _find_coordinator_id_process_response(self, response):
.format(response))
else:
raise NotImplementedError(
"Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
"Support for FindCoordinatorResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))
return response.coordinator_id

Expand Down Expand Up @@ -512,20 +510,18 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
return future.value

def list_topics(self):
metadata = self._get_cluster_metadata(topics=None)
obj = metadata.to_object()
return [t['topic'] for t in obj['topics']]
metadata = copy.copy(self._cluster_metadata)
topics = metadata.pop('topics')
return [m['topic'] for m in topics]

def describe_topics(self, topics=None):
metadata = self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj['topics']
metadata = copy.copy(self._cluster_metadata)
return metadata.pop('topics')

def describe_cluster(self):
metadata = self._get_cluster_metadata()
obj = metadata.to_object()
obj.pop('topics') # We have 'describe_topics' for this
return obj
metadata = copy.copy(self._cluster_metadata)
metadata.pop('topics') # describe_topics is for this
return metadata

@staticmethod
def _convert_describe_acls_response_to_acls(describe_response):
Expand Down
15 changes: 15 additions & 0 deletions kafka/admin/coordinator_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum


class CoordinatorType(IntEnum):
"""Type of coordinator key type"""

GROUP = 0
TRANSACTION = 1
8 changes: 4 additions & 4 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ClusterMetadata:
A class to manage kafka cluster metadata.

This class does not perform any IO. It simply updates internal state
given API responses (MetadataResponse, GroupCoordinatorResponse).
given API responses (MetadataResponse, FindCoordinatorResponse).

Keyword Arguments:
retry_backoff_ms (int): Milliseconds to backoff when retrying on
Expand Down Expand Up @@ -342,16 +342,16 @@ def add_group_coordinator(self, group, response):
"""Update with metadata for a group coordinator

Arguments:
group (str): name of group from GroupCoordinatorRequest
response (GroupCoordinatorResponse): broker response
group (str): name of group from FindCoordinatorRequest
response (FindCoordinatorResponse): broker response

Returns:
string: coordinator node_id if metadata is updated, None on error
"""
log.debug("Updating coordinator for %s: %s", group, response)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
log.error("GroupCoordinatorResponse error: %s", error_type)
log.error("FindCoordinatorResponse error: %s", error_type)
self._groups[group] = -1
return

Expand Down
4 changes: 2 additions & 2 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,13 +975,13 @@ def reset_override_configs():
# request, both will be failed with a ConnectionError that wraps
# socket.error (32, 54, or 104)
from kafka.protocol.admin import ApiVersionRequest, ListGroupsRequest
from kafka.protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from kafka.protocol.commit import OffsetFetchRequest, FindCoordinatorRequest

test_cases = [
# All cases starting from 0.10 will be based on ApiVersionResponse
((0, 10), ApiVersionRequest[0]()),
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')),
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
((0, 8, 0), MetadataRequest[0](topics)),
]
Expand Down
4 changes: 2 additions & 2 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from kafka.future import Future
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
from kafka.protocol.commit import FindCoordinatorRequest, OffsetCommitRequest
from kafka.protocol.group import (HeartbeatRequest, JoinGroupRequest,
LeaveGroupRequest, SyncGroupRequest)

Expand Down Expand Up @@ -724,7 +724,7 @@ def _send_group_coordinator_request(self):

log.debug("Sending group coordinator request for group %s to broker %s",
self.group_id, node_id)
request = GroupCoordinatorRequest[0](self.group_id)
request = FindCoordinatorRequest[0](self.group_id)
future = Future()
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_group_coordinator_response, future)
Expand Down
27 changes: 13 additions & 14 deletions kafka/protocol/commit.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from kafka.protocol.api import Request, Response
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String, CompactBytes


class OffsetCommitResponse_v0(Response):
Expand Down Expand Up @@ -207,47 +207,46 @@ class OffsetFetchRequest_v3(Request):
]


class GroupCoordinatorResponse_v0(Response):
class FindCoordinatorResponse_v0(Response):
API_KEY = 10
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('coordinator_id', Int32),
('host', String('utf-8')),
('host', CompactBytes),
('port', Int32)
)


class GroupCoordinatorResponse_v1(Response):
class FindCoordinatorResponse_v1(Response):
API_KEY = 10
API_VERSION = 1
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('error_message', String('utf-8')),
('coordinator_id', Int32),
('host', String('utf-8')),
('host', CompactBytes),
('port', Int32)
)


class GroupCoordinatorRequest_v0(Request):
class FindCoordinatorRequest_v0(Request):
API_KEY = 10
API_VERSION = 0
RESPONSE_TYPE = GroupCoordinatorResponse_v0
RESPONSE_TYPE = FindCoordinatorResponse_v0
SCHEMA = Schema(
('consumer_group', String('utf-8'))
('group_id', String('utf-8'))
)


class GroupCoordinatorRequest_v1(Request):
class FindCoordinatorRequest_v1(Request):
API_KEY = 10
API_VERSION = 1
RESPONSE_TYPE = GroupCoordinatorResponse_v1
RESPONSE_TYPE = FindCoordinatorResponse_v1
SCHEMA = Schema(
('coordinator_key', String('utf-8')),
('coordinator_type', Int8)
)


GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1]
GroupCoordinatorResponse = [GroupCoordinatorResponse_v0, GroupCoordinatorResponse_v1]
FindCoordinatorRequest = [FindCoordinatorRequest_v0, FindCoordinatorRequest_v1]
FindCoordinatorResponse = [FindCoordinatorResponse_v0, FindCoordinatorResponse_v1]
8 changes: 4 additions & 4 deletions kafka/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import logging

import kafka.errors as Errors
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.commit import FindCoordinatorResponse
from kafka.protocol.frame import KafkaBytes
from kafka.protocol.types import Int32, TaggedFields
from kafka.protocol.types import Int32
from kafka.version import __version__

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -140,9 +140,9 @@ def _process_response(self, read_buffer):
# 0.8.2 quirk
if (recv_correlation_id == 0 and
correlation_id != 0 and
request.RESPONSE_TYPE is GroupCoordinatorResponse[0] and
request.RESPONSE_TYPE is FindCoordinatorResponse[0] and
(self._api_version == (0, 8, 2) or self._api_version is None)):
log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse'
log.warning('Kafka 0.8.2 quirk -- FindCoordinatorResponse'
' Correlation ID does not match request. This'
' should go away once at least one topic has been'
' initialized on the broker.')
Expand Down
4 changes: 2 additions & 2 deletions test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest

from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorRequest
from kafka.protocol.commit import FindCoordinatorRequest
from kafka.protocol.fetch import FetchRequest, FetchResponse
from kafka.protocol.message import Message, MessageSet, PartialMessage
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -168,7 +168,7 @@ def test_encode_message_header():
b'client3', # ClientId
])

req = GroupCoordinatorRequest[0]('foo')
req = FindCoordinatorRequest[0]('foo')
header = RequestHeader(req, correlation_id=4, client_id='client3')
assert header.encode() == expect

Expand Down
Loading