Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KIP-554] User SCRAM credentials API #1575

Merged
merged 36 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
d4e28f3
Initial Changes
mahajanadhitya May 18, 2023
3aca974
Changes
mahajanadhitya May 18, 2023
f59cc4d
Changes
mahajanadhitya May 18, 2023
d0facf2
Changes
mahajanadhitya May 18, 2023
814f19d
Changes
mahajanadhitya May 18, 2023
e7209a1
Changes
mahajanadhitya May 19, 2023
961e5a1
Changes
mahajanadhitya May 23, 2023
1fe51f9
Changes
mahajanadhitya May 23, 2023
8b249f6
changes
mahajanadhitya May 23, 2023
7b28acf
Changes
mahajanadhitya May 24, 2023
652feff
Changes
mahajanadhitya May 24, 2023
4fd0ebe
Changes
mahajanadhitya May 24, 2023
9a7717e
Changes
mahajanadhitya May 27, 2023
4420a74
Changes
mahajanadhitya May 27, 2023
8a8ba5f
Changes
mahajanadhitya May 27, 2023
fcc37ff
Changes
mahajanadhitya May 29, 2023
b69de54
Changes
mahajanadhitya May 29, 2023
7478085
flake changes
mahajanadhitya May 30, 2023
53673eb
Reflect librdkafka changes
emasab Jun 26, 2023
a38cf30
Move scram related classes to a private package
emasab Jun 26, 2023
9cea327
Reflect librdkafka changes about duplicate users
emasab Jun 26, 2023
cf70165
Merge branch 'master' into feature/AdminClient-ScramAPI
emasab Jun 29, 2023
8b285e2
Update changelog
emasab Jun 29, 2023
28d6c62
Fix memory leak
emasab Jun 29, 2023
1a45a4f
Return a different future for each
emasab Jun 29, 2023
77d46d9
Merge branch 'master' into feature/AdminClient-ScramAPI
emasab Jul 4, 2023
6604afb
Address comments
emasab Jul 7, 2023
5eac807
Address comment about alteration index
emasab Jul 10, 2023
b851a01
Added more unit tests and fixed uninitialized value access
pranavrth Jul 10, 2023
7e14f6a
Remove memory leaks
emasab Jul 10, 2023
c9a27ec
Move C array extractions to the event switch case
emasab Jul 10, 2023
3f08e00
Address remaining comments
emasab Jul 10, 2023
be33f70
Better naming for C variables
emasab Jul 10, 2023
a9a69b2
Merge branch 'master' into feature/AdminClient-ScramAPI
emasab Jul 11, 2023
d3207b0
Merge branch 'master' into feature/AdminClient-ScramAPI
emasab Jul 11, 2023
33bcdc0
Use make_futures_v2
emasab Jul 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ v2.2.0 is a feature release with the following features, fixes and enhancements:

