Skip to content

Commit 3e4c8d4

Browse files
committed
Address feedback on CallState params and adding more tests
1 parent beab3c7 commit 3e4c8d4

File tree

8 files changed

+396
-62
lines changed

8 files changed

+396
-62
lines changed

src/confluent_kafka/src/Admin.c

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4717,7 +4717,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
47174717
PyGILState_STATE gstate;
47184718
PyObject *error, *method, *ret;
47194719
PyObject *result = NULL;
4720-
PyObject *exctype = NULL, *exc = NULL, *excargs = NULL;
4720+
PyObject *exc = NULL, *excargs = NULL;
47214721

47224722
/* Acquire GIL */
47234723
gstate = PyGILState_Ensure();
@@ -5093,7 +5093,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
50935093
PyObject *trace = NULL;
50945094

50955095
/* Fetch (and clear) currently raised exception */
5096-
cfl_exception_fetch(&exctype, &error, &trace);
5096+
cfl_exception_fetch(&exc);
50975097
Py_XDECREF(trace);
50985098
}
50995099

@@ -5124,22 +5124,17 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
51245124
* Pass an exception to future.set_exception().
51255125
*/
51265126

5127-
if (!exctype) {
5127+
if (!exc) {
51285128
/* No previous exception raised, use KafkaException */
5129-
exctype = KafkaException;
5130-
Py_INCREF(exctype);
5131-
}
5132-
5133-
/* Create a new exception based on exception type and error. */
5134-
excargs = PyTuple_New(1);
5135-
Py_INCREF(error); /* tuple's reference */
5136-
PyTuple_SET_ITEM(excargs, 0, error);
5137-
exc = ((PyTypeObject *)exctype)->tp_new(
5138-
(PyTypeObject *)exctype, NULL, NULL);
5139-
exc->ob_type->tp_init(exc, excargs, NULL);
5140-
Py_DECREF(excargs);
5141-
Py_XDECREF(exctype);
5142-
Py_XDECREF(error); /* from error source above */
5129+
excargs = PyTuple_New(1);
5130+
Py_INCREF(error); /* tuple's reference */
5131+
PyTuple_SET_ITEM(excargs, 0, error);
5132+
exc = ((PyTypeObject *)KafkaException)->tp_new(
5133+
(PyTypeObject *)KafkaException, NULL, NULL);
5134+
exc->ob_type->tp_init(exc, excargs, NULL);
5135+
Py_DECREF(excargs);
5136+
Py_XDECREF(error); /* from error source above */
5137+
}
51435138

51445139
/*
51455140
* Call future.set_exception(exc)

src/confluent_kafka/src/Consumer.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
424424
if (result)
425425
Py_DECREF(result);
426426
else {
427+
CallState_fetch_exception(cs);
427428
CallState_crash(cs);
428429
rd_kafka_yield(rk);
429430
}
@@ -1586,7 +1587,6 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
15861587
if (result)
15871588
Py_DECREF(result);
15881589
else {
1589-
15901590
CallState_fetch_exception(cs);
15911591
CallState_crash(cs);
15921592
rd_kafka_yield(rk);

src/confluent_kafka/src/Producer.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
174174
if (result)
175175
Py_DECREF(result);
176176
else {
177-
178177
CallState_fetch_exception(cs);
179178
CallState_crash(cs);
180179
rd_kafka_yield(rk);

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1766,8 +1766,7 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
17661766
if (result)
17671767
Py_DECREF(result);
17681768
else {
1769-
1770-
CallState_fetch_exception(cs);
1769+
CallState_fetch_exception(cs);
17711770
crash:
17721771
CallState_crash(cs);
17731772
rd_kafka_yield(h->rk);
@@ -2589,9 +2588,7 @@ void CallState_begin (Handle *h, CallState *cs) {
25892588
cs->thread_state = PyEval_SaveThread();
25902589
assert(cs->thread_state != NULL);
25912590
cs->crashed = 0;
2592-
cs->exception_type = NULL;
25932591
cs->exception_value = NULL;
2594-
cs->exception_traceback = NULL;
25952592
#ifdef WITH_PY_TSS
25962593
PyThread_tss_set(&h->tlskey, cs);
25972594
#else
@@ -2617,7 +2614,7 @@ int CallState_end (Handle *h, CallState *cs) {
26172614

26182615
if (cs->crashed) {
26192616
/* Restore the saved exception if we have one */
2620-
if (cs->exception_type) {
2617+
if (cs->exception_value) {
26212618
CallState_restore_exception(cs);
26222619
}
26232620
return 0;

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,7 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg);
270270
typedef struct {
271271
PyThreadState *thread_state;
272272
int crashed; /* Callback crashed */
273-
PyObject *exception_type; /* Stored exception type */
274273
PyObject *exception_value; /* Stored exception value */
275-
PyObject *exception_traceback; /* Stored exception traceback */
276274
} CallState;
277275

