diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 72a945e9585f..b61307cd4124 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata" @@ -202,6 +203,15 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) } } +var ieRowsAffectedRetryLimit = settings.RegisterIntSetting( + settings.ApplicationLevel, + "sql.internal_executor.rows_affected_retry_limit", + "limit on the number of retries that can be transparently performed "+ + "by the InternalExecutor's Exec{Ex} methods", + 5, + settings.NonNegativeInt, +) + func (ie *InternalExecutor) runWithEx( ctx context.Context, txn *kv.Txn, @@ -263,11 +273,12 @@ func (ie *InternalExecutor) initConnEx( w: w, mode: mode, sync: syncCallback, - resetRowsAffected: func() { - var zero int - _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) - }, } + clientComm.rowsAffectedState.rewind = func() { + var zero int + _ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero}) + } + clientComm.rowsAffectedState.numRewindsLimit = ieRowsAffectedRetryLimit.Get(&ie.s.cfg.Settings.SV) applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */) sds := sessiondata.NewStack(sd) @@ -426,6 +437,10 @@ type ieIteratorResult struct { type rowsIterator struct { r ieResultReader + // depth tracks the current depth of recursion in Next(). Once it exceeds + // iteratorDepthLimit, an error is returned to prevent stack overflow. + depth int64 + rowsAffected int resultCols colinfo.ResultColumns @@ -461,6 +476,13 @@ type rowsIterator struct { var _ isql.Rows = &rowsIterator{} var _ eval.InternalRows = &rowsIterator{} +// iteratorDepthLimit is maximum allowed depth of recursion in Next(). It is set +// to be sufficiently large to not matter under normal circumstances while +// preventing the possibility of the stack overflow (as we've seen in #109197). +const iteratorDepthLimit = 1000 + +var iteratorDepthLimitExceededErr = errors.New("rowsIterator exceeded recursion depth limit") + func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { // Due to recursive calls to Next() below, this deferred function might get // executed multiple times, yet it is not a problem because Close() is @@ -478,8 +500,16 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) { r.errCallback = nil } retErr = r.lastErr + r.depth-- }() + r.depth++ + if r.depth > iteratorDepthLimit { + r.lastErr = iteratorDepthLimitExceededErr + r.done = true + return false, r.lastErr + } + if r.done { return false, r.lastErr } @@ -1305,11 +1335,20 @@ type internalClientComm struct { // mode determines how the results of the query execution are consumed. mode ieExecutionMode - // resetRowsAffected is a callback that sends a single ieIteratorResult - // object to w in order to set the number of rows affected to zero. Only - // used in rowsAffectedIEExecutionMode when discarding a result (indicating - // that a command will be retried). - resetRowsAffected func() + // rowsAffectedState is only used in rowsAffectedIEExecutionMode. + rowsAffectedState struct { + // rewind is a callback that sends a single ieIteratorResult object to w + // in order to set the number of rows affected to zero. Used when + // discarding a result (indicating that a command will be retried). + rewind func() + // numRewinds tracks the number of times rewind() has been called. + numRewinds int64 + // numRewindsLimit is the limit on the number of times we will perform + // the transparent retry. Once numRewinds reaches numRewindsLimit, the + // internal executor machinery will no longer retry and, instead, will + // return the error to the client. + numRewindsLimit int64 + } // sync, if set, is called whenever a Sync is executed with all accumulated // results since the last Sync. @@ -1349,7 +1388,8 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult { // "finalized"). icc.results = icc.results[:len(icc.results)-1] if icc.mode == rowsAffectedIEExecutionMode { - icc.resetRowsAffected() + icc.rowsAffectedState.numRewinds++ + icc.rowsAffectedState.rewind() } }, } @@ -1446,12 +1486,18 @@ func (icc *internalClientComm) Close() {} // ClientPos is part of the ClientLock interface. func (icc *internalClientComm) ClientPos() CmdPos { - if icc.mode == rowsAffectedIEExecutionMode { + if icc.mode == rowsAffectedIEExecutionMode && + icc.rowsAffectedState.numRewinds < icc.rowsAffectedState.numRewindsLimit { // With the "rows affected" mode, any command can be rewound since we // assume that only a single command results in actual "rows affected", // and in Discard we will reset the number to zero (if we were in // process of evaluation that command when we encountered the retry // error). + // + // However, to prevent stack overflow due to large (infinite?) number of + // retries we also need to check that we haven't reached the limit yet. + // If we have, then we fall back to the general logic below of + // determining whether we can retry. return -1 } // Find the latest result that cannot be rewound. diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index 72c502a27702..e6142e19eecf 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -726,8 +726,9 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { ctx := context.Background() params, _ := createTestServerParams() - s, db, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(ctx) + srv, db, kvDB := serverutils.StartServer(t, params) + defer srv.Stopper().Stop(ctx) + s := srv.ApplicationLayer() if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil { t.Fatal(err) @@ -793,6 +794,25 @@ func TestInternalExecutorEncountersRetry(t *testing.T) { } }) + // This test case verifies that ExecEx stops retrying once the limit on the + // number of retries is reached. + t.Run("ExecEx retry limit reached in implicit txn", func(t *testing.T) { + // This number must be less than the number of errors injected (which is + // determined by sql.numTxnRetryErrors = 3). + if _, err := db.Exec("SET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit = 1;"); err != nil { + t.Fatal(err) + } + defer func() { + if _, err := db.Exec("RESET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit;"); err != nil { + t.Fatal(err) + } + }() + _, err := ie.ExecEx(ctx, "read rows", nil /* txn */, ieo, rowsStmt) + if err == nil { + t.Fatal("expected to get an injected retriable error") + } + }) + // TODO(yuzefovich): add a test for when a schema change is done in-between // the retries. }