Skip to content

Commit 9bfe49c

Browse files
authored
Fix error propagation rule for Python's C API (#2019)
1 parent f026e72 commit 9bfe49c

File tree

9 files changed

+540
-67
lines changed

9 files changed

+540
-67
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ for a complete list of changes, enhancements, fixes and upgrade considerations.
7878

7979
v2.11.0 is a feature release with the following enhancements:
8080

81+
### Fixes
82+
- Fix error propagation rule for Python's C API to prevent SystemError when callbacks raise exceptions (#865)
83+
8184
confluent-kafka-python v2.11.0 is based on librdkafka v2.11.0, see the
8285
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0)
8386
for a complete list of changes, enhancements, fixes and upgrade considerations.

src/confluent_kafka/src/Admin.c

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4717,7 +4717,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
47174717
PyGILState_STATE gstate;
47184718
PyObject *error, *method, *ret;
47194719
PyObject *result = NULL;
4720-
PyObject *exctype = NULL, *exc = NULL, *excargs = NULL;
4720+
PyObject *exc = NULL, *excargs = NULL;
47214721

47224722
/* Acquire GIL */
47234723
gstate = PyGILState_Ensure();
@@ -5093,7 +5093,7 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
50935093
PyObject *trace = NULL;
50945094

50955095
/* Fetch (and clear) currently raised exception */
5096-
PyErr_Fetch(&exctype, &error, &trace);
5096+
cfl_exception_fetch(&exc);
50975097
Py_XDECREF(trace);
50985098
}
50995099

@@ -5124,22 +5124,17 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
51245124
* Pass an exception to future.set_exception().
51255125
*/
51265126

5127-
if (!exctype) {
5127+
if (!exc) {
51285128
/* No previous exception raised, use KafkaException */
5129-
exctype = KafkaException;
5130-
Py_INCREF(exctype);
5131-
}
5132-
5133-
/* Create a new exception based on exception type and error. */
5134-
excargs = PyTuple_New(1);
5135-
Py_INCREF(error); /* tuple's reference */
5136-
PyTuple_SET_ITEM(excargs, 0, error);
5137-
exc = ((PyTypeObject *)exctype)->tp_new(
5138-
(PyTypeObject *)exctype, NULL, NULL);
5139-
exc->ob_type->tp_init(exc, excargs, NULL);
5140-
Py_DECREF(excargs);
5141-
Py_XDECREF(exctype);
5142-
Py_XDECREF(error); /* from error source above */
5129+
excargs = PyTuple_New(1);
5130+
Py_INCREF(error); /* tuple's reference */
5131+
PyTuple_SET_ITEM(excargs, 0, error);
5132+
exc = ((PyTypeObject *)KafkaException)->tp_new(
5133+
(PyTypeObject *)KafkaException, NULL, NULL);
5134+
exc->ob_type->tp_init(exc, excargs, NULL);
5135+
Py_DECREF(excargs);
5136+
Py_XDECREF(error); /* from error source above */
5137+
}
51435138

51445139
/*
51455140
* Call future.set_exception(exc)

src/confluent_kafka/src/Consumer.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
424424
if (result)
425425
Py_DECREF(result);
426426
else {
427+
CallState_fetch_exception(cs);
427428
CallState_crash(cs);
428429
rd_kafka_yield(rk);
429430
}
@@ -1586,6 +1587,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
15861587
if (result)
15871588
Py_DECREF(result);
15881589
else {
1590+
CallState_fetch_exception(cs);
15891591
CallState_crash(cs);
15901592
rd_kafka_yield(rk);
15911593
}

src/confluent_kafka/src/Producer.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
174174
if (result)
175175
Py_DECREF(result);
176176
else {
177+
CallState_fetch_exception(cs);
177178
CallState_crash(cs);
178179
rd_kafka_yield(rk);
179180
}

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1766,6 +1766,7 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
17661766
if (result)
17671767
Py_DECREF(result);
17681768
else {
1769+
CallState_fetch_exception(cs);
17691770
crash:
17701771
CallState_crash(cs);
17711772
rd_kafka_yield(h->rk);
@@ -1819,6 +1820,8 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker
18191820
/* throttle_cb executed successfully */
18201821
Py_DECREF(result);
18211822
goto done;
1823+
} else {
1824+
CallState_fetch_exception(cs);
18221825
}
18231826