- [KIP-339](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API)
IncrementalAlterConfigs API (#1517).
- [KIP-554](https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API):
User SASL/SCRAM credentials alteration and description (#1575).
pranavrth marked this conversation as resolved.
Show resolved Hide resolved

confluent-kafka-python is based on librdkafka v2.2.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.2.0)
Expand Down
59 changes: 59 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ Supporting classes
- :ref:`AclPermissionType <pythonclient_acl_permission_type>`
- :ref:`AclBinding <pythonclient_acl_binding>`
- :ref:`AclBindingFilter <pythonclient_acl_binding_filter>`
- :ref:`ScramCredentialInfo <pythonclient_scram_credential_info>`
- :ref:`UserScramCredentialsDescription <pythonclient_user_scram_credentials_description>`
- :ref:`UserScramCredentialAlteration <pythonclient_user_scram_credential_alteration>`
- :ref:`UserScramCredentialUpsertion <pythonclient_user_scram_credential_upsertion>`
- :ref:`UserScramCredentialDeletion <pythonclient_user_scram_credential_deletion>`

Experimental
These classes are experimental and are likely to be removed, or subject to incompatible
Expand Down Expand Up @@ -188,6 +193,60 @@ AclBindingFilter
.. autoclass:: confluent_kafka.admin.AclBindingFilter
:members:

.. _pythonclient_scram_mechanism:

**************
ScramMechanism
**************

.. autoclass:: confluent_kafka.admin.ScramMechanism
:members:

.. _pythonclient_scram_credential_info:

*******************
ScramCredentialInfo
*******************

.. autoclass:: confluent_kafka.admin.ScramCredentialInfo
:members:

.. _pythonclient_user_scram_credentials_description:

*******************************
UserScramCredentialsDescription
*******************************

.. autoclass:: confluent_kafka.admin.UserScramCredentialsDescription
:members:

.. _pythonclient_user_scram_credential_alteration:

*****************************
UserScramCredentialAlteration
*****************************

.. autoclass:: confluent_kafka.admin.UserScramCredentialAlteration
:members:

.. _pythonclient_user_scram_credential_upsertion:

****************************
UserScramCredentialUpsertion
****************************

.. autoclass:: confluent_kafka.admin.UserScramCredentialUpsertion
:members:

.. _pythonclient_user_scram_credential_deletion:

***************************
UserScramCredentialDeletion
***************************

.. autoclass:: confluent_kafka.admin.UserScramCredentialDeletion
:members:

.. _pythonclient_consumer:

********
Expand Down
97 changes: 94 additions & 3 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource,
ConfigEntry, ConfigSource, AclBinding,
AclBindingFilter, ResourceType, ResourcePatternType,
AclOperation, AclPermissionType, AlterConfigOpType)
AclOperation, AclPermissionType, AlterConfigOpType,
ScramMechanism, ScramCredentialInfo,
UserScramCredentialUpsertion, UserScramCredentialDeletion)
import sys
import threading
import logging
Expand Down Expand Up @@ -596,6 +598,89 @@ def example_alter_consumer_group_offsets(a, args):
raise


def example_describe_user_scram_credentials(a, args):
"""
Describe User Scram Credentials
"""
futmap = a.describe_user_scram_credentials(args)

for username, fut in futmap.items():
print("Username: {}".format(username))
try:
response = fut.result()
for scram_credential_info in response.scram_credential_infos:
print(f" Mechanism: {scram_credential_info.mechanism} " +
f"Iterations: {scram_credential_info.iterations}")
except KafkaException as e:
emasab marked this conversation as resolved.
Show resolved Hide resolved
print(" Error: {}".format(e))
emasab marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
print(f" Unexpected exception: {e}")


def example_alter_user_scram_credentials(a, args):
"""
AlterUserScramCredentials
"""
alterations_args = []
alterations = []
i = 0
op_cnt = 0

while i < len(args):
emasab marked this conversation as resolved.
Show resolved Hide resolved
op = args[i]
if op == "UPSERT":
if i + 5 >= len(args):
emasab marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
f"Invalid number of arguments for alteration {op_cnt}, expected 5, got {len(args) - i - 1}")
user = args[i + 1]
mechanism = ScramMechanism[args[i + 2]]
iterations = int(args[i + 3])
password = bytes(args[i + 4], 'utf8')
# if salt is an empty string,
# set it to None to generate it randomly.
salt = args[i + 5]
if not salt:
salt = None
else:
salt = bytes(salt, 'utf8')
alterations_args.append([op, user, mechanism, iterations,
iterations, password, salt])
i += 6
elif op == "DELETE":
if i + 2 >= len(args):
raise ValueError(
f"Invalid number of arguments for alteration {op_cnt}, expected 2, got {len(args) - i - 1}")
user = args[i + 1]
mechanism = ScramMechanism[args[i + 2]]
alterations_args.append([op, user, mechanism])
i += 3
else:
raise ValueError(f"Invalid alteration {op}, must be UPSERT or DELETE")
op_cnt += 1

