Skip to content

Commit

Permalink
Add set_sasl_credentials binding across clients (#1511)
Browse files Browse the repository at this point in the history
This binding calls internally rd_kafka_sasl_set_credentials.
This binding is needed across consumers, producers, and
adminclient.
  • Loading branch information
milindl authored Feb 2, 2023
1 parent 91904a5 commit 1d1449f
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 0 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Confluent's Python client for Apache Kafka

## vNext

- Added `set_sasl_credentials`. This new method (on the Producer, Consumer, and AdminClient) allows modifying the stored
SASL PLAIN/SCRAM credentials that will be used for subsequent (new) connections to a broker (#1511).


## v2.0.2

Expand Down
19 changes: 19 additions & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,3 +787,22 @@ def alter_consumer_group_offsets(self, alter_consumer_group_offsets_request, **k
super(AdminClient, self).alter_consumer_group_offsets(alter_consumer_group_offsets_request, f, **kwargs)

return futmap

def set_sasl_credentials(self, username, password):
"""
Sets the SASL credentials used for this client.
These credentials will overwrite the old ones, and will be used the
next time the client needs to authenticate.
This method will not disconnect existing broker connections that
have been established with the old credentials.
This method is applicable only to SASL PLAIN and SCRAM mechanisms.
:param str username: The username to set.
:param str password: The password to set.
:rtype: None
:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
"""
super(AdminClient, self).set_sasl_credentials(username, password)
4 changes: 4 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2152,6 +2152,10 @@ static PyMethodDef Admin_methods[] = {
Admin_delete_acls_doc
},

{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},

{ NULL }
};

Expand Down
3 changes: 3 additions & 0 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,9 @@ static PyMethodDef Consumer_methods[] = {
"send_offsets_to_transaction() API.\n"
"\n"
},
{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},


{ NULL }
Expand Down
3 changes: 3 additions & 0 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,9 @@ static PyMethodDef Producer_methods[] = {
" Treat any other error as a fatal error.\n"
"\n"
},
{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},
{ NULL }
};

Expand Down
49 changes: 49 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,55 @@ PyObject *cfl_int32_array_to_py_list (const int32_t *arr, size_t cnt) {
}


/****************************************************************************
*
*
* Methods common across all types of clients.
*
*
*
*
****************************************************************************/

const char set_sasl_credentials_doc[] = PyDoc_STR(
".. py:function:: set_sasl_credentials(username, password)\n"
"\n"
" Sets the SASL credentials used for this client.\n"
" These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate.\n"
" This method will not disconnect existing broker connections that have been established with the old credentials.\n"
" This method is applicable only to SASL PLAIN and SCRAM mechanisms.\n");


PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs) {
const char *username = NULL;
const char *password = NULL;
rd_kafka_error_t* error;
CallState cs;
static char *kws[] = {"username", "password", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "ss", kws,
&username, &password)) {
return NULL;
}

CallState_begin(self, &cs);
error = rd_kafka_sasl_set_credentials(self->rk, username, password);

if (!CallState_end(self, &cs)) {
if (error) /* Ignore error in favour of callstate exception */
rd_kafka_error_destroy(error);
return NULL;
}

if (error) {
cfl_PyErr_from_error_destroy(error);
return NULL;
}

Py_RETURN_NONE;
}


/****************************************************************************
*
*
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,12 @@ PyObject *c_Node_to_py(const rd_kafka_Node_t *c_node);
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs);
PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs);
PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs);


extern const char list_topics_doc[];
extern const char list_groups_doc[];
extern const char set_sasl_credentials_doc[];


#ifdef RD_KAFKA_V_HEADERS
Expand Down
18 changes: 18 additions & 0 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,21 @@ def on_delivery(err, msg):
if "CI" in os.environ:
pytest.xfail("Timeout exceeded")
pytest.fail("Timeout exceeded")


def test_set_sasl_credentials_api():
clients = [
AdminClient({}),
confluent_kafka.Consumer({"group.id": "dummy"}),
confluent_kafka.Producer({})]

for c in clients:
c.set_sasl_credentials('username', 'password')

c.set_sasl_credentials('override', 'override')

with pytest.raises(TypeError):
c.set_sasl_credentials(None, 'password')

with pytest.raises(TypeError):
c.set_sasl_credentials('username', None)

0 comments on commit 1d1449f

Please sign in to comment.