18241827
/**
@@ -1850,6 +1853,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
18501853
if (result)
18511854
Py_DECREF(result);
18521855
else {
1856+
CallState_fetch_exception(cs);
18531857
CallState_crash(cs);
18541858
rd_kafka_yield(h->rk);
18551859
}
@@ -1885,6 +1889,7 @@ static void log_cb (const rd_kafka_t *rk, int level,
18851889
if (result)
18861890
Py_DECREF(result);
18871891
else {
1892+
CallState_fetch_exception(cs);
18881893
CallState_crash(cs);
18891894
rd_kafka_yield(h->rk);
18901895
}
@@ -2583,6 +2588,7 @@ void CallState_begin (Handle *h, CallState *cs) {
25832588
cs->thread_state = PyEval_SaveThread();
25842589
assert(cs->thread_state != NULL);
25852590
cs->crashed = 0;
2591+
cs->exception_value = NULL;
25862592
#ifdef WITH_PY_TSS
25872593
PyThread_tss_set(&h->tlskey, cs);
25882594
#else
@@ -2603,9 +2609,17 @@ int CallState_end (Handle *h, CallState *cs) {
26032609

26042610
PyEval_RestoreThread(cs->thread_state);
26052611

2606-
if (PyErr_CheckSignals() == -1 || cs->crashed)
2612+
if (PyErr_CheckSignals() == -1)
26072613
return 0;
26082614

2615+
if (cs->crashed) {
2616+
/* Restore the saved exception if we have one */
2617+
if (cs->exception_value) {
2618+
CallState_restore_exception(cs);
2619+
}
2620+
return 0;
2621+
}
2622+
26092623
return 1;
26102624
}
26112625

src/confluent_kafka/src/confluent_kafka.h

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,55 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg);
270270
typedef struct {
271271
PyThreadState *thread_state;
272272
int crashed; /* Callback crashed */
273+
PyObject *exception_value; /* Stored exception value */
273274
} CallState;
274275

276+
/**
277+
* @brief Compatibility layer for Python exception handling API changes.
278+
* PyErr_Fetch/PyErr_Restore were deprecated in Python 3.12 in favor of
279+
* PyErr_GetRaisedException/PyErr_SetRaisedException.
280+
*/
281+
282+
static inline void
283+
cfl_exception_fetch(PyObject **exc_value) {
284+
#if PY_VERSION_HEX >= 0x030c0000
285+
/* Python 3.12+ - use new API */
286+
*exc_value = PyErr_GetRaisedException();
287+
#else
288+
/* Python < 3.12 - use legacy API */
289+
PyObject *exc_type, *exc_traceback;
290+
PyErr_Fetch(&exc_type, exc_value, &exc_traceback);
291+
Py_XDECREF(exc_type);
292+
Py_XDECREF(exc_traceback);
293+
#endif
294+
}
295+
296+
static inline void
297+
cfl_exception_restore(PyObject *exc_value) {
298+
#if PY_VERSION_HEX >= 0x030c0000
299+
/* Python 3.12+ - use new API */
300+
if (exc_value) {
301+
PyErr_SetRaisedException(exc_value);
302+
}
303+
#else
304+
/* Python < 3.12 - use legacy API */
305+
PyErr_SetObject(PyExceptionInstance_Class(exc_value), exc_value);
306+
#endif
307+
}
308+
309+
static inline void
310+
CallState_fetch_exception(CallState *cs) {
311+
cfl_exception_fetch(&cs->exception_value);
312+
}
313+
314+
static inline void
315+
CallState_restore_exception(CallState *cs) {
316+
if (cs->exception_value) {
317+
cfl_exception_restore(cs->exception_value);
318+
cs->exception_value = NULL;
319+
}
320+
}
321+
275322
/**
276323
* @brief Initialiase a CallState and unlock the GIL prior to a
277324
* possibly blocking external call.

0 commit comments

Comments
 (0)