for alteration_arg in alterations_args:
op = alteration_arg[0]
if op == "UPSERT":
[_, user, mechanism, iterations,
iterations, password, salt] = alteration_arg
scram_credential_info = ScramCredentialInfo(mechanism, iterations)
upsertion = UserScramCredentialUpsertion(user, scram_credential_info,
password, salt)
alterations.append(upsertion)
elif op == "DELETE":
[_, user, mechanism] = alteration_arg
deletion = UserScramCredentialDeletion(user, mechanism)
alterations.append(deletion)

futmap = a.alter_user_scram_credentials(alterations)
for username, fut in futmap.items():
try:
fut.result()
print("{}: Success".format(username))
except KafkaException as e:
print("{}: Error: {}".format(username, e))


if __name__ == '__main__':
if len(sys.argv) < 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
Expand Down Expand Up @@ -625,7 +710,11 @@ def example_alter_consumer_group_offsets(a, args):
sys.stderr.write(
' alter_consumer_group_offsets <group> <topic1> <partition1> <offset1> ' +
'<topic2> <partition2> <offset2> ..\n')

sys.stderr.write(' describe_user_scram_credentials [<user1> <user2> ..]\n')
sys.stderr.write(' alter_user_scram_credentials UPSERT <user1> <mechanism1> ' +
'<iterations1> <password1> <salt1> ' +
pranavrth marked this conversation as resolved.
Show resolved Hide resolved
'[UPSERT <user2> <mechanism2> <iterations2> ' +
' <password2> <salt2> DELETE <user3> <mechanism3> ..]\n')
sys.exit(1)

broker = sys.argv[1]
Expand All @@ -650,7 +739,9 @@ def example_alter_consumer_group_offsets(a, args):
'describe_consumer_groups': example_describe_consumer_groups,
'delete_consumer_groups': example_delete_consumer_groups,
'list_consumer_group_offsets': example_list_consumer_group_offsets,
'alter_consumer_group_offsets': example_alter_consumer_group_offsets}
'alter_consumer_group_offsets': example_alter_consumer_group_offsets,
'describe_user_scram_credentials': example_describe_user_scram_credentials,
'alter_user_scram_credentials': example_alter_user_scram_credentials}

if operation not in opsmap:
sys.stderr.write('Unknown operation: %s\n' % operation)
Expand Down
141 changes: 141 additions & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@
ConsumerGroupDescription,
MemberAssignment,
MemberDescription)
from ._scram import (UserScramCredentialAlteration, # noqa: F401
UserScramCredentialUpsertion,
UserScramCredentialDeletion,
ScramCredentialInfo,
ScramMechanism,
UserScramCredentialsDescription)

