Skip to content

Update fork #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ python:
- 2.7
- 3.4
- 3.7
- 3.8
- pypy2.7-6.0

env:
Expand All @@ -15,11 +16,13 @@ env:
- KAFKA_VERSION=0.11.0.3
- KAFKA_VERSION=1.1.1
- KAFKA_VERSION=2.4.0
- KAFKA_VERSION=2.5.0

addons:
apt:
packages:
- libsnappy-dev
- libzstd-dev
- openjdk-8-jdk

cache:
Expand Down
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ Kafka Python client
------------------------

.. image:: https://img.shields.io/badge/kafka-2.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
:target: https://kafka-python.readthedocs.io/compatibility.html
:target: https://kafka-python.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
:target: https://pypi.python.org/pypi/kafka-python
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
Expand Down
File renamed without changes.
6 changes: 4 additions & 2 deletions build_integration.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"}
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"}
: ${SCALA_VERSION:=2.11}
: ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/}
: ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git}
Expand Down Expand Up @@ -33,12 +33,14 @@ pushd servers
echo "-------------------------------------"
echo "Checking kafka binaries for ${kafka}"
echo
# kafka 0.8.0 is only available w/ scala 2.8.0
if [ "$kafka" == "0.8.0" ]; then
KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz"
else if [ "$kafka" \> "2.4.0" ]; then
KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz"
else
KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz"
fi
fi
if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then
if [ -f "${KAFKA_ARTIFACT}" ]; then
echo "Using cached artifact: ${KAFKA_ARTIFACT}"
Expand Down
2 changes: 1 addition & 1 deletion docs/compatibility.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ Although kafka-python is tested and expected to work on recent broker versions,
not all features are supported. Specifically, authentication codecs, and
transactional producer/consumer support are not fully implemented. PRs welcome!

kafka-python is tested on python 2.7, 3.4, 3.7, and pypy2.7.
kafka-python is tested on python 2.7, 3.4, 3.7, 3.8 and pypy2.7.

Builds and tests via Travis-CI. See https://travis-ci.org/dpkp/kafka-python
1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# All configuration values have a default; values that are commented out
# serve to show the default.

import sys
import os

# If extensions (or modules to document with autodoc) are in another directory,
Expand Down
9 changes: 5 additions & 4 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ multiprocessing is recommended.
Compression
***********

kafka-python supports gzip compression/decompression natively. To produce or
consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
To enable snappy, install python-snappy (also requires snappy library).
See `Installation <install.html#optional-snappy-install>`_ for more information.
kafka-python supports multiple compression types:

- gzip : supported natively
- lz4 : requires `python-lz4 <https://pypi.org/project/lz4/>`_ installed
- snappy : requires the `python-snappy <https://pypi.org/project/python-snappy/>`_ package (which requires the snappy C library)
- zstd : requires the `python-zstandard <https://github.com/indygreg/python-zstandard>`_ package installed

Protocol
********
Expand Down
46 changes: 28 additions & 18 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#!/usr/bin/env python
import threading, logging, time
import multiprocessing
import threading, time

from kafka import KafkaConsumer, KafkaProducer
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic


class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()

def stop(self):
self.stop_event.set()

Expand All @@ -23,14 +23,15 @@ def run(self):

producer.close()

class Consumer(multiprocessing.Process):

class Consumer(threading.Thread):
def __init__(self):
multiprocessing.Process.__init__(self)
self.stop_event = multiprocessing.Event()
threading.Thread.__init__(self)
self.stop_event = threading.Event()

def stop(self):
self.stop_event.set()

def run(self):
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
Expand All @@ -44,29 +45,38 @@ def run(self):
break

consumer.close()


def main():
# Create 'my-topic' Kafka topic
try:
admin = KafkaAdminClient(bootstrap_servers='localhost:9092')

topic = NewTopic(name='my-topic',
num_partitions=1,
replication_factor=1)
admin.create_topics([topic])
except Exception:
pass

tasks = [
Producer(),
Consumer()
]

# Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
for t in tasks:
t.start()

time.sleep(10)


# Stop threads
for task in tasks:
task.stop()

for task in tasks:
task.join()


if __name__ == "__main__":
logging.basicConfig(
format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s',
level=logging.INFO
)
main()
59 changes: 43 additions & 16 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from __future__ import absolute_import

from collections import defaultdict
from collections import defaultdict, namedtuple
import copy
import logging
import socket

