Skip to content

Commit

Permalink
sql: fix internal executor when it encounters a retry error
Browse files Browse the repository at this point in the history
This commit fixes a bug with the internal executor when it encounters an
internal retry. Previously, the implementation of the "rewind
capability" was such that we always assumed that we can rewind the
StmtBuf for the IE at any point; however, that is not generally true. In
particular, if we communicated anything to the client of the IE's
connExecutor (`rowsIterator` in this context), then we cannot rewind the
current command we're evaluating. In theory, we could have pushed some
rows through the iterator only to encounter the internal retry later which
would then lead to re-executing the command from the start (in other
words, the rows that we've pushed before the retry would be
double-pushed); in practice, however, we haven't actually seen this (at
least yet). What we have seen is the number of rows affected being
double counted. This particular reproduction was already fixed in the
previous commit, but this commit fixes the problem more generally.

This commit makes it so that for every `streamingCommandResult` we are
tracking whether it has already communicated something to the client so
that the result can no longer be rewound, and then we use that tracking
mechanism to correctly implement the rewind capability. We have three
possible types of data that we communicate:
- rows
- number of rows affected
- column schema.

If the retry happens after some rows have been communicated, we're out
of luck - there is no way we can retry the stmt internally, so from now
on we will return a retry error to the client.

If the retry happens after "rows affected", then given the adjustment in
the previous commit we can proceed transparently.

In order to avoid propagating the retry error up when it occurs after
having received the column schema but before pushing out any rows, this
commit adjusts the behavior to always keep the latest column schema,
thus, we can still proceed transparently in this case.

This bug has been present since at least 21.1 when
`streamingCommandResult` was introduced. However, since we now might
return a retry error in some cases, this could lead to test failures or
flakes, or even to errors in some internal CRDB operations that execute
statements of ROWS type (if there is no appropriate retry logic), so
I intend to only backport this to 23.1. There is also no release note
since the only failure we've seen is about double counted "rows
affected" number, the likelihood of which has significantly increased
due to the jobs system refactor (i.e. mostly 23.1 is affected AFAIK).

Additionally, this commit makes it so that we correctly block the
`execInternal` call until the first actual, non-metadata result is seen
(this behavior is needed to correctly synchronize access to the txn
before the stmt is given to the execution engine).

Release note: None
  • Loading branch information
yuzefovich committed Apr 19, 2023
1 parent b701617 commit 0316935
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 111 deletions.
28 changes: 18 additions & 10 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -965,11 +965,6 @@ func (rc *rewindCapability) close() {
rc.cl.Close()
}

type resCloseType bool

const closed resCloseType = true
const discarded resCloseType = false

// streamingCommandResult is a CommandResult that streams rows on the channel
// and can call a provided callback when closed.
type streamingCommandResult struct {
Expand All @@ -980,11 +975,17 @@ type streamingCommandResult struct {
// on the synchronization strategy.
w ieResultWriter

// cannotRewind indicates whether this result has communicated some data
// (rows or metadata) such that the corresponding command cannot be rewound.
cannotRewind bool

err error
rowsAffected int

// closeCallback, if set, is called when Close()/Discard() is called.
closeCallback func(resCloseType)
// closeCallback, if set, is called when Close() is called.
closeCallback func()
// discardCallback, if set, is called when Discard() is called.
discardCallback func()
}

var _ RestrictedCommandResult = &streamingCommandResult{}
Expand All @@ -1007,6 +1008,8 @@ func (r *streamingCommandResult) SetColumns(ctx context.Context, cols colinfo.Re
if cols == nil {
cols = colinfo.ResultColumns{}
}
// NB: we do not set r.cannotRewind here because the correct columns will be
// set in rowsIterator.Next.
_ = r.w.addResult(ctx, ieIteratorResult{cols: cols})
}

Expand All @@ -1033,6 +1036,9 @@ func (r *streamingCommandResult) AddRow(ctx context.Context, row tree.Datums) er
r.rowsAffected++
rowCopy := make(tree.Datums, len(row))
copy(rowCopy, row)
// Once we add this row to the writer, it can be immediately consumed by the
// reader, so this result can no longer be rewound.
r.cannotRewind = true
return r.w.addResult(ctx, ieIteratorResult{row: rowCopy})
}

Expand Down Expand Up @@ -1076,6 +1082,8 @@ func (r *streamingCommandResult) SetRowsAffected(ctx context.Context, n int) {
// streamingCommandResult might be used outside of the internal executor
// (i.e. not by rowsIterator) in which case the channel is not set.
if r.w != nil {
// NB: we do not set r.cannotRewind here because rowsAffected value will
// be overwritten in rowsIterator.Next correctly if necessary.
_ = r.w.addResult(ctx, ieIteratorResult{rowsAffected: &n})
}
}
Expand All @@ -1088,14 +1096,14 @@ func (r *streamingCommandResult) RowsAffected() int {
// Close is part of the CommandResultClose interface.
func (r *streamingCommandResult) Close(context.Context, TransactionStatusIndicator) {
if r.closeCallback != nil {
r.closeCallback(closed)
r.closeCallback()
}
}

// Discard is part of the CommandResult interface.
func (r *streamingCommandResult) Discard() {
if r.closeCallback != nil {
r.closeCallback(discarded)
if r.discardCallback != nil {
r.discardCallback()
}
}

Expand Down
Loading

0 comments on commit 0316935

Please sign in to comment.