diff --git a/CHANGELOG.md b/CHANGELOG.md index 11ea87380..0ee4040f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,12 +4,14 @@ v2.3.0 is a feature release with the following features, fixes and enhancements: - * Add Python 3.12 wheels - * Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()`. (@jainruchir, #1635) - * [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses): + - Add Python 3.12 wheels + - Add support for AdminAPI `DescribeCluster()` and `DescribeTopics()`. (@jainruchir, #1635) + - [KIP-430](https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses): Return authorized operations in Describe Responses. (@jainruchir, #1635) - * Add `Rack` to the `Node` type, so AdminAPI calls can expose racks for brokers + - Add `Rack` to the `Node` type, so AdminAPI calls can expose racks for brokers (currently, all Describe Responses) (#1635, @jainruchir). + - Fix the Describe User Scram Credentials for Describe all users or empty users list. Please refer to + issue(https://github.com/confluentinc/confluent-kafka-python/issues/1616) for more details (#1630). confluent-kafka-python is based on librdkafka v2.3.0, see the [librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.3.0) diff --git a/examples/adminapi.py b/examples/adminapi.py index f5b7f867d..390aba030 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -688,19 +688,46 @@ def example_describe_user_scram_credentials(a, args): """ Describe User Scram Credentials """ - futmap = a.describe_user_scram_credentials(args) - - for username, fut in futmap.items(): - print("Username: {}".format(username)) + if len(args) == 0: + """ + Case: Describes all user scram credentials + Input: no argument passed or None + Gets a future which result will give a + dict[str, UserScramCredentialsDescription] + or will throw a KafkaException + """ + f = a.describe_user_scram_credentials() try: - response = fut.result() - for scram_credential_info in response.scram_credential_infos: - print(f" Mechanism: {scram_credential_info.mechanism} " + - f"Iterations: {scram_credential_info.iterations}") + results = f.result() + for username, response in results.items(): + print("Username : {}".format(username)) + for scram_credential_info in response.scram_credential_infos: + print(f" Mechanism: {scram_credential_info.mechanism} " + + f"Iterations: {scram_credential_info.iterations}") except KafkaException as e: - print(" Error: {}".format(e)) - except Exception as e: - print(f" Unexpected exception: {e}") + print("Failed to describe all user scram credentials : {}".format(e)) + except Exception: + raise + else: + """ + Case: Describe specified user scram credentials + Input: users is a list + Gets a dict[str, future] where the result() of + each future will give a UserScramCredentialsDescription + or a KafkaException + """ + futmap = a.describe_user_scram_credentials(args) + for username, fut in futmap.items(): + print("Username: {}".format(username)) + try: + response = fut.result() + for scram_credential_info in response.scram_credential_infos: + print(f" Mechanism: {scram_credential_info.mechanism} " + + f"Iterations: {scram_credential_info.iterations}") + except KafkaException as e: + print(" Error: {}".format(e)) + except Exception: + raise def example_alter_user_scram_credentials(a, args): diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index b9b756522..924361f2e 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -251,28 +251,6 @@ def _make_acls_result(f, futmap): for resource, fut in futmap.items(): fut.set_exception(e) - @staticmethod - def _make_user_scram_credentials_result(f, futmap): - try: - results = f.result() - len_results = len(results) - len_futures = len(futmap) - if len(results) != len_futures: - raise RuntimeError( - f"Results length {len_results} is different from future-map length {len_futures}") - for username, value in results.items(): - fut = futmap.get(username, None) - if fut is None: - raise RuntimeError( - f"username {username} not found in future-map: {futmap}") - if isinstance(value, KafkaError): - fut.set_exception(KafkaException(value)) - else: - fut.set_result(value) - except Exception as e: - for _, fut in futmap.items(): - fut.set_exception(e) - @staticmethod def _make_futmap_result_from_list(f, futmap): try: @@ -366,6 +344,30 @@ def _make_futures_v2(futmap_keys, class_check, make_result_fn): return f, futmap + @staticmethod + def _make_single_future_pair(): + """ + Create an pair of futures, one for internal usage and one + to use externally, the external one throws a KafkaException if + any of the values in the map returned by the first future is + a KafkaError. + """ + def single_future_result(internal_f, f): + try: + results = internal_f.result() + for _, value in results.items(): + if isinstance(value, KafkaError): + f.set_exception(KafkaException(value)) + return + f.set_result(results) + except Exception as e: + f.set_exception(e) + + f = AdminClient._create_future() + internal_f = AdminClient._create_future() + internal_f.add_done_callback(lambda internal_f: single_future_result(internal_f, f)) + return internal_f, f + @staticmethod def _has_duplicates(items): return len(set(items)) != len(items) @@ -449,9 +451,13 @@ def _check_alter_consumer_group_offsets_request(request): @staticmethod def _check_describe_user_scram_credentials_request(users): + if users is None: + return if not isinstance(users, list): raise TypeError("Expected input to be list of String") for user in users: + if user is None: + raise TypeError("'user' cannot be None") if not isinstance(user, string_type): raise TypeError("Each value should be a string") if not user: @@ -1094,34 +1100,41 @@ def set_sasl_credentials(self, username, password): """ super(AdminClient, self).set_sasl_credentials(username, password) - def describe_user_scram_credentials(self, users, **kwargs): + def describe_user_scram_credentials(self, users=None, **kwargs): """ Describe user SASL/SCRAM credentials. :param list(str) users: List of user names to describe. - Duplicate users aren't allowed. + Duplicate users aren't allowed. Can be None + to describe all user's credentials. :param float request_timeout: The overall request timeout in seconds, including broker lookup, request transmission, operation time on broker, and response. Default: `socket.timeout.ms*1000.0` - :returns: A dict of futures keyed by user name. - The future result() method returns the - :class:`UserScramCredentialsDescription` or - raises KafkaException + :returns: In case None is passed it returns a single future. + The future yields a dict[str, UserScramCredentialsDescription] + or raises a KafkaException - :rtype: dict[str, future] + In case a list of user names is passed, it returns + a dict[str, future[UserScramCredentialsDescription]]. + The futures yield a :class:`UserScramCredentialsDescription` + or raise a KafkaException + + :rtype: Union[future[dict[str, UserScramCredentialsDescription]], + dict[str, future[UserScramCredentialsDescription]]] :raises TypeError: Invalid input type. :raises ValueError: Invalid input value. """ AdminClient._check_describe_user_scram_credentials_request(users) - f, futmap = AdminClient._make_futures_v2(users, None, - AdminClient._make_user_scram_credentials_result) - - super(AdminClient, self).describe_user_scram_credentials(users, f, **kwargs) - - return futmap + if users is None: + internal_f, ret_fut = AdminClient._make_single_future_pair() + else: + internal_f, ret_fut = AdminClient._make_futures_v2(users, None, + AdminClient._make_futmap_result) + super(AdminClient, self).describe_user_scram_credentials(users, internal_f, **kwargs) + return ret_fut def alter_user_scram_credentials(self, alterations, **kwargs): """ @@ -1146,7 +1159,7 @@ def alter_user_scram_credentials(self, alterations, **kwargs): AdminClient._check_alter_user_scram_credentials_request(alterations) f, futmap = AdminClient._make_futures_v2(set([alteration.user for alteration in alterations]), None, - AdminClient._make_user_scram_credentials_result) + AdminClient._make_futmap_result) super(AdminClient, self).alter_user_scram_credentials(alterations, f, **kwargs) return futmap diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 473c8f3c8..8515c2aa8 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -1809,7 +1809,7 @@ static PyObject *Admin_describe_user_scram_credentials(Handle *self, PyObject *a NULL }; struct Admin_options options = Admin_options_INITIALIZER; rd_kafka_AdminOptions_t *c_options = NULL; - int user_cnt, i; + int user_cnt = 0, i; const char **c_users = NULL; rd_kafka_queue_t *rkqu; CallState cs; @@ -1820,7 +1820,7 @@ static PyObject *Admin_describe_user_scram_credentials(Handle *self, PyObject *a &options.request_timeout)) return NULL; - if (!PyList_Check(users)) { + if (users != Py_None && !PyList_Check(users)) { PyErr_SetString(PyExc_ValueError, "Expected non-empty list of string " "objects in 'users' parameter"); @@ -1836,33 +1836,34 @@ static PyObject *Admin_describe_user_scram_credentials(Handle *self, PyObject *a * is finished, so we need to keep our own refcount. */ Py_INCREF(future); - user_cnt = (int)PyList_Size(users); + if (users != Py_None) { + user_cnt = (int)PyList_Size(users); + if (user_cnt > 0) + c_users = malloc(sizeof(char *) * user_cnt); + for (i = 0 ; i < user_cnt ; i++) { + PyObject *user = PyList_GET_ITEM(users, i); + PyObject *u_user; + PyObject *uo_user = NULL; - c_users = malloc(sizeof(char *) * user_cnt); - - for (i = 0 ; i < user_cnt ; i++) { - PyObject *user = PyList_GET_ITEM(users, i); - PyObject *u_user; - PyObject *uo_user = NULL; + if (user == Py_None) { + PyErr_Format(PyExc_TypeError, + "User %d in 'users' parameters must not " + "be None", i); + goto err; + } - if (user == Py_None) { - PyErr_Format(PyExc_TypeError, - "User %d in 'users' parameters must not " - "be None", i); - goto err; - } + if (!(u_user = cfl_PyObject_Unistr(user))) { + PyErr_Format(PyExc_ValueError, + "User %d in 'users' parameters must " + " be convertible to str", i); + goto err; + } - if (!(u_user = cfl_PyObject_Unistr(user))) { - PyErr_Format(PyExc_ValueError, - "User %d in 'users' parameters must " - " be convertible to str", i); - goto err; + c_users[i] = cfl_PyUnistr_AsUTF8(u_user, &uo_user); + Py_XDECREF(u_user); + Py_XDECREF(uo_user); } - - c_users[i] = cfl_PyUnistr_AsUTF8(u_user, &uo_user); - Py_XDECREF(u_user); - Py_XDECREF(uo_user); } /* Use librdkafka's background thread queue to automatically dispatch * Admin_background_event_cb() when the admin operation is finished. */ diff --git a/tests/integration/admin/test_basic_operations.py b/tests/integration/admin/test_basic_operations.py index 70595e131..820fd5228 100644 --- a/tests/integration/admin/test_basic_operations.py +++ b/tests/integration/admin/test_basic_operations.py @@ -16,15 +16,11 @@ import confluent_kafka import struct import time -import pytest from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState from confluent_kafka.admin import (NewPartitions, ConfigResource, AclBinding, AclBindingFilter, ResourceType, - ResourcePatternType, AclOperation, AclPermissionType, - UserScramCredentialsDescription, UserScramCredentialUpsertion, - UserScramCredentialDeletion, ScramCredentialInfo, - ScramMechanism) -from confluent_kafka.error import ConsumeError, KafkaException, KafkaError + ResourcePatternType, AclOperation, AclPermissionType) +from confluent_kafka.error import ConsumeError topic_prefix = "test-topic" @@ -196,83 +192,6 @@ def verify_consumer_group_offsets_operations(client, our_topic, group_id): assert topic_partition.offset == 0 -def verify_admin_scram(admin_client): - newuser = "non-existent" - newmechanism = ScramMechanism.SCRAM_SHA_256 - newiterations = 10000 - - futmap = admin_client.describe_user_scram_credentials([newuser]) - assert isinstance(futmap, dict) - assert len(futmap) == 1 - assert newuser in futmap - fut = futmap[newuser] - with pytest.raises(KafkaException) as ex: - result = fut.result() - assert ex.value.args[0] == KafkaError.RESOURCE_NOT_FOUND - - futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(newuser, - ScramCredentialInfo(newmechanism, newiterations), - b"password", b"salt")]) - fut = futmap[newuser] - result = fut.result() - assert result is None - - futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion( - newuser, - ScramCredentialInfo( - ScramMechanism.SCRAM_SHA_256, 10000), - b"password", b"salt"), - UserScramCredentialUpsertion( - newuser, - ScramCredentialInfo( - ScramMechanism.SCRAM_SHA_512, 10000), - b"password") - ]) - fut = futmap[newuser] - result = fut.result() - assert result is None - - futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion( - newuser, - ScramCredentialInfo( - ScramMechanism.SCRAM_SHA_256, 10000), - b"password", b"salt"), - UserScramCredentialDeletion( - newuser, - ScramMechanism.SCRAM_SHA_512) - ]) - fut = futmap[newuser] - result = fut.result() - assert result is None - - futmap = admin_client.describe_user_scram_credentials([newuser]) - assert isinstance(futmap, dict) - assert len(futmap) == 1 - assert newuser in futmap - description = futmap[newuser].result() - assert isinstance(description, UserScramCredentialsDescription) - for scram_credential_info in description.scram_credential_infos: - assert ((scram_credential_info.mechanism == newmechanism) and - (scram_credential_info.iterations == newiterations)) - - futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion(newuser, newmechanism)]) - assert isinstance(futmap, dict) - assert len(futmap) == 1 - assert newuser in futmap - fut = futmap[newuser] - result = fut.result() - assert result is None - - futmap = admin_client.describe_user_scram_credentials([newuser]) - assert isinstance(futmap, dict) - assert len(futmap) == 1 - assert newuser in futmap - fut = futmap[newuser] - with pytest.raises(KafkaException) as ex: - result = fut.result() - assert ex.value.args[0] == KafkaError.RESOURCE_NOT_FOUND - - def test_basic_operations(kafka_cluster): num_partitions = 2 topic_config = {"compression.type": "gzip"} @@ -452,5 +371,3 @@ def verify_config(expconfig, configs): # Verify ACL operations verify_admin_acls(admin_client, acls_topic, acls_group) - # Verify user SCRAM credentials API - verify_admin_scram(admin_client) diff --git a/tests/integration/admin/test_incremental_alter_configs.py b/tests/integration/admin/test_incremental_alter_configs.py index 111e6fc4b..ad19a1164 100644 --- a/tests/integration/admin/test_incremental_alter_configs.py +++ b/tests/integration/admin/test_incremental_alter_configs.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from confluent_kafka.admin import ConfigResource, \ ConfigEntry, ResourceType, \ AlterConfigOpType diff --git a/tests/integration/admin/test_list_offsets.py b/tests/integration/admin/test_list_offsets.py index c6983ae28..6a2e0a46a 100644 --- a/tests/integration/admin/test_list_offsets.py +++ b/tests/integration/admin/test_list_offsets.py @@ -1,3 +1,18 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec from confluent_kafka import TopicPartition, IsolationLevel diff --git a/tests/integration/admin/test_user_scram_credentials.py b/tests/integration/admin/test_user_scram_credentials.py new file mode 100644 index 000000000..21c15bc07 --- /dev/null +++ b/tests/integration/admin/test_user_scram_credentials.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# Copyright 2023 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import concurrent +from confluent_kafka.admin import UserScramCredentialsDescription, UserScramCredentialUpsertion, \ + UserScramCredentialDeletion, ScramCredentialInfo, \ + ScramMechanism +from confluent_kafka.error import KafkaException, KafkaError + + +def test_user_scram_credentials(kafka_cluster): + """ + Tests for the alter and describe SASL/SCRAM credential operations. + """ + + admin_client = kafka_cluster.admin() + + newuser = "non-existent" + mechanism = ScramMechanism.SCRAM_SHA_256 + iterations = 10000 + password = b"password" + salt = b"salt" + + futmap = admin_client.describe_user_scram_credentials([newuser]) + assert isinstance(futmap, dict) + assert len(futmap) == 1 + assert newuser in futmap + fut = futmap[newuser] + with pytest.raises(KafkaException) as ex: + result = fut.result() + assert ex.value.args[0] == KafkaError.RESOURCE_NOT_FOUND + + # Insert new user with SCRAM_SHA_256: 10000 + futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(newuser, + ScramCredentialInfo(mechanism, iterations), + password, salt)]) + fut = futmap[newuser] + result = fut.result() + assert result is None + + # Try upsertion for newuser,SCRAM_SHA_256 and add newuser,SCRAM_SHA_512 + futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion( + newuser, + ScramCredentialInfo( + mechanism, iterations), + password, salt), + UserScramCredentialUpsertion( + newuser, + ScramCredentialInfo( + ScramMechanism.SCRAM_SHA_512, 10000), + password) + ]) + fut = futmap[newuser] + result = fut.result() + assert result is None + + # Delete newuser,SCRAM_SHA_512 + futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion( + newuser, + ScramMechanism.SCRAM_SHA_512) + ]) + fut = futmap[newuser] + result = fut.result() + assert result is None + + # Describe all users + for args in [[], [None]]: + f = admin_client.describe_user_scram_credentials(*args) + assert isinstance(f, concurrent.futures.Future) + results = f.result() + assert newuser in results + description = results[newuser] + assert isinstance(description, UserScramCredentialsDescription) + for scram_credential_info in description.scram_credential_infos: + assert ((scram_credential_info.mechanism == mechanism) and + (scram_credential_info.iterations == iterations)) + + # Describe specific user + futmap = admin_client.describe_user_scram_credentials([newuser]) + assert isinstance(futmap, dict) + assert len(futmap) == 1 + assert newuser in futmap + description = futmap[newuser].result() + assert isinstance(description, UserScramCredentialsDescription) + for scram_credential_info in description.scram_credential_infos: + assert ((scram_credential_info.mechanism == mechanism) and + (scram_credential_info.iterations == iterations)) + + # Delete newuser + futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion(newuser, mechanism)]) + assert isinstance(futmap, dict) + assert len(futmap) == 1 + assert newuser in futmap + fut = futmap[newuser] + result = fut.result() + assert result is None + + # newuser isn't found + futmap = admin_client.describe_user_scram_credentials([newuser]) + assert isinstance(futmap, dict) + assert len(futmap) == 1 + assert newuser in futmap + fut = futmap[newuser] + with pytest.raises(KafkaException) as ex: + result = fut.result() + assert ex.value.args[0] == KafkaError.RESOURCE_NOT_FOUND diff --git a/tests/test_Admin.py b/tests/test_Admin.py index e5d8dfc3a..b59cefb68 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -906,10 +906,14 @@ def test_describe_user_scram_credentials_api(): # Describe User Scram API a = AdminClient({"socket.timeout.ms": 10}) + f = a.describe_user_scram_credentials() + assert isinstance(f, concurrent.futures.Future) + + futmap = a.describe_user_scram_credentials(["user"]) + assert isinstance(futmap, dict) + with pytest.raises(TypeError): a.describe_user_scram_credentials(10) - with pytest.raises(TypeError): - a.describe_user_scram_credentials(None) with pytest.raises(TypeError): a.describe_user_scram_credentials([None]) with pytest.raises(ValueError):