Skip to content

Commit

Permalink
Merge pull request #114541 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.2-114398
  • Loading branch information
yuzefovich authored Nov 16, 2023
2 parents a1e15bb + f451ce7 commit 03fbc08
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 13 deletions.
68 changes: 57 additions & 11 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
Expand Down Expand Up @@ -202,6 +203,15 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData)
}
}

var ieRowsAffectedRetryLimit = settings.RegisterIntSetting(
settings.ApplicationLevel,
"sql.internal_executor.rows_affected_retry_limit",
"limit on the number of retries that can be transparently performed "+
"by the InternalExecutor's Exec{Ex} methods",
5,
settings.NonNegativeInt,
)

func (ie *InternalExecutor) runWithEx(
ctx context.Context,
txn *kv.Txn,
Expand Down Expand Up @@ -263,11 +273,12 @@ func (ie *InternalExecutor) initConnEx(
w: w,
mode: mode,
sync: syncCallback,
resetRowsAffected: func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
},
}
clientComm.rowsAffectedState.rewind = func() {
var zero int
_ = w.addResult(ctx, ieIteratorResult{rowsAffected: &zero})
}
clientComm.rowsAffectedState.numRewindsLimit = ieRowsAffectedRetryLimit.Get(&ie.s.cfg.Settings.SV)

applicationStats := ie.s.sqlStats.GetApplicationStats(sd.ApplicationName, true /* internal */)
sds := sessiondata.NewStack(sd)
Expand Down Expand Up @@ -426,6 +437,10 @@ type ieIteratorResult struct {
type rowsIterator struct {
r ieResultReader

// depth tracks the current depth of recursion in Next(). Once it exceeds
// iteratorDepthLimit, an error is returned to prevent stack overflow.
depth int64

rowsAffected int
resultCols colinfo.ResultColumns

Expand Down Expand Up @@ -461,6 +476,13 @@ type rowsIterator struct {
var _ isql.Rows = &rowsIterator{}
var _ eval.InternalRows = &rowsIterator{}

// iteratorDepthLimit is maximum allowed depth of recursion in Next(). It is set
// to be sufficiently large to not matter under normal circumstances while
// preventing the possibility of the stack overflow (as we've seen in #109197).
const iteratorDepthLimit = 1000

var iteratorDepthLimitExceededErr = errors.New("rowsIterator exceeded recursion depth limit")

func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
// Due to recursive calls to Next() below, this deferred function might get
// executed multiple times, yet it is not a problem because Close() is
Expand All @@ -478,8 +500,16 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
r.errCallback = nil
}
retErr = r.lastErr
r.depth--
}()

r.depth++
if r.depth > iteratorDepthLimit {
r.lastErr = iteratorDepthLimitExceededErr
r.done = true
return false, r.lastErr
}

if r.done {
return false, r.lastErr
}
Expand Down Expand Up @@ -1305,11 +1335,20 @@ type internalClientComm struct {
// mode determines how the results of the query execution are consumed.
mode ieExecutionMode

// resetRowsAffected is a callback that sends a single ieIteratorResult
// object to w in order to set the number of rows affected to zero. Only
// used in rowsAffectedIEExecutionMode when discarding a result (indicating
// that a command will be retried).
resetRowsAffected func()
// rowsAffectedState is only used in rowsAffectedIEExecutionMode.
rowsAffectedState struct {
// rewind is a callback that sends a single ieIteratorResult object to w
// in order to set the number of rows affected to zero. Used when
// discarding a result (indicating that a command will be retried).
rewind func()
// numRewinds tracks the number of times rewind() has been called.
numRewinds int64
// numRewindsLimit is the limit on the number of times we will perform
// the transparent retry. Once numRewinds reaches numRewindsLimit, the
// internal executor machinery will no longer retry and, instead, will
// return the error to the client.
numRewindsLimit int64
}

// sync, if set, is called whenever a Sync is executed with all accumulated
// results since the last Sync.
Expand Down Expand Up @@ -1349,7 +1388,8 @@ func (icc *internalClientComm) createRes(pos CmdPos) *streamingCommandResult {
// "finalized").
icc.results = icc.results[:len(icc.results)-1]
if icc.mode == rowsAffectedIEExecutionMode {
icc.resetRowsAffected()
icc.rowsAffectedState.numRewinds++
icc.rowsAffectedState.rewind()
}
},
}
Expand Down Expand Up @@ -1446,12 +1486,18 @@ func (icc *internalClientComm) Close() {}

// ClientPos is part of the ClientLock interface.
func (icc *internalClientComm) ClientPos() CmdPos {
if icc.mode == rowsAffectedIEExecutionMode {
if icc.mode == rowsAffectedIEExecutionMode &&
icc.rowsAffectedState.numRewinds < icc.rowsAffectedState.numRewindsLimit {
// With the "rows affected" mode, any command can be rewound since we
// assume that only a single command results in actual "rows affected",
// and in Discard we will reset the number to zero (if we were in
// process of evaluation that command when we encountered the retry
// error).
//
// However, to prevent stack overflow due to large (infinite?) number of
// retries we also need to check that we haven't reached the limit yet.
// If we have, then we fall back to the general logic below of
// determining whether we can retry.
return -1
}
// Find the latest result that cannot be rewound.
Expand Down
24 changes: 22 additions & 2 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,9 @@ func TestInternalExecutorEncountersRetry(t *testing.T) {

ctx := context.Background()
params, _ := createTestServerParams()
s, db, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)
srv, db, kvDB := serverutils.StartServer(t, params)
defer srv.Stopper().Stop(ctx)
s := srv.ApplicationLayer()

if _, err := db.Exec("CREATE DATABASE test; CREATE TABLE test.t (c) AS SELECT 1"); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -793,6 +794,25 @@ func TestInternalExecutorEncountersRetry(t *testing.T) {
}
})

// This test case verifies that ExecEx stops retrying once the limit on the
// number of retries is reached.
t.Run("ExecEx retry limit reached in implicit txn", func(t *testing.T) {
// This number must be less than the number of errors injected (which is
// determined by sql.numTxnRetryErrors = 3).
if _, err := db.Exec("SET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit = 1;"); err != nil {
t.Fatal(err)
}
defer func() {
if _, err := db.Exec("RESET CLUSTER SETTING sql.internal_executor.rows_affected_retry_limit;"); err != nil {
t.Fatal(err)
}
}()
_, err := ie.ExecEx(ctx, "read rows", nil /* txn */, ieo, rowsStmt)
if err == nil {
t.Fatal("expected to get an injected retriable error")
}
})

// TODO(yuzefovich): add a test for when a schema change is done in-between
// the retries.
}
Expand Down

0 comments on commit 03fbc08

Please sign in to comment.