Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add set_sasl_credentials binding across clients #1511

Merged
merged 3 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Confluent's Python client for Apache Kafka

## vNext

- Added `set_sasl_credentials`. This new method (on the Producer, Consumer, and AdminClient) allows modifying the stored
SASL PLAIN/SCRAM credentials that will be used for subsequent (new) connections to a broker (#1511).


## v2.0.2

Expand Down
19 changes: 19 additions & 0 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,3 +787,22 @@ def alter_consumer_group_offsets(self, alter_consumer_group_offsets_request, **k
super(AdminClient, self).alter_consumer_group_offsets(alter_consumer_group_offsets_request, f, **kwargs)

return futmap

def set_sasl_credentials(self, username, password):
"""
Sets the SASL credentials used for this client.
These credentials will overwrite the old ones, and will be used the
next time the client needs to authenticate.
This method will not disconnect existing broker connections that
have been established with the old credentials.
This method is applicable only to SASL PLAIN and SCRAM mechanisms.

:param str username: The username to set.
:param str password: The password to set.

:rtype: None

:raises KafkaException: Operation failed locally or on broker.
:raises TypeException: Invalid input.
"""
super(AdminClient, self).set_sasl_credentials(username, password)
4 changes: 4 additions & 0 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2152,6 +2152,10 @@ static PyMethodDef Admin_methods[] = {
Admin_delete_acls_doc
},

{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},

{ NULL }
};

Expand Down
3 changes: 3 additions & 0 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1477,6 +1477,9 @@ static PyMethodDef Consumer_methods[] = {
"send_offsets_to_transaction() API.\n"
"\n"
},
{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},


{ NULL }
Expand Down
3 changes: 3 additions & 0 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,9 @@ static PyMethodDef Producer_methods[] = {
" Treat any other error as a fatal error.\n"
"\n"
},
{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
set_sasl_credentials_doc
},
{ NULL }
};

Expand Down
49 changes: 49 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,55 @@ PyObject *cfl_int32_array_to_py_list (const int32_t *arr, size_t cnt) {
}


/****************************************************************************
*
*
* Methods common across all types of clients.
*
*
*
*
****************************************************************************/

const char set_sasl_credentials_doc[] = PyDoc_STR(
".. py:function:: set_sasl_credentials(username, password)\n"
"\n"
" Sets the SASL credentials used for this client.\n"
" These credentials will overwrite the old ones, and will be used the next time the client needs to authenticate.\n"
" This method will not disconnect existing broker connections that have been established with the old credentials.\n"
" This method is applicable only to SASL PLAIN and SCRAM mechanisms.\n");


PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs) {
const char *username = NULL;
const char *password = NULL;
rd_kafka_error_t* error;
CallState cs;
static char *kws[] = {"username", "password", NULL};

if (!PyArg_ParseTupleAndKeywords(args, kwargs, "ss", kws,
&username, &password)) {
return NULL;
}

CallState_begin(self, &cs);
error = rd_kafka_sasl_set_credentials(self->rk, username, password);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if we need to release GIL.

Copy link
Contributor Author

@milindl milindl Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are actually doing some operations except just copying of credentials under the sasl.lock that librdkafka has.
I'm not sure how time consuming they are, so I released the GIL

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think release the GIL is correct approach here.

@emasab - What is your thoughts?

Copy link
Contributor

@emasab emasab Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, better to use CallState_begin and CallState_end because it acquires the mutex rk->rk_conf.sasl.lock and other locks in rd_kafka_all_brokers_wakeup that are blocking operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, merging this now since there's agreement to release the GIL


if (!CallState_end(self, &cs)) {
if (error) /* Ignore error in favour of callstate exception */
rd_kafka_error_destroy(error);
return NULL;
}

if (error) {
cfl_PyErr_from_error_destroy(error);
return NULL;
}

Py_RETURN_NONE;
}


/****************************************************************************
*
*
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,12 @@ PyObject *c_Node_to_py(const rd_kafka_Node_t *c_node);
rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist);
PyObject *list_topics (Handle *self, PyObject *args, PyObject *kwargs);
PyObject *list_groups (Handle *self, PyObject *args, PyObject *kwargs);
PyObject *set_sasl_credentials(Handle *self, PyObject *args, PyObject *kwargs);

milindl marked this conversation as resolved.
Show resolved Hide resolved

extern const char list_topics_doc[];
extern const char list_groups_doc[];
extern const char set_sasl_credentials_doc[];


#ifdef RD_KAFKA_V_HEADERS
Expand Down
18 changes: 18 additions & 0 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,21 @@ def on_delivery(err, msg):
if "CI" in os.environ:
pytest.xfail("Timeout exceeded")
pytest.fail("Timeout exceeded")


def test_set_sasl_credentials_api():
clients = [
AdminClient({}),
confluent_kafka.Consumer({"group.id": "dummy"}),
confluent_kafka.Producer({})]

for c in clients:
c.set_sasl_credentials('username', 'password')

c.set_sasl_credentials('override', 'override')

with pytest.raises(TypeError):
c.set_sasl_credentials(None, 'password')

with pytest.raises(TypeError):
c.set_sasl_credentials('username', None)