Skip to content

Commit

Permalink
Add Request/Response structs for kafka broker 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp authored and jeffwidman committed Feb 6, 2018
1 parent 441aeb8 commit c0df771
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 36 deletions.
1 change: 1 addition & 0 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ def _infer_broker_version_from_api_versions(self, api_versions):
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
Expand Down
41 changes: 39 additions & 2 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class SaslHandShakeResponse_v0(Response):
)


class SaslHandShakeResponse_v1(Response):
API_KEY = 17
API_VERSION = 1
SCHEMA = SaslHandShakeResponse_v0.SCHEMA


class SaslHandShakeRequest_v0(Request):
API_KEY = 17
API_VERSION = 0
Expand All @@ -294,5 +300,36 @@ class SaslHandShakeRequest_v0(Request):
('mechanism', String('utf-8'))
)

SaslHandShakeRequest = [SaslHandShakeRequest_v0]
SaslHandShakeResponse = [SaslHandShakeResponse_v0]

class SaslHandShakeRequest_v1(Request):
API_KEY = 17
API_VERSION = 1
RESPONSE_TYPE = SaslHandShakeResponse_v1
SCHEMA = SaslHandShakeRequest_v0.SCHEMA


SaslHandShakeRequest = [SaslHandShakeRequest_v0, SaslHandShakeRequest_v1]
SaslHandShakeResponse = [SaslHandShakeResponse_v0, SaslHandShakeResponse_v1]


class SaslAuthenticateResponse_v0(Request):
API_KEY = 36
API_VERSION = 0
SCHEMA = Schema(
('error_code', Int16),
('error_message', String('utf-8')),
('sasl_auth_bytes', Bytes)
)


class SaslAuthenticateRequest_v0(Request):
API_KEY = 36
API_VERSION = 0
RESPONSE_TYPE = SaslAuthenticateResponse_v0
SCHEMA = Schema(
('sasl_auth_bytes', Bytes)
)


SaslAuthenticateRequest = [SaslAuthenticateRequest_v0]
SaslAuthenticateResponse = [SaslAuthenticateResponse_v0]
28 changes: 26 additions & 2 deletions kafka/protocol/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ class FetchResponse_v5(Response):
)


class FetchResponse_v6(Response):
"""
Same as FetchResponse_v5. The version number is bumped up to indicate that the client supports KafkaStorageException.
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
"""
API_KEY = 1
API_VERSION = 6
SCHEMA = FetchResponse_v5.SCHEMA


class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
Expand Down Expand Up @@ -174,11 +184,25 @@ class FetchRequest_v5(Request):
)


class FetchRequest_v6(Request):
"""
The body of FETCH_REQUEST_V6 is the same as FETCH_REQUEST_V5.
The version number is bumped up to indicate that the client supports KafkaStorageException.
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 5
"""
API_KEY = 1
API_VERSION = 6
RESPONSE_TYPE = FetchResponse_v6
SCHEMA = FetchRequest_v5.SCHEMA


FetchRequest = [
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
FetchRequest_v6
]
FetchResponse = [
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
FetchResponse_v6
]
2 changes: 1 addition & 1 deletion kafka/protocol/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class JoinGroupRequest_v2(Request):
JoinGroupRequest_v0, JoinGroupRequest_v1, JoinGroupRequest_v2
]
JoinGroupResponse = [
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v1
JoinGroupResponse_v0, JoinGroupResponse_v1, JoinGroupResponse_v2
]


Expand Down
43 changes: 41 additions & 2 deletions kafka/protocol/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,32 @@ class MetadataResponse_v4(Response):
SCHEMA = MetadataResponse_v3.SCHEMA


class MetadataResponse_v5(Response):
API_KEY = 3
API_VERSION = 5
SCHEMA = Schema(
('throttle_time_ms', Int32),
('brokers', Array(
('node_id', Int32),
('host', String('utf-8')),
('port', Int32),
('rack', String('utf-8')))),
('cluster_id', String('utf-8')),
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32)),
('offline_replicas', Array(Int32))))))
)


