Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic for inferring newer broker versions #2038

Merged
merged 2 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ env:
- KAFKA_VERSION=0.11.0.3
- KAFKA_VERSION=1.1.1
- KAFKA_VERSION=2.4.0
- KAFKA_VERSION=2.5.0

addons:
apt:
Expand Down
6 changes: 4 additions & 2 deletions build_integration.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"}
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"}
: ${SCALA_VERSION:=2.11}
: ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/}
: ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git}
Expand Down Expand Up @@ -33,12 +33,14 @@ pushd servers
echo "-------------------------------------"
echo "Checking kafka binaries for ${kafka}"
echo
# kafka 0.8.0 is only available w/ scala 2.8.0
if [ "$kafka" == "0.8.0" ]; then
KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz"
else if [ "$kafka" \> "2.4.0" ]; then
KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz"
else
KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz"
fi
fi
if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then
if [ -f "${KAFKA_ARTIFACT}" ]; then
echo "Using cached artifact: ${KAFKA_ARTIFACT}"
Expand Down
12 changes: 11 additions & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.oauth.abstract import AbstractTokenProvider
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2
Copy link
Contributor Author

@gabriel-tincu gabriel-tincu Apr 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plus my last commit is in response to the broker not recognizing the new V2 request even tho the schema is unchanged per
https://github.com/apache/kafka/blob/2.5/clients/src/main/resources/common/message/DescribeAclsRequest.json
This leads me to believe that objects are serialized and deserialized differently when flexible versions are enabled , but i need some alone time with the java client and / or wireshark to figure out how that actually happens
UPDATE: According to https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields, the request header needs to be updated to account for (0) tagged fields at the minimum, otherwise the server will not be able to parse that request
Specifically , a good place to start would be a Variant base type , on whitch the compact string data type and new header format depends

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabriel-tincu Ohhhh, so they changed the protocol specification in version 2.4.0 basically. All new versions after that would require us to have support for Tagged fields on protocol level. For now, we can leave the auto-discovery up to 2.5, that's not a problem, but the change in Admin client above should be reverted if version 2 does not work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabriel-tincu As for the Compact versions of each type it should not be too hard, as we already have a varint implementation for length in protocol utils. It was used for message V2 parser.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted the admin code

from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.offset import OffsetRequest
from kafka.protocol.produce import ProduceRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.parser import KafkaProtocol
from kafka.protocol.types import Int32, Int8
from kafka.scram import ScramClient
Expand Down Expand Up @@ -1166,6 +1169,13 @@ def _infer_broker_version_from_api_versions(self, api_versions):
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
((2, 5, 0), DescribeAclsRequest_v2),
((2, 4, 0), ProduceRequest[8]),
((2, 3, 0), FetchRequest[11]),
((2, 2, 0), OffsetRequest[5]),
((2, 1, 0), FetchRequest[10]),
((2, 0, 0), FetchRequest[8]),
((1, 1, 0), FetchRequest[7]),
((1, 0, 0), MetadataRequest[5]),
((0, 11, 0), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
Expand Down
20 changes: 20 additions & 0 deletions kafka/protocol/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,13 @@ class DescribeAclsResponse_v1(Response):
('permission_type', Int8)))))
)


class DescribeAclsResponse_v2(Response):
API_KEY = 29
API_VERSION = 2
SCHEMA = DescribeAclsResponse_v1.SCHEMA


class DescribeAclsRequest_v0(Request):
API_KEY = 29
API_VERSION = 0
Expand All @@ -490,6 +497,7 @@ class DescribeAclsRequest_v0(Request):
('permission_type', Int8)
)


class DescribeAclsRequest_v1(Request):
API_KEY = 29
API_VERSION = 1
Expand All @@ -504,6 +512,17 @@ class DescribeAclsRequest_v1(Request):
('permission_type', Int8)
)


class DescribeAclsRequest_v2(Request):
"""
Enable flexible version
"""
API_KEY = 29
API_VERSION = 2
RESPONSE_TYPE = DescribeAclsResponse_v2
SCHEMA = DescribeAclsRequest_v1.SCHEMA


DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1]
DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1]

Expand Down Expand Up @@ -862,3 +881,4 @@ class CreatePartitionsRequest_v1(Request):
CreatePartitionsResponse = [
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
]

182 changes: 180 additions & 2 deletions kafka/protocol/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,72 @@ class FetchResponse_v6(Response):
SCHEMA = FetchResponse_v5.SCHEMA


