Skip to content

Commit

Permalink
Fix describe user scram credentials (#1630)
Browse files Browse the repository at this point in the history
for describing all users

---------

Co-authored-by: Emanuele Sabellico <esabellico@confluent.io>
Co-authored-by: Pranav Rathi <4427674+pranavrth@users.noreply.github.com>
Co-authored-by: Milind L <milindl@users.noreply.github.com>
  • Loading branch information
4 people authored Oct 25, 2023
1 parent c391879 commit a150b5e
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 162 deletions.
10 changes: 6 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

v2.3.0 is a feature release with the following features, fixes and enhancements:

* Add Python 3.12 wheels
* Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()`. (@jainruchir, #1635)
* [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
- Add Python 3.12 wheels
- Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()`. (@jainruchir, #1635)
- [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses):
Return authorized operations in Describe Responses. (@jainruchir, #1635)
* Add `Rack` to the `Node` type, so AdminAPI calls can expose racks for brokers
- Add `Rack` to the `Node` type, so AdminAPI calls can expose racks for brokers
(currently, all Describe Responses) (#1635, @jainruchir).
- Fix the Describe User Scram Credentials for Describe all users or empty users list. Please refer to
issue(https://github.com/confluentinc/confluent-kafka-python/issues/1616) for more details (#1630).

confluent-kafka-python is based on librdkafka v2.3.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0)
Expand Down
49 changes: 38 additions & 11 deletions examples/adminapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,19 +688,46 @@ def example_describe_user_scram_credentials(a, args):
"""
Describe User Scram Credentials
"""
futmap = a.describe_user_scram_credentials(args)

for username, fut in futmap.items():
print("Username: {}".format(username))
if len(args) == 0:
"""
Case: Describes all user scram credentials
Input: no argument passed or None
Gets a future which result will give a
dict[str, UserScramCredentialsDescription]
or will throw a KafkaException
"""
f = a.describe_user_scram_credentials()
try:
response = fut.result()
for scram_credential_info in response.scram_credential_infos:
print(f" Mechanism: {scram_credential_info.mechanism} " +
f"Iterations: {scram_credential_info.iterations}")
results = f.result()
for username, response in results.items():
print("Username : {}".format(username))
for scram_credential_info in response.scram_credential_infos:
print(f" Mechanism: {scram_credential_info.mechanism} " +
f"Iterations: {scram_credential_info.iterations}")
except KafkaException as e:
print(" Error: {}".format(e))
except Exception as e:
print(f" Unexpected exception: {e}")
print("Failed to describe all user scram credentials : {}".format(e))
except Exception:
raise
else:
"""
Case: Describe specified user scram credentials
Input: users is a list
Gets a dict[str, future] where the result() of
each future will give a UserScramCredentialsDescription
or a KafkaException
"""
futmap = a.describe_user_scram_credentials(args)
for username, fut in futmap.items():
print("Username: {}".format(username))
try:
response = fut.result()
for scram_credential_info in response.scram_credential_infos:
print(f" Mechanism: {scram_credential_info.mechanism} " +
f"Iterations: {scram_credential_info.iterations}")
except KafkaException as e:
print(" Error: {}".format(e))
except Exception:
raise


def example_alter_user_scram_credentials(a, args):
Expand Down
85 changes: 49 additions & 36 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,28 +251,6 @@ def _make_acls_result(f, futmap):
for resource, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_user_scram_credentials_result(f, futmap):
try:
results = f.result()
len_results = len(results)
len_futures = len(futmap)
if len(results) != len_futures:
raise RuntimeError(
f"Results length {len_results} is different from future-map length {len_futures}")
for username, value in results.items():
fut = futmap.get(username, None)
if fut is None:
raise RuntimeError(
f"username {username} not found in future-map: {futmap}")
if isinstance(value, KafkaError):
fut.set_exception(KafkaException(value))
else:
fut.set_result(value)
except Exception as e:
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_futmap_result_from_list(f, futmap):
try:
Expand Down Expand Up @@ -366,6 +344,30 @@ def _make_futures_v2(futmap_keys, class_check, make_result_fn):

return f, futmap

@staticmethod
def _make_single_future_pair():
"""
Create an pair of futures, one for internal usage and one
to use externally, the external one throws a KafkaException if
any of the values in the map returned by the first future is
a KafkaError.
"""
def single_future_result(internal_f, f):
try:
results = internal_f.result()
for _, value in results.items():
if isinstance(value, KafkaError):
f.set_exception(KafkaException(value))
return
f.set_result(results)
except Exception as e:
f.set_exception(e)

f = AdminClient._create_future()
internal_f = AdminClient._create_future()
internal_f.add_done_callback(lambda internal_f: single_future_result(internal_f, f))
return internal_f, f

@staticmethod
def _has_duplicates(items):
return len(set(items)) != len(items)
Expand Down Expand Up @@ -449,9 +451,13 @@ def _check_alter_consumer_group_offsets_request(request):

@staticmethod
def _check_describe_user_scram_credentials_request(users):
if users is None:
return
if not isinstance(users, list):
raise TypeError("Expected input to be list of String")
for user in users:
if user is None:
raise TypeError("'user' cannot be None")
if not isinstance(user, string_type):
raise TypeError("Each value should be a string")
if not user:
Expand Down Expand Up @@ -1094,34 +1100,41 @@ def set_sasl_credentials(self, username, password):
"""
super(AdminClient, self).set_sasl_credentials(username, password)

def describe_user_scram_credentials(self, users, **kwargs):
def describe_user_scram_credentials(self, users=None, **kwargs):
"""
Describe user SASL/SCRAM credentials.
:param list(str) users: List of user names to describe.
Duplicate users aren't allowed.
Duplicate users aren't allowed. Can be None
to describe all user's credentials.
:param float request_timeout: The overall request timeout in seconds,
including broker lookup, request transmission, operation time
on broker, and response. Default: `socket.timeout.ms*1000.0`
:returns: A dict of futures keyed by user name.
The future result() method returns the
:class:`UserScramCredentialsDescription` or
raises KafkaException
:returns: In case None is passed it returns a single future.
The future yields a dict[str, UserScramCredentialsDescription]
or raises a KafkaException
:rtype: dict[str, future]
In case a list of user names is passed, it returns
a dict[str, future[UserScramCredentialsDescription]].
The futures yield a :class:`UserScramCredentialsDescription`
or raise a KafkaException
:rtype: Union[future[dict[str, UserScramCredentialsDescription]],
dict[str, future[UserScramCredentialsDescription]]]
:raises TypeError: Invalid input type.
:raises ValueError: Invalid input value.
"""
AdminClient._check_describe_user_scram_credentials_request(users)

f, futmap = AdminClient._make_futures_v2(users, None,
AdminClient._make_user_scram_credentials_result)

super(AdminClient, self).describe_user_scram_credentials(users, f, **kwargs)

return futmap
if users is None:
internal_f, ret_fut = AdminClient._make_single_future_pair()
else:
internal_f, ret_fut = AdminClient._make_futures_v2(users, None,
AdminClient._make_futmap_result)
super(AdminClient, self).describe_user_scram_credentials(users, internal_f, **kwargs)
return ret_fut

def alter_user_scram_credentials(self, alterations, **kwargs):
"""
Expand All @@ -1146,7 +1159,7 @@ def alter_user_scram_credentials(self, alterations, **kwargs):
AdminClient._check_alter_user_scram_credentials_request(alterations)

f, futmap = AdminClient._make_futures_v2(set([alteration.user for alteration in alterations]), None,
AdminClient._make_user_scram_credentials_result)
AdminClient._make_futmap_result)

super(AdminClient, self).alter_user_scram_credentials(alterations, f, **kwargs)
return futmap
Expand Down
49 changes: 25 additions & 24 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -1809,7 +1809,7 @@ static PyObject *Admin_describe_user_scram_credentials(Handle *self, PyObject *a
NULL };
struct Admin_options options = Admin_options_INITIALIZER;
rd_kafka_AdminOptions_t *c_options = NULL;
int user_cnt, i;
int user_cnt = 0, i;
const char **c_users = NULL;
rd_kafka_queue_t *rkqu;
CallState cs;
Expand All @@ -1820,7 +1820,7 @@ static PyObject *Admin_describe_user_scram_credentials(Handle *self, PyObject *a
&options.request_timeout))
return NULL;

if (!PyList_Check(users)) {
if (users != Py_None && !PyList_Check(users)) {
PyErr_SetString(PyExc_ValueError,
"Expected non-empty list of string "
"objects in 'users' parameter");
Expand All @@ -1836,33 +1836,34 @@ static PyObject *Admin_describe_user_scram_credentials(Handle *self, PyObject *a
* is finished, so we need to keep our own refcount. */
Py_INCREF(future);

user_cnt = (int)PyList_Size(users);
if (users != Py_None) {
user_cnt = (int)PyList_Size(users);
if (user_cnt > 0)
c_users = malloc(sizeof(char *) * user_cnt);

for (i = 0 ; i < user_cnt ; i++) {
PyObject *user = PyList_GET_ITEM(users, i);
PyObject *u_user;
PyObject *uo_user = NULL;

c_users = malloc(sizeof(char *) * user_cnt);

for (i = 0 ; i < user_cnt ; i++) {
PyObject *user = PyList_GET_ITEM(users, i);
PyObject *u_user;
PyObject *uo_user = NULL;
if (user == Py_None) {
PyErr_Format(PyExc_TypeError,
"User %d in 'users' parameters must not "
"be None", i);
goto err;
}

if (user == Py_None) {
PyErr_Format(PyExc_TypeError,
"User %d in 'users' parameters must not "
"be None", i);
goto err;
}
if (!(u_user = cfl_PyObject_Unistr(user))) {
PyErr_Format(PyExc_ValueError,
"User %d in 'users' parameters must "
" be convertible to str", i);
goto err;
}

if (!(u_user = cfl_PyObject_Unistr(user))) {
PyErr_Format(PyExc_ValueError,
"User %d in 'users' parameters must "
" be convertible to str", i);
goto err;
c_users[i] = cfl_PyUnistr_AsUTF8(u_user, &uo_user);
Py_XDECREF(u_user);
Py_XDECREF(uo_user);
}

c_users[i] = cfl_PyUnistr_AsUTF8(u_user, &uo_user);
Py_XDECREF(u_user);
Py_XDECREF(uo_user);
}
/* Use librdkafka's background thread queue to automatically dispatch
* Admin_background_event_cb() when the admin operation is finished. */
Expand Down
87 changes: 2 additions & 85 deletions tests/integration/admin/test_basic_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@
import confluent_kafka
import struct
import time
import pytest
from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState
from confluent_kafka.admin import (NewPartitions, ConfigResource,
AclBinding, AclBindingFilter, ResourceType,
ResourcePatternType, AclOperation, AclPermissionType,
UserScramCredentialsDescription, UserScramCredentialUpsertion,
UserScramCredentialDeletion, ScramCredentialInfo,
ScramMechanism)
from confluent_kafka.error import ConsumeError, KafkaException, KafkaError
ResourcePatternType, AclOperation, AclPermissionType)
from confluent_kafka.error import ConsumeError

topic_prefix = "test-topic"

Expand Down Expand Up @@ -196,83 +192,6 @@ def verify_consumer_group_offsets_operations(client, our_topic, group_id):
assert topic_partition.offset == 0


def verify_admin_scram(admin_client):
newuser = "non-existent"
newmechanism = ScramMechanism.SCRAM_SHA_256
newiterations = 10000

futmap = admin_client.describe_user_scram_credentials([newuser])
assert isinstance(futmap, dict)
assert len(futmap) == 1
assert newuser in futmap
fut = futmap[newuser]
with pytest.raises(KafkaException) as ex:
result = fut.result()
assert ex.value.args[0] == KafkaError.RESOURCE_NOT_FOUND

futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(newuser,
ScramCredentialInfo(newmechanism, newiterations),
b"password", b"salt")])
fut = futmap[newuser]
result = fut.result()
assert result is None

futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(
newuser,
ScramCredentialInfo(
ScramMechanism.SCRAM_SHA_256, 10000),
b"password", b"salt"),
UserScramCredentialUpsertion(
newuser,
ScramCredentialInfo(
ScramMechanism.SCRAM_SHA_512, 10000),
b"password")
])
fut = futmap[newuser]
result = fut.result()
assert result is None

futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(
newuser,
ScramCredentialInfo(
ScramMechanism.SCRAM_SHA_256, 10000),
b"password", b"salt"),
UserScramCredentialDeletion(
newuser,
ScramMechanism.SCRAM_SHA_512)
])
fut = futmap[newuser]
result = fut.result()
assert result is None