class MetadataRequest_v0(Request):
API_KEY = 3
API_VERSION = 0
Expand Down Expand Up @@ -151,11 +177,24 @@ class MetadataRequest_v4(Request):
NO_TOPICS = None # Empty array (len 0) for topics returns no topics


class MetadataRequest_v5(Request):
"""
The v5 metadata request is the same as v4.
An additional field for offline_replicas has been added to the v5 metadata response
"""
API_KEY = 3
API_VERSION = 5
RESPONSE_TYPE = MetadataResponse_v5
SCHEMA = MetadataRequest_v4.SCHEMA
ALL_TOPICS = -1 # Null Array (len -1) for topics returns all topics
NO_TOPICS = None # Empty array (len 0) for topics returns no topics


MetadataRequest = [
MetadataRequest_v0, MetadataRequest_v1, MetadataRequest_v2,
MetadataRequest_v3, MetadataRequest_v4
MetadataRequest_v3, MetadataRequest_v4, MetadataRequest_v5
]
MetadataResponse = [
MetadataResponse_v0, MetadataResponse_v1, MetadataResponse_v2,
MetadataResponse_v3, MetadataResponse_v4
MetadataResponse_v3, MetadataResponse_v4, MetadataResponse_v5
]
88 changes: 59 additions & 29 deletions kafka/protocol/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,52 +52,67 @@ class ProduceResponse_v3(Response):
SCHEMA = ProduceResponse_v2.SCHEMA


class ProduceRequest_v0(Request):
class ProduceResponse_v4(Response):
"""
The version number is bumped up to indicate that the client supports KafkaStorageException.
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
"""
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = ProduceResponse_v0
API_VERSION = 4
SCHEMA = ProduceResponse_v3.SCHEMA


class ProduceResponse_v5(Response):
API_KEY = 0
API_VERSION = 5
SCHEMA = Schema(
('required_acks', Int16),
('timeout', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('messages', Bytes)))))
('error_code', Int16),
('offset', Int64),
('timestamp', Int64),
('log_start_offset', Int64))))),
('throttle_time_ms', Int32)
)


class ProduceRequest(Request):
API_KEY = 0

def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True


class ProduceRequest_v1(Request):
API_KEY = 0
class ProduceRequest_v0(ProduceRequest):
API_VERSION = 0
RESPONSE_TYPE = ProduceResponse_v0
SCHEMA = Schema(
('required_acks', Int16),
('timeout', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('messages', Bytes)))))
)


class ProduceRequest_v1(ProduceRequest):
API_VERSION = 1
RESPONSE_TYPE = ProduceResponse_v1
SCHEMA = ProduceRequest_v0.SCHEMA

def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True


class ProduceRequest_v2(Request):
API_KEY = 0
class ProduceRequest_v2(ProduceRequest):
API_VERSION = 2
RESPONSE_TYPE = ProduceResponse_v2
SCHEMA = ProduceRequest_v1.SCHEMA

def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True


class ProduceRequest_v3(Request):
API_KEY = 0
class ProduceRequest_v3(ProduceRequest):
API_VERSION = 3
RESPONSE_TYPE = ProduceResponse_v3
SCHEMA = Schema(
Expand All @@ -111,17 +126,32 @@ class ProduceRequest_v3(Request):
('messages', Bytes)))))
)

def expect_response(self):
if self.required_acks == 0: # pylint: disable=no-member
return False
return True

class ProduceRequest_v4(ProduceRequest):
"""
The version number is bumped up to indicate that the client supports KafkaStorageException.
The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
"""
API_VERSION = 4
RESPONSE_TYPE = ProduceResponse_v4
SCHEMA = ProduceRequest_v3.SCHEMA


class ProduceRequest_v5(ProduceRequest):
"""
Same as v4. The version number is bumped since the v5 response includes an additional
partition level field: the log_start_offset.
"""
API_VERSION = 5
RESPONSE_TYPE = ProduceResponse_v5
SCHEMA = ProduceRequest_v4.SCHEMA


ProduceRequest = [
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
ProduceRequest_v3
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
]
ProduceResponse = [
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
ProduceResponse_v2
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
]

0 comments on commit c0df771

Please sign in to comment.