Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-345 Static membership implementation #2333

Open
wants to merge 40 commits into
base: master
Choose a base branch
from

Conversation

KazakovDenis
Copy link

@KazakovDenis KazakovDenis commented Oct 23, 2022

Hello!

I understand that the project is not actively maintained due to this issue nowadays but I hope this changes may help if new maintainer would be found.

I tried to implement stable static consumer group membership as it was declared in KIP-345, was released in 2.3.0 and mentioned in #2147.

This behaviour now would be reached by setting the next two values:

'group_instance_id': 'name',
'leave_group_on_close': False,

After that group rebalancing won't be triggered on consumer restarts or new deployments with the same group_instance_id, but developers should set the value of session_timeout_ms (and other related) big enough.

The second param leave_group_on_close is required because of this issue.

So, now you can see in a broker's log something like that:

# On group join
  
INFO [GroupCoordinator 1001]: Static Member with unknown member id joins group group-id in Empty state. Created a new member id instance-id-9e63faa7-a270-456d-93fb-7236a0d47a34 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Adding new static member Some(instance-id) to group group-id with member id instance-id-9e63faa7-a270-456d-93fb-7236a0d47a34. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Preparing to rebalance group group-id in state PreparingRebalance with old generation 17 (__consumer_offsets-47) (reason: Adding new member instance-id-9e63faa7-a270-456d-93fb-7236a0d47a34 with group instance id Some(instance-id)) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Stabilized group group-id generation 18 (__consumer_offsets-47) (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Assignment received from leader for group group-id for generation 18 (kafka.coordinator.group.GroupCoordinator)
  
 # After rejoining the group
  
INFO [GroupCoordinator 1001]: Static member Some(instance-id) of group group-id with unknown member id rejoins, assigning new member id instance-id-3a11b72d-b9c4-4919-adc2-0ebaf5adcfdf, while old member id instance-id-9e63faa7-a270-456d-93fb-7236a0d47a34 will be removed. (kafka.coordinator.group.GroupCoordinator)
INFO [GroupCoordinator 1001]: Static member which joins during Stable stage and doesnt affect selectProtocol will not trigger rebalance. (kafka.coordinator.group.GroupCoordinator)

Also I didn't touch functionality that this KIP does not implement, as you can see in JoinGroupRequest / SyncGroupRequest / LeaveGroupRequest.

I would be glad to hear what should I do else to make this PR merged.


This change is Reviewable

wbarnha and others added 23 commits March 7, 2024 10:31
…terations for Kafka 0.8.2 and Python 3.12 (dpkp#159)

* skip failing tests for PyPy since they work locally

* Reconfigure tests for PyPy and 3.12

* Skip partitioner tests in test_partitioner.py if 3.12 and 0.8.2

* Update test_partitioner.py

* Update test_producer.py

* Timeout tests after ten minutes

* Set 0.8.2.2 to be experimental from hereon

* Formally support PyPy 3.9
* Test Kafka 0.8.2.2 using Python 3.11 in the meantime

* Override PYTHON_LATEST conditionally in python-package.yml

* Update python-package.yml

* add python annotation to kafka version test matrix

* Update python-package.yml

* try python 3.10
* Remove support for EOL'ed versions of Python

* Update setup.py
Too many MRs to review... so little time.
After stop/start kafka service, kafka-python may use 100% CPU caused by
busy-retry while the socket was closed. This fix the issue by unregister
the socket if the fd is negative.

Co-authored-by: Orange Kao <orange@aiven.io>
Co-authored-by: Ryar Nyah <ryarnyah@gmail.com>
Co-authored-by: Denis Otkidach <denis.otkidach@gmail.com>
The former has been deprecated since setuptools 56

Co-authored-by: micwoj92 <45581170+micwoj92@users.noreply.github.com>
* docs: Update syntax in README.rst

* docs: Update code block syntax in docs/index.rst

---------

Co-authored-by: HalfSweet <60973476+HalfSweet@users.noreply.github.com>
* Fix crc32c's __main__ for Python 3

* Remove TODO from _crc32c.py

---------

Co-authored-by: Yonatan Goldschmidt <yon.goldschmidt@gmail.com>
Co-authored-by: Dave Voutila <voutilad@gmail.com>
…pkp#155)

* handling OSError

* better error output

* removed traceback logging

---------

Co-authored-by: Alexander Sibiryakov <sixty-one@yandex.ru>
…pkp#134)

wakeup

When wakeup() is called, we sometime notice that we get
an endless prints:
"Unable to send to wakeup socket!".

Those prints are spamming the logs.
This commit aims to address it by allowing restating the
application via an intentional exception raise.
This behavior is configurable and its default is backward compatible.

Signed-off-by: shimon-armis <shimon.turjeman@armis.com>
Co-authored-by: shimon-armis <shimon.turjeman@armis.com>
Co-authored-by: drewdogg <drewdogg@users.noreply.github.com>
* Support custom SASL mechanisms

There is some interest in supporting various SASL mechanisms not
currently included in the library:

* dpkp#2110 (DMS)
* dpkp#2204 (SSPI)
* dpkp#2232 (AWS_MSK_IAM)

Adding these mechanisms in the core library may be undesirable due to:

* Increased maintenance burden.
* Unavailable testing environments.
* Vendor specificity.

This commit provides a quick prototype for a pluggable SASL system.

---

**Example**

To define a custom SASL mechanism a module must implement two methods:

```py

def validate_config(conn):
    # Check configuration values, available libraries, etc.
    assert conn.config['vendor_specific_setting'] is not None, (
        'vendor_specific_setting required when sasl_mechanism=MY_SASL'
    )

def try_authenticate(conn, future):
    # Do authentication routine and return resolved Future with failed
    # or succeeded state.
```

And then the custom mechanism should be registered before initializing
a KafkaAdminClient, KafkaConsumer, or KafkaProducer:

```py

import kafka.sasl
from kafka import KafkaProducer

import my_sasl

kafka.sasl.register_mechanism('MY_SASL', my_sasl)

producer = KafkaProducer(sasl_mechanism='MY_SASL')
```

---

**Notes**

**ABCs**

This prototype does not implement an ABC for custom SASL mechanisms.
Using an ABC would reduce a few of the explicit assertions involved with
registering a mechanism and is a viable option. Due to differing feature
sets between py2/py3 this option was not explored, but shouldn't be
difficult.

**Private Methods**

This prototype relies on some methods that are currently marked as
**private** in `BrokerConnection`.

* `._can_send_recv`
* `._lock`
* `._recv_bytes_blocking`
* `._send_bytes_blocking`

A pluggable system would require stable interfaces for these actions.

**Alternative Approach**

If the module-scoped dict modification in `register_mechanism` feels too
clunky maybe the addtional mechanisms can be specified via an argument
when initializing one of the `Kafka*` classes?

* Add test_msk.py by @mattoberle

* add msk to __init__ and check for extension in conn.py

* rename try_authenticate in msk.py

* fix imports

* fix imports

* add botocore to requirements-dev.txt

* add boto3 to requirements-dev.txt

* add awscli to requirements-dev.txt

* add awscli to workflow since it takes too long to install normally

* just install botocore i guess

* just install boto3 i guess

* force reinstall awscli

* try something weird

* ok now the dang tests should work and if they don't i'll cry

* skip the msk test for now...

* Revert "skip the msk test for now..."

This reverts commit 1c29667.

* skip the msk test for now...

* nvm just needed to update tox lol

* Update kafka/sasl/gssapi.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/oauthbearer.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/plain.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/scram.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

* Update kafka/sasl/msk.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

---------

Co-authored-by: Matt Oberle <mattoberle@users.noreply.github.com>
Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
wbarnha and others added 12 commits March 18, 2024 14:07
Now that the codebase has been modernised by using pyupgrade, we can
also remove all backported vendor modules, and all uses of them.
I implemented API KEY 35 from the official Apache Kafka documentation. This functionality is requested in issue # 2163 and this is an implementation proposal.

Co-authored-by: chopatate <florian.courouge@outlook.fr>
… topic naming (dpkp#172)

* Update conftest.py to use request.node.originalname instead for legal topic naming

Otherwise parametrization doesn't work.

* Update test/conftest.py

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>

---------

Co-authored-by: code-review-doctor[bot] <72320148+code-review-doctor[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants