Skip to content

Commit

Permalink
Add private map of api key -> min/max versions to BrokerConnection (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored and 88manpreet committed Jul 16, 2018
1 parent 203b64b commit 0461419
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 11 deletions.
32 changes: 21 additions & 11 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int32
from kafka.version import __version__
Expand Down Expand Up @@ -192,6 +192,7 @@ def __init__(self, host, port, afi, **configs):
self.port = port
self.afi = afi
self.in_flight_requests = collections.deque()
self._api_versions = None

self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
Expand Down Expand Up @@ -882,23 +883,31 @@ def _next_correlation_id(self):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id

def _check_api_version_response(self, response):
def _handle_api_version_response(self, response):
error_type = Errors.for_code(response.error_code)
assert error_type is Errors.NoError, "API version check failed"
self._api_versions = dict([
(api_key, (min_version, max_version))
for api_key, min_version, max_version in response.api_versions
])
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
# in descending order. As soon as we find one that works, return it
test_cases = [
# format (<broker verion>, <needed struct>)
((0, 10, 1), MetadataRequest[2])
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
]

error_type = Errors.for_code(response.error_code)
assert error_type is Errors.NoError, "API version check failed"
max_versions = dict([
(api_key, max_version)
for api_key, _, max_version in response.api_versions
])
# Get the best match of test cases
for broker_version, struct in sorted(test_cases, reverse=True):
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
if struct.API_KEY not in api_versions:
continue
min_version, max_version = api_versions[struct.API_KEY]
if min_version <= struct.API_VERSION <= max_version:
return broker_version

# We know that ApiVersionResponse is only supported in 0.10+
Expand Down Expand Up @@ -986,7 +995,8 @@ def connect():
if isinstance(request, ApiVersionRequest[0]):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
version = self._check_api_version_response(f.value)
api_versions = self._handle_api_version_response(f.value)
version = self._infer_broker_version_from_api_versions(api_versions)
log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
Expand Down
37 changes: 37 additions & 0 deletions kafka/protocol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,40 @@
CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
ATTRIBUTE_CODEC_MASK, KafkaProtocol,
)

API_KEYS = {
0: 'Produce',
1: 'Fetch',
2: 'ListOffsets',
3: 'Metadata',
4: 'LeaderAndIsr',
5: 'StopReplica',
6: 'UpdateMetadata',
7: 'ControlledShutdown',
8: 'OffsetCommit',
9: 'OffsetFetch',
10: 'FindCoordinator',
11: 'JoinGroup',
12: 'Heartbeat',
13: 'LeaveGroup',
14: 'SyncGroup',
15: 'DescribeGroups',
16: 'ListGroups',
17: 'SaslHandshake',
18: 'ApiVersions',
19: 'CreateTopics',
20: 'DeleteTopics',
21: 'DeleteRecords',
22: 'InitProducerId',
23: 'OffsetForLeaderEpoch',
24: 'AddPartitionsToTxn',
25: 'AddOffsetsToTxn',
26: 'EndTxn',
27: 'WriteTxnMarkers',
28: 'TxnOffsetCommit',
29: 'DescribeAcls',
30: 'CreateAcls',
31: 'DeleteAcls',
32: 'DescribeConfigs',
33: 'AlterConfigs',
}

0 comments on commit 0461419

Please sign in to comment.