Skip to content

Commit

Permalink
Merge #69479
Browse files Browse the repository at this point in the history
69479: sql: fix some issues around the portal state management r=yuzefovich a=yuzefovich

This commit fixes some issues around the handling of portals in the
prepared statements namespace. Namely:
- it fixes the reference counting that was broken in cf0d5e4 (where we
switched to storing the portal by value instead of by pointer, but the
refcounts need to be shared between all references which wasn't true
in that commit) by entirely removing the ref counting
- it fixes the missing memory accounting for a copy of the portal
- it deletes all of the portals from `prepStmtsNamespaceAtTxnRewindPos`
in `resetExtraTxnState` whenever a txn commits or rolls back
- it adjusts a comment on the method deleting a prepared statement to
match the implementation.

Release note: None.

Release justification: fix to a long-standing bug.

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Sep 10, 2021
2 parents 0e66ea6 + 7b976b1 commit 812219e
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 59 deletions.
68 changes: 47 additions & 21 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,11 +1020,11 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) {

if closeType != panicClose {
// Close all statements and prepared portals.
ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, prepStmtNamespace{}, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
ex.extraTxnState.prepStmtsNamespace.resetToEmpty(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
ctx, prepStmtNamespace{}, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetToEmpty(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
ex.extraTxnState.prepStmtsNamespaceMemAcc.Close(ctx)
}
Expand Down Expand Up @@ -1414,7 +1414,9 @@ type prepStmtNamespace struct {
// prepStmts contains the prepared statements currently available on the
// session.
prepStmts map[string]*PreparedStatement
// portals contains the portals currently available on the session.
// portals contains the portals currently available on the session. Note
// that PreparedPortal.accountForCopy needs to be called if a copy of a
// PreparedPortal is retained.
portals map[string]PreparedPortal
}

Expand All @@ -1437,19 +1439,30 @@ func (ns prepStmtNamespace) String() string {
return sb.String()
}

// resetToEmpty deallocates the prepStmtNamespace.
func (ns *prepStmtNamespace) resetToEmpty(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) {
// No errors could occur since we're releasing the resources.
_ = ns.resetTo(ctx, prepStmtNamespace{}, prepStmtsNamespaceMemAcc)
}

// resetTo resets a namespace to equate another one (`to`). All the receiver's
// references are release and all the to's references are duplicated.
// references are released and all the to's references are duplicated.
//
// An empty `to` can be passed in to deallocate everything.
//
// It can only return an error if we've reached the memory limit and had to make
// a copy of portals.
func (ns *prepStmtNamespace) resetTo(
ctx context.Context, to prepStmtNamespace, prepStmtsNamespaceMemAcc *mon.BoundAccount,
) {
) error {
for name, p := range ns.prepStmts {
p.decRef(ctx)
delete(ns.prepStmts, name)
}
for name, p := range ns.portals {
p.decRef(ctx, prepStmtsNamespaceMemAcc, name)
p.close(ctx, prepStmtsNamespaceMemAcc, name)
delete(ns.portals, name)
}

Expand All @@ -1458,9 +1471,12 @@ func (ns *prepStmtNamespace) resetTo(
ns.prepStmts[name] = ps
}
for name, p := range to.portals {
p.incRef(ctx)
if err := p.accountForCopy(ctx, prepStmtsNamespaceMemAcc, name); err != nil {
return err
}
ns.portals[name] = p
}
return nil
}

// resetExtraTxnState resets the fields of ex.extraTxnState when a transaction
Expand All @@ -1480,12 +1496,16 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err

// Close all portals.
for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
p.decRef(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

switch ev {
case txnCommit, txnRollback:
for name, p := range ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals {
p.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.portals, name)
}
ex.extraTxnState.savepoints.clear()
// After txn is finished, we need to call onTxnFinish (if it's non-nil).
if ex.extraTxnState.onTxnFinish != nil {
Expand Down Expand Up @@ -1900,7 +1920,9 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
return err
}
case rewind:
ex.rewindPrepStmtNamespace(ctx)
if err := ex.rewindPrepStmtNamespace(ctx); err != nil {
return err
}
ex.extraTxnState.savepoints = ex.extraTxnState.rewindPosSnapshot.savepoints
// Note we use the Replace function instead of reassigning, as there are
// copies of the ex.sessionDataStack in the iterators and extendedEvalContext.
Expand Down Expand Up @@ -1975,7 +1997,9 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
"unexpected advance code when starting a txn: %s",
errors.Safe(advInfo.code))
}
ex.setTxnRewindPos(ctx, nextPos)
if err := ex.setTxnRewindPos(ctx, nextPos); err != nil {
return err
}
} else {
// See if we can advance the rewind point even if this is not the point
// where the transaction started. We can do that after running a special
Expand Down Expand Up @@ -2027,7 +2051,9 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
panic(errors.AssertionFailedf("unsupported cmd: %T", cmd))
}
if canAdvance {
ex.setTxnRewindPos(ctx, pos+1)
if err := ex.setTxnRewindPos(ctx, pos+1); err != nil {
return err
}
}
}
}
Expand All @@ -2038,16 +2064,16 @@ func (ex *connExecutor) updateTxnRewindPosMaybe(
//
// All statements with lower position in stmtBuf (if any) are removed, as we
// won't ever need them again.
func (ex *connExecutor) setTxnRewindPos(ctx context.Context, pos CmdPos) {
func (ex *connExecutor) setTxnRewindPos(ctx context.Context, pos CmdPos) error {
if pos <= ex.extraTxnState.txnRewindPos {
panic(errors.AssertionFailedf("can only move the txnRewindPos forward. "+
"Was: %d; new value: %d", ex.extraTxnState.txnRewindPos, pos))
}
ex.extraTxnState.txnRewindPos = pos
ex.stmtBuf.Ltrim(ctx, pos)
ex.commitPrepStmtNamespace(ctx)
ex.extraTxnState.rewindPosSnapshot.savepoints = ex.extraTxnState.savepoints.clone()
ex.extraTxnState.rewindPosSnapshot.sessionDataStack = ex.sessionDataStack.Clone()
return ex.commitPrepStmtNamespace(ctx)
}

// stmtDoesntNeedRetry returns true if the given statement does not need to be
Expand Down Expand Up @@ -2196,16 +2222,16 @@ func (ex *connExecutor) generateID() ClusterWideID {

// commitPrepStmtNamespace deallocates everything in
// prepStmtsNamespaceAtTxnRewindPos that's not part of prepStmtsNamespace.
func (ex *connExecutor) commitPrepStmtNamespace(ctx context.Context) {
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
func (ex *connExecutor) commitPrepStmtNamespace(ctx context.Context) error {
return ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespace, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

// commitPrepStmtNamespace deallocates everything in prepStmtsNamespace that's
// not part of prepStmtsNamespaceAtTxnRewindPos.
func (ex *connExecutor) rewindPrepStmtNamespace(ctx context.Context) {
ex.extraTxnState.prepStmtsNamespace.resetTo(
func (ex *connExecutor) rewindPrepStmtNamespace(ctx context.Context) error {
return ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}
Expand Down Expand Up @@ -3173,8 +3199,8 @@ func (ps connExPrepStmtsAccessor) Delete(ctx context.Context, name string) bool

// DeleteAll is part of the preparedStatementsAccessor interface.
func (ps connExPrepStmtsAccessor) DeleteAll(ctx context.Context) {
ps.ex.extraTxnState.prepStmtsNamespace.resetTo(
ctx, prepStmtNamespace{}, &ps.ex.extraTxnState.prepStmtsNamespaceMemAcc,
ps.ex.extraTxnState.prepStmtsNamespace.resetToEmpty(
ctx, &ps.ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (ex *connExecutor) deletePortal(ctx context.Context, name string) {
if !ok {
return
}
portal.decRef(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
portal.close(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
delete(ex.extraTxnState.prepStmtsNamespace.portals, name)
}

Expand Down
52 changes: 15 additions & 37 deletions pkg/sql/prepared_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,15 @@ type preparedStatementsAccessor interface {
DeleteAll(ctx context.Context)
}

// PreparedPortal is a PreparedStatement that has been bound with query arguments.
//
// Note that PreparedPortals maintain a reference counter internally.
// References need to be registered with incRef() and de-registered with
// decRef().
// PreparedPortal is a PreparedStatement that has been bound with query
// arguments.
type PreparedPortal struct {
Stmt *PreparedStatement
Qargs tree.QueryArguments

// OutFormats contains the requested formats for the output columns.
OutFormats []pgwirebase.FormatCode

// refCount keeps track of the number of references to this PreparedStatement.
// New references are registered through incRef().
// Most references are being held by portals created from this prepared
// statement.
refCount int

// exhausted tracks whether this portal has already been fully exhausted,
// meaning that any additional attempts to execute it should return no
// rows.
Expand All @@ -141,8 +132,7 @@ type PreparedPortal struct {

// makePreparedPortal creates a new PreparedPortal.
//
// incRef() doesn't need to be called on the result.
// When no longer in use, the PreparedPortal needs to be decRef()d.
// accountForCopy() doesn't need to be called on the prepared statement.
func (ex *connExecutor) makePreparedPortal(
ctx context.Context,
name string,
Expand All @@ -154,37 +144,25 @@ func (ex *connExecutor) makePreparedPortal(
Stmt: stmt,
Qargs: qargs,
OutFormats: outFormats,
refCount: 1,
}
if err := ex.extraTxnState.prepStmtsNamespaceMemAcc.Grow(ctx, portal.size(name)); err != nil {
return PreparedPortal{}, err
}
// The portal keeps a reference to the PreparedStatement, so register it.
stmt.incRef(ctx)
return portal, nil
return portal, portal.accountForCopy(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc, name)
}

func (p *PreparedPortal) incRef(ctx context.Context) {
if p.refCount <= 0 {
log.Fatal(ctx, "corrupt PreparedPortal refcount")
}
p.refCount++
// accountForCopy updates the state to account for the copy of the
// PreparedPortal (p is the copy).
func (p PreparedPortal) accountForCopy(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, portalName string,
) error {
p.Stmt.incRef(ctx)
return prepStmtsNamespaceMemAcc.Grow(ctx, p.size(portalName))
}

// decRef decrements the number of references to this portal. If the refCount
// reaches 0, then the memory account is shrunk accordingly.
func (p *PreparedPortal) decRef(
// close closes this portal.
func (p PreparedPortal) close(
ctx context.Context, prepStmtsNamespaceMemAcc *mon.BoundAccount, portalName string,
) {
if p.refCount <= 0 {
log.Fatal(ctx, "corrupt PreparedPortal refcount")
}
p.refCount--

if p.refCount == 0 {
prepStmtsNamespaceMemAcc.Shrink(ctx, p.size(portalName))
p.Stmt.decRef(ctx)
}
prepStmtsNamespaceMemAcc.Shrink(ctx, p.size(portalName))
p.Stmt.decRef(ctx)
}

func (p PreparedPortal) size(portalName string) int64 {
Expand Down

0 comments on commit 812219e

Please sign in to comment.