diff --git a/CHANGELOG.md b/CHANGELOG.md index 6554d4ce7..45d6c5104 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Confluent's Python client for Apache Kafka +## vNext + +- Added `set_sasl_credentials`. This new method (on the Producer, Consumer, and AdminClient) allows modifying the stored + SASL PLAIN/SCRAM credentials that will be used for subsequent (new) connections to a broker (#1511). + ## v2.0.2 diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index c15bf0e25..8b12358ee 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -787,3 +787,22 @@ def alter_consumer_group_offsets(self, alter_consumer_group_offsets_request, **k super(AdminClient, self).alter_consumer_group_offsets(alter_consumer_group_offsets_request, f, **kwargs) return futmap + + def set_sasl_credentials(self, username, password): + """ + Sets the SASL credentials used for this client. + These credentials will overwrite the old ones, and will be used the + next time the client needs to authenticate. + This method will not disconnect existing broker connections that + have been established with the old credentials. + This method is applicable only to SASL PLAIN and SCRAM mechanisms. + + :param str username: The username to set. + :param str password: The password to set. + + :rtype: None + + :raises KafkaException: Operation failed locally or on broker. + :raises TypeException: Invalid input. + """ + super(AdminClient, self).set_sasl_credentials(username, password) diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 210a011c0..27f209143 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -2152,6 +2152,10 @@ static PyMethodDef Admin_methods[] = { Admin_delete_acls_doc }, + { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, + set_sasl_credentials_doc + }, + { NULL } }; diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 66f5b7540..6bbadb9ca 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -1477,6 +1477,9 @@ static PyMethodDef Consumer_methods[] = { "send_offsets_to_transaction() API.\n" "\n" }, + { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, + set_sasl_credentials_doc + }, { NULL } diff --git a/src/confluent_kafka/src/Producer.c b/src/confluent_kafka/src/Producer.c index caa0c6084..b6a51f510 100644 --- a/src/confluent_kafka/src/Producer.c +++ b/src/confluent_kafka/src/Producer.c @@ -811,6 +811,9 @@ static PyMethodDef Producer_methods[] = { " Treat any other error as a fatal error.\n" "\n" }, + { "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS, + set_sasl_credentials_doc + }, { NULL } }; diff --git a/src/confluent_kafka/src/confluent_kafka.c b/src/confluent_kafka/src/confluent_kafka.c index 16ff496a3..2fab961b0 100644 --- a/src/confluent_kafka/src/confluent_kafka.c +++ b/src/confluent_kafka/src/confluent_kafka.c @@ -2578,6 +2578,55 @@ PyObject *cfl_int32_array_to_py_list (const int32_t *arr, size_t cnt) { } +/**************************************************************************** + * + * + * Methods common across all types of clients. + * + * + * + * + ****************************************************************************/ + +const char set_sasl_credentials_doc[] = PyDoc_STR( + ".. py:function:: set_sasl_credentials(username, password)\n" + "\n" + " Sets the SASL credentials used for this client.\n" + " These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate.\n" + " This method will not disconnect existing broker connections that have been established with the old credentials.\n" + " This method is applicable only to SASL PLAIN and SCRAM mechanisms.\n"); + + +PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs) { + const char *username = NULL; + const char *password = NULL; + rd_kafka_error_t* error; + CallState cs; + static char *kws[] = {"username", "password", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "ss", kws, + &username, &password)) { + return NULL; + } + + CallState_begin(self, &cs); + error = rd_kafka_sasl_set_credentials(self->rk, username, password); + + if (!CallState_end(self, &cs)) { + if (error) /* Ignore error in favour of callstate exception */ + rd_kafka_error_destroy(error); + return NULL; + } + + if (error) { + cfl_PyErr_from_error_destroy(error); + return NULL; + } + + Py_RETURN_NONE; +} + + /**************************************************************************** * * diff --git a/src/confluent_kafka/src/confluent_kafka.h b/src/confluent_kafka/src/confluent_kafka.h index f2768c3f3..23fbdb0c1 100644 --- a/src/confluent_kafka/src/confluent_kafka.h +++ b/src/confluent_kafka/src/confluent_kafka.h @@ -386,10 +386,12 @@ PyObject *c_Node_to_py(const rd_kafka_Node_t *c_node); rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist); PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs); PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs); +PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs); extern const char list_topics_doc[]; extern const char list_groups_doc[]; +extern const char set_sasl_credentials_doc[]; #ifdef RD_KAFKA_V_HEADERS diff --git a/tests/test_misc.py b/tests/test_misc.py index ae016a3a9..aca7b5a4f 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -262,3 +262,21 @@ def on_delivery(err, msg): if "CI" in os.environ: pytest.xfail("Timeout exceeded") pytest.fail("Timeout exceeded") + + +def test_set_sasl_credentials_api(): + clients = [ + AdminClient({}), + confluent_kafka.Consumer({"group.id": "dummy"}), + confluent_kafka.Producer({})] + + for c in clients: + c.set_sasl_credentials('username', 'password') + + c.set_sasl_credentials('override', 'override') + + with pytest.raises(TypeError): + c.set_sasl_credentials(None, 'password') + + with pytest.raises(TypeError): + c.set_sasl_credentials('username', None)