@@ -1864,6 +1864,17 @@ class Rdb_transaction {
18641864 bool m_is_delayed_snapshot = false ;
18651865 bool m_is_two_phase = false ;
18661866
1867+ private:
1868+ /* Number of RockDB savepoints taken */
1869+ int m_n_savepoints;
1870+ /*
1871+ Number of write operations this transaction had when we took the last
1872+ savepoint (the idea is not to take another savepoint if we haven't made
1873+ any changes)
1874+ */
1875+ ulonglong m_writes_at_last_savepoint;
1876+
1877+ protected:
18671878 THD *m_thd = nullptr ;
18681879
18691880 static std::multiset<Rdb_transaction *> s_tx_list;
@@ -1913,6 +1924,14 @@ class Rdb_transaction {
19131924 return s;
19141925 }
19151926
1927+ protected:
1928+ /*
1929+ The following two are helper functions to be overloaded by child classes.
1930+ They should provide RocksDB's savepoint semantics.
1931+ */
1932+ virtual void do_set_savepoint () = 0;
1933+ virtual void do_rollback_to_savepoint () = 0;
1934+
19161935 public:
19171936 rocksdb::ReadOptions m_read_opts;
19181937 const char *m_mysql_log_file_name;
@@ -2399,6 +2418,50 @@ class Rdb_transaction {
23992418 virtual bool is_tx_started () const = 0;
24002419 virtual void start_tx () = 0;
24012420 virtual void start_stmt () = 0;
2421+
2422+ void set_initial_savepoint () {
2423+ /*
2424+ Set the initial savepoint. If the first statement in the transaction
2425+ fails, we need something to roll back to, without rolling back the
2426+ entire transaction.
2427+ */
2428+ do_set_savepoint ();
2429+ m_n_savepoints= 1 ;
2430+ m_writes_at_last_savepoint= m_write_count;
2431+ }
2432+
2433+ /*
2434+ Called when a "top-level" statement inside a transaction completes
2435+ successfully and its changes become part of the transaction's changes.
2436+ */
2437+ void make_stmt_savepoint_permanent () {
2438+
2439+ // Take another RocksDB savepoint only if we had changes since the last
2440+ // one. This is very important for long transactions doing lots of
2441+ // SELECTs.
2442+ if (m_writes_at_last_savepoint != m_write_count)
2443+ {
2444+ do_set_savepoint ();
2445+ m_writes_at_last_savepoint= m_write_count;
2446+ m_n_savepoints++;
2447+ }
2448+ }
2449+
2450+
2451+ /*
2452+ Rollback to the savepoint we've set before the last statement
2453+ */
2454+ void rollback_to_stmt_savepoint () {
2455+ if (m_writes_at_last_savepoint != m_write_count) {
2456+ do_rollback_to_savepoint ();
2457+ if (!--m_n_savepoints) {
2458+ do_set_savepoint ();
2459+ m_n_savepoints= 1 ;
2460+ }
2461+ m_writes_at_last_savepoint= m_write_count;
2462+ }
2463+ }
2464+
24022465 virtual void rollback_stmt () = 0;
24032466
24042467 void set_tx_failed (bool failed_arg) { m_is_tx_failed = failed_arg; }
@@ -2725,9 +2788,20 @@ class Rdb_transaction_impl : public Rdb_transaction {
27252788
27262789 m_read_opts = rocksdb::ReadOptions ();
27272790
2791+ set_initial_savepoint ();
2792+
27282793 m_ddl_transaction = false ;
27292794 }
27302795
2796+ /* Implementations of do_*savepoint based on rocksdB::Transaction savepoints */
2797+ void do_set_savepoint () override {
2798+ m_rocksdb_tx->SetSavePoint ();
2799+ }
2800+
2801+ void do_rollback_to_savepoint () override {
2802+ m_rocksdb_tx->RollbackToSavePoint ();
2803+ }
2804+
27312805 /*
27322806 Start a statement inside a multi-statement transaction.
27332807
@@ -2740,7 +2814,6 @@ class Rdb_transaction_impl : public Rdb_transaction {
27402814 void start_stmt () override {
27412815 // Set the snapshot to delayed acquisition (SetSnapshotOnNextOperation)
27422816 acquire_snapshot (false );
2743- m_rocksdb_tx->SetSavePoint ();
27442817 }
27452818
27462819 /*
@@ -2751,7 +2824,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
27512824 /* TODO: here we must release the locks taken since the start_stmt() call */
27522825 if (m_rocksdb_tx) {
27532826 const rocksdb::Snapshot *const org_snapshot = m_rocksdb_tx->GetSnapshot ();
2754- m_rocksdb_tx-> RollbackToSavePoint ();
2827+ rollback_to_stmt_savepoint ();
27552828
27562829 const rocksdb::Snapshot *const cur_snapshot = m_rocksdb_tx->GetSnapshot ();
27572830 if (org_snapshot != cur_snapshot) {
@@ -2841,6 +2914,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
28412914 return res;
28422915 }
28432916
2917+ /* Implementations of do_*savepoint based on rocksdB::WriteBatch savepoints */
2918+ void do_set_savepoint () override {
2919+ m_batch->SetSavePoint ();
2920+ }
2921+
2922+ void do_rollback_to_savepoint () override {
2923+ m_batch->RollbackToSavePoint ();
2924+ }
2925+
28442926public:
28452927 bool is_writebatch_trx () const override { return true ; }
28462928
@@ -2946,13 +3028,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
29463028 write_opts.disableWAL = THDVAR (m_thd, write_disable_wal);
29473029 write_opts.ignore_missing_column_families =
29483030 THDVAR (m_thd, write_ignore_missing_column_families);
3031+
3032+ set_initial_savepoint ();
29493033 }
29503034
2951- void start_stmt () override { m_batch-> SetSavePoint (); }
3035+ void start_stmt () override {}
29523036
29533037 void rollback_stmt () override {
29543038 if (m_batch)
2955- m_batch-> RollbackToSavePoint ();
3039+ rollback_to_stmt_savepoint ();
29563040 }
29573041
29583042 explicit Rdb_writebatch_impl (THD *const thd)
@@ -3161,6 +3245,8 @@ static int rocksdb_prepare(handlerton *const hton, THD *const thd,
31613245
31623246 DEBUG_SYNC (thd, " rocksdb.prepared" );
31633247 }
3248+ else
3249+ tx->make_stmt_savepoint_permanent ();
31643250
31653251 return HA_EXIT_SUCCESS;
31663252}
@@ -3332,11 +3418,9 @@ static int rocksdb_commit(handlerton *const hton, THD *const thd,
33323418 } else {
33333419 /*
33343420 We get here when committing a statement within a transaction.
3335-
3336- We don't need to do anything here. tx->start_stmt() will notify
3337- Rdb_transaction_impl that another statement has started.
33383421 */
33393422 tx->set_tx_failed (false );
3423+ tx->make_stmt_savepoint_permanent ();
33403424 }
33413425
33423426 if (my_core::thd_tx_isolation (thd) <= ISO_READ_COMMITTED) {
@@ -10341,22 +10425,24 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) {
1034110425 }
1034210426
1034310427 if (lock_type == F_UNLCK) {
10344- Rdb_transaction *const tx = get_or_create_tx (thd);
10428+ Rdb_transaction *const tx = get_tx_from_thd (thd);
1034510429
10346- tx->io_perf_end_and_record (&m_io_perf);
10347- tx->m_n_mysql_tables_in_use --;
10348- if (tx->m_n_mysql_tables_in_use == 0 &&
10349- !my_core::thd_test_options (thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
10350- /*
10351- Do like InnoDB: when we get here, it's time to commit a
10352- single-statement transaction.
10430+ if (tx) {
10431+ tx->io_perf_end_and_record (&m_io_perf);
10432+ tx->m_n_mysql_tables_in_use --;
10433+ if (tx->m_n_mysql_tables_in_use == 0 &&
10434+ !my_core::thd_test_options (thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
10435+ /*
10436+ Do like InnoDB: when we get here, it's time to commit a
10437+ single-statement transaction.
1035310438
10354- If the statement involved multiple tables, this code will be executed
10355- for each of them, but that's ok because non-first tx->commit() calls
10356- will be no-ops.
10357- */
10358- if (tx->commit_or_rollback ()) {
10359- res = HA_ERR_INTERNAL_ERROR;
10439+ If the statement involved multiple tables, this code will be executed
10440+ for each of them, but that's ok because non-first tx->commit() calls
10441+ will be no-ops.
10442+ */
10443+ if (tx->commit_or_rollback ()) {
10444+ res = HA_ERR_INTERNAL_ERROR;
10445+ }
1036010446 }
1036110447 }
1036210448 } else {
0 commit comments