Skip to content

Commit

Permalink
Fix calls with empty lists (#1662)
Browse files Browse the repository at this point in the history
* Empty topic collection allowed

* Empty topic names disallowed

* Test with different options

* New _make_futmap_result_from_list method
  • Loading branch information
emasab authored Oct 25, 2023
1 parent aaeca9a commit c391879
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 59 deletions.
54 changes: 24 additions & 30 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,31 +199,6 @@ def _make_consumer_groups_result(f, futmap):
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_describe_topics_result(f, futmap):
"""
Map per-topic results to per-topic futures in futmap.
"""
try:

results = f.result()
futmap_values = list(futmap.values())
len_results = len(results)
len_futures = len(futmap_values)
if len_results != len_futures:
raise RuntimeError(
"Results length {} is different from future-map length {}".format(len_results, len_futures))
for i, result in enumerate(results):
fut = futmap_values[i]
if isinstance(result, KafkaError):
fut.set_exception(KafkaException(result))
else:
fut.set_result(result)
except Exception as e:
# Request-level exception, raise the same for all topics
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_consumer_group_offsets_result(f, futmap):
"""
Expand Down Expand Up @@ -298,6 +273,28 @@ def _make_user_scram_credentials_result(f, futmap):
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_futmap_result_from_list(f, futmap):
try:

results = f.result()
futmap_values = list(futmap.values())
len_results = len(results)
len_futures = len(futmap_values)
if len_results != len_futures:
raise RuntimeError(
"Results length {} is different from future-map length {}".format(len_results, len_futures))
for i, result in enumerate(results):
fut = futmap_values[i]
if isinstance(result, KafkaError):
fut.set_exception(KafkaException(result))
else:
fut.set_result(result)
except Exception as e:
# Request-level exception, raise the same for all topics
for _, fut in futmap.items():
fut.set_exception(e)

@staticmethod
def _make_futmap_result(f, futmap):
try:
Expand Down Expand Up @@ -948,11 +945,8 @@ def describe_topics(self, topics, **kwargs):
if not isinstance(topic_names, list):
raise TypeError("Expected list of topic names to be described")

if len(topic_names) == 0:
raise ValueError("Expected at least one topic to be described")

f, futmap = AdminClient._make_futures(topic_names, None,
AdminClient._make_describe_topics_result)
f, futmap = AdminClient._make_futures_v2(topic_names, None,
AdminClient._make_futmap_result_from_list)

super(AdminClient, self).describe_topics(topic_names, f, **kwargs)

Expand Down
49 changes: 28 additions & 21 deletions src/confluent_kafka/src/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -2326,36 +2326,43 @@ PyObject *Admin_describe_topics (Handle *self, PyObject *args, PyObject *kwargs)
&options.include_authorized_operations))
goto err;

if (!PyList_Check(topics) || (topics_cnt = (int)PyList_Size(topics)) < 1) {
PyErr_SetString(PyExc_ValueError,
"Expected non-empty list of topics");
if (!PyList_Check(topics)) {
PyErr_SetString(PyExc_TypeError,
"Expected a list of topics");
goto err;
}

c_topics = malloc(sizeof(char *) * topics_cnt);
topics_cnt = PyList_Size(topics);

for (i = 0 ; i < topics_cnt ; i++) {
PyObject *topic = PyList_GET_ITEM(topics, i);
PyObject *utopic;
PyObject *uotopic = NULL;
if (topics_cnt) {
c_topics = malloc(sizeof(char *) * topics_cnt);
for (i = 0 ; i < topics_cnt ; i++) {
PyObject *topic = PyList_GET_ITEM(topics, i);
PyObject *uotopic = NULL;

if (topic == Py_None ||
!(utopic = cfl_PyObject_Unistr(topic))) {
PyErr_Format(PyExc_ValueError,
"Expected list of topics strings, "
"not %s",
((PyTypeObject *)PyObject_Type(topic))->
tp_name);
goto err;
}
if (topic == Py_None ||
!PyUnicode_Check(topic)) {
PyErr_Format(PyExc_TypeError,
"Expected list of topics strings, "
"not %s",
((PyTypeObject *)PyObject_Type(topic))->
tp_name);
goto err;
}

c_topics[i] = cfl_PyUnistr_AsUTF8(utopic, &uotopic);
c_topics[i] = cfl_PyUnistr_AsUTF8(topic, &uotopic);
Py_XDECREF(uotopic);

Py_XDECREF(utopic);
Py_XDECREF(uotopic);
if (!c_topics[i][0]) {
PyErr_Format(PyExc_ValueError,
"Empty topic name at index %d isn't "
"allowed", i);
goto err;
}
}
}
c_topic_collection = rd_kafka_TopicCollection_of_topic_names(c_topics, topics_cnt);

c_topic_collection = rd_kafka_TopicCollection_of_topic_names(c_topics, topics_cnt);
c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_DESCRIBETOPICS,
&options, future);
if (!c_options) {
Expand Down
53 changes: 45 additions & 8 deletions tests/test_Admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,18 +637,55 @@ def test_describe_consumer_groups_api():
def test_describe_topics_api():
a = AdminClient({"socket.timeout.ms": 10})

topic_names = ["test-topic-1", "test-topic-2"]
# Wrong option types
for kwargs in [{"include_authorized_operations": "wrong_type"},
{"request_timeout": "wrong_type"}]:
with pytest.raises(TypeError):
a.describe_topics(TopicCollection([]), **kwargs)

a.describe_topics(TopicCollection(topic_names))
# Wrong option values
for kwargs in [{"request_timeout": -1}]:
with pytest.raises(ValueError):
a.describe_topics(TopicCollection([]), **kwargs)

with pytest.raises(TypeError):
a.describe_topics(topic_names)
# Test with different options
for kwargs in [{},
{"include_authorized_operations": True},
{"request_timeout": 0.01},
{"include_authorized_operations": False,
"request_timeout": 0.01}]:

with pytest.raises(TypeError):
a.describe_topics("test-topic-1")
topic_names = ["test-topic-1", "test-topic-2"]

with pytest.raises(ValueError):
a.describe_topics(TopicCollection([]))
# Empty TopicCollection returns empty futures
fs = a.describe_topics(TopicCollection([]), **kwargs)
assert len(fs) == 0

# Normal call
fs = a.describe_topics(TopicCollection(topic_names), **kwargs)
for f in concurrent.futures.as_completed(iter(fs.values())):
e = f.exception(timeout=1)
assert isinstance(e, KafkaException)
assert e.args[0].code() == KafkaError._TIMED_OUT

# Wrong argument type
for args in [
[topic_names],
["test-topic-1"],
[TopicCollection([3])],
[TopicCollection(["correct", 3])],
[TopicCollection([None])]
]:
with pytest.raises(TypeError):
a.describe_topics(*args, **kwargs)

# Wrong argument value
for args in [
[TopicCollection([""])],
[TopicCollection(["correct", ""])]
]:
with pytest.raises(ValueError):
a.describe_topics(*args, **kwargs)


def test_describe_cluster():
Expand Down

0 comments on commit c391879

Please sign in to comment.