Skip to content

Commit

Permalink
sql: implement XA transaction SQL statements
Browse files Browse the repository at this point in the history
Informs #22329.

This commit implements the statement handling for `PREPARE TRANSACTION
<gid>`, `COMMIT PREPARED <gid>`, and `ROLLBACK PREPARED <gid>`, for
parity with Postgres support. The implementation is based on the
Postgres documentation and the Postgres source code.

Release note (sql change): XA transaction support allows CockroachDB to
participate in distributed transaction with other resources (e.g.
databases, message queues, etc) using a two phase commit protocol.
  • Loading branch information
nvanbenschoten committed Dec 18, 2024
1 parent 3d95da9 commit 503d914
Show file tree
Hide file tree
Showing 32 changed files with 999 additions and 23 deletions.
12 changes: 12 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1835,14 +1835,26 @@
<tr><td>APPLICATION</td><td>sql.txn.commit.count.internal</td><td>Number of SQL transaction COMMIT statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit.started.count</td><td>Number of SQL transaction COMMIT statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit.started.count.internal</td><td>Number of SQL transaction COMMIT statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit_prepared.count</td><td>Number of SQL COMMIT PREPARED statements successfully executed</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit_prepared.count.internal</td><td>Number of SQL COMMIT PREPARED statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit_prepared.started.count</td><td>Number of SQL COMMIT PREPARED statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.commit_prepared.started.count.internal</td><td>Number of SQL COMMIT PREPARED statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.contended.count</td><td>Number of SQL transactions experienced contention</td><td>Contention</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.contended.count.internal</td><td>Number of SQL transactions experienced contention (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.latency</td><td>Latency of SQL transactions</td><td>Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.latency.internal</td><td>Latency of SQL transactions (internal queries)</td><td>SQL Internal Statements</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.prepare.count</td><td>Number of SQL PREPARE TRANSACTION statements successfully executed</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.prepare.count.internal</td><td>Number of SQL PREPARE TRANSACTION statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.prepare.started.count</td><td>Number of SQL PREPARE TRANSACTION statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.prepare.started.count.internal</td><td>Number of SQL PREPARE TRANSACTION statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback.count</td><td>Number of SQL transaction ROLLBACK statements successfully executed</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback.count.internal</td><td>Number of SQL transaction ROLLBACK statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback.started.count</td><td>Number of SQL transaction ROLLBACK statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback.started.count.internal</td><td>Number of SQL transaction ROLLBACK statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback_prepared.count</td><td>Number of SQL ROLLBACK PREPARED statements successfully executed</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback_prepared.count.internal</td><td>Number of SQL ROLLBACK PREPARED statements successfully executed (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback_prepared.started.count</td><td>Number of SQL ROLLBACK PREPARED statements started</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.rollback_prepared.started.count.internal</td><td>Number of SQL ROLLBACK PREPARED statements started (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.upgraded_iso_level.count</td><td>Number of times a weak isolation level was automatically upgraded to a stronger one</td><td>SQL Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txn.upgraded_iso_level.count.internal</td><td>Number of times a weak isolation level was automatically upgraded to a stronger one (internal queries)</td><td>SQL Internal Statements</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>sql.txns.open</td><td>Number of currently open user SQL transactions</td><td>Open SQL Transactions</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/internal/sqlsmith/sqlsmith.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ var DisableDDLs = simpleOption("disable DDLs", func(s *Smither) {
{2, makeRollbackToSavepoint},
{2, makeCommit},
{2, makeRollback},
// TODO(nvanbenschoten): add two-phase commit statements.
}
})

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ go_library(
"testutils.go",
"topk.go",
"truncate.go",
"two_phase_commit.go",
"txn_fingerprint_id_cache.go",
"txn_state.go",
"type_change.go",
Expand Down
31 changes: 27 additions & 4 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2161,7 +2161,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
}

switch ev.eventType {
case txnCommit, txnRollback:
case txnCommit, txnRollback, txnPrepare:
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
Expand Down Expand Up @@ -4080,10 +4080,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

fallthrough
case txnRollback:
case txnRollback, txnPrepare:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, payloadErr)
// Since we're doing a complete rollback, there's no need to keep the
// prepared stmts for a txn rewind.
// Since we're finalizing the SQL transaction (commit, rollback, prepare),
// there's no need to keep the prepared stmts for a txn rewind.
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ex.Ctx(), &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
Expand Down Expand Up @@ -4494,6 +4494,11 @@ type StatementCounters struct {
TxnRollbackCount telemetry.CounterWithMetric
TxnUpgradedCount *metric.Counter

// Transaction XA two-phase commit operations.
TxnPrepareCount telemetry.CounterWithMetric
TxnCommitPreparedCount telemetry.CounterWithMetric
TxnRollbackPreparedCount telemetry.CounterWithMetric

// Savepoint operations. SavepointCount is for real SQL savepoints;
// the RestartSavepoint variants are for the
// cockroach-specific client-side retry protocol.
Expand Down Expand Up @@ -4531,6 +4536,12 @@ func makeStartedStatementCounters(internal bool) StatementCounters {
getMetricMeta(MetaTxnRollbackStarted, internal)),
TxnUpgradedCount: metric.NewCounter(
getMetricMeta(MetaTxnUpgradedFromWeakIsolation, internal)),
TxnPrepareCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnPrepareStarted, internal)),
TxnCommitPreparedCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnCommitPreparedStarted, internal)),
TxnRollbackPreparedCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnRollbackPreparedStarted, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRestartSavepointStarted, internal)),
ReleaseRestartSavepointCount: telemetry.NewCounterWithMetric(
Expand Down Expand Up @@ -4576,6 +4587,12 @@ func makeExecutedStatementCounters(internal bool) StatementCounters {
getMetricMeta(MetaTxnRollbackExecuted, internal)),
TxnUpgradedCount: metric.NewCounter(
getMetricMeta(MetaTxnUpgradedFromWeakIsolation, internal)),
TxnPrepareCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnPrepareExecuted, internal)),
TxnCommitPreparedCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnCommitPreparedExecuted, internal)),
TxnRollbackPreparedCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaTxnRollbackPreparedExecuted, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRestartSavepointExecuted, internal)),
ReleaseRestartSavepointCount: telemetry.NewCounterWithMetric(
Expand Down Expand Up @@ -4638,6 +4655,12 @@ func (sc *StatementCounters) incrementCount(ex *connExecutor, stmt tree.Statemen
} else {
sc.TxnRollbackCount.Inc()
}
case *tree.PrepareTransaction:
sc.TxnPrepareCount.Inc()
case *tree.CommitPrepared:
sc.TxnCommitPreparedCount.Inc()
case *tree.RollbackPrepared:
sc.TxnRollbackPreparedCount.Inc()
case *tree.Savepoint:
if ex.isCommitOnReleaseSavepoint(t.Name) {
sc.RestartSavepointCount.Inc()
Expand Down
21 changes: 15 additions & 6 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,10 @@ func (ex *connExecutor) execStmtInOpenState(
ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res)
return ev, payload, nil

case *tree.PrepareTransaction:
ev, payload := ex.execPrepareTransactionInOpenState(ctx, s)
return ev, payload, nil

case *tree.ShowCommitTimestamp:
ev, payload := ex.execShowCommitTimestampInOpenState(ctx, s, res, canAutoCommit)
return ev, payload, nil
Expand Down Expand Up @@ -1799,6 +1803,10 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res)
return ev, payload, nil

case *tree.PrepareTransaction:
ev, payload := ex.execPrepareTransactionInOpenState(ctx, s)
return ev, payload, nil

case *tree.ShowCommitTimestamp:
ev, payload := ex.execShowCommitTimestampInOpenState(ctx, s, res, canAutoCommit)
return ev, payload, nil
Expand Down Expand Up @@ -3450,8 +3458,8 @@ func (ex *connExecutor) execStmtInNoTxnState(
)
case *tree.ShowCommitTimestamp:
return ex.execShowCommitTimestampInNoTxnState(ctx, s, res)
case *tree.CommitTransaction, *tree.ReleaseSavepoint,
*tree.RollbackTransaction, *tree.SetTransaction, *tree.Savepoint:
case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.PrepareTransaction,
*tree.SetTransaction, *tree.Savepoint, *tree.ReleaseSavepoint:
if ex.sessionData().AutoCommitBeforeDDL {
// If autocommit_before_ddl is set, we allow these statements to be
// executed, and send a warning rather than an error.
Expand Down Expand Up @@ -3523,7 +3531,7 @@ func (ex *connExecutor) beginImplicitTxn(

// execStmtInAbortedState executes a statement in a txn that's in state
// Aborted or RestartWait. All statements result in error events except:
// - COMMIT / ROLLBACK: aborts the current transaction.
// - COMMIT / ROLLBACK / PREPARE TRANSACTION: aborts the current transaction.
// - ROLLBACK TO SAVEPOINT / SAVEPOINT: reopens the current transaction,
// allowing it to be retried.
func (ex *connExecutor) execStmtInAbortedState(
Expand All @@ -3545,9 +3553,10 @@ func (ex *connExecutor) execStmtInAbortedState(
}

switch s := ast.(type) {
case *tree.CommitTransaction, *tree.RollbackTransaction:
if _, ok := s.(*tree.CommitTransaction); ok {
// Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too.
case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.PrepareTransaction:
if _, ok := s.(*tree.RollbackTransaction); !ok {
// Note: Postgres replies to COMMIT and PREPARE TRANSACTION of failed
// transactions with "ROLLBACK" too.
res.ResetStmtType((*tree.RollbackTransaction)(nil))
}
return ex.rollbackSQLTransaction(ctx, s)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,8 @@ func (ex *connExecutor) execDescribe(
// prepared and executed inside of an aborted transaction.
func (ex *connExecutor) isAllowedInAbortedTxn(ast tree.Statement) bool {
switch s := ast.(type) {
case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.RollbackToSavepoint:
case *tree.CommitTransaction, *tree.PrepareTransaction,
*tree.RollbackTransaction, *tree.RollbackToSavepoint:
return true
case *tree.Savepoint:
if ex.isCommitOnReleaseSavepoint(s.Name) {
Expand Down
30 changes: 30 additions & 0 deletions pkg/sql/conn_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,23 @@ type eventTxnFinishAbortedPLpgSQL struct{}
// that case no event is necessary.
type eventSavepointRollback struct{}

// eventTxnFinishPrepared is generated when a PREPARE TRANSACTION statement is
// executed. The current transaction is dissociated with the current session.
type eventTxnFinishPrepared struct{}

// eventTxnFinishPreparedErrPayload represents the payload for
// eventTxnFinishPrepared, if the PREPARE TRANSACTION statement encountered an
// error. No payload is used if the statement was successful.
type eventTxnFinishPreparedErrPayload struct {
// err is the error encountered during the prepare.
err error
}

// errorCause implements the payloadWithError interface.
func (p eventTxnFinishPreparedErrPayload) errorCause() error {
return p.err
}

type eventNonRetriableErr struct {
IsCommit fsm.Bool
}
Expand Down Expand Up @@ -230,6 +247,7 @@ type payloadWithError interface {
func (eventTxnStart) Event() {}
func (eventTxnFinishCommitted) Event() {}
func (eventTxnFinishAborted) Event() {}
func (eventTxnFinishPrepared) Event() {}
func (eventTxnFinishCommittedPLpgSQL) Event() {}
func (eventTxnFinishAbortedPLpgSQL) Event() {}
func (eventSavepointRollback) Event() {}
Expand Down Expand Up @@ -374,6 +392,18 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
},
},
stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}: {
// Handle prepared transactions. Note that this statement is only valid in
// the context of an explicit transaction, so we don't need to handle
// implicit transactions.
eventTxnFinishPrepared{}: {
Description: "PREPARE TRANSACTION",
Next: stateNoTxn{},
Action: func(args fsm.Args) error {
// Note that the KV txn has been prepared by the statement execution by
// this point and is being dissociated from the session.
return args.Extended.(*txnState).finishTxn(txnPrepare, advanceOne)
},
},
// Handle the errors in explicit txns.
eventNonRetriableErr{IsCommit: fsm.False}: {
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/control_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var jobCommandToDesiredStatus = map[tree.JobCommand]jobs.Status{
tree.PauseJob: jobs.StatusPaused,
}

// FastPathResults implements the planNodeFastPath inteface.
// FastPathResults implements the planNodeFastPath interface.
func (n *controlJobsNode) FastPathResults() (int, bool) {
return n.numRows, true
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,24 @@ var (
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
MetaTxnPrepareStarted = metric.Metadata{
Name: "sql.txn.prepare.started.count",
Help: "Number of SQL PREPARE TRANSACTION statements started",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
MetaTxnCommitPreparedStarted = metric.Metadata{
Name: "sql.txn.commit_prepared.started.count",
Help: "Number of SQL COMMIT PREPARED statements started",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
MetaTxnRollbackPreparedStarted = metric.Metadata{
Name: "sql.txn.rollback_prepared.started.count",
Help: "Number of SQL ROLLBACK PREPARED statements started",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
MetaSQLTxnContended = metric.Metadata{
Name: "sql.txn.contended.count",
Help: "Number of SQL transactions experienced contention",
Expand Down Expand Up @@ -1009,6 +1027,24 @@ var (
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
MetaTxnPrepareExecuted = metric.Metadata{
Name: "sql.txn.prepare.count",
Help: "Number of SQL PREPARE TRANSACTION statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
MetaTxnCommitPreparedExecuted = metric.Metadata{
Name: "sql.txn.commit_prepared.count",
Help: "Number of SQL COMMIT PREPARED statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
MetaTxnRollbackPreparedExecuted = metric.Metadata{
Name: "sql.txn.rollback_prepared.count",
Help: "Number of SQL ROLLBACK PREPARED statements successfully executed",
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
MetaSelectExecuted = metric.Metadata{
Name: "sql.select.count",
Help: "Number of SQL SELECT statements successfully executed",
Expand Down Expand Up @@ -2024,6 +2060,8 @@ func golangFillQueryArguments(args ...interface{}) (tree.Datums, error) {
d = dd
case username.SQLUsername:
d = tree.NewDString(t.Normalized())
case uuid.UUID:
d = tree.NewDUuid(tree.DUuid{UUID: t})
}
if d == nil {
// Handle all types which have an underlying type that can be stored in the
Expand Down
Loading

0 comments on commit 503d914

Please sign in to comment.