emasab marked this conversation as resolved.
Show resolved Hide resolved
from ..cimpl import (KafkaException, # noqa: F401
KafkaError,
_AdminClientImpl,
Expand All @@ -65,6 +72,7 @@
from confluent_kafka import ConsumerGroupState \
as _ConsumerGroupState


try:
string_type = basestring
except NameError:
Expand Down Expand Up @@ -235,6 +243,28 @@ def _make_acls_result(f, futmap):
for resource, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_user_scram_credentials_result(f, futmap):
try:
results = f.result()
len_results = len(results)
len_futures = len(futmap)
if len(results) != len_futures:
raise RuntimeError(
f"Results length {len_results} is different from future-map length {len_futures}")
for username, value in results.items():
fut = futmap.get(username, None)
if fut is None:
raise RuntimeError(
f"username {username} not found in future-map: {futmap}")
if isinstance(value, KafkaError):
fut.set_exception(KafkaException(value))
else:
fut.set_result(value)
except Exception as e:
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _create_future():
f = concurrent.futures.Future()
Expand Down Expand Up @@ -366,6 +396,59 @@ def _check_alter_consumer_group_offsets_request(request):
raise ValueError(
"Element of 'topic_partitions' must not have negative value for 'offset' field")

@staticmethod
def _check_describe_user_scram_credentials_request(users):
if not isinstance(users, list):
raise TypeError("Expected input to be list of String")
for user in users:
if not isinstance(user, string_type):
raise TypeError("Each value should be a string")
if not user:
raise ValueError("'user' cannot be empty")

@staticmethod
def _check_alter_user_scram_credentials_request(alterations):
if not isinstance(alterations, list):
raise TypeError("Expected input to be list")
if len(alterations) == 0:
raise ValueError("Expected at least one alteration")
for alteration in alterations:
if not isinstance(alteration, UserScramCredentialAlteration):
raise TypeError("Expected each element of list to be subclass of UserScramCredentialAlteration")
if alteration.user is None:
raise TypeError("'user' cannot be None")
if not isinstance(alteration.user, string_type):
raise TypeError("'user' must be a string")
if not alteration.user:
raise ValueError("'user' cannot be empty")

if isinstance(alteration, UserScramCredentialUpsertion):
if alteration.password is None:
raise TypeError("'password' cannot be None")
if not isinstance(alteration.password, bytes):
raise TypeError("'password' must be bytes")
if not alteration.password:
raise ValueError("'password' cannot be empty")

if alteration.salt is not None and not alteration.salt:
raise ValueError("'salt' can be None but cannot be empty")
if alteration.salt and not isinstance(alteration.salt, bytes):
raise TypeError("'salt' must be bytes")

if not isinstance(alteration.scram_credential_info, ScramCredentialInfo):
raise TypeError("Expected credential_info to be ScramCredentialInfo Type")
if alteration.scram_credential_info.iterations < 1:
raise ValueError("Iterations should be positive")
if not isinstance(alteration.scram_credential_info.mechanism, ScramMechanism):
raise TypeError("Expected the mechanism to be ScramMechanism Type")
elif isinstance(alteration, UserScramCredentialDeletion):
if not isinstance(alteration.mechanism, ScramMechanism):
raise TypeError("Expected the mechanism to be ScramMechanism Type")
else:
raise TypeError("Expected each element of list 'alterations' " +
"to be either a UserScramCredentialUpsertion or a " +
"UserScramCredentialDeletion")

def create_topics(self, new_topics, **kwargs):
"""
Create one or more new topics.
Expand Down Expand Up @@ -871,3 +954,61 @@ def set_sasl_credentials(self, username, password):
:raises TypeException: Invalid input.
"""
super(AdminClient, self).set_sasl_credentials(username, password)

def describe_user_scram_credentials(self, users, **kwargs):
"""
Describe user SASL/SCRAM credentials.

:param list(str) users: List of user names to describe.
emasab marked this conversation as resolved.
Show resolved Hide resolved
Duplicate users aren't allowed.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`

:returns: A dict of futures keyed by user name.
The future result() method returns the
:class:`UserScramCredentialsDescription` or
raises KafkaException

:rtype: dict[str, future]

:raises TypeError: Invalid input type.
:raises ValueError: Invalid input value.
"""
AdminClient._check_describe_user_scram_credentials_request(users)

f, futmap = AdminClient._make_futures_v2(users, None,
AdminClient._make_user_scram_credentials_result)

super(AdminClient, self).describe_user_scram_credentials(users, f, **kwargs)

return futmap

def alter_user_scram_credentials(self, alterations, **kwargs):
"""
Alter user SASL/SCRAM credentials.

:param list(UserScramCredentialAlteration) alterations: List of
:class:`UserScramCredentialAlteration` to apply.
The pair (user, mechanism) must be unique among alterations.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`

:returns: A dict of futures keyed by user name.
The future result() method returns None or
raises KafkaException

:rtype: dict[str, future]

:raises TypeError: Invalid input type.
:raises ValueError: Invalid input value.
"""
AdminClient._check_alter_user_scram_credentials_request(alterations)

f, futmap = AdminClient._make_futures_v2(set([alteration.user for alteration in alterations]), None,
AdminClient._make_user_scram_credentials_result)

super(AdminClient, self).alter_user_scram_credentials(alterations, f, **kwargs)

return futmap
Loading