futmap = admin_client.describe_user_scram_credentials([newuser])
assert isinstance(futmap, dict)
assert len(futmap) == 1
assert newuser in futmap
description = futmap[newuser].result()
assert isinstance(description, UserScramCredentialsDescription)
for scram_credential_info in description.scram_credential_infos:
assert ((scram_credential_info.mechanism == newmechanism) and
(scram_credential_info.iterations == newiterations))

futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion(newuser, newmechanism)])
assert isinstance(futmap, dict)
assert len(futmap) == 1
assert newuser in futmap
fut = futmap[newuser]
result = fut.result()
assert result is None

futmap = admin_client.describe_user_scram_credentials([newuser])
assert isinstance(futmap, dict)
assert len(futmap) == 1
assert newuser in futmap
fut = futmap[newuser]
with pytest.raises(KafkaException) as ex:
result = fut.result()
assert ex.value.args[0] == KafkaError.RESOURCE_NOT_FOUND


def test_basic_operations(kafka_cluster):
num_partitions = 2
topic_config = {"compression.type": "gzip"}
Expand Down Expand Up @@ -452,5 +371,3 @@ def verify_config(expconfig, configs):

# Verify ACL operations
verify_admin_acls(admin_client, acls_topic, acls_group)
# Verify user SCRAM credentials API
verify_admin_scram(admin_client)
Loading

0 comments on commit a150b5e

Please sign in to comment.