Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support AWS_MSK_IAM authentication #2255

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cb18e67
Support AWS_MSK_IAM authentication
mattoberle Aug 18, 2021
ed99ae5
Use six to import urllib (python2 compatability)
mattoberle Aug 18, 2021
7ff3237
Tests AWS_MSK_IAM signature and payload generation
mattoberle Aug 18, 2021
b95e46d
Rename project from kafka-python to kafka-python-ng (#1)
wbarnha Mar 7, 2024
78c74c0
Fix artifact downloads for release
wbarnha Mar 7, 2024
e796019
Fix badge links in README.rst
wbarnha Mar 8, 2024
38e159a
Reconfigure tests to complete in a more timely manner and skip some i…
wbarnha Mar 8, 2024
e762321
Test Kafka 0.8.2.2 using Python 3.10 in the meantime (#161)
wbarnha Mar 9, 2024
00750aa
Remove support for EOL'ed versions of Python (#160)
wbarnha Mar 9, 2024
5bd1323
Stop testing Python 3.13 in python-package.yml (#162)
wbarnha Mar 9, 2024
cda8f81
Avoid 100% CPU usage while socket is closed (#156)
wbarnha Mar 9, 2024
c02df08
Fix DescribeConfigsResponse_v1 config_source (#150)
wbarnha Mar 9, 2024
9793097
Merge branch 'master' into feature/2232-AWS_MSK_IAM
wbarnha Mar 9, 2024
65eacfb
Fix base class of DescribeClientQuotasResponse_v0 (#144)
wbarnha Mar 10, 2024
e0ebe5d
Update license_file to license_files (#131)
wbarnha Mar 10, 2024
26bb3eb
Update some RST documentation syntax (#130)
wbarnha Mar 10, 2024
88763da
Fix crc32c's __main__ for Python 3 (#142)
wbarnha Mar 10, 2024
b1a4c53
Strip trailing dot off hostname. (#133)
wbarnha Mar 10, 2024
18eaa2d
Handle OSError to properly recycle SSL connection, fix infinite loop …
wbarnha Mar 10, 2024
54cbd63
client_async: Allow throwing an exception upon socket error during (#…
wbarnha Mar 10, 2024
eb6fd9b
Log connection errors at ERROR level (#139)
wbarnha Mar 12, 2024
0a9f7a6
Merge branch 'master' into feature/2232-AWS_MSK_IAM
wbarnha Mar 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
- name: Build artifacts
run: python -m build
- name: Upload built artifacts for testing
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: ${{ env.sdist-artifact }}
# NOTE: Exact expected file names are specified here
Expand All @@ -66,12 +66,8 @@ jobs:
- "3.10"
- "3.11"
- "3.12"
- "pypy3.9"
experimental: [ false ]
include:
- python-version: "pypy3.9"
experimental: true
- python-version: "~3.13.0-0"
experimental: true
steps:
- name: Checkout the source code
uses: actions/checkout@v4
Expand Down Expand Up @@ -111,15 +107,15 @@ jobs:
KAFKA_VERSION: ${{ env.KAFKA_LATEST }}

test-kafka:
name: Tests for Kafka ${{ matrix.kafka-version }}
name: Tests for Kafka ${{ matrix.kafka-version }} (Python ${{ matrix.python-version }})
needs:
- build-sdist
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
fail-fast: false
matrix:
kafka-version:
- "0.8.2.2"
- "0.9.0.1"
- "0.10.2.2"
- "0.11.0.2"
Expand All @@ -128,6 +124,18 @@ jobs:
- "2.4.0"
- "2.5.0"
- "2.6.0"
python-version: ['3.12']
experimental: [false]
include:
- kafka-version: '0.8.2.2'
experimental: true
python-version: "3.12"
- kafka-version: '0.8.2.2'
experimental: false
python-version: "3.10"
env:
PYTHON_LATEST: ${{ matrix.python-version }}
continue-on-error: ${{ matrix.experimental }}
steps:
- name: Checkout the source code
uses: actions/checkout@v4
Expand All @@ -141,7 +149,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_LATEST }}
python-version: ${{ matrix.python-version }}
cache: pip
cache-dependency-path: |
requirements-dev.txt
Expand Down Expand Up @@ -187,11 +195,11 @@ jobs:
environment: pypi
if: github.event_name == 'release' && github.event.action == 'created'
steps:
- name: Download the sdist artifact
uses: actions/download-artifact@v3
- name: Download the artifacts
uses: actions/download-artifact@v4
with:
name: artifact
path: dist
name: ${{ env.sdist-artifact }}
path: dist/${{ env.sdist-name }}
- name: Publish package to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
Expand Down
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ Some of the major changes include:
* SASL authentication is working (we think)
* Removed several circular references to improve gc on close()

Thanks to all contributors -- the state of the kafka-python community is strong!
Thanks to all contributors -- the state of the kafka-python-ng community is strong!

Detailed changelog are listed below:

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ cov-local: build-integration
@echo "open file://`pwd`/htmlcov/index.html"

# Check the readme for syntax errors, which can lead to invalid formatting on
# PyPi homepage (https://pypi.python.org/pypi/kafka-python)
# PyPi homepage (https://pypi.python.org/pypi/kafka-python-ng)
check-readme:
python setup.py check -rms

Expand Down
195 changes: 119 additions & 76 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ Kafka Python client
------------------------

.. image:: https://img.shields.io/badge/kafka-2.6%2C%202.5%2C%202.4%2C%202.3%2C%202.2%2C%202.1%2C%202.0%2C%201.1%2C%201.0%2C%200.11%2C%200.10%2C%200.9%2C%200.8-brightgreen.svg
:target: https://kafka-python.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python.svg
:target: https://pypi.python.org/pypi/kafka-python
.. image:: https://coveralls.io/repos/dpkp/kafka-python/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/dpkp/kafka-python?branch=master
:target: https://kafka-python-ng.readthedocs.io/en/master/compatibility.html
.. image:: https://img.shields.io/pypi/pyversions/kafka-python-ng.svg
:target: https://pypi.python.org/pypi/kafka-python-ng
.. image:: https://coveralls.io/repos/wbarnha/kafka-python-ng/badge.svg?branch=master&service=github
:target: https://coveralls.io/github/wbarnha/kafka-python-ng?branch=master
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
:target: https://github.com/dpkp/kafka-python/blob/master/LICENSE
.. image:: https://img.shields.io/pypi/dw/kafka-python.svg
:target: https://pypistats.org/packages/kafka-python
:target: https://github.com/wbarnha/kafka-python-ng/blob/master/LICENSE
.. image:: https://img.shields.io/pypi/dw/kafka-python-ng.svg
:target: https://pypistats.org/packages/kafka-python-ng
.. image:: https://img.shields.io/pypi/v/kafka-python.svg
:target: https://pypi.org/project/kafka-python
.. image:: https://img.shields.io/pypi/implementation/kafka-python
:target: https://github.com/dpkp/kafka-python/blob/master/setup.py
:target: https://pypi.org/project/kafka-python-ng
.. image:: https://img.shields.io/pypi/implementation/kafka-python-ng
:target: https://github.com/wbarnha/kafka-python-ng/blob/master/setup.py



Python client for the Apache Kafka distributed stream processing system.
kafka-python is designed to function much like the official java client, with a
kafka-python-ng is designed to function much like the official java client, with a
sprinkling of pythonic interfaces (e.g., consumer iterators).

kafka-python is best used with newer brokers (0.9+), but is backwards-compatible with
kafka-python-ng is best used with newer brokers (0.9+), but is backwards-compatible with
older versions (to 0.8.0). Some features will only be enabled on newer brokers.
For example, fully coordinated consumer groups -- i.e., dynamic partition
assignment to multiple consumers in the same group -- requires use of 0.9+ kafka
Expand All @@ -32,13 +32,19 @@ check code (perhaps using zookeeper or consul). For older brokers, you can
achieve something similar by manually assigning different partitions to each
consumer instance with config management tools like chef, ansible, etc. This
approach will work fine, though it does not support rebalancing on failures.
See <https://kafka-python.readthedocs.io/en/master/compatibility.html>

See https://kafka-python.readthedocs.io/en/master/compatibility.html

for more details.

Please note that the master branch may contain unreleased features. For release
documentation, please see readthedocs and/or python's inline help.

>>> pip install kafka-python

.. code-block:: bash

$ pip install kafka-python-ng



KafkaConsumer
Expand All @@ -48,89 +54,123 @@ KafkaConsumer is a high-level message consumer, intended to operate as similarly
as possible to the official java client. Full support for coordinated
consumer groups requires use of kafka brokers that support the Group APIs: kafka v0.9+.

See <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html>

See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

for API and configuration details.

The consumer iterator returns ConsumerRecords, which are simple namedtuples
that expose basic message attributes: topic, partition, offset, key, and value:

>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic')
>>> for msg in consumer:
... print (msg)
.. code-block:: python

>>> # join a consumer group for dynamic partition assignment and offset commits
>>> from kafka import KafkaConsumer
>>> consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
>>> for msg in consumer:
... print (msg)
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)

>>> # manually assign the partition list for the consumer
>>> from kafka import TopicPartition
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
>>> consumer.assign([TopicPartition('foobar', 2)])
>>> msg = next(consumer)
.. code-block:: python

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
for msg in consumer:
print (msg)

>>> # Access record headers. The returned value is a list of tuples
>>> # with str, bytes for key and value
>>> for msg in consumer:
... print (msg.headers)
.. code-block:: python

>>> # Get consumer metrics
>>> metrics = consumer.metrics()
# manually assign the partition list for the consumer
from kafka import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)

.. code-block:: python

# Deserialize msgpack-encoded values
consumer = KafkaConsumer(value_deserializer=msgpack.loads)
consumer.subscribe(['msgpackfoo'])
for msg in consumer:
assert isinstance(msg.value, dict)

.. code-block:: python

# Access record headers. The returned value is a list of tuples
# with str, bytes for key and value
for msg in consumer:
print (msg.headers)

.. code-block:: python

# Get consumer metrics
metrics = consumer.metrics()


KafkaProducer
*************

KafkaProducer is a high-level, asynchronous message producer. The class is
intended to operate as similarly as possible to the official java client.
See <https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html>

See https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html

for more details.

>>> from kafka import KafkaProducer
>>> producer = KafkaProducer(bootstrap_servers='localhost:1234')
>>> for _ in range(100):
... producer.send('foobar', b'some_message_bytes')
.. code-block:: python

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:1234')
for _ in range(100):
producer.send('foobar', b'some_message_bytes')

.. code-block:: python

# Block until a single message is sent (or timeout)
future = producer.send('foobar', b'another_message')
result = future.get(timeout=60)

.. code-block:: python

# Block until all pending messages are at least put on the network
# NOTE: This does not guarantee delivery or success! It is really
# only useful if you configure internal batching using linger_ms
producer.flush()

.. code-block:: python

>>> # Block until a single message is sent (or timeout)
>>> future = producer.send('foobar', b'another_message')
>>> result = future.get(timeout=60)
# Use a key for hashed-partitioning
producer.send('foobar', key=b'foo', value=b'bar')

>>> # Block until all pending messages are at least put on the network
>>> # NOTE: This does not guarantee delivery or success! It is really
>>> # only useful if you configure internal batching using linger_ms
>>> producer.flush()
.. code-block:: python

>>> # Use a key for hashed-partitioning
>>> producer.send('foobar', key=b'foo', value=b'bar')
# Serialize json messages
import json
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('fizzbuzz', {'foo': 'bar'})

>>> # Serialize json messages
>>> import json
>>> producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
>>> producer.send('fizzbuzz', {'foo': 'bar'})
.. code-block:: python

>>> # Serialize string keys
>>> producer = KafkaProducer(key_serializer=str.encode)
>>> producer.send('flipflap', key='ping', value=b'1234')
# Serialize string keys
producer = KafkaProducer(key_serializer=str.encode)
producer.send('flipflap', key='ping', value=b'1234')

>>> # Compress messages
>>> producer = KafkaProducer(compression_type='gzip')
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
.. code-block:: python

>>> # Include record headers. The format is list of tuples with string key
>>> # and bytes value.
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
# Compress messages
producer = KafkaProducer(compression_type='gzip')
for i in range(1000):
producer.send('foobar', b'msg %d' % i)

>>> # Get producer performance metrics
>>> metrics = producer.metrics()
.. code-block:: python

# Include record headers. The format is list of tuples with string key
# and bytes value.
producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])

.. code-block:: python

# Get producer performance metrics
metrics = producer.metrics()


Thread safety
Expand All @@ -146,31 +186,34 @@ multiprocessing is recommended.
Compression
***********

kafka-python supports the following compression formats:
kafka-python-ng supports the following compression formats:

- gzip
- LZ4
- Snappy
- Zstandard (zstd)

gzip is supported natively, the others require installing additional libraries.
See <https://kafka-python.readthedocs.io/en/master/install.html> for more information.

See https://kafka-python.readthedocs.io/en/master/install.html for more information.



Optimized CRC32 Validation
**************************

Kafka uses CRC32 checksums to validate messages. kafka-python includes a pure
Kafka uses CRC32 checksums to validate messages. kafka-python-ng includes a pure
python implementation for compatibility. To improve performance for high-throughput
applications, kafka-python will use `crc32c` for optimized native code if installed.
See <https://kafka-python.readthedocs.io/en/master/install.html> for installation instructions.
See https://kafka-python.readthedocs.io/en/master/install.html for installation instructions.

See https://pypi.org/project/crc32c/ for details on the underlying crc32c lib.


Protocol
********

A secondary goal of kafka-python is to provide an easy-to-use protocol layer
A secondary goal of kafka-python-ng is to provide an easy-to-use protocol layer
for interacting with kafka brokers via the python repl. This is useful for
testing, probing, and general experimentation. The protocol support is
leveraged to enable a KafkaClient.check_version() method that
Expand Down
Loading
Loading