Skip to content

Commit

Permalink
sql: use SetRowsAffected instead of IncrementRowsAffected
Browse files Browse the repository at this point in the history
This commit changes the `RestrictedCommandResult` interface a bit to
make `SetRowsAffected` just set the number of rows affected, rather than
increment it. This method is supposed to be called only once, so this
change seems reasonable on its own, but it also has an additional
benefit for the follow-up commit. In particular, in the follow-up commit
we will preserve the ability to automatically retry statements of
non-ROWS stmt type by the internal executor.

Note that this commit on its own fixes the behavior for the "rows
affected" statements (the only known occurrence of the bug that is
generally fixed in the following commit).

Release note: None
  • Loading branch information
yuzefovich committed Apr 19, 2023
1 parent 0de74be commit b701617
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 49 deletions.
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,8 @@ func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) er
return nil
}
}
func (w *changefeedResultWriter) IncrementRowsAffected(ctx context.Context, n int) {
w.rowsAffected += n
func (w *changefeedResultWriter) SetRowsAffected(ctx context.Context, n int) {
w.rowsAffected = n
}
func (w *changefeedResultWriter) SetError(err error) {
w.err = err
Expand Down
24 changes: 10 additions & 14 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,12 +791,13 @@ type RestrictedCommandResult interface {
// AddBatch is undefined.
SupportsAddBatch() bool

// IncrementRowsAffected increments a counter by n. This is used for all
// SetRowsAffected sets RowsAffected counter to n. This is used for all
// result types other than tree.Rows.
IncrementRowsAffected(ctx context.Context, n int)
SetRowsAffected(ctx context.Context, n int)

// RowsAffected returns either the number of times AddRow was called, or the
// sum of all n passed into IncrementRowsAffected.
// RowsAffected returns either the number of times AddRow was called, total
// number of rows pushed via AddBatch, or the last value of n passed into
// SetRowsAffected.
RowsAffected() int

// DisableBuffering can be called during execution to ensure that
Expand Down Expand Up @@ -1026,7 +1027,7 @@ func (r *streamingCommandResult) ResetStmtType(stmt tree.Statement) {

// AddRow is part of the RestrictedCommandResult interface.
func (r *streamingCommandResult) AddRow(ctx context.Context, row tree.Datums) error {
// AddRow() and IncrementRowsAffected() are never called on the same command
// AddRow() and SetRowsAffected() are never called on the same command
// result, so we will not double count the affected rows by an increment
// here.
r.rowsAffected++
Expand Down Expand Up @@ -1069,13 +1070,13 @@ func (r *streamingCommandResult) Err() error {
return r.err
}

// IncrementRowsAffected is part of the RestrictedCommandResult interface.
func (r *streamingCommandResult) IncrementRowsAffected(ctx context.Context, n int) {
r.rowsAffected += n
// SetRowsAffected is part of the RestrictedCommandResult interface.
func (r *streamingCommandResult) SetRowsAffected(ctx context.Context, n int) {
r.rowsAffected = n
// streamingCommandResult might be used outside of the internal executor
// (i.e. not by rowsIterator) in which case the channel is not set.
if r.w != nil {
_ = r.w.addResult(ctx, ieIteratorResult{rowsAffectedIncrement: &n})
_ = r.w.addResult(ctx, ieIteratorResult{rowsAffected: &n})
}
}

Expand Down Expand Up @@ -1113,11 +1114,6 @@ func (r *streamingCommandResult) SetPortalOutput(
) {
}

// SetRowsAffected is part of the sql.CopyInResult interface.
func (r *streamingCommandResult) SetRowsAffected(ctx context.Context, rows int) {
r.rowsAffected = rows
}

// SendCopyOut is part of the sql.CopyOutResult interface.
func (r *streamingCommandResult) SendCopyOut(
ctx context.Context, cols colinfo.ResultColumns, format pgwirebase.FormatCode,
Expand Down
22 changes: 11 additions & 11 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ type rowResultWriter interface {
// AddRow writes a result row.
// Note that the caller owns the row slice and might reuse it.
AddRow(ctx context.Context, row tree.Datums) error
IncrementRowsAffected(ctx context.Context, n int)
SetRowsAffected(ctx context.Context, n int)
SetError(error)
Err() error
}
Expand Down Expand Up @@ -1088,8 +1088,8 @@ func (w *errOnlyResultWriter) AddBatch(ctx context.Context, batch coldata.Batch)
panic("AddBatch not supported by errOnlyResultWriter")
}

func (w *errOnlyResultWriter) IncrementRowsAffected(ctx context.Context, n int) {
panic("IncrementRowsAffected not supported by errOnlyResultWriter")
func (w *errOnlyResultWriter) SetRowsAffected(ctx context.Context, n int) {
panic("SetRowsAffected not supported by errOnlyResultWriter")
}

// RowResultWriter is a thin wrapper around a RowContainer.
Expand All @@ -1106,9 +1106,9 @@ func NewRowResultWriter(rowContainer *rowContainerHelper) *RowResultWriter {
return &RowResultWriter{rowContainer: rowContainer}
}

// IncrementRowsAffected implements the rowResultWriter interface.
func (b *RowResultWriter) IncrementRowsAffected(ctx context.Context, n int) {
b.rowsAffected += n
// SetRowsAffected implements the rowResultWriter interface.
func (b *RowResultWriter) SetRowsAffected(ctx context.Context, n int) {
b.rowsAffected = n
}

// AddRow implements the rowResultWriter interface.
Expand Down Expand Up @@ -1146,9 +1146,9 @@ func NewCallbackResultWriter(
return &CallbackResultWriter{fn: fn}
}

// IncrementRowsAffected is part of the rowResultWriter interface.
func (c *CallbackResultWriter) IncrementRowsAffected(ctx context.Context, n int) {
c.rowsAffected += n
// SetRowsAffected is part of the rowResultWriter interface.
func (c *CallbackResultWriter) SetRowsAffected(ctx context.Context, n int) {
c.rowsAffected = n
}

// AddRow is part of the rowResultWriter interface.
Expand Down Expand Up @@ -1432,7 +1432,7 @@ func (r *DistSQLReceiver) Push(
// We only need the row count. planNodeToRowSource is set up to handle
// ensuring that the last stage in the pipeline will return a single-column
// row with the row count in it, so just grab that and exit.
r.resultWriterMu.row.IncrementRowsAffected(r.ctx, n)
r.resultWriterMu.row.SetRowsAffected(r.ctx, n)
return r.status
}

Expand Down Expand Up @@ -1516,7 +1516,7 @@ func (r *DistSQLReceiver) PushBatch(
// We only need the row count. planNodeToRowSource is set up to handle
// ensuring that the last stage in the pipeline will return a single-column
// row with the row count in it, so just grab that and exit.
r.resultWriterMu.row.IncrementRowsAffected(r.ctx, int(batch.ColVec(0).Int64()[0]))
r.resultWriterMu.row.SetRowsAffected(r.ctx, int(batch.ColVec(0).Int64()[0]))
return r.status
}

Expand Down
28 changes: 19 additions & 9 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,10 @@ func (ie *InternalExecutor) newConnExecutorWithTxn(

type ieIteratorResult struct {
// Exactly one of these 4 fields will be set.
row tree.Datums
rowsAffectedIncrement *int
cols colinfo.ResultColumns
err error
row tree.Datums
rowsAffected *int
cols colinfo.ResultColumns
err error
}

type rowsIterator struct {
Expand Down Expand Up @@ -429,8 +429,8 @@ func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
r.lastRow = data.row
return true, nil
}
if data.rowsAffectedIncrement != nil {
r.rowsAffected += *data.rowsAffectedIncrement
if data.rowsAffected != nil {
r.rowsAffected = *data.rowsAffected
return r.Next(ctx)
}
if data.cols != nil {
Expand Down Expand Up @@ -748,6 +748,8 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess
if o.QualityOfService != nil {
sd.DefaultTxnQualityOfService = o.QualityOfService.ValidateInternal()
}
// We always override the injection knob based on the override struct.
sd.InjectRetryErrorsEnabled = o.InjectRetryErrorsEnabled
}

func (ie *InternalExecutor) maybeRootSessionDataOverride(
Expand Down Expand Up @@ -830,9 +832,10 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{
// occurs.
//
// An additional responsibility of the internalClientComm is handling the retry
// errors. At the moment of writing, this is done incorrectly - namely, the
// internalClientComm implements the ClientLock interface in such a fashion as
// if any command can be transparently retried.
// errors. At the moment of writing, this is done incorrectly (except for stmts
// of "RowsAffected" type) - namely, the internalClientComm implements the
// ClientLock interface in such a fashion as if any command can be transparently
// retried.
// TODO(yuzefovich): fix this.
//
// Note that only implicit txns can be retried internally. If an explicit txn is
Expand All @@ -848,6 +851,13 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{
// zeroth result - the error is sent on the ieResultChannel
// - the rowsIterator receives the error and returns it to the caller of
// execInternal.
//
// Retries for implicit txns and statements of "RowsAffected" type are achieved
// by overriding the "rows affected" number, stored in the rowsIterator, with
// the latest information. With such setup, even if the stmt execution before
// the retry communicated its incorrect "rows affected" information, that info
// is overridden accordingly after the connExecutor re-executes the
// corresponding command.

// execInternal executes a statement.
//
Expand Down
43 changes: 43 additions & 0 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,49 @@ func TestInternalDBWithOverrides(t *testing.T) {
assert.Equal(t, "'off'", drow[0].String())
}

// TestInternalExecutorEncountersRetry verifies that if the internal executor
// encounters a retry error after some data (rows or metadata) have been
// communicated to the client, the query either results in a retry error (when
// rows have been sent) or correctly transparently retries (#98558).
func TestInternalExecutorEncountersRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
params, _ := tests.CreateTestServerParams()
s, _, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

ie := s.InternalExecutor().(*sql.InternalExecutor)

// This test case verifies that if we execute the stmt of the RowsAffected
// type, it is transparently retried and the correct number of "rows
// affected" is reported.
t.Run("RowsAffected stmt", func(t *testing.T) {
// We will use PAUSE SCHEDULES statement which is of RowsAffected type.
//
// Notably, internally this statement will run some other queries via
// the "nested" internal executor, but those "nested" queries don't hit
// the injected retry error since this knob only applies to the "top"
// IE.
const stmt = `PAUSE SCHEDULES SELECT id FROM [SHOW SCHEDULES FOR SQL STATISTICS];`
paused, err := ie.ExecEx(
ctx, "pause schedule", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
InjectRetryErrorsEnabled: true,
},
stmt,
)
if err != nil {
t.Fatal(err)
}
if paused != 1 {
t.Fatalf("expected 1 schedule to be paused, got %d", paused)
}
})
}

// TODO(andrei): Test that descriptor leases are released by the
// Executor, with and without a higher-level txn. When there is no
// higher-level txn, the leases are released normally by the txn finishing. When
Expand Down
12 changes: 3 additions & 9 deletions pkg/sql/pgwire/command_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,6 @@ func (r *commandResult) SetPortalOutput(
_ /* err */ = r.conn.writeRowDescription(ctx, cols, formatCodes, &r.conn.writerState.buf)
}

// SetRowsAffected is part of the sql.CopyIn interface.
func (r *commandResult) SetRowsAffected(ctx context.Context, n int) {
r.assertNotReleased()
r.rowsAffected = n
}

// SendCopyOut is part of the sql.CopyOutResult interface.
func (r *commandResult) SendCopyOut(
ctx context.Context, cols colinfo.ResultColumns, format pgwirebase.FormatCode,
Expand Down Expand Up @@ -367,10 +361,10 @@ func (r *commandResult) SendCopyDone(ctx context.Context) error {
return r.conn.bufferCopyDone()
}

// IncrementRowsAffected is part of the sql.RestrictedCommandResult interface.
func (r *commandResult) IncrementRowsAffected(ctx context.Context, n int) {
// SetRowsAffected is part of the sql.RestrictedCommandResult interface.
func (r *commandResult) SetRowsAffected(ctx context.Context, n int) {
r.assertNotReleased()
r.rowsAffected += n
r.rowsAffected = n
}

// RowsAffected is part of the sql.RestrictedCommandResult interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/recursive_cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ func (n *recursiveCTENode) AddRow(ctx context.Context, row tree.Datums) error {
return n.workingRows.AddRow(ctx, row)
}

// IncrementRowsAffected is part of the rowResultWriter interface.
func (n *recursiveCTENode) IncrementRowsAffected(context.Context, int) {
// SetRowsAffected is part of the rowResultWriter interface.
func (n *recursiveCTENode) SetRowsAffected(context.Context, int) {
}

// SetError is part of the rowResultWriter interface.
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func (d *droppingResultWriter) AddRow(ctx context.Context, row tree.Datums) erro
return nil
}

// IncrementRowsAffected is part of the rowResultWriter interface.
func (d *droppingResultWriter) IncrementRowsAffected(ctx context.Context, n int) {}
// SetRowsAffected is part of the rowResultWriter interface.
func (d *droppingResultWriter) SetRowsAffected(ctx context.Context, n int) {}

// SetError is part of the rowResultWriter interface.
func (d *droppingResultWriter) SetError(err error) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/sessiondata/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ type InternalExecutorOverride struct {
// used as long as that value has a QoSLevel defined
// (see QoSLevel.ValidateInternal).
QualityOfService *sessiondatapb.QoSLevel
// InjectRetryErrorsEnabled, if true, injects a transaction retry error
// _after_ the statement has been processed by the execution engine and
// _before_ the control flow is returned to the connExecutor state machine.
//
// The error will be injected (roughly speaking) three times (determined by
// the numTxnRetryErrors constant in conn_executor_exec.go).
//
// For testing only.
//
// NB: this override applies only to the "top" internal executor, i.e. it
// does **not** propagate further to "nested" executors that are spawned up
// by the "top" executor.
InjectRetryErrorsEnabled bool
}

// NoSessionDataOverride is the empty InternalExecutorOverride which does not
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sessiondatapb/local_only_session_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ message LocalOnlySessionData {
// InjectRetryErrorsEnabled causes statements inside an explicit
// transaction to return a transaction retry error. It is intended for
// developers to test their app's retry logic.
//
// Note that this session variable is **not** propagated to the internal
// executors - use InternalExecutorOverride for that.
bool inject_retry_errors_enabled = 54;
// NullOrderedLast controls whether NULL is ordered last. We default to
// NULLS FIRST for ascending order by default, whereas postgres defaults
Expand Down

0 comments on commit b701617

Please sign in to comment.