Skip to content

Commit 6fc0081

Browse files
author
Tincu Gabriel
authored
Add logic for inferring newer broker versions (#2038)
* Add logic for inferring newer broker versions - New Fetch / ListOffsets request / response objects - Add new test cases to inferr code based on mentioned objects - Add unit test to check inferred version against whatever resides in KAFKA_VERSION - Add new kafka broker versions to integration list - Add more kafka broker versions to travis task list - Add support for broker version 2.5 id * Implement PR change requests: fewer versions for travis testing, remove unused older versions for inference code, remove one minor version from known server list Do not use newly created ACL request / responses in allowed version lists, due to flexible versions enabling in kafka actually requiring a serialization protocol header update Revert admin client file change
1 parent f9e0264 commit 6fc0081

11 files changed

+315
-10
lines changed

.travis.yml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ env:
1515
- KAFKA_VERSION=0.11.0.3
1616
- KAFKA_VERSION=1.1.1
1717
- KAFKA_VERSION=2.4.0
18+
- KAFKA_VERSION=2.5.0
1819

1920
addons:
2021
apt:

build_integration.sh

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/bash
22

3-
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"}
3+
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"}
44
: ${SCALA_VERSION:=2.11}
55
: ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/}
66
: ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git}
@@ -33,12 +33,14 @@ pushd servers
3333
echo "-------------------------------------"
3434
echo "Checking kafka binaries for ${kafka}"
3535
echo
36-
# kafka 0.8.0 is only available w/ scala 2.8.0
3736
if [ "$kafka" == "0.8.0" ]; then
3837
KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz"
38+
else if [ "$kafka" \> "2.4.0" ]; then
39+
KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz"
3940
else
4041
KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz"
4142
fi
43+
fi
4244
if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then
4345
if [ -f "${KAFKA_ARTIFACT}" ]; then
4446
echo "Using cached artifact: ${KAFKA_ARTIFACT}"

kafka/conn.py

+11-1
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@
2424
from kafka.future import Future
2525
from kafka.metrics.stats import Avg, Count, Max, Rate
2626
from kafka.oauth.abstract import AbstractTokenProvider
27-
from kafka.protocol.admin import SaslHandShakeRequest
27+
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2
2828
from kafka.protocol.commit import OffsetFetchRequest
29+
from kafka.protocol.offset import OffsetRequest
30+
from kafka.protocol.produce import ProduceRequest
2931
from kafka.protocol.metadata import MetadataRequest
32+
from kafka.protocol.fetch import FetchRequest
3033
from kafka.protocol.parser import KafkaProtocol
3134
from kafka.protocol.types import Int32, Int8
3235
from kafka.scram import ScramClient
@@ -1166,6 +1169,13 @@ def _infer_broker_version_from_api_versions(self, api_versions):
11661169
# in reverse order. As soon as we find one that works, return it
11671170
test_cases = [
11681171
# format (<broker version>, <needed struct>)
1172+
((2, 5, 0), DescribeAclsRequest_v2),
1173+
((2, 4, 0), ProduceRequest[8]),
1174+
((2, 3, 0), FetchRequest[11]),
1175+
((2, 2, 0), OffsetRequest[5]),
1176+
((2, 1, 0), FetchRequest[10]),
1177+
((2, 0, 0), FetchRequest[8]),
1178+
((1, 1, 0), FetchRequest[7]),
11691179
((1, 0, 0), MetadataRequest[5]),
11701180
((0, 11, 0), MetadataRequest[4]),
11711181
((0, 10, 2), OffsetFetchRequest[2]),

kafka/protocol/admin.py

+20
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,13 @@ class DescribeAclsResponse_v1(Response):
477477
('permission_type', Int8)))))
478478
)
479479

480+
481+
class DescribeAclsResponse_v2(Response):
482+
API_KEY = 29
483+
API_VERSION = 2
484+
SCHEMA = DescribeAclsResponse_v1.SCHEMA
485+
486+
480487
class DescribeAclsRequest_v0(Request):
481488
API_KEY = 29
482489
API_VERSION = 0
@@ -490,6 +497,7 @@ class DescribeAclsRequest_v0(Request):
490497
('permission_type', Int8)
491498
)
492499

