From f451ce72444b4b9fc84df13ac20c4b1834ac5721 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 14 Nov 2023 05:03:59 +0000 Subject: [PATCH] sql: introduce limit on number of retries in IntExec.Exec{Ex} methods This commit introduces a limit on the number of times the InternalExecutor machinery can retry errors in `Exec{Ex}` methods. That logic was introduced in c09860b656e31bbdf996350382a8000f3e37d370 in order to reduce the impact of fixes in other commits in #101477. However, there is no limit in the number of retries, and we hit the stack overflow twice in our own testing recently seemingly in this code path. Thus, this commit adds a limit, 5 by default. Note that I'm not sure exactly what bug, if any, can lead to the stack overflow. One seemingly-unlikely theory is that there is no bug, meaning that we were simply retrying forever because the stmts were pushed by higher priority txns every time. Still, this change seems beneficial on its own and should prevent stack overflows even if we don't fully understand the root cause. An additional improvement is that we now track the depth of recursion, and once it exceeds 1000, we'll return an error. This should prevent the stack overflows due to other reasons. There is no release note given we've only seen this twice in our own testing and it involved cluster-to-cluster streaming. Release note: None --- pkg/sql/internal.go | 68 +++++++++++++++++++++++++++++++++------- pkg/sql/internal_test.go | 24 ++++++++++++-- 2 files changed, 79 insertions(+), 13 deletions(-) diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 72a945e9585f..b61307cd4124 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata" @@ -202,6 +203,15 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) } } +var ieRowsAffectedRetryLimit = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.internal_executor.rows_affected_retry_limit", + "limit on the number of retries that can be transparently performed "+ + "by the InternalExecutor's Exec{Ex} methods", + 5, + settings.NonNegativeInt, +) + func (ie *InternalExecutor) runWithEx( ctx context.Context, txn *kv.Txn, @@ -263,11 +273,12 @@ func (ie *InternalExecutor) initConnEx( w: w, mode: mode, sync: syncCallback, - resetRowsAffected: func() { - var zero int - _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) - }, } + clientComm.rowsAffectedState.rewind = func() { + var zero int + _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) + } + clientComm.rowsAffectedState.numRewindsLimit = ieRowsAffectedRetryLimit.Get(&ie.s.cfg.Settings.SV) applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */) sds := sessiondata.NewStack(sd) @@ -426,6 +437,10 @@ type ieIteratorResult struct { type rowsIterator struct { r ieResultReader + // depth tracks the current depth of recursion in Next(). Once it exceeds + // iteratorDepthLimit, an error is returned to prevent stack overflow. + depth int64 + rowsAffected int resultCols colinfo.ResultColumns @@ -461,6 +476,13 @@ type rowsIterator struct { var _ isql.Rows = &rowsIterator{} var _ eval.InternalRows = &rowsIterator{} +// iteratorDepthLimit is maximum allowed depth of recursion in Next(). It is set +// to be sufficiently large to not matter under normal circumstances while +// preventing the possibility of the stack overflow (as we've seen in #109197). +const iteratorDepthLimit = 1000 + +var iteratorDepthLimitExceededErr = errors.New("rowsIterator exceeded recursion depth limit") + func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { // Due to recursive calls to Next() below, this deferred function might get // executed multiple times, yet it is not a problem because Close() is @@ -478,8 +500,16 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { r.errCallback = nil } retErr = r.lastErr + r.depth-- }() + r.depth++ + if r.depth > iteratorDepthLimit { + r.lastErr = iteratorDepthLimitExceededErr + r.done = true + return false, r.lastErr + } + if r.done { return false, r.lastErr } @@ -1305,11 +1335,20 @@ type internalClientComm struct { // mode determines how the results of the query execution are consumed. mode ieExecutionMode - // resetRowsAffected is a callback that sends a single ieIteratorResult - // object to w in order to set the number of rows affected to zero. Only - // used in rowsAffectedIEExecutionMode when discarding a result (indicating - // that a command will be retried). - resetRowsAffected func() + // rowsAffectedState is only used in rowsAffectedIEExecutionMode. + rowsAffectedState struct { + // rewind is a callback that sends a single ieIteratorResult object to w + // in order to set the number of rows affected to zero. Used when + // discarding a result (indicating that a command will be retried). + rewind func() + // numRewinds tracks the number of times rewind() has been called. + numRewinds int64 + // numRewindsLimit is the limit on the number of times we will perform + // the transparent retry. Once numRewinds reaches numRewindsLimit, the + // internal executor machinery will no longer retry and, instead, will + // return the error to the client. + numRewindsLimit int64 + } // sync, if set, is called whenever a Sync is executed with all accumulated // results since the last Sync. @@ -1349,7 +1388,8 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult { // "finalized"). icc.results = icc.results[:len(icc.results)-1] if icc.mode == rowsAffectedIEExecutionMode { - icc.resetRowsAffected() + icc.rowsAffectedState.numRewinds++ + icc.rowsAffectedState.rewind() } }, } @@ -1446,12 +1486,18 @@ func (icc *internalClientComm) Close() {} // ClientPos is part of the ClientLock interface. func (icc *internalClientComm) ClientPos() CmdPos { - if icc.mode == rowsAffectedIEExecutionMode { + if icc.mode == rowsAffectedIEExecutionMode && + icc.rowsAffectedState.numRewinds < icc.rowsAffectedState.numRewindsLimit { // With the "rows affected" mode, any command can be rewound since we // assume that only a single command results in actual "rows affected", // and in Discard we will reset the number to zero (if we were in // process of evaluation that command when we encountered the retry // error). + // + // However, to prevent stack overflow due to large (infinite?) number of + // retries we also need to check that we haven't reached the limit yet. + // If we have, then we fall back to the general logic below of + // determining whether we can retry. return -1 } // Find the latest result that cannot be rewound. diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index 72c502a27702..e6142e19eecf 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -726,8 +726,9 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { ctx := context.Background() params, _ := createTestServerParams() - s, db, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) + srv, db, kvDB := serverutils.StartServer(t, params) + defer srv.Stopper().Stop(ctx) + s := srv.ApplicationLayer() if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil { t.Fatal(err) @@ -793,6 +794,25 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { } }) + // This test case verifies that ExecEx stops retrying once the limit on the + // number of retries is reached. + t.Run("ExecEx retry limit reached in implicit txn", func(t *testing.T) { + // This number must be less than the number of errors injected (which is + // determined by sql.numTxnRetryErrors = 3). + if _, err := db.Exec("SET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit = 1;"); err != nil { + t.Fatal(err) + } + defer func() { + if _, err := db.Exec("RESET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit;"); err != nil { + t.Fatal(err) + } + }() + _, err := ie.ExecEx(ctx, "read rows", nil /* txn */, ieo, rowsStmt) + if err == nil { + t.Fatal("expected to get an injected retriable error") + } + }) + // TODO(yuzefovich): add a test for when a schema change is done in-between // the retries. }