class FetchResponse_v7(Response):
"""
Add error_code and session_id to response
"""
API_KEY = 1
API_VERSION = 7
SCHEMA = Schema(
('throttle_time_ms', Int32),
('error_code', Int16),
('session_id', Int32),
('topics', Array(
('topics', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('last_stable_offset', Int64),
('log_start_offset', Int64),
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('message_set', Bytes)))))
)


class FetchResponse_v8(Response):
API_KEY = 1
API_VERSION = 8
SCHEMA = FetchResponse_v7.SCHEMA


class FetchResponse_v9(Response):
API_KEY = 1
API_VERSION = 9
SCHEMA = FetchResponse_v7.SCHEMA


class FetchResponse_v10(Response):
API_KEY = 1
API_VERSION = 10
SCHEMA = FetchResponse_v7.SCHEMA


class FetchResponse_v11(Response):
API_KEY = 1
API_VERSION = 11
SCHEMA = Schema(
('throttle_time_ms', Int32),
('error_code', Int16),
('session_id', Int32),
('topics', Array(
('topics', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('highwater_offset', Int64),
('last_stable_offset', Int64),
('log_start_offset', Int64),
('aborted_transactions', Array(
('producer_id', Int64),
('first_offset', Int64))),
('preferred_read_replica', Int32),
('message_set', Bytes)))))
)


class FetchRequest_v0(Request):
API_KEY = 1
API_VERSION = 0
Expand Down Expand Up @@ -196,13 +262,125 @@ class FetchRequest_v6(Request):
SCHEMA = FetchRequest_v5.SCHEMA


class FetchRequest_v7(Request):
"""
Add incremental fetch requests
"""
API_KEY = 1
API_VERSION = 7
RESPONSE_TYPE = FetchResponse_v7
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('max_bytes', Int32),
('isolation_level', Int8),
('session_id', Int32),
('session_epoch', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('fetch_offset', Int64),
('log_start_offset', Int64),
('max_bytes', Int32))))),
('forgotten_topics_data', Array(
('topic', String),
('partitions', Array(Int32))
)),
)


class FetchRequest_v8(Request):
"""
bump used to indicate that on quota violation brokers send out responses before throttling.
"""
API_KEY = 1
API_VERSION = 8
RESPONSE_TYPE = FetchResponse_v8
SCHEMA = FetchRequest_v7.SCHEMA


class FetchRequest_v9(Request):
"""
adds the current leader epoch (see KIP-320)
"""
API_KEY = 1
API_VERSION = 9
RESPONSE_TYPE = FetchResponse_v9
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('max_bytes', Int32),
('isolation_level', Int8),
('session_id', Int32),
('session_epoch', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('current_leader_epoch', Int32),
('fetch_offset', Int64),
('log_start_offset', Int64),
('max_bytes', Int32))))),
('forgotten_topics_data', Array(
('topic', String),
('partitions', Array(Int32)),
)),
)


class FetchRequest_v10(Request):
"""
bumped up to indicate ZStandard capability. (see KIP-110)
"""
API_KEY = 1
API_VERSION = 10
RESPONSE_TYPE = FetchResponse_v10
SCHEMA = FetchRequest_v9.SCHEMA


class FetchRequest_v11(Request):
"""
added rack ID to support read from followers (KIP-392)
"""
API_KEY = 1
API_VERSION = 11
RESPONSE_TYPE = FetchResponse_v11
SCHEMA = Schema(
('replica_id', Int32),
('max_wait_time', Int32),
('min_bytes', Int32),
('max_bytes', Int32),
('isolation_level', Int8),
('session_id', Int32),
('session_epoch', Int32),
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('current_leader_epoch', Int32),
('fetch_offset', Int64),
('log_start_offset', Int64),
('max_bytes', Int32))))),
('forgotten_topics_data', Array(
('topic', String),
('partitions', Array(Int32))
)),
('rack_id', String('utf-8')),
)


FetchRequest = [
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
FetchRequest_v6
FetchRequest_v6, FetchRequest_v7, FetchRequest_v8,
FetchRequest_v9, FetchRequest_v10, FetchRequest_v11,
]
FetchResponse = [
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
FetchResponse_v6
FetchResponse_v6, FetchResponse_v7, FetchResponse_v8,
FetchResponse_v9, FetchResponse_v10, FetchResponse_v11,
]
Loading