500+
493501
class DescribeAclsRequest_v1(Request):
494502
API_KEY = 29
495503
API_VERSION = 1
@@ -504,6 +512,17 @@ class DescribeAclsRequest_v1(Request):
504512
('permission_type', Int8)
505513
)
506514

515+
516+
class DescribeAclsRequest_v2(Request):
517+
"""
518+
Enable flexible version
519+
"""
520+
API_KEY = 29
521+
API_VERSION = 2
522+
RESPONSE_TYPE = DescribeAclsResponse_v2
523+
SCHEMA = DescribeAclsRequest_v1.SCHEMA
524+
525+
507526
DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1]
508527
DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1]
509528

@@ -862,3 +881,4 @@ class CreatePartitionsRequest_v1(Request):
862881
CreatePartitionsResponse = [
863882
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
864883
]
884+

kafka/protocol/fetch.py

+180-2
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,72 @@ class FetchResponse_v6(Response):
9494
SCHEMA = FetchResponse_v5.SCHEMA
9595

9696

97+
class FetchResponse_v7(Response):
98+
"""
99+
Add error_code and session_id to response
100+
"""
101+
API_KEY = 1
102+
API_VERSION = 7
103+
SCHEMA = Schema(
104+
('throttle_time_ms', Int32),
105+
('error_code', Int16),
106+
('session_id', Int32),
107+
('topics', Array(
108+
('topics', String('utf-8')),
109+
('partitions', Array(
110+
('partition', Int32),
111+
('error_code', Int16),
112+
('highwater_offset', Int64),
113+
('last_stable_offset', Int64),
114+
('log_start_offset', Int64),
115+
('aborted_transactions', Array(
116+
('producer_id', Int64),
117+
('first_offset', Int64))),
118+
('message_set', Bytes)))))
119+
)
120+
121+
122+
class FetchResponse_v8(Response):
123+
API_KEY = 1
124+
API_VERSION = 8
125+
SCHEMA = FetchResponse_v7.SCHEMA
126+
127+
128+
class FetchResponse_v9(Response):
129+
API_KEY = 1
130+
API_VERSION = 9
131+
SCHEMA = FetchResponse_v7.SCHEMA
132+
133+
134+
class FetchResponse_v10(Response):
135+
API_KEY = 1
136+
API_VERSION = 10
137+
SCHEMA = FetchResponse_v7.SCHEMA
138+
139+
140+
class FetchResponse_v11(Response):
141+
API_KEY = 1
142+
API_VERSION = 11
143+
SCHEMA = Schema(
144+
('throttle_time_ms', Int32),
145+
('error_code', Int16),
146+
('session_id', Int32),
147+
('topics', Array(
148+
('topics', String('utf-8')),
149+
('partitions', Array(
150+
('partition', Int32),
151+
('error_code', Int16),
152+
('highwater_offset', Int64),
153+
('last_stable_offset', Int64),
154+
('log_start_offset', Int64),
155+
('aborted_transactions', Array(
156+
('producer_id', Int64),
157+
('first_offset', Int64))),
158+
('preferred_read_replica', Int32),
159+
('message_set', Bytes)))))
160+
)
161+
162+
97163
class FetchRequest_v0(Request):
98164
API_KEY = 1
99165
API_VERSION = 0
@@ -196,13 +262,125 @@ class FetchRequest_v6(Request):
196262
SCHEMA = FetchRequest_v5.SCHEMA
197263

198264