278276
/**
@@ -282,51 +280,42 @@ typedef struct {
282280
*/
283281

284282
static inline void
285-
cfl_exception_fetch(PyObject **exc_type, PyObject **exc_value, PyObject **exc_traceback) {
283+
cfl_exception_fetch(PyObject **exc_value) {
286284
#if PY_VERSION_HEX >= 0x030c0000
287285
/* Python 3.12+ - use new API */
288-
PyObject *exc = PyErr_GetRaisedException();
289-
if (exc) {
290-
*exc_type = (PyObject *)Py_TYPE(exc);
291-
Py_INCREF(*exc_type);
292-
*exc_value = exc;
293-
*exc_traceback = PyException_GetTraceback(exc);
294-
} else {
295-
*exc_type = *exc_value = *exc_traceback = NULL;
296-
}
286+
*exc_value = PyErr_GetRaisedException();
297287
#else
298288
/* Python < 3.12 - use legacy API */
299-
PyErr_Fetch(exc_type, exc_value, exc_traceback);
289+
PyObject *exc_type, *exc_traceback;
290+
PyErr_Fetch(&exc_type, exc_value, &exc_traceback);
291+
Py_XDECREF(exc_type);
292+
Py_XDECREF(exc_traceback);
300293
#endif
301294
}
302295

303296
static inline void
304-
cfl_exception_restore(PyObject *exc_type, PyObject *exc_value, PyObject *exc_traceback) {
297+
cfl_exception_restore(PyObject *exc_value) {
305298
#if PY_VERSION_HEX >= 0x030c0000
306299
/* Python 3.12+ - use new API */
307300
if (exc_value) {
308301
PyErr_SetRaisedException(exc_value);
309-
Py_XDECREF(exc_type);
310-
Py_XDECREF(exc_traceback);
311302
}
312303
#else
313304
/* Python < 3.12 - use legacy API */
314-
PyErr_Restore(exc_type, exc_value, exc_traceback);
305+
PyErr_SetObject(PyExceptionInstance_Class(exc_value), exc_value);
315306
#endif
316307
}
317308

318309
static inline void
319310
CallState_fetch_exception(CallState *cs) {
320-
cfl_exception_fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
311+
cfl_exception_fetch(&cs->exception_value);
321312
}
322313

323314
static inline void
324315
CallState_restore_exception(CallState *cs) {
325-
if (cs->exception_type) {
326-
cfl_exception_restore(cs->exception_type, cs->exception_value, cs->exception_traceback);
327-
cs->exception_type = NULL;
316+
if (cs->exception_value) {
317+
cfl_exception_restore(cs->exception_value);
328318
cs->exception_value = NULL;
329-
cs->exception_traceback = NULL;
330319
}
331320
}
332321

tests/test_Admin.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,3 +1224,89 @@ def test_elect_leaders():
12241224
with pytest.raises(KafkaException):
12251225
a.elect_leaders(correct_election_type, [correct_partitions])\
12261226
.result(timeout=1)
1227+
1228+
1229+
@pytest.mark.skipif(libversion()[1] < 0x000b0500,
1230+
reason="AdminAPI requires librdkafka >= v0.11.5")
1231+
def test_admin_callback_exception_no_system_error():
1232+
"""Test AdminClient callbacks exception handling with different exception types"""
1233+
1234+
# Test error_cb with different exception types
1235+
def error_cb_kafka_exception(error):
1236+
raise KafkaException(KafkaError._FAIL, "KafkaException from error_cb")
1237+
1238+
def error_cb_value_error(error):
1239+
raise ValueError("ValueError from error_cb")
1240+
1241+
def error_cb_runtime_error(error):
1242+
raise RuntimeError("RuntimeError from error_cb")
1243+
1244+
# Test error_cb with KafkaException
1245+
admin = AdminClient({
1246+
'bootstrap.servers': 'nonexistent-broker:9092',
1247+
'socket.timeout.ms': 100,
1248+
'error_cb': error_cb_kafka_exception
1249+
})
1250+
1251+
with pytest.raises(KafkaException) as exc_info:
1252+
admin.poll(timeout=0.2)
1253+
assert "KafkaException from error_cb" in str(exc_info.value)
1254+
1255+
# Test error_cb with ValueError
1256+
admin = AdminClient({
1257+
'bootstrap.servers': 'nonexistent-broker:9092',
1258+
'socket.timeout.ms': 100,
1259+
'error_cb': error_cb_value_error
1260+
})
1261+
1262+
with pytest.raises(ValueError) as exc_info:
1263+
admin.poll(timeout=0.2)
1264+
assert "ValueError from error_cb" in str(exc_info.value)
1265+
1266+
# Test error_cb with RuntimeError
1267+
admin = AdminClient({
1268+
'bootstrap.servers': 'nonexistent-broker:9092',
1269+
'socket.timeout.ms': 100,
1270+
'error_cb': error_cb_runtime_error
1271+
})
1272+
1273+
with pytest.raises(RuntimeError) as exc_info:
1274+
admin.poll(timeout=0.2)
1275+
assert "RuntimeError from error_cb" in str(exc_info.value)
1276+
1277+
1278+
@pytest.mark.skipif(libversion()[1] < 0x000b0500,
1279+
reason="AdminAPI requires librdkafka >= v0.11.5")
1280+
def test_admin_multiple_callbacks_different_error_types():
1281+
"""Test AdminClient with multiple callbacks configured with different error types to see which one gets triggered"""
1282+
1283+
callbacks_called = []
1284+
1285+
def error_cb_that_raises_runtime(error):
1286+
callbacks_called.append('error_cb_runtime')
1287+
raise RuntimeError("RuntimeError from error_cb")
1288+
1289+
def stats_cb_that_raises_value(stats_json):
1290+
callbacks_called.append('stats_cb_value')
1291+
raise ValueError("ValueError from stats_cb")
1292+
1293+
def throttle_cb_that_raises_kafka(throttle_event):
1294+
callbacks_called.append('throttle_cb_kafka')
1295+
raise KafkaException(KafkaError._FAIL, "KafkaException from throttle_cb")
1296+
1297+
admin = AdminClient({
1298+
'bootstrap.servers': 'nonexistent-broker:9092',
1299+
'socket.timeout.ms': 100,
1300+
'statistics.interval.ms': 100, # Enable stats callback
1301+
'error_cb': error_cb_that_raises_runtime,
1302+
'stats_cb': stats_cb_that_raises_value,
1303+
'throttle_cb': throttle_cb_that_raises_kafka
1304+
})
1305+
1306+
# Test that error_cb callback raises an exception (it's triggered by connection failures)
1307+
with pytest.raises(RuntimeError) as exc_info:
1308+
admin.poll(timeout=0.2)
1309+
1310+
# Verify that error_cb was called
1311+
assert len(callbacks_called) > 0
1312+
assert 'error_cb_runtime' in callbacks_called

0 commit comments

Comments
 (0)