from . import ConfigResourceType
from kafka.vendor import six

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
from kafka.client_async import KafkaClient, selectors
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, NotControllerError,
Expand All @@ -19,9 +22,8 @@
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
from kafka.protocol.types import Array
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
from kafka.version import __version__


Expand Down Expand Up @@ -204,7 +206,7 @@ def __init__(self, **configs):
self._client = KafkaClient(metrics=self._metrics,
metric_group_prefix='admin',
**self.config)
self._client.check_version()
self._client.check_version(timeout=(self.config['api_version_auto_timeout_ms'] / 1000))

# Get auto-discovered version from client if necessary
if self.config['api_version'] is None:
Expand Down Expand Up @@ -271,7 +273,7 @@ def _refresh_controller_id(self):
response = future.value
controller_id = response.controller_id
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(controller_id)
controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
if controller_version < (0, 10, 0):
raise IncompatibleBrokerVersion(
"The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0."
Expand Down Expand Up @@ -1000,22 +1002,47 @@ def _describe_consumer_groups_process_response(self, response):
"""Process a DescribeGroupsResponse into a group description."""
if response.API_VERSION <= 3:
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
if isinstance(response_field, Array):
described_groups = response.__dict__[response_name]
described_groups_field_schema = response_field.array_of
described_group = response.__dict__[response_name][0]
described_group_information_list = []
protocol_type_is_consumer = False
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
if group_information_name == 'protocol_type':
protocol_type = described_group_information
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
if isinstance(group_information_field, Array):
member_information_list = []
member_schema = group_information_field.array_of
for members in described_group_information:
member_information = []
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
if protocol_type_is_consumer:
if member_name == 'member_metadata' and member:
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
elif member_name == 'member_assignment' and member:
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
else:
member_information.append(member)
member_info_tuple = MemberInformation._make(member_information)
member_information_list.append(member_info_tuple)
described_group_information_list.append(member_information_list)
else:
described_group_information_list.append(described_group_information)
# Version 3 of the DescribeGroups API introduced the "authorized_operations" field. This will cause the namedtuple to fail
# Therefore, appending a placeholder of None in it.
if response.API_VERSION <=2:
described_group_information_list.append(None)
group_description = GroupInformation._make(described_group_information_list)
error_code = group_description.error_code
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"DescribeGroupsResponse failed with response '{}'."
.format(response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
else:
raise NotImplementedError(
"Support for DescribeGroupsResponse_v{} has not yet been added to KafkaAdminClient."
Expand Down
10 changes: 6 additions & 4 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import collections
import copy
import functools
import logging
import random
import socket
Expand Down Expand Up @@ -202,18 +201,22 @@ def __init__(self, **configs):
if key in configs:
self.config[key] = configs[key]

# these properties need to be set on top of the initialization pipeline
# because they are used when __del__ method is called
self._closed = False
self._wake_r, self._wake_w = socket.socketpair()
self._selector = self.config['selector']()

self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
self._conns = Dict() # object to support weakrefs
self._api_versions = None
self._connecting = set()
self._sending = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
self._bootstrap_fails = 0
self._wake_r, self._wake_w = socket.socketpair()
self._wake_r.setblocking(False)
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
self._wake_lock = threading.Lock()
Expand All @@ -227,7 +230,6 @@ def __init__(self, **configs):

self._selector.register(self._wake_r, selectors.EVENT_READ)
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
self._closed = False
self._sensors = None
if self.config['metrics']:
self._sensors = KafkaClientMetrics(self.config['metrics'],
Expand Down
25 changes: 25 additions & 0 deletions kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

try:
import snappy
except ImportError:
snappy = None

try:
import zstandard as zstd
except ImportError:
zstd = None

try:
import lz4.frame as lz4

Expand Down Expand Up @@ -58,6 +64,10 @@ def has_snappy():
return snappy is not None


def has_zstd():
return zstd is not None


def has_lz4():
if lz4 is not None:
return True
Expand Down Expand Up @@ -299,3 +309,18 @@ def lz4_decode_old_kafka(payload):
payload[header_size:]
])
return lz4_decode(munged_payload)


def zstd_encode(payload):
if not zstd:
raise NotImplementedError("Zstd codec is not available")
return zstd.ZstdCompressor().compress(payload)


def zstd_decode(payload):
if not zstd:
raise NotImplementedError("Zstd codec is not available")
try:
return zstd.ZstdDecompressor().decompress(payload)
except zstd.ZstdError:
return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE)
Loading