diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 0b34a2309d6f..e20a34e4add8 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -1835,14 +1835,26 @@
APPLICATION | sql.txn.commit.count.internal | Number of SQL transaction COMMIT statements successfully executed (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.commit.started.count | Number of SQL transaction COMMIT statements started | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.commit.started.count.internal | Number of SQL transaction COMMIT statements started (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.commit_prepared.count | Number of SQL COMMIT PREPARED statements successfully executed | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.commit_prepared.count.internal | Number of SQL COMMIT PREPARED statements successfully executed (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.commit_prepared.started.count | Number of SQL COMMIT PREPARED statements started | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.commit_prepared.started.count.internal | Number of SQL COMMIT PREPARED statements started (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.contended.count | Number of SQL transactions experienced contention | Contention | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.contended.count.internal | Number of SQL transactions experienced contention (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.latency | Latency of SQL transactions | Latency | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | sql.txn.latency.internal | Latency of SQL transactions (internal queries) | SQL Internal Statements | HISTOGRAM | NANOSECONDS | AVG | NONE |
+APPLICATION | sql.txn.prepare.count | Number of SQL PREPARE TRANSACTION statements successfully executed | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.prepare.count.internal | Number of SQL PREPARE TRANSACTION statements successfully executed (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.prepare.started.count | Number of SQL PREPARE TRANSACTION statements started | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.prepare.started.count.internal | Number of SQL PREPARE TRANSACTION statements started (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.rollback.count | Number of SQL transaction ROLLBACK statements successfully executed | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.rollback.count.internal | Number of SQL transaction ROLLBACK statements successfully executed (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.rollback.started.count | Number of SQL transaction ROLLBACK statements started | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.rollback.started.count.internal | Number of SQL transaction ROLLBACK statements started (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.rollback_prepared.count | Number of SQL ROLLBACK PREPARED statements successfully executed | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.rollback_prepared.count.internal | Number of SQL ROLLBACK PREPARED statements successfully executed (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.rollback_prepared.started.count | Number of SQL ROLLBACK PREPARED statements started | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | sql.txn.rollback_prepared.started.count.internal | Number of SQL ROLLBACK PREPARED statements started (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.upgraded_iso_level.count | Number of times a weak isolation level was automatically upgraded to a stronger one | SQL Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txn.upgraded_iso_level.count.internal | Number of times a weak isolation level was automatically upgraded to a stronger one (internal queries) | SQL Internal Statements | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | sql.txns.open | Number of currently open user SQL transactions | Open SQL Transactions | GAUGE | COUNT | AVG | NONE |
diff --git a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go
index 4a0fabfbddfd..d534e65b3f12 100644
--- a/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go
+++ b/pkg/ccl/logictestccl/tests/3node-tenant/generated_test.go
@@ -2206,6 +2206,13 @@ func TestTenantLogic_tuple(
runLogicTest(t, "tuple")
}
+func TestTenantLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestTenantLogic_txn(
t *testing.T,
) {
diff --git a/pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go b/pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go
index 805a51ed6041..5406ec46ec9d 100644
--- a/pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go
+++ b/pkg/ccl/logictestccl/tests/local-read-committed/generated_test.go
@@ -2190,6 +2190,13 @@ func TestReadCommittedLogic_tuple_local(
runLogicTest(t, "tuple_local")
}
+func TestReadCommittedLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestReadCommittedLogic_txn(
t *testing.T,
) {
diff --git a/pkg/ccl/logictestccl/tests/local-repeatable-read/generated_test.go b/pkg/ccl/logictestccl/tests/local-repeatable-read/generated_test.go
index 173ea62943ac..74d19d79e9b2 100644
--- a/pkg/ccl/logictestccl/tests/local-repeatable-read/generated_test.go
+++ b/pkg/ccl/logictestccl/tests/local-repeatable-read/generated_test.go
@@ -2183,6 +2183,13 @@ func TestRepeatableReadLogic_tuple_local(
runLogicTest(t, "tuple_local")
}
+func TestRepeatableReadLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestRepeatableReadLogic_txn(
t *testing.T,
) {
diff --git a/pkg/cli/clisqlshell/testdata/describe b/pkg/cli/clisqlshell/testdata/describe
index afb2dbef5c3f..ad4d4186a119 100644
--- a/pkg/cli/clisqlshell/testdata/describe
+++ b/pkg/cli/clisqlshell/testdata/describe
@@ -563,7 +563,7 @@ pg_catalog,pg_policies,table,node,permanent,prefix,pg_policies was created for c
pg_catalog,pg_policy,table,node,permanent,prefix,pg_policy was created for compatibility and is currently unimplemented
pg_catalog,pg_prepared_statements,table,node,permanent,prefix,"prepared statements
https://www.postgresql.org/docs/9.6/view-pg-prepared-statements.html"
-pg_catalog,pg_prepared_xacts,table,node,permanent,prefix,"prepared transactions (empty - feature does not exist)
+pg_catalog,pg_prepared_xacts,table,node,permanent,prefix,"prepared transactions
https://www.postgresql.org/docs/9.6/view-pg-prepared-xacts.html"
pg_catalog,pg_proc,table,node,permanent,prefix,"built-in functions (incomplete)
https://www.postgresql.org/docs/16/catalog-pg-proc.html"
diff --git a/pkg/cmd/roachtest/tests/pgjdbc_blocklist.go b/pkg/cmd/roachtest/tests/pgjdbc_blocklist.go
index f7d4c69d78ad..44764a9ea3a4 100644
--- a/pkg/cmd/roachtest/tests/pgjdbc_blocklist.go
+++ b/pkg/cmd/roachtest/tests/pgjdbc_blocklist.go
@@ -621,23 +621,7 @@ var pgjdbcBlockList = blocklist{
`org.postgresql.test.util.PasswordUtilTest.encryptionTypeValueOfOn()`: "73337",
`org.postgresql.test.util.PasswordUtilTest.mD5()`: "73337",
`org.postgresql.test.util.PasswordUtilTest.scramSha256()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.autoCommit()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.commitByDifferentConnection()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.commitUnknownXid()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.committingCommittedXid()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.databaseRemovesPreparedBeforeCommit()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.databaseRemovesPreparedBeforeRollback()`: "unknown",
`org.postgresql.test.xa.XADataSourceTest.mappingOfConstraintViolations()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.networkIssueOnCommit()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.networkIssueOnRollback()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.onePhaseCommitOfPrepared()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.preparingPreparedXid()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.recover()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.repeatedRolledBack()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.rollback()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.rollbackByDifferentConnection()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.rollbackUnknownXid()`: "unknown",
- `org.postgresql.test.xa.XADataSourceTest.twoPhaseCommit()`: "unknown",
}
var pgjdbcIgnoreList = blocklist{
diff --git a/pkg/internal/sqlsmith/sqlsmith.go b/pkg/internal/sqlsmith/sqlsmith.go
index 3ba8093cff33..93810d2d13e2 100644
--- a/pkg/internal/sqlsmith/sqlsmith.go
+++ b/pkg/internal/sqlsmith/sqlsmith.go
@@ -368,6 +368,7 @@ var DisableDDLs = simpleOption("disable DDLs", func(s *Smither) {
{2, makeRollbackToSavepoint},
{2, makeCommit},
{2, makeRollback},
+ // TODO(nvanbenschoten): add two-phase commit statements.
}
})
diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go
index e910214a1717..b1318201806f 100644
--- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go
+++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go
@@ -1323,6 +1323,13 @@ func (tc *TxnCoordSender) IsSerializablePushAndRefreshNotPossible() bool {
return isTxnSerializable && isTxnPushed && refreshAttemptNotPossible
}
+// Key is part of the kv.TxnSender interface.
+func (tc *TxnCoordSender) Key() roachpb.Key {
+ tc.mu.Lock()
+ defer tc.mu.Unlock()
+ return tc.mu.txn.Key
+}
+
// Epoch is part of the kv.TxnSender interface.
func (tc *TxnCoordSender) Epoch() enginepb.TxnEpoch {
tc.mu.Lock()
diff --git a/pkg/kv/mock_transactional_sender.go b/pkg/kv/mock_transactional_sender.go
index 53f4bdfcefeb..bd743cd83f40 100644
--- a/pkg/kv/mock_transactional_sender.go
+++ b/pkg/kv/mock_transactional_sender.go
@@ -211,6 +211,9 @@ func (m *MockTransactionalSender) CanUseSavepoint(context.Context, SavepointToke
panic("unimplemented")
}
+// Key is part of the TxnSender interface.
+func (m *MockTransactionalSender) Key() roachpb.Key { panic("unimplemented") }
+
// Epoch is part of the TxnSender interface.
func (m *MockTransactionalSender) Epoch() enginepb.TxnEpoch { panic("unimplemented") }
diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go
index e0ce101a1404..f4aaddb97eb0 100644
--- a/pkg/kv/sender.go
+++ b/pkg/kv/sender.go
@@ -289,6 +289,10 @@ type TxnSender interface {
// https://github.com/cockroachdb/cockroach/issues/15012
Active() bool
+ // Key returns the current "anchor" key of the transaction, or nil if no such
+ // key has been set because the transaction has not yet acquired any locks.
+ Key() roachpb.Key
+
// Epoch returns the txn's epoch.
Epoch() enginepb.TxnEpoch
diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go
index 032b25675749..c10f9e7eb3da 100644
--- a/pkg/kv/txn.go
+++ b/pkg/kv/txn.go
@@ -262,6 +262,14 @@ func (txn *Txn) ID() uuid.UUID {
return txn.mu.ID
}
+// Key returns the current "anchor" key of the transaction, or nil if no such
+// key has been set because the transaction has not yet acquired any locks.
+func (txn *Txn) Key() roachpb.Key {
+ txn.mu.Lock()
+ defer txn.mu.Unlock()
+ return txn.mu.sender.Key()
+}
+
// Epoch exports the txn's epoch.
func (txn *Txn) Epoch() enginepb.TxnEpoch {
txn.mu.Lock()
diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel
index f5b8feb4547b..bf625c90202b 100644
--- a/pkg/sql/BUILD.bazel
+++ b/pkg/sql/BUILD.bazel
@@ -274,6 +274,7 @@ go_library(
"testutils.go",
"topk.go",
"truncate.go",
+ "two_phase_commit.go",
"txn_fingerprint_id_cache.go",
"txn_state.go",
"type_change.go",
diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go
index 18a30a9405ed..7d40fdf54cd3 100644
--- a/pkg/sql/conn_executor.go
+++ b/pkg/sql/conn_executor.go
@@ -2161,7 +2161,7 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent, pay
}
switch ev.eventType {
- case txnCommit, txnRollback:
+ case txnCommit, txnRollback, txnPrepare:
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
@@ -4080,10 +4080,10 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}
fallthrough
- case txnRollback:
+ case txnRollback, txnPrepare:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent, payloadErr)
- // Since we're doing a complete rollback, there's no need to keep the
- // prepared stmts for a txn rewind.
+ // Since we're finalizing the SQL transaction (commit, rollback, prepare),
+ // there's no need to keep the prepared stmts for a txn rewind.
ex.extraTxnState.prepStmtsNamespaceAtTxnRewindPos.closeAllPortals(
ex.Ctx(), &ex.extraTxnState.prepStmtsNamespaceMemAcc,
)
@@ -4494,6 +4494,11 @@ type StatementCounters struct {
TxnRollbackCount telemetry.CounterWithMetric
TxnUpgradedCount *metric.Counter
+ // Transaction XA two-phase commit operations.
+ TxnPrepareCount telemetry.CounterWithMetric
+ TxnCommitPreparedCount telemetry.CounterWithMetric
+ TxnRollbackPreparedCount telemetry.CounterWithMetric
+
// Savepoint operations. SavepointCount is for real SQL savepoints;
// the RestartSavepoint variants are for the
// cockroach-specific client-side retry protocol.
@@ -4531,6 +4536,12 @@ func makeStartedStatementCounters(internal bool) StatementCounters {
getMetricMeta(MetaTxnRollbackStarted, internal)),
TxnUpgradedCount: metric.NewCounter(
getMetricMeta(MetaTxnUpgradedFromWeakIsolation, internal)),
+ TxnPrepareCount: telemetry.NewCounterWithMetric(
+ getMetricMeta(MetaTxnPrepareStarted, internal)),
+ TxnCommitPreparedCount: telemetry.NewCounterWithMetric(
+ getMetricMeta(MetaTxnCommitPreparedStarted, internal)),
+ TxnRollbackPreparedCount: telemetry.NewCounterWithMetric(
+ getMetricMeta(MetaTxnRollbackPreparedStarted, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRestartSavepointStarted, internal)),
ReleaseRestartSavepointCount: telemetry.NewCounterWithMetric(
@@ -4576,6 +4587,12 @@ func makeExecutedStatementCounters(internal bool) StatementCounters {
getMetricMeta(MetaTxnRollbackExecuted, internal)),
TxnUpgradedCount: metric.NewCounter(
getMetricMeta(MetaTxnUpgradedFromWeakIsolation, internal)),
+ TxnPrepareCount: telemetry.NewCounterWithMetric(
+ getMetricMeta(MetaTxnPrepareExecuted, internal)),
+ TxnCommitPreparedCount: telemetry.NewCounterWithMetric(
+ getMetricMeta(MetaTxnCommitPreparedExecuted, internal)),
+ TxnRollbackPreparedCount: telemetry.NewCounterWithMetric(
+ getMetricMeta(MetaTxnRollbackPreparedExecuted, internal)),
RestartSavepointCount: telemetry.NewCounterWithMetric(
getMetricMeta(MetaRestartSavepointExecuted, internal)),
ReleaseRestartSavepointCount: telemetry.NewCounterWithMetric(
@@ -4638,6 +4655,12 @@ func (sc *StatementCounters) incrementCount(ex *connExecutor, stmt tree.Statemen
} else {
sc.TxnRollbackCount.Inc()
}
+ case *tree.PrepareTransaction:
+ sc.TxnPrepareCount.Inc()
+ case *tree.CommitPrepared:
+ sc.TxnCommitPreparedCount.Inc()
+ case *tree.RollbackPrepared:
+ sc.TxnRollbackPreparedCount.Inc()
case *tree.Savepoint:
if ex.isCommitOnReleaseSavepoint(t.Name) {
sc.RestartSavepointCount.Inc()
diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go
index 973b59c3b8bc..b681ec78e03a 100644
--- a/pkg/sql/conn_executor_exec.go
+++ b/pkg/sql/conn_executor_exec.go
@@ -780,6 +780,10 @@ func (ex *connExecutor) execStmtInOpenState(
ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res)
return ev, payload, nil
+ case *tree.PrepareTransaction:
+ ev, payload := ex.execPrepareTransactionInOpenState(ctx, s)
+ return ev, payload, nil
+
case *tree.ShowCommitTimestamp:
ev, payload := ex.execShowCommitTimestampInOpenState(ctx, s, res, canAutoCommit)
return ev, payload, nil
@@ -1799,6 +1803,10 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
ev, payload := ex.execRollbackToSavepointInOpenState(ctx, s, res)
return ev, payload, nil
+ case *tree.PrepareTransaction:
+ ev, payload := ex.execPrepareTransactionInOpenState(ctx, s)
+ return ev, payload, nil
+
case *tree.ShowCommitTimestamp:
ev, payload := ex.execShowCommitTimestampInOpenState(ctx, s, res, canAutoCommit)
return ev, payload, nil
@@ -3450,8 +3458,8 @@ func (ex *connExecutor) execStmtInNoTxnState(
)
case *tree.ShowCommitTimestamp:
return ex.execShowCommitTimestampInNoTxnState(ctx, s, res)
- case *tree.CommitTransaction, *tree.ReleaseSavepoint,
- *tree.RollbackTransaction, *tree.SetTransaction, *tree.Savepoint:
+ case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.PrepareTransaction,
+ *tree.SetTransaction, *tree.Savepoint, *tree.ReleaseSavepoint:
if ex.sessionData().AutoCommitBeforeDDL {
// If autocommit_before_ddl is set, we allow these statements to be
// executed, and send a warning rather than an error.
@@ -3523,7 +3531,7 @@ func (ex *connExecutor) beginImplicitTxn(
// execStmtInAbortedState executes a statement in a txn that's in state
// Aborted or RestartWait. All statements result in error events except:
-// - COMMIT / ROLLBACK: aborts the current transaction.
+// - COMMIT / ROLLBACK / PREPARE TRANSACTION: aborts the current transaction.
// - ROLLBACK TO SAVEPOINT / SAVEPOINT: reopens the current transaction,
// allowing it to be retried.
func (ex *connExecutor) execStmtInAbortedState(
@@ -3545,9 +3553,10 @@ func (ex *connExecutor) execStmtInAbortedState(
}
switch s := ast.(type) {
- case *tree.CommitTransaction, *tree.RollbackTransaction:
- if _, ok := s.(*tree.CommitTransaction); ok {
- // Note: Postgres replies to COMMIT of failed txn with "ROLLBACK" too.
+ case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.PrepareTransaction:
+ if _, ok := s.(*tree.RollbackTransaction); !ok {
+ // Note: Postgres replies to COMMIT and PREPARE TRANSACTION of failed
+ // transactions with "ROLLBACK" too.
res.ResetStmtType((*tree.RollbackTransaction)(nil))
}
return ex.rollbackSQLTransaction(ctx, s)
diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go
index 1045f49e2519..ad6466751148 100644
--- a/pkg/sql/conn_executor_prepare.go
+++ b/pkg/sql/conn_executor_prepare.go
@@ -711,7 +711,8 @@ func (ex *connExecutor) execDescribe(
// prepared and executed inside of an aborted transaction.
func (ex *connExecutor) isAllowedInAbortedTxn(ast tree.Statement) bool {
switch s := ast.(type) {
- case *tree.CommitTransaction, *tree.RollbackTransaction, *tree.RollbackToSavepoint:
+ case *tree.CommitTransaction, *tree.PrepareTransaction,
+ *tree.RollbackTransaction, *tree.RollbackToSavepoint:
return true
case *tree.Savepoint:
if ex.isCommitOnReleaseSavepoint(s.Name) {
diff --git a/pkg/sql/conn_fsm.go b/pkg/sql/conn_fsm.go
index a128e8f7da9d..702952a566bc 100644
--- a/pkg/sql/conn_fsm.go
+++ b/pkg/sql/conn_fsm.go
@@ -163,6 +163,23 @@ type eventTxnFinishAbortedPLpgSQL struct{}
// that case no event is necessary.
type eventSavepointRollback struct{}
+// eventTxnFinishPrepared is generated when a PREPARE TRANSACTION statement is
+// executed. The current transaction is dissociated with the current session.
+type eventTxnFinishPrepared struct{}
+
+// eventTxnFinishPreparedErrPayload represents the payload for
+// eventTxnFinishPrepared, if the PREPARE TRANSACTION statement encountered an
+// error. No payload is used if the statement was successful.
+type eventTxnFinishPreparedErrPayload struct {
+ // err is the error encountered during the prepare.
+ err error
+}
+
+// errorCause implements the payloadWithError interface.
+func (p eventTxnFinishPreparedErrPayload) errorCause() error {
+ return p.err
+}
+
type eventNonRetriableErr struct {
IsCommit fsm.Bool
}
@@ -230,6 +247,7 @@ type payloadWithError interface {
func (eventTxnStart) Event() {}
func (eventTxnFinishCommitted) Event() {}
func (eventTxnFinishAborted) Event() {}
+func (eventTxnFinishPrepared) Event() {}
func (eventTxnFinishCommittedPLpgSQL) Event() {}
func (eventTxnFinishAbortedPLpgSQL) Event() {}
func (eventSavepointRollback) Event() {}
@@ -374,6 +392,18 @@ var TxnStateTransitions = fsm.Compile(fsm.Pattern{
},
},
stateOpen{ImplicitTxn: fsm.False, WasUpgraded: fsm.Var("wasUpgraded")}: {
+ // Handle prepared transactions. Note that this statement is only valid in
+ // the context of an explicit transaction, so we don't need to handle
+ // implicit transactions.
+ eventTxnFinishPrepared{}: {
+ Description: "PREPARE TRANSACTION",
+ Next: stateNoTxn{},
+ Action: func(args fsm.Args) error {
+ // Note that the KV txn has been prepared by the statement execution by
+ // this point and is being dissociated from the session.
+ return args.Extended.(*txnState).finishTxn(txnPrepare, advanceOne)
+ },
+ },
// Handle the errors in explicit txns.
eventNonRetriableErr{IsCommit: fsm.False}: {
Next: stateAborted{WasUpgraded: fsm.Var("wasUpgraded")},
diff --git a/pkg/sql/control_jobs.go b/pkg/sql/control_jobs.go
index 203a897914ed..077f41c139b3 100644
--- a/pkg/sql/control_jobs.go
+++ b/pkg/sql/control_jobs.go
@@ -31,7 +31,7 @@ var jobCommandToDesiredStatus = map[tree.JobCommand]jobs.Status{
tree.PauseJob: jobs.StatusPaused,
}
-// FastPathResults implements the planNodeFastPath inteface.
+// FastPathResults implements the planNodeFastPath interface.
func (n *controlJobsNode) FastPathResults() (int, bool) {
return n.numRows, true
}
diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go
index 47a4946fa4bc..1ed8334fa433 100644
--- a/pkg/sql/exec_util.go
+++ b/pkg/sql/exec_util.go
@@ -881,6 +881,24 @@ var (
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
+ MetaTxnPrepareStarted = metric.Metadata{
+ Name: "sql.txn.prepare.started.count",
+ Help: "Number of SQL PREPARE TRANSACTION statements started",
+ Measurement: "SQL Statements",
+ Unit: metric.Unit_COUNT,
+ }
+ MetaTxnCommitPreparedStarted = metric.Metadata{
+ Name: "sql.txn.commit_prepared.started.count",
+ Help: "Number of SQL COMMIT PREPARED statements started",
+ Measurement: "SQL Statements",
+ Unit: metric.Unit_COUNT,
+ }
+ MetaTxnRollbackPreparedStarted = metric.Metadata{
+ Name: "sql.txn.rollback_prepared.started.count",
+ Help: "Number of SQL ROLLBACK PREPARED statements started",
+ Measurement: "SQL Statements",
+ Unit: metric.Unit_COUNT,
+ }
MetaSQLTxnContended = metric.Metadata{
Name: "sql.txn.contended.count",
Help: "Number of SQL transactions experienced contention",
@@ -1009,6 +1027,24 @@ var (
Measurement: "SQL Statements",
Unit: metric.Unit_COUNT,
}
+ MetaTxnPrepareExecuted = metric.Metadata{
+ Name: "sql.txn.prepare.count",
+ Help: "Number of SQL PREPARE TRANSACTION statements successfully executed",
+ Measurement: "SQL Statements",
+ Unit: metric.Unit_COUNT,
+ }
+ MetaTxnCommitPreparedExecuted = metric.Metadata{
+ Name: "sql.txn.commit_prepared.count",
+ Help: "Number of SQL COMMIT PREPARED statements successfully executed",
+ Measurement: "SQL Statements",
+ Unit: metric.Unit_COUNT,
+ }
+ MetaTxnRollbackPreparedExecuted = metric.Metadata{
+ Name: "sql.txn.rollback_prepared.count",
+ Help: "Number of SQL ROLLBACK PREPARED statements successfully executed",
+ Measurement: "SQL Statements",
+ Unit: metric.Unit_COUNT,
+ }
MetaSelectExecuted = metric.Metadata{
Name: "sql.select.count",
Help: "Number of SQL SELECT statements successfully executed",
@@ -2024,6 +2060,8 @@ func golangFillQueryArguments(args ...interface{}) (tree.Datums, error) {
d = dd
case username.SQLUsername:
d = tree.NewDString(t.Normalized())
+ case uuid.UUID:
+ d = tree.NewDUuid(tree.DUuid{UUID: t})
}
if d == nil {
// Handle all types which have an underlying type that can be stored in the
diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal
index 3badea94370c..8acaddb5415e 100644
--- a/pkg/sql/logictest/testdata/logic_test/crdb_internal
+++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal
@@ -126,7 +126,7 @@ pg_partitioned_table true
pg_policies true
pg_policy true
pg_prepared_statements false
-pg_prepared_xacts true
+pg_prepared_xacts false
pg_proc false
pg_publication true
pg_publication_rel true
diff --git a/pkg/sql/logictest/testdata/logic_test/two_phase_commit b/pkg/sql/logictest/testdata/logic_test/two_phase_commit
new file mode 100644
index 000000000000..507b819b3cde
--- /dev/null
+++ b/pkg/sql/logictest/testdata/logic_test/two_phase_commit
@@ -0,0 +1,296 @@
+# LogicTest: !local-mixed-24.3
+
+query T
+SHOW max_prepared_transactions
+----
+2147483647
+
+query TTTTTTT
+SELECT * FROM system.prepared_transactions
+----
+
+query ITTTT
+SELECT * FROM pg_catalog.pg_prepared_xacts
+----
+
+statement ok
+CREATE TABLE t (a INT)
+
+statement ok
+GRANT ALL on t TO testuser
+
+
+# Prepare a read-only transaction.
+statement ok
+BEGIN
+
+query I
+SELECT * FROM t
+----
+
+statement ok
+PREPARE TRANSACTION 'read-only'
+
+# Verify the prepared transaction was added to the system table.
+query TTT
+SELECT global_id, owner, database FROM system.prepared_transactions
+----
+read-only root test
+
+# Verify the prepared transaction is visible in the pg_catalog.pg_prepared_xacts
+# table.
+query ITTT
+SELECT transaction, gid, owner, database FROM pg_catalog.pg_prepared_xacts
+----
+0 read-only root test
+
+# Commit the read-only prepared transaction.
+statement ok
+COMMIT PREPARED 'read-only'
+
+# Verify the prepared transaction is gone.
+query T
+SELECT global_id FROM system.prepared_transactions
+----
+
+
+# Prepare another read-only transaction for abort.
+statement ok
+BEGIN
+
+query I
+SELECT * FROM t
+----
+
+statement ok
+PREPARE TRANSACTION 'read-only'
+
+# Commit the read-only prepared transaction.
+statement ok
+ROLLBACK PREPARED 'read-only'
+
+
+# Prepare a read-write transaction.
+statement ok
+BEGIN
+
+statement ok
+INSERT INTO t(a) VALUES (1)
+
+statement ok
+PREPARE TRANSACTION 'read-write'
+
+# Verify the prepared transaction is visible in the system table.
+query T
+SELECT global_id FROM system.prepared_transactions
+----
+read-write
+
+# Commit the prepared transaction
+statement ok
+COMMIT PREPARED 'read-write'
+
+# Verify prepared insert is visible.
+query I
+SELECT * FROM t
+----
+1
+
+
+# Prepare a read-write transaction for rollback.
+statement ok
+BEGIN
+
+statement ok
+INSERT INTO t(a) VALUES (2)
+
+statement ok
+PREPARE TRANSACTION 'read-write'
+
+statement ok
+ROLLBACK PREPARED 'read-write'
+
+query I
+SELECT * FROM t
+----
+1
+
+query T
+SELECT global_id FROM system.prepared_transactions
+----
+
+
+# Verify a transaction cannot be prepared with a duplicate global id.
+statement ok
+BEGIN
+
+statement ok
+INSERT INTO t(a) VALUES (2)
+
+statement ok
+PREPARE TRANSACTION 'duplicate'
+
+statement ok
+BEGIN
+
+statement ok
+INSERT INTO t(a) VALUES (3)
+
+statement error pgcode 42710 transaction identifier "duplicate" is already in use
+PREPARE TRANSACTION 'duplicate'
+
+query T
+SHOW transaction_status
+----
+NoTxn
+
+# Commit concurrent prepared transaction.
+statement ok
+COMMIT PREPARED 'duplicate'
+
+# Verify the intent for a=2 has been commited and the intent for a=3 has been
+# rolled back.
+query I
+SELECT * FROM t ORDER BY a
+----
+1
+2
+
+
+# Verify the global id has a maximum length of 200 characters.
+statement ok
+BEGIN
+
+statement error pgcode 22023 transaction identifier "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" is too long
+PREPARE TRANSACTION 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+
+query T
+SHOW transaction_status
+----
+NoTxn
+
+# Prepare a transaction with one fewer character.
+statement ok
+BEGIN
+
+statement ok
+PREPARE TRANSACTION 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+
+query T
+SELECT global_id FROM system.prepared_transactions
+----
+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+
+statement ok
+ROLLBACK PREPARED 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa';
+
+
+# Prepare in an aborted transaction rolls the transaction back without inserting
+# into the system table.
+statement error pgcode 22012 division by zero
+BEGIN; SELECT 1/0
+
+statement ok
+PREPARE TRANSACTION 'aborted-txn'
+
+query T
+SHOW transaction_status
+----
+NoTxn
+
+query TTT
+SELECT global_id, owner, database FROM system.prepared_transactions
+----
+
+
+# Verify prepare must be executed inside a transaction block.
+statement error pgcode 25P01 there is no transaction in progress
+PREPARE TRANSACTION 'implicit-txn'
+
+
+# Verify commit/rollback cannot be executed inside a transaction block.
+statement ok
+BEGIN
+
+statement error pgcode 25001 COMMIT PREPARED cannot run inside a transaction block
+COMMIT PREPARED 'txn'
+
+statement ok
+ROLLBACK; BEGIN
+
+statement error pgcode 25001 ROLLBACK PREPARED cannot run inside a transaction block
+ROLLBACK PREPARED 'txn'
+
+statement ok
+ROLLBACK
+
+
+# Verify an error is throw for unknown prepared transactions.
+statement error pgcode 42704 prepared transaction with identifier "unknown" does not exist
+COMMIT PREPARED 'unknown'
+
+statement error pgcode 42704 prepared transaction with identifier "unknown" does not exist
+ROLLBACK PREPARED 'unknown'
+
+
+# Verify permissions.
+statement ok
+BEGIN
+
+statement ok
+INSERT INTO t(a) VALUES (3)
+
+statement ok
+PREPARE TRANSACTION 'read-write-root'
+
+query TTT
+SELECT global_id, owner, database FROM system.prepared_transactions
+----
+read-write-root root test
+
+user testuser
+
+statement error pgcode 42501 permission denied to finish prepared transaction
+COMMIT PREPARED 'read-write-root'
+
+# Prepare a transaction as testuser and verify that root can roll it back.
+statement ok
+BEGIN
+
+statement ok
+INSERT INTO t(a) VALUES (4)
+
+statement ok
+PREPARE TRANSACTION 'read-write-testuser'
+
+# Before we switch back, verify that non-root/admin users cannot read from
+# or write to system.prepared_transactions.
+statement error pgcode 42501 user testuser does not have SELECT privilege on relation prepared_transactions
+SELECT global_id, owner, database FROM system.prepared_transactions
+
+statement error pgcode 42501 user testuser does not have DELETE privilege on relation prepared_transactions
+DELETE FROM system.prepared_transactions
+
+user root
+
+query TTT rowsort
+SELECT global_id, owner, database FROM system.prepared_transactions
+----
+read-write-root root test
+read-write-testuser testuser test
+
+query ITTT rowsort
+SELECT transaction, gid, owner, database FROM pg_catalog.pg_prepared_xacts
+----
+0 read-write-root root test
+0 read-write-testuser testuser test
+
+statement ok
+ROLLBACK PREPARED 'read-write-root'
+
+statement ok
+ROLLBACK PREPARED 'read-write-testuser'
+
+query TTT
+SELECT global_id, owner, database FROM system.prepared_transactions
+----
diff --git a/pkg/sql/logictest/testdata/logic_test/two_phase_commit_mixed_version b/pkg/sql/logictest/testdata/logic_test/two_phase_commit_mixed_version
new file mode 100644
index 000000000000..26522ab2f3fe
--- /dev/null
+++ b/pkg/sql/logictest/testdata/logic_test/two_phase_commit_mixed_version
@@ -0,0 +1,17 @@
+# LogicTest: local-mixed-24.3
+
+statement ok
+BEGIN
+
+statement error pgcode 0A000 PREPARE TRANSACTION unsupported in mixed-version cluster
+PREPARE TRANSACTION 'txn'
+
+statement error pgcode 0A000 COMMIT PREPARED unsupported in mixed-version cluster
+COMMIT PREPARED 'txn'
+
+statement error pgcode 0A000 ROLLBACK PREPARED unsupported in mixed-version cluster
+ROLLBACK PREPARED 'txn'
+
+query ITTTT
+SELECT * FROM pg_catalog.pg_prepared_xacts
+----
diff --git a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go
index 8d4e83341d49..3fab80652e0e 100644
--- a/pkg/sql/logictest/tests/fakedist-disk/generated_test.go
+++ b/pkg/sql/logictest/tests/fakedist-disk/generated_test.go
@@ -2159,6 +2159,13 @@ func TestLogic_tuple(
runLogicTest(t, "tuple")
}
+func TestLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestLogic_txn(
t *testing.T,
) {
diff --git a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go
index d41883e1da75..d4aefe8b1090 100644
--- a/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go
+++ b/pkg/sql/logictest/tests/fakedist-vec-off/generated_test.go
@@ -2166,6 +2166,13 @@ func TestLogic_tuple(
runLogicTest(t, "tuple")
}
+func TestLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestLogic_txn(
t *testing.T,
) {
diff --git a/pkg/sql/logictest/tests/fakedist/generated_test.go b/pkg/sql/logictest/tests/fakedist/generated_test.go
index a8ef6791772a..f8af5190b483 100644
--- a/pkg/sql/logictest/tests/fakedist/generated_test.go
+++ b/pkg/sql/logictest/tests/fakedist/generated_test.go
@@ -2180,6 +2180,13 @@ func TestLogic_tuple(
runLogicTest(t, "tuple")
}
+func TestLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestLogic_txn(
t *testing.T,
) {
diff --git a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go
index be9963884d0f..808c55191f07 100644
--- a/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go
+++ b/pkg/sql/logictest/tests/local-legacy-schema-changer/generated_test.go
@@ -2159,6 +2159,13 @@ func TestLogic_tuple_local(
runLogicTest(t, "tuple_local")
}
+func TestLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestLogic_txn(
t *testing.T,
) {
diff --git a/pkg/sql/logictest/tests/local-mixed-24.3/generated_test.go b/pkg/sql/logictest/tests/local-mixed-24.3/generated_test.go
index 7987a50e8abe..787f0c3265c6 100644
--- a/pkg/sql/logictest/tests/local-mixed-24.3/generated_test.go
+++ b/pkg/sql/logictest/tests/local-mixed-24.3/generated_test.go
@@ -2180,6 +2180,13 @@ func TestLogic_tuple_local(
runLogicTest(t, "tuple_local")
}
+func TestLogic_two_phase_commit_mixed_version(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit_mixed_version")
+}
+
func TestLogic_txn(
t *testing.T,
) {
diff --git a/pkg/sql/logictest/tests/local-vec-off/generated_test.go b/pkg/sql/logictest/tests/local-vec-off/generated_test.go
index eb3984738b5f..9662423f1acb 100644
--- a/pkg/sql/logictest/tests/local-vec-off/generated_test.go
+++ b/pkg/sql/logictest/tests/local-vec-off/generated_test.go
@@ -2194,6 +2194,13 @@ func TestLogic_tuple_local(
runLogicTest(t, "tuple_local")
}
+func TestLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestLogic_txn(
t *testing.T,
) {
diff --git a/pkg/sql/logictest/tests/local/generated_test.go b/pkg/sql/logictest/tests/local/generated_test.go
index 76c854b2f5f0..915ee3d4211e 100644
--- a/pkg/sql/logictest/tests/local/generated_test.go
+++ b/pkg/sql/logictest/tests/local/generated_test.go
@@ -2411,6 +2411,13 @@ func TestLogic_tuple_local(
runLogicTest(t, "tuple_local")
}
+func TestLogic_two_phase_commit(
+ t *testing.T,
+) {
+ defer leaktest.AfterTest(t)()
+ runLogicTest(t, "two_phase_commit")
+}
+
func TestLogic_txn(
t *testing.T,
) {
diff --git a/pkg/sql/opaque.go b/pkg/sql/opaque.go
index 779b850fd926..3f403087f34d 100644
--- a/pkg/sql/opaque.go
+++ b/pkg/sql/opaque.go
@@ -150,14 +150,16 @@ func planOpaque(ctx context.Context, p *planner, stmt tree.Statement) (planNode,
return p.CommentOnConstraint(ctx, n)
case *tree.CommentOnDatabase:
return p.CommentOnDatabase(ctx, n)
- case *tree.CommentOnSchema:
- return p.CommentOnSchema(ctx, n)
case *tree.CommentOnIndex:
return p.CommentOnIndex(ctx, n)
+ case *tree.CommentOnSchema:
+ return p.CommentOnSchema(ctx, n)
case *tree.CommentOnTable:
return p.CommentOnTable(ctx, n)
case *tree.CommentOnType:
return p.CommentOnType(ctx, n)
+ case *tree.CommitPrepared:
+ return p.CommitPrepared(ctx, n)
case *tree.CopyTo:
// COPY TO does not actually get prepared in any meaningful way. This means
// it can't have placeholder arguments, and the execution can use the same
@@ -245,6 +247,8 @@ func planOpaque(ctx context.Context, p *planner, stmt tree.Statement) (planNode,
return p.Revoke(ctx, n)
case *tree.RevokeRole:
return p.RevokeRole(ctx, n)
+ case *tree.RollbackPrepared:
+ return p.RollbackPrepared(ctx, n)
case *tree.Scatter:
return p.Scatter(ctx, n)
case *tree.Scrub:
@@ -350,12 +354,13 @@ func init() {
&tree.AlterRoleSet{},
&tree.CloseCursor{},
&tree.CommentOnColumn{},
+ &tree.CommentOnConstraint{},
&tree.CommentOnDatabase{},
- &tree.CommentOnType{},
- &tree.CommentOnSchema{},
&tree.CommentOnIndex{},
- &tree.CommentOnConstraint{},
+ &tree.CommentOnSchema{},
&tree.CommentOnTable{},
+ &tree.CommentOnType{},
+ &tree.CommitPrepared{},
&tree.CopyTo{},
&tree.CreateDatabase{},
&tree.CreateExtension{},
@@ -398,6 +403,7 @@ func init() {
&tree.ReparentDatabase{},
&tree.Revoke{},
&tree.RevokeRole{},
+ &tree.RollbackPrepared{},
&tree.Scatter{},
&tree.Scrub{},
&tree.SetClusterSetting{},
diff --git a/pkg/sql/pg_catalog.go b/pkg/sql/pg_catalog.go
index 31a515fbe350..67476994eb7d 100644
--- a/pkg/sql/pg_catalog.go
+++ b/pkg/sql/pg_catalog.go
@@ -16,6 +16,7 @@ import (
"time"
"unicode"
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
@@ -2419,13 +2420,46 @@ https://www.postgresql.org/docs/9.5/catalog-pg-operator.html`,
}
var pgCatalogPreparedXactsTable = virtualSchemaTable{
- comment: `prepared transactions (empty - feature does not exist)
+ comment: `prepared transactions
https://www.postgresql.org/docs/9.6/view-pg-prepared-xacts.html`,
schema: vtable.PGCatalogPreparedXacts,
populate: func(ctx context.Context, p *planner, dbContext catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error {
+ if !p.IsActive(ctx, clusterversion.V25_1_PreparedTransactionsTable) {
+ // TODO(nvanbenschoten): Remove this logic when mixed-version support
+ // with v24.3 is no longer necessary.
+ return nil
+ }
+ rows, err := p.InternalSQLTxn().QueryBufferedEx(
+ ctx,
+ "select-prepared-transactions",
+ p.Txn(),
+ sessiondata.NodeUserSessionDataOverride,
+ `SELECT global_id, prepared, owner, database FROM system.prepared_transactions`,
+ )
+ if err != nil {
+ return err
+ }
+ for _, row := range rows {
+ // NOTE: we can't map a 128-bit CockroachDB transaction ID to a 32-bit
+ // Postgres xid, so we just return zero for each transaction's xid. This
+ // is acceptable, as the gid is the important part of pg_prepared_xacts.
+ transaction := zeroVal
+ globalID := row[0]
+ prepared := row[1]
+ owner := tree.NewDName(string(tree.MustBeDString(row[2])))
+ database := tree.NewDName(string(tree.MustBeDString(row[3])))
+ if err := addRow(
+ transaction, // transaction
+ globalID, // gid
+ prepared, // prepared
+ owner, // owner
+ database, // database
+ ); err != nil {
+ return err
+ }
+ }
return nil
},
- unimplemented: true,
}
// pgCatalogPreparedStatementsTable implements the pg_prepared_statements table.
diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go
index 928c6811f914..30f179506a8c 100644
--- a/pkg/sql/plan.go
+++ b/pkg/sql/plan.go
@@ -179,6 +179,7 @@ var _ planNode = &dropViewNode{}
var _ planNode = &errorIfRowsNode{}
var _ planNode = &explainVecNode{}
var _ planNode = &filterNode{}
+var _ planNode = &endPreparedTxnNode{}
var _ planNode = &GrantRoleNode{}
var _ planNode = &groupNode{}
var _ planNode = &hookFnNode{}
diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go
index a18fe5ee5895..cd46ae06ccb9 100644
--- a/pkg/sql/plan_opt.go
+++ b/pkg/sql/plan_opt.go
@@ -67,17 +67,17 @@ func (p *planner) prepareUsingOptimizer(
*tree.Analyze,
*tree.BeginTransaction,
*tree.CommentOnColumn, *tree.CommentOnConstraint, *tree.CommentOnDatabase, *tree.CommentOnIndex, *tree.CommentOnTable, *tree.CommentOnSchema,
- *tree.CommitTransaction,
+ *tree.CommitPrepared, *tree.CommitTransaction,
*tree.CopyFrom, *tree.CopyTo, *tree.CreateDatabase, *tree.CreateIndex, *tree.CreateView,
*tree.CreateSequence,
*tree.CreateStats,
*tree.Deallocate, *tree.Discard, *tree.DropDatabase, *tree.DropIndex,
*tree.DropTable, *tree.DropView, *tree.DropSequence, *tree.DropType,
*tree.Grant, *tree.GrantRole,
- *tree.Prepare,
+ *tree.Prepare, *tree.PrepareTransaction,
*tree.ReleaseSavepoint, *tree.RenameColumn, *tree.RenameDatabase,
*tree.RenameIndex, *tree.RenameTable, *tree.Revoke, *tree.RevokeRole,
- *tree.RollbackToSavepoint, *tree.RollbackTransaction,
+ *tree.RollbackPrepared, *tree.RollbackToSavepoint, *tree.RollbackTransaction,
*tree.Savepoint, *tree.SetTransaction, *tree.SetTracing, *tree.SetSessionAuthorizationDefault,
*tree.SetSessionCharacteristics:
// These statements do not have result columns and do not support placeholders
diff --git a/pkg/sql/two_phase_commit.go b/pkg/sql/two_phase_commit.go
new file mode 100644
index 000000000000..15800c677c8e
--- /dev/null
+++ b/pkg/sql/two_phase_commit.go
@@ -0,0 +1,430 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the CockroachDB Software License
+// included in the /LICENSE file.
+
+package sql
+
+import (
+ "context"
+
+ "github.com/cockroachdb/cockroach/pkg/clusterversion"
+ "github.com/cockroachdb/cockroach/pkg/kv"
+ "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
+ "github.com/cockroachdb/cockroach/pkg/roachpb"
+ "github.com/cockroachdb/cockroach/pkg/sql/isql"
+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
+ "github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
+ "github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
+ "github.com/cockroachdb/cockroach/pkg/storage/enginepb"
+ "github.com/cockroachdb/cockroach/pkg/util/fsm"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/tracing"
+ "github.com/cockroachdb/cockroach/pkg/util/uuid"
+ "github.com/cockroachdb/errors"
+)
+
+// maxPreparedTxnGlobalIDLen is the maximum length of a prepared transaction's
+// global ID. Taken from Postgres, see GIDSIZE.
+const maxPreparedTxnGlobalIDLen = 200
+
+// execPrepareTransactionInOpenState runs a PREPARE TRANSACTION statement inside
+// an open txn.
+func (ex *connExecutor) execPrepareTransactionInOpenState(
+ ctx context.Context, s *tree.PrepareTransaction,
+) (fsm.Event, fsm.EventPayload) {
+ ctx, sp := tracing.EnsureChildSpan(ctx, ex.server.cfg.AmbientCtx.Tracer, "prepare sql txn")
+ defer sp.Finish()
+
+ // Insert into the system table and prepare the transaction in the KV layer.
+ prepareErr := ex.execPrepareTransactionInOpenStateInternal(ctx, s)
+
+ // Roll back the transaction if we encounter an error.
+ //
+ // From https://www.postgresql.org/docs/16/sql-prepare-transaction.html:
+ // > If the PREPARE TRANSACTION command fails for any reason, it becomes a
+ // > ROLLBACK: the current transaction is canceled.
+ if prepareErr != nil {
+ if rbErr := ex.state.mu.txn.Rollback(ctx); rbErr != nil {
+ log.Warningf(ctx, "txn rollback failed: err=%s", rbErr)
+ }
+ }
+
+ // Dissociate the prepared transaction from the current session.
+ var p fsm.EventPayload
+ if prepareErr != nil {
+ p = eventTxnFinishPreparedErrPayload{err: prepareErr}
+ }
+ return eventTxnFinishPrepared{}, p
+}
+
+func (ex *connExecutor) execPrepareTransactionInOpenStateInternal(
+ ctx context.Context, s *tree.PrepareTransaction,
+) error {
+ // TODO(nvanbenschoten): Remove this logic when mixed-version support with
+ // v24.3 is no longer necessary.
+ if !ex.planner.EvalContext().Settings.Version.IsActive(ctx, clusterversion.V25_1_PreparedTransactionsTable) {
+ return pgerror.Newf(pgcode.FeatureNotSupported, "PREPARE TRANSACTION unsupported in mixed-version cluster")
+ }
+
+ // TODO(nvanbenschoten): why are these needed here (and in the equivalent
+ // functions for commit and rollback)? Shouldn't they be handled by
+ // connExecutor.resetExtraTxnState?
+ if err := ex.extraTxnState.sqlCursors.closeAll(cursorCloseForTxnCommit); err != nil {
+ return err
+ }
+ ex.extraTxnState.prepStmtsNamespace.closeAllPortals(ctx, &ex.extraTxnState.prepStmtsNamespaceMemAcc)
+
+ // Validate the global ID.
+ globalID := s.Transaction.RawString()
+ if len(globalID) >= maxPreparedTxnGlobalIDLen {
+ return pgerror.Newf(pgcode.InvalidParameterValue, "transaction identifier %q is too long", globalID)
+ }
+
+ // Validate that the transaction has not performed any incompatible operations
+ // which would prevent it from being prepared.
+ if ex.extraTxnState.descCollection.HasUncommittedDescriptors() {
+ return pgerror.Newf(pgcode.InvalidTransactionState,
+ "cannot prepare a transaction that has already performed schema changes")
+ }
+
+ txn := ex.state.mu.txn
+ txnID := txn.ID()
+ txnKey := txn.Key()
+ if !txn.IsOpen() {
+ return errors.AssertionFailedf("cannot prepare a transaction that is not open")
+ }
+
+ // Insert the prepared transaction's row into the system table. We do this
+ // before preparing the transaction in the KV layer so that we can track the
+ // existence of the prepared transaction in the event of a crash. We do this
+ // non-transactionally so that the system row is committed and readable while
+ // the transaction that it references remains in the PREPARED state.
+ err := ex.server.cfg.InternalDB.Txn(ctx, func(ctx context.Context, sqlTxn isql.Txn) error {
+ return insertPreparedTransaction(
+ ctx,
+ sqlTxn,
+ globalID,
+ txnID,
+ txnKey,
+ ex.sessionData().User().Normalized(),
+ ex.sessionData().Database,
+ )
+ })
+ if err != nil {
+ if pgerror.GetPGCode(err) == pgcode.UniqueViolation {
+ return pgerror.Newf(pgcode.DuplicateObject, "transaction identifier %q is already in use", globalID)
+ }
+ return err
+ }
+
+ // Move the transaction into the PREPARED state in the KV layer.
+ if err := ex.state.mu.txn.Prepare(ctx); err != nil {
+ // The transaction prepare failed. Try to roll it back. If we succeed, we
+ // can delete the row from system.prepared_transactions. If we fail, we log
+ // a warning and leave the row in place. Either way, we return the original
+ // error.
+ ex.cleanupAfterFailedPrepareTransaction(ctx, globalID)
+ return err
+ }
+
+ // TODO(nvanbenschoten): why is these needed here (and in the equivalent
+ // functions for commit and rollback)? Shouldn't it be handled by
+ // connExecutor.resetExtraTxnState?
+ if err := ex.reportSessionDataChanges(func() error {
+ ex.sessionDataStack.PopAll()
+ return nil
+ }); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (ex *connExecutor) cleanupAfterFailedPrepareTransaction(ctx context.Context, globalID string) {
+ // Try to rollback. We only want to cleanup the system.prepared_transactions
+ // row if we successfully rollback the transaction.
+ err := ex.state.mu.txn.Rollback(ctx)
+ if err != nil {
+ log.Warningf(ctx, "txn rollback failed: err=%s", err)
+ return
+ }
+ if ctx.Err() != nil {
+ // If the context has been canceled, the rollback may have moved to running
+ // async, so the absence of an error is not a guarantee that the rollback
+ // succeeded. Stop here.
+ return
+ }
+
+ // We believe we've rolled back the transaction successfully. Query the
+ // transaction record again to confirm, to be extra safe.
+ txn := ex.state.mu.txn
+ txnID := txn.ID()
+ txnKey := txn.Key()
+ txnRecord, err := queryPreparedTransactionRecord(ctx, ex.server.cfg.DB, txnID, txnKey)
+ if err != nil {
+ log.Warningf(ctx, "query prepared transaction record after rollback failed: %s", err)
+ return
+ }
+ if txnRecord.Status != roachpb.ABORTED {
+ log.Errorf(ctx, "prepared transaction %s not aborted after rollback: %v", globalID, txnRecord)
+ return
+ }
+
+ // We're certain that the transaction has been rolled back and that its record
+ // is not in the PREPARED state. Clean up the system.prepared_transactions row,
+ // non-transactionally.
+ err = ex.server.cfg.InternalDB.Txn(ctx, func(ctx context.Context, sqlTxn isql.Txn) error {
+ return deletePreparedTransaction(ctx, sqlTxn, globalID)
+ })
+ if err != nil {
+ log.Warningf(ctx, "cleanup prepared transaction row failed: %s", err)
+ }
+}
+
+// CommitPrepared commits a previously prepared transaction and deletes its
+// associated entry from the system.prepared_transactions table. This is called
+// from COMMIT PREPARED.
+func (p *planner) CommitPrepared(ctx context.Context, n *tree.CommitPrepared) (planNode, error) {
+ return p.endPreparedTxnNode(n.Transaction, true /* commit */), nil
+}
+
+// RollbackPrepared aborts a previously prepared transaction and deletes its
+// associated entry from the system.prepared_transactions table. This is called
+// from ROLLBACK PREPARED.
+func (p *planner) RollbackPrepared(
+ ctx context.Context, n *tree.RollbackPrepared,
+) (planNode, error) {
+ return p.endPreparedTxnNode(n.Transaction, false /* commit */), nil
+}
+
+type endPreparedTxnNode struct {
+ globalID string
+ commit bool
+}
+
+func (p *planner) endPreparedTxnNode(globalID *tree.StrVal, commit bool) *endPreparedTxnNode {
+ return &endPreparedTxnNode{
+ globalID: globalID.RawString(),
+ commit: commit,
+ }
+}
+
+func (f *endPreparedTxnNode) startExec(params runParams) error {
+ // TODO(nvanbenschoten): Remove this logic when mixed-version support with
+ // v24.3 is no longer necessary.
+ if !params.EvalContext().Settings.Version.IsActive(params.ctx, clusterversion.V25_1_PreparedTransactionsTable) {
+ return pgerror.Newf(pgcode.FeatureNotSupported, "%s unsupported in mixed-version cluster", f.stmtName())
+ }
+
+ if err := f.checkNoActiveTxn(params); err != nil {
+ return err
+ }
+
+ // Retrieve the prepared transaction's row from the system table.
+ txnID, txnKey, owner, err := f.selectPreparedTxn(params)
+ if err != nil {
+ return err
+ }
+
+ // Check privileges.
+ //
+ // From https://www.postgresql.org/docs/16/sql-commit-prepared.html and
+ // https://www.postgresql.org/docs/16/sql-rollback-prepared.html:
+ // > To commit / roll back a prepared transaction, you must be either the same
+ // > user that executed the transaction originally, or a superuser.
+ if params.SessionData().User().Normalized() != owner && !params.SessionData().IsSuperuser {
+ return errors.WithHint(pgerror.Newf(pgcode.InsufficientPrivilege,
+ "permission denied to finish prepared transaction"),
+ "Must be superuser or the user that prepared the transaction.")
+ }
+
+ // End (commit or roll back) the prepared transaction in the KV layer.
+ if err := f.endPreparedTxn(params, txnID, txnKey); err != nil {
+ return err
+ }
+
+ // Delete the prepared transaction's row from the system table.
+ //
+ // It is essential that we only delete the row after the transaction has been
+ // moved out of the PREPARED state in the KV layer. This is because the system
+ // table row is the only way to track the existence of a prepared transaction
+ // so that it can be moved out of the PREPARED state. If we were to delete the
+ // row before the transaction was moved out of the PREPARED state, we might
+ // lose track of the PREPARED transaction and leave it dangling indefinitely.
+ return f.deletePreparedTxn(params)
+}
+
+func (f *endPreparedTxnNode) stmtName() string {
+ if f.commit {
+ return "COMMIT PREPARED"
+ }
+ return "ROLLBACK PREPARED"
+}
+
+// checkNoActiveTxn checks that there is no active transaction in the current
+// session. If there is, it returns an error.
+func (f *endPreparedTxnNode) checkNoActiveTxn(params runParams) error {
+ if params.p.autoCommit {
+ return nil
+ }
+ return pgerror.Newf(pgcode.ActiveSQLTransaction,
+ "%s cannot run inside a transaction block", f.stmtName())
+}
+
+// selectPreparedTxn queries the prepared transaction from the system table and,
+// if found, returns the transaction ID, key, and owner. If the transaction is
+// not found, it returns an error.
+func (f *endPreparedTxnNode) selectPreparedTxn(
+ params runParams,
+) (txnID uuid.UUID, txnKey roachpb.Key, owner string, err error) {
+ row, err := selectPreparedTransaction(params.ctx, params.p.InternalSQLTxn(), f.globalID)
+ if err != nil {
+ return uuid.UUID{}, nil, "", err
+ }
+ if row == nil {
+ return uuid.UUID{}, nil, "", pgerror.Newf(pgcode.UndefinedObject,
+ "prepared transaction with identifier %q does not exist", f.globalID)
+ }
+
+ txnID = tree.MustBeDUuid(row[0]).UUID
+ if row[1] != tree.DNull {
+ txnKey = roachpb.Key(tree.MustBeDBytes(row[1]))
+ }
+ owner = string(tree.MustBeDString(row[2]))
+ return txnID, txnKey, owner, nil
+}
+
+// endPreparedTxn ends the prepared transaction by either committing or rolling
+// back the transaction in the KV layer.
+func (f *endPreparedTxnNode) endPreparedTxn(
+ params runParams, txnID uuid.UUID, txnKey roachpb.Key,
+) error {
+ // If the transaction had no key, then it was read-only and never wrote a
+ // transaction record. In this case, we don't need to do anything besides
+ // clean up the system.prepared_transactions row.
+ if txnKey == nil {
+ return nil
+ }
+
+ // Query the prepared transaction's record to determine its current status and
+ // to retrieve enough of its metadata to commit or rollback.
+ db := params.ExecCfg().DB
+ txnRecord, err := queryPreparedTransactionRecord(params.ctx, db, txnID, txnKey)
+ if err != nil {
+ return err
+ }
+ if txnRecord.Status.IsFinalized() {
+ // The transaction record has already been finalized. Just clean up the
+ // system.prepared_transactions row.
+ return nil
+ }
+ if txnRecord.Status != roachpb.PREPARED {
+ // The prepared transaction was never moved into the PREPARED state. This
+ // can happen if there was a crash after the system.prepared_transactions
+ // row was inserted but before the transaction record was PREPARED to
+ // commit. In this case, we can't commit the transaction, but we can still
+ // roll it back.
+ if f.commit {
+ return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
+ "prepared transaction with identifier %q not in PREPARED state, cannot COMMIT", f.globalID)
+ }
+ return nil
+ }
+
+ // Set the transaction's read timestamp to its write timestamp. This is a bit
+ // of a hack which falls out of the transaction record only storing the write
+ // timestamp and not the read timestamp (see Transaction.AsRecord). To issue a
+ // CommitPrepared or RollbackPrepared request, we need to have a read
+ // timestamp set. Since the transaction is successfully prepared, the read
+ // timestamp can safely be assumed to be equal to the write timestamp.
+ txnRecord.ReadTimestamp = txnRecord.WriteTimestamp
+
+ if f.commit {
+ err = db.CommitPrepared(params.ctx, txnRecord)
+ } else {
+ err = db.RollbackPrepared(params.ctx, txnRecord)
+ }
+ return err
+}
+
+// deletePreparedTxn deletes the prepared transaction from the system table.
+func (f *endPreparedTxnNode) deletePreparedTxn(params runParams) error {
+ return deletePreparedTransaction(params.ctx, params.p.InternalSQLTxn(), f.globalID)
+}
+
+func (f *endPreparedTxnNode) Next(params runParams) (bool, error) { return false, nil }
+func (f *endPreparedTxnNode) Values() tree.Datums { return tree.Datums{} }
+func (f *endPreparedTxnNode) Close(ctx context.Context) {}
+
+func insertPreparedTransaction(
+ ctx context.Context,
+ sqlTxn isql.Txn,
+ globalID string,
+ txnID uuid.UUID,
+ txnKey roachpb.Key,
+ owner, database string,
+) error {
+ _, err := sqlTxn.ExecEx(
+ ctx,
+ "insert-prepared-transaction",
+ sqlTxn.KV(),
+ sessiondata.NodeUserSessionDataOverride,
+ `INSERT INTO system.prepared_transactions
+ (global_id, transaction_id, transaction_key, owner, database)
+ VALUES ($1, $2, $3, $4, $5)`,
+ globalID,
+ txnID,
+ txnKey,
+ owner,
+ database,
+ )
+ return err
+}
+
+func deletePreparedTransaction(ctx context.Context, sqlTxn isql.Txn, globalID string) error {
+ _, err := sqlTxn.ExecEx(
+ ctx,
+ "delete-prepared-transaction",
+ sqlTxn.KV(),
+ sessiondata.NodeUserSessionDataOverride,
+ `DELETE FROM system.prepared_transactions WHERE global_id = $1`,
+ globalID,
+ )
+ return err
+}
+
+func selectPreparedTransaction(
+ ctx context.Context, sqlTxn isql.Txn, globalID string,
+) (tree.Datums, error) {
+ return sqlTxn.QueryRowEx(
+ ctx,
+ "select-prepared-txn",
+ sqlTxn.KV(),
+ sessiondata.NodeUserSessionDataOverride,
+ `SELECT transaction_id, transaction_key, owner
+ FROM system.prepared_transactions WHERE global_id = $1 FOR UPDATE`,
+ globalID,
+ )
+}
+
+func queryPreparedTransactionRecord(
+ ctx context.Context, db *kv.DB, txnID uuid.UUID, txnKey roachpb.Key,
+) (*roachpb.Transaction, error) {
+ ba := &kvpb.BatchRequest{}
+ ba.Add(&kvpb.QueryTxnRequest{
+ RequestHeader: kvpb.RequestHeader{
+ Key: txnKey,
+ },
+ Txn: enginepb.TxnMeta{
+ ID: txnID,
+ Key: txnKey,
+ },
+ })
+ br, pErr := db.NonTransactionalSender().Send(ctx, ba)
+ if pErr != nil {
+ return nil, pErr.GoError()
+ }
+ return &br.Responses[0].GetQueryTxn().QueriedTxn, nil
+}
diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go
index 123656530f32..85f3cfc1fa65 100644
--- a/pkg/sql/txn_restart_test.go
+++ b/pkg/sql/txn_restart_test.go
@@ -669,7 +669,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT);
// rollbackStrategy is the type of statement which a client can use to
// rollback aborted txns from retryable errors. We accept two statements
// for rolling back to the cockroach_restart savepoint. See
-// *Executor.execStmtInAbortedTxn for more about transaction retries.
+// *connExecutor.execStmtInAbortedState for more about transaction retries.
type rollbackStrategy int
const (
diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go
index 41acbefd29da..26dbe2fc5d0e 100644
--- a/pkg/sql/txn_state.go
+++ b/pkg/sql/txn_state.go
@@ -470,6 +470,10 @@ const (
// rolled back, not to a savepoint). It is generated when an implicit
// transaction fails and when an explicit transaction runs a ROLLBACK.
txnRollback
+ // txnPrepare means that the SQL transaction has been prepared and is now
+ // being dissociated from the session. It is generated when an explicit
+ // transaction runs a PREPARE TRANSACTION statement.
+ txnPrepare
// txnRestart means that the transaction is restarting. The iteration of the
// txn just finished will not commit. It is generated when we're about to
// auto-retry a txn and after a rollback to a savepoint placed at the start of
diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go
index 75b788e43017..1e11d02f5928 100644
--- a/pkg/sql/txn_state_test.go
+++ b/pkg/sql/txn_state_test.go
@@ -692,6 +692,36 @@ func TestTransitions(t *testing.T) {
// performed any operations, but it's not easy to do the test.
expTxn: &expKVTxn{},
},
+ {
+ // PREPARE TRANSACTION.
+ name: "Open + prepare",
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
+ s, ts := testCon.createOpenState(explicitTxn)
+ return s, ts, ts.mu.txn.ID(), nil
+ },
+ ev: eventTxnFinishPrepared{},
+ expState: stateNoTxn{},
+ expAdv: expAdvance{
+ expCode: advanceOne,
+ expEv: txnPrepare,
+ },
+ expTxn: nil,
+ },
+ {
+ // PREPARE TRANSACTION on an upgraded txn.
+ name: "Open (upgraded) + prepare",
+ init: func() (fsm.State, *txnState, uuid.UUID, error) {
+ s, ts := testCon.createOpenState(upgradedExplicitTxn)
+ return s, ts, ts.mu.txn.ID(), nil
+ },
+ ev: eventTxnFinishPrepared{},
+ expState: stateNoTxn{},
+ expAdv: expAdvance{
+ expCode: advanceOne,
+ expEv: txnPrepare,
+ },
+ expTxn: nil,
+ },
//
// Tests starting from the Aborted state.
//
diff --git a/pkg/sql/txneventtype_string.go b/pkg/sql/txneventtype_string.go
index 278e3bf994db..3a2eeaad5682 100644
--- a/pkg/sql/txneventtype_string.go
+++ b/pkg/sql/txneventtype_string.go
@@ -17,8 +17,9 @@ func _() {
_ = x[txnStart-1]
_ = x[txnCommit-2]
_ = x[txnRollback-3]
- _ = x[txnRestart-4]
- _ = x[txnUpgradeToExplicit-5]
+ _ = x[txnPrepare-4]
+ _ = x[txnRestart-5]
+ _ = x[txnUpgradeToExplicit-6]
}
func (i txnEventType) String() string {
@@ -31,6 +32,8 @@ func (i txnEventType) String() string {
return "txnCommit"
case txnRollback:
return "txnRollback"
+ case txnPrepare:
+ return "txnPrepare"
case txnRestart:
return "txnRestart"
case txnUpgradeToExplicit:
diff --git a/pkg/sql/txnstatetransitions_diagram.gv b/pkg/sql/txnstatetransitions_diagram.gv
index e106c0eaa532..78ca4eb92e25 100644
--- a/pkg/sql/txnstatetransitions_diagram.gv
+++ b/pkg/sql/txnstatetransitions_diagram.gv
@@ -48,6 +48,7 @@ digraph finite_state_machine {
"Open{ImplicitTxn:false, WasUpgraded:false}" -> "CommitWait{}" [label = SHOW COMMIT TIMESTAMP>]
"Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>]
"Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>]
+ "Open{ImplicitTxn:false, WasUpgraded:false}" -> "NoTxn{}" [label = PREPARE TRANSACTION>]
"Open{ImplicitTxn:false, WasUpgraded:false}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>]
"Open{ImplicitTxn:false, WasUpgraded:false}" -> "Open{ImplicitTxn:false, WasUpgraded:false}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>]
"Open{ImplicitTxn:false, WasUpgraded:true}" -> "Aborted{WasUpgraded:true}" [label = "NonRetriableErr{IsCommit:false}"]
@@ -60,6 +61,7 @@ digraph finite_state_machine {
"Open{ImplicitTxn:false, WasUpgraded:true}" -> "CommitWait{}" [label = SHOW COMMIT TIMESTAMP>]
"Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = ROLLBACK, or after a statement running as an implicit txn fails>]
"Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = COMMIT, or after a statement running as an implicit txn>]
+ "Open{ImplicitTxn:false, WasUpgraded:true}" -> "NoTxn{}" [label = PREPARE TRANSACTION>]
"Open{ImplicitTxn:false, WasUpgraded:true}" -> "CommitWait{}" [label = RELEASE SAVEPOINT cockroach_restart>]
"Open{ImplicitTxn:false, WasUpgraded:true}" -> "Open{ImplicitTxn:false, WasUpgraded:true}" [label = ROLLBACK TO SAVEPOINT cockroach_restart>]
"Open{ImplicitTxn:true, WasUpgraded:false}" -> "NoTxn{}" [label = "NonRetriableErr{IsCommit:false}"]
diff --git a/pkg/sql/txnstatetransitions_report.txt b/pkg/sql/txnstatetransitions_report.txt
index d3675a670476..cb6fd012b5d0 100644
--- a/pkg/sql/txnstatetransitions_report.txt
+++ b/pkg/sql/txnstatetransitions_report.txt
@@ -17,6 +17,7 @@ Aborted{WasUpgraded:false}
TxnFinishAbortedPLpgSQL{}
TxnFinishCommittedPLpgSQL{}
TxnFinishCommitted{}
+ TxnFinishPrepared{}
TxnReleased{}
TxnStart{ImplicitTxn:false}
TxnStart{ImplicitTxn:true}
@@ -38,6 +39,7 @@ Aborted{WasUpgraded:true}
TxnFinishAbortedPLpgSQL{}
TxnFinishCommittedPLpgSQL{}
TxnFinishCommitted{}
+ TxnFinishPrepared{}
TxnReleased{}
TxnStart{ImplicitTxn:false}
TxnStart{ImplicitTxn:true}
@@ -58,6 +60,7 @@ CommitWait{}
TxnFinishAbortedPLpgSQL{}
TxnFinishAborted{}
TxnFinishCommittedPLpgSQL{}
+ TxnFinishPrepared{}
TxnReleased{}
TxnRestart{}
TxnStart{ImplicitTxn:false}
@@ -81,6 +84,7 @@ NoTxn{}
TxnFinishAborted{}
TxnFinishCommittedPLpgSQL{}
TxnFinishCommitted{}
+ TxnFinishPrepared{}
TxnReleased{}
TxnRestart{}
TxnUpgradeToExplicit{}
@@ -96,6 +100,7 @@ Open{ImplicitTxn:false, WasUpgraded:false}
TxnCommittedWithShowCommitTimestamp{}
TxnFinishAborted{}
TxnFinishCommitted{}
+ TxnFinishPrepared{}
TxnReleased{}
TxnRestart{}
missing events:
@@ -117,6 +122,7 @@ Open{ImplicitTxn:false, WasUpgraded:true}
TxnCommittedWithShowCommitTimestamp{}
TxnFinishAborted{}
TxnFinishCommitted{}
+ TxnFinishPrepared{}
TxnReleased{}
TxnRestart{}
missing events:
@@ -143,6 +149,7 @@ Open{ImplicitTxn:true, WasUpgraded:false}
missing events:
SavepointRollback{}
TxnCommittedWithShowCommitTimestamp{}
+ TxnFinishPrepared{}
TxnReleased{}
TxnRestart{}
TxnStart{ImplicitTxn:false}
@@ -164,6 +171,7 @@ Open{ImplicitTxn:true, WasUpgraded:true}
TxnCommittedWithShowCommitTimestamp{}
TxnFinishAbortedPLpgSQL{}
TxnFinishCommittedPLpgSQL{}
+ TxnFinishPrepared{}
TxnReleased{}
TxnRestart{}
TxnStart{ImplicitTxn:false}
diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go
index c26c43d01a44..c265786768f5 100644
--- a/pkg/sql/walk.go
+++ b/pkg/sql/walk.go
@@ -416,6 +416,7 @@ var planNodeNames = map[reflect.Type]string{
reflect.TypeOf(&exportNode{}): "export",
reflect.TypeOf(&fetchNode{}): "fetch",
reflect.TypeOf(&filterNode{}): "filter",
+ reflect.TypeOf(&endPreparedTxnNode{}): "commit/rollback prepared",
reflect.TypeOf(&GrantRoleNode{}): "grant role",
reflect.TypeOf(&groupNode{}): "group",
reflect.TypeOf(&hookFnNode{}): "plugin",