From 700ca5fe34c1ef989c73a7eae65ff46dfbdeb46e Mon Sep 17 00:00:00 2001 From: Zhang Hao Date: Fri, 5 Jan 2024 11:32:55 +0800 Subject: [PATCH] Fix bug: PLPY function causes master process reset. (#16856) ## Problem An error occurs in python lib when a plpython function is executed. After our analysis, in the user's cluster, a plpython UDF was running with the unstable network, and got a timeout error: `failed to acquire resources on one or more segments`. Then a plpython UDF was run in the same session, and the UDF failed with GC error. Here is the core dump: ``` 2023-11-24 10:15:18.945507 CST,,,p2705198,th2081832064,,,,0,,,seg-1,,,,,"LOG","00000","3rd party error log: #0 0x7f7c68b6d55b in frame_dealloc /home/cc/repo/cpython/Objects/frameobject.c:509:5 #1 0x7f7c68b5109d in gen_send_ex /home/cc/repo/cpython/Objects/genobject.c:108:9 #2 0x7f7c68af9ddd in PyIter_Next /home/cc/repo/cpython/Objects/abstract.c:3118:14 #3 0x7f7c78caa5c0 in PLy_exec_function /home/cc/repo/gpdb6/src/pl/plpython/plpy_exec.c:134:11 #4 0x7f7c78cb5ffb in plpython_call_handler /home/cc/repo/gpdb6/src/pl/plpython/plpy_main.c:387:13 #5 0x562f5e008bb5 in ExecMakeTableFunctionResult /home/cc/repo/gpdb6/src/backend/executor/execQual.c:2395:13 #6 0x562f5e0dddec in FunctionNext_guts /home/cc/repo/gpdb6/src/backend/executor/nodeFunctionscan.c:142:5 #7 0x562f5e0da094 in FunctionNext /home/cc/repo/gpdb6/src/backend/executor/nodeFunctionscan.c:350:11 #8 0x562f5e03d4b0 in ExecScanFetch /home/cc/repo/gpdb6/src/backend/executor/execScan.c:84:9 #9 0x562f5e03cd8f in ExecScan /home/cc/repo/gpdb6/src/backend/executor/execScan.c:154:10 #10 0x562f5e0da072 in ExecFunctionScan /home/cc/repo/gpdb6/src/backend/executor/nodeFunctionscan.c:380:9 #11 0x562f5e001a1c in ExecProcNode /home/cc/repo/gpdb6/src/backend/executor/execProcnode.c:1071:13 #12 0x562f5dfe6377 in ExecutePlan /home/cc/repo/gpdb6/src/backend/executor/execMain.c:3202:10 #13 0x562f5dfe5bf4 in standard_ExecutorRun /home/cc/repo/gpdb6/src/backend/executor/execMain.c:1171:5 #14 0x562f5dfe4877 in ExecutorRun /home/cc/repo/gpdb6/src/backend/executor/execMain.c:992:4 #15 0x562f5e857e69 in PortalRunSelect /home/cc/repo/gpdb6/src/backend/tcop/pquery.c:1164:4 #16 0x562f5e856d3f in PortalRun /home/cc/repo/gpdb6/src/backend/tcop/pquery.c:1005:18 #17 0x562f5e84607a in exec_simple_query /home/cc/repo/gpdb6/src/backend/tcop/postgres.c:1848:10 ``` ## Reproduce We can use a simple procedure to reproduce the above problem: - set timeout GUC: `gpconfig -c gp_segment_connect_timeout -v 5` and `gpstop -ari` - prepare function: ``` CREATE EXTENSION plpythonu; CREATE OR REPLACE FUNCTION test_func() RETURNS SETOF int AS $$ plpy.execute("select pg_backend_pid()") for i in range(0, 5): yield (i) $$ LANGUAGE plpythonu; ``` - exit from the current psql session. - stop the postmaster of segment: `gdb -p "the pid of segment postmaster"` - enter a psql session. - call `SELECT test_func();` and get error ``` gpadmin=# select test_func(); ERROR: function "test_func" error fetching next item from iterator (plpy_elog.c:121) DETAIL: Exception: failed to acquire resources on one or more segments CONTEXT: Traceback (most recent call last): PL/Python function "test_func" ``` - quit gdb and make postmaster runnable. - call `SELECT test_func();` again and get panic ``` gpadmin=# SELECT test_func(); server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. The connection to the server was lost. Attempting reset: Failed. !> ``` ## Analysis - There is an SPI call in test_func(): `plpy.execute()`. - Then coordinator will start a subtransaction by PLy_spi_subtransaction_begin(); - Meanwhile, if the segment cannot receive the instruction from the coordinator, the subtransaction beginning procedure return fails. - BUT! The Python processor does not know whether an error happened and does not clean its environment. - Then the next plpython UDF in the same session will fail due to the wrong Python environment. ## Solution - Use try-catch to catch the exception caused by PLy_spi_subtransaction_begin() - set the python error indicator by PLy_spi_exception_set() Co-authored-by: Chen Mulong --- src/backend/access/transam/xact.c | 1 + .../expected/plpython_subtransaction.out | 45 +++++++++++++++ src/pl/plpython/plpy_cursorobject.c | 12 ++-- src/pl/plpython/plpy_spi.c | 57 ++++++++++++++++--- src/pl/plpython/plpy_spi.h | 2 +- .../plpython/sql/plpython_subtransaction.sql | 21 +++++++ 6 files changed, 125 insertions(+), 13 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1b73917e40f..8566480fc6f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5560,6 +5560,7 @@ void BeginInternalSubTransaction(const char *name) { TransactionState s = CurrentTransactionState; + SIMPLE_FAULT_INJECTOR("begin_internal_sub_transaction"); if (Gp_role == GP_ROLE_DISPATCH) { diff --git a/src/pl/plpython/expected/plpython_subtransaction.out b/src/pl/plpython/expected/plpython_subtransaction.out index 2a56541917d..f043e27b444 100644 --- a/src/pl/plpython/expected/plpython_subtransaction.out +++ b/src/pl/plpython/expected/plpython_subtransaction.out @@ -399,3 +399,48 @@ CONTEXT: Traceback (most recent call last): PL/Python function "cursor_close_aborted_subxact", line 7, in cur.close() PL/Python function "cursor_close_aborted_subxact" +-- error report test in subtransaction begin +-- prepare function +CREATE OR REPLACE FUNCTION test_func() RETURNS SETOF int AS +$$ +plpy.execute("select pg_backend_pid()") + +for i in range(0, 5): + yield (i) + +$$ LANGUAGE plpython3u; +-- inject fault and wait for trigger +select gp_inject_fault_infinite('begin_internal_sub_transaction', 'error', 1); + gp_inject_fault_infinite +-------------------------- + Success: +(1 row) + +SELECT test_func(); +ERROR: function "test_func" error fetching next item from iterator +DETAIL: spiexceptions.FaultInject: fault triggered, fault name:'begin_internal_sub_transaction' fault type:'error' +CONTEXT: Traceback (most recent call last): +PL/Python function "test_func" +select gp_wait_until_triggered_fault('begin_internal_sub_transaction', 1, 1); + gp_wait_until_triggered_fault +------------------------------- + Success: +(1 row) + +select gp_inject_fault('begin_internal_sub_transaction', 'reset', 1); + gp_inject_fault +----------------- + Success: +(1 row) + +SELECT test_func(); + test_func +----------- + 0 + 1 + 2 + 3 + 4 +(5 rows) + +DROP FUNCTION test_func(); diff --git a/src/pl/plpython/plpy_cursorobject.c b/src/pl/plpython/plpy_cursorobject.c index 08d8b607e38..f0771018a17 100644 --- a/src/pl/plpython/plpy_cursorobject.c +++ b/src/pl/plpython/plpy_cursorobject.c @@ -98,7 +98,8 @@ PLy_cursor_query(const char *query) oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; - PLy_spi_subtransaction_begin(oldcontext, oldowner); + if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) + return NULL; PG_TRY(); { @@ -196,7 +197,8 @@ PLy_cursor_plan(PyObject *ob, PyObject *args) oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; - PLy_spi_subtransaction_begin(oldcontext, oldowner); + if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) + return NULL; PG_TRY(); { @@ -333,7 +335,8 @@ PLy_cursor_iternext(PyObject *self) oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; - PLy_spi_subtransaction_begin(oldcontext, oldowner); + if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) + return NULL; PG_TRY(); { @@ -403,7 +406,8 @@ PLy_cursor_fetch(PyObject *self, PyObject *args) oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; - PLy_spi_subtransaction_begin(oldcontext, oldowner); + if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) + return NULL; PG_TRY(); { diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c index 9f105994c11..5c483592381 100644 --- a/src/pl/plpython/plpy_spi.c +++ b/src/pl/plpython/plpy_spi.c @@ -84,7 +84,8 @@ PLy_spi_prepare(PyObject *self, PyObject *args) oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; - PLy_spi_subtransaction_begin(oldcontext, oldowner); + if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) + return NULL; PG_TRY(); { @@ -238,7 +239,8 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, int64 limit) oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; - PLy_spi_subtransaction_begin(oldcontext, oldowner); + if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) + return NULL; PG_TRY(); { @@ -334,7 +336,8 @@ PLy_spi_execute_query(char *query, int64 limit) oldcontext = CurrentMemoryContext; oldowner = CurrentResourceOwner; - PLy_spi_subtransaction_begin(oldcontext, oldowner); + if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) + return NULL; PG_TRY(); { @@ -595,7 +598,9 @@ PLy_rollback(PyObject *self, PyObject *args) * MemoryContext oldcontext = CurrentMemoryContext; * ResourceOwner oldowner = CurrentResourceOwner; * - * PLy_spi_subtransaction_begin(oldcontext, oldowner); + * if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) + * return NULL; + * * PG_TRY(); * { * @@ -612,12 +617,48 @@ PLy_rollback(PyObject *self, PyObject *args) * These utilities take care of restoring connection to the SPI manager and * setting a Python exception in case of an abort. */ -void +bool PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner) { - BeginInternalSubTransaction(NULL); - /* Want to run inside function's memory context */ - MemoryContextSwitchTo(oldcontext); + PG_TRY(); + { + /* Start subtransaction (could fail) */ + BeginInternalSubTransaction(NULL); + /* Want to run inside function's memory context */ + MemoryContextSwitchTo(oldcontext); + } + PG_CATCH(); + { + ErrorData *edata; + PLyExceptionEntry *entry; + PyObject *exc; + + /* Ensure we restore original context and owner */ + MemoryContextSwitchTo(oldcontext); + CurrentResourceOwner = oldowner; + + /* Save error info */ + edata = CopyErrorData(); + FlushErrorState(); + + /* Look up the correct exception */ + entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), + HASH_FIND, NULL); + + /* + * This could be a custom error code, if that's the case fallback to + * SPIError + */ + exc = entry ? entry->exc : PLy_exc_spi_error; + /* Make Python raise the exception */ + PLy_spi_exception_set(exc, edata); + FreeErrorData(edata); + + return false; + } + PG_END_TRY(); + + return true; } void diff --git a/src/pl/plpython/plpy_spi.h b/src/pl/plpython/plpy_spi.h index 98ccd210931..a43d803928a 100644 --- a/src/pl/plpython/plpy_spi.h +++ b/src/pl/plpython/plpy_spi.h @@ -22,7 +22,7 @@ typedef struct PLyExceptionEntry } PLyExceptionEntry; /* handling of SPI operations inside subtransactions */ -extern void PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner); +extern bool PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner); extern void PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner); extern void PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner); diff --git a/src/pl/plpython/sql/plpython_subtransaction.sql b/src/pl/plpython/sql/plpython_subtransaction.sql index cc4b1ae102b..6c5229b3109 100644 --- a/src/pl/plpython/sql/plpython_subtransaction.sql +++ b/src/pl/plpython/sql/plpython_subtransaction.sql @@ -260,3 +260,24 @@ SELECT cursor_in_subxact(); SELECT cursor_aborted_subxact(); SELECT cursor_plan_aborted_subxact(); SELECT cursor_close_aborted_subxact(); + +-- error report test in subtransaction begin +-- prepare function +CREATE OR REPLACE FUNCTION test_func() RETURNS SETOF int AS +$$ +plpy.execute("select pg_backend_pid()") + +for i in range(0, 5): + yield (i) + +$$ LANGUAGE plpythonu; + +-- inject fault and wait for trigger +select gp_inject_fault_infinite('begin_internal_sub_transaction', 'error', 1); +SELECT test_func(); +select gp_wait_until_triggered_fault('begin_internal_sub_transaction', 1, 1); +select gp_inject_fault('begin_internal_sub_transaction', 'reset', 1); + +SELECT test_func(); + +DROP FUNCTION test_func();