265+
class FetchRequest_v7(Request):
266+
"""
267+
Add incremental fetch requests
268+
"""
269+
API_KEY = 1
270+
API_VERSION = 7
271+
RESPONSE_TYPE = FetchResponse_v7
272+
SCHEMA = Schema(
273+
('replica_id', Int32),
274+
('max_wait_time', Int32),
275+
('min_bytes', Int32),
276+
('max_bytes', Int32),
277+
('isolation_level', Int8),
278+
('session_id', Int32),
279+
('session_epoch', Int32),
280+
('topics', Array(
281+
('topic', String('utf-8')),
282+
('partitions', Array(
283+
('partition', Int32),
284+
('fetch_offset', Int64),
285+
('log_start_offset', Int64),
286+
('max_bytes', Int32))))),
287+
('forgotten_topics_data', Array(
288+
('topic', String),
289+
('partitions', Array(Int32))
290+
)),
291+
)
292+
293+
294+
class FetchRequest_v8(Request):
295+
"""
296+
bump used to indicate that on quota violation brokers send out responses before throttling.
297+
"""
298+
API_KEY = 1
299+
API_VERSION = 8
300+
RESPONSE_TYPE = FetchResponse_v8
301+
SCHEMA = FetchRequest_v7.SCHEMA
302+
303+
304+
class FetchRequest_v9(Request):
305+
"""
306+
adds the current leader epoch (see KIP-320)
307+
"""
308+
API_KEY = 1
309+
API_VERSION = 9
310+
RESPONSE_TYPE = FetchResponse_v9
311+
SCHEMA = Schema(
312+
('replica_id', Int32),
313+
('max_wait_time', Int32),
314+
('min_bytes', Int32),
315+
('max_bytes', Int32),
316+
('isolation_level', Int8),
317+
('session_id', Int32),
318+
('session_epoch', Int32),
319+
('topics', Array(
320+
('topic', String('utf-8')),
321+
('partitions', Array(
322+
('partition', Int32),
323+
('current_leader_epoch', Int32),
324+
('fetch_offset', Int64),
325+
('log_start_offset', Int64),
326+
('max_bytes', Int32))))),
327+
('forgotten_topics_data', Array(
328+
('topic', String),
329+
('partitions', Array(Int32)),
330+
)),
331+
)
332+
333+
334+
class FetchRequest_v10(Request):
335+
"""
336+
bumped up to indicate ZStandard capability. (see KIP-110)
337+
"""
338+
API_KEY = 1
339+
API_VERSION = 10
340+
RESPONSE_TYPE = FetchResponse_v10
341+
SCHEMA = FetchRequest_v9.SCHEMA
342+
343+
344+
class FetchRequest_v11(Request):
345+
"""
346+
added rack ID to support read from followers (KIP-392)
347+
"""
348+
API_KEY = 1
349+
API_VERSION = 11
350+
RESPONSE_TYPE = FetchResponse_v11
351+
SCHEMA = Schema(
352+
('replica_id', Int32),
353+
('max_wait_time', Int32),
354+
('min_bytes', Int32),
355+
('max_bytes', Int32),
356+
('isolation_level', Int8),
357+
('session_id', Int32),
358+
('session_epoch', Int32),
359+
('topics', Array(
360+
('topic', String('utf-8')),
361+
('partitions', Array(
362+
('partition', Int32),
363+
('current_leader_epoch', Int32),
364+
('fetch_offset', Int64),
365+
('log_start_offset', Int64),
366+
('max_bytes', Int32))))),
367+
('forgotten_topics_data', Array(
368+
('topic', String),
369+
('partitions', Array(Int32))
370+
)),
371+
('rack_id', String('utf-8')),
372+
)
373+
374+
199375
FetchRequest = [
200376
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
201377
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
202-
FetchRequest_v6
378+
FetchRequest_v6, FetchRequest_v7, FetchRequest_v8,
379+
FetchRequest_v9, FetchRequest_v10, FetchRequest_v11,
203380
]
204381
FetchResponse = [
205382
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
206383
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
207-
FetchResponse_v6
384+
FetchResponse_v6, FetchResponse_v7, FetchResponse_v8,
385+
FetchResponse_v9, FetchResponse_v10, FetchResponse_v11,
208386
]

0 commit comments

Comments
 (0)