@@ -1772,6 +1772,17 @@ class Rdb_transaction {
17721772 bool m_is_delayed_snapshot = false ;
17731773 bool m_is_two_phase = false ;
17741774
1775+ private:
1776+ /* Number of RockDB savepoints taken */
1777+ int m_n_savepoints;
1778+ /*
1779+ Number of write operations this transaction had when we took the last
1780+ savepoint (the idea is not to take another savepoint if we haven't made
1781+ any changes)
1782+ */
1783+ ulonglong m_writes_at_last_savepoint;
1784+
1785+ protected:
17751786 THD *m_thd = nullptr ;
17761787
17771788 rocksdb::ReadOptions m_read_opts;
@@ -1799,6 +1810,14 @@ class Rdb_transaction {
17991810 virtual rocksdb::Iterator *
18001811 get_iterator (const rocksdb::ReadOptions &options,
18011812 rocksdb::ColumnFamilyHandle *column_family) = 0 ;
1813+
1814+ protected:
1815+ /*
1816+ The following two are helper functions to be overloaded by child classes.
1817+ They should provide RocksDB's savepoint semantics.
1818+ */
1819+ virtual void do_set_savepoint () = 0;
1820+ virtual void do_rollback_to_savepoint () = 0;
18021821
18031822public:
18041823 const char *m_mysql_log_file_name;
@@ -2173,6 +2192,50 @@ class Rdb_transaction {
21732192 virtual bool is_tx_started () const = 0;
21742193 virtual void start_tx () = 0;
21752194 virtual void start_stmt () = 0;
2195+
2196+ void set_initial_savepoint () {
2197+ /*
2198+ Set the initial savepoint. If the first statement in the transaction
2199+ fails, we need something to roll back to, without rolling back the
2200+ entire transaction.
2201+ */
2202+ do_set_savepoint ();
2203+ m_n_savepoints= 1 ;
2204+ m_writes_at_last_savepoint= m_write_count;
2205+ }
2206+
2207+ /*
2208+ Called when a "top-level" statement inside a transaction completes
2209+ successfully and its changes become part of the transaction's changes.
2210+ */
2211+ void make_stmt_savepoint_permanent () {
2212+
2213+ // Take another RocksDB savepoint only if we had changes since the last
2214+ // one. This is very important for long transactions doing lots of
2215+ // SELECTs.
2216+ if (m_writes_at_last_savepoint != m_write_count)
2217+ {
2218+ do_set_savepoint ();
2219+ m_writes_at_last_savepoint= m_write_count;
2220+ m_n_savepoints++;
2221+ }
2222+ }
2223+
2224+
2225+ /*
2226+ Rollback to the savepoint we've set before the last statement
2227+ */
2228+ void rollback_to_stmt_savepoint () {
2229+ if (m_writes_at_last_savepoint != m_write_count) {
2230+ do_rollback_to_savepoint ();
2231+ if (!--m_n_savepoints) {
2232+ do_set_savepoint ();
2233+ m_n_savepoints= 1 ;
2234+ }
2235+ m_writes_at_last_savepoint= m_write_count;
2236+ }
2237+ }
2238+
21762239 virtual void rollback_stmt () = 0;
21772240
21782241 void set_tx_failed (bool failed_arg) { m_is_tx_failed = failed_arg; }
@@ -2462,9 +2525,20 @@ class Rdb_transaction_impl : public Rdb_transaction {
24622525
24632526 m_read_opts = rocksdb::ReadOptions ();
24642527
2528+ set_initial_savepoint ();
2529+
24652530 m_ddl_transaction = false ;
24662531 }
24672532
2533+ /* Implementations of do_*savepoint based on rocksdB::Transaction savepoints */
2534+ void do_set_savepoint () override {
2535+ m_rocksdb_tx->SetSavePoint ();
2536+ }
2537+
2538+ void do_rollback_to_savepoint () override {
2539+ m_rocksdb_tx->RollbackToSavePoint ();
2540+ }
2541+
24682542 /*
24692543 Start a statement inside a multi-statement transaction.
24702544
@@ -2477,7 +2551,6 @@ class Rdb_transaction_impl : public Rdb_transaction {
24772551 void start_stmt () override {
24782552 // Set the snapshot to delayed acquisition (SetSnapshotOnNextOperation)
24792553 acquire_snapshot (false );
2480- m_rocksdb_tx->SetSavePoint ();
24812554 }
24822555
24832556 /*
@@ -2488,7 +2561,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
24882561 /* TODO: here we must release the locks taken since the start_stmt() call */
24892562 if (m_rocksdb_tx) {
24902563 const rocksdb::Snapshot *const org_snapshot = m_rocksdb_tx->GetSnapshot ();
2491- m_rocksdb_tx-> RollbackToSavePoint ();
2564+ rollback_to_stmt_savepoint ();
24922565
24932566 const rocksdb::Snapshot *const cur_snapshot = m_rocksdb_tx->GetSnapshot ();
24942567 if (org_snapshot != cur_snapshot) {
@@ -2565,6 +2638,16 @@ class Rdb_writebatch_impl : public Rdb_transaction {
25652638 return res;
25662639 }
25672640
2641+ protected:
2642+ /* Implementations of do_*savepoint based on rocksdB::WriteBatch savepoints */
2643+ void do_set_savepoint () override {
2644+ m_batch->SetSavePoint ();
2645+ }
2646+
2647+ void do_rollback_to_savepoint () override {
2648+ m_batch->RollbackToSavePoint ();
2649+ }
2650+
25682651public:
25692652 bool is_writebatch_trx () const override { return true ; }
25702653
@@ -2670,13 +2753,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
26702753 write_opts.disableWAL = THDVAR (m_thd, write_disable_wal);
26712754 write_opts.ignore_missing_column_families =
26722755 THDVAR (m_thd, write_ignore_missing_column_families);
2756+
2757+ set_initial_savepoint ();
26732758 }
26742759
2675- void start_stmt () override { m_batch-> SetSavePoint (); }
2760+ void start_stmt () override {}
26762761
26772762 void rollback_stmt () override {
26782763 if (m_batch)
2679- m_batch-> RollbackToSavePoint ();
2764+ rollback_to_stmt_savepoint ();
26802765 }
26812766
26822767 explicit Rdb_writebatch_impl (THD *const thd)
@@ -2922,6 +3007,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
29223007
29233008 DEBUG_SYNC (thd, " rocksdb.prepared" );
29243009 }
3010+ else
3011+ tx->make_stmt_savepoint_permanent ();
29253012 return HA_EXIT_SUCCESS;
29263013}
29273014
@@ -3172,11 +3259,8 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
31723259 } else {
31733260 /*
31743261 We get here when committing a statement within a transaction.
3175-
3176- We don't need to do anything here. tx->start_stmt() will notify
3177- Rdb_transaction_impl that another statement has started.
31783262 */
3179- tx->set_tx_failed ( false );
3263+ tx->make_stmt_savepoint_permanent ( );
31803264 }
31813265
31823266 if (my_core::thd_tx_isolation (thd) <= ISO_READ_COMMITTED) {
@@ -10063,22 +10147,24 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) {
1006310147 }
1006410148
1006510149 if (lock_type == F_UNLCK) {
10066- Rdb_transaction *const tx = get_or_create_tx (thd);
10150+ Rdb_transaction *const tx = get_tx_from_thd (thd);
1006710151
10068- tx->io_perf_end_and_record (&m_io_perf);
10069- tx->m_n_mysql_tables_in_use --;
10070- if (tx->m_n_mysql_tables_in_use == 0 &&
10071- !my_core::thd_test_options (thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
10072- /*
10073- Do like InnoDB: when we get here, it's time to commit a
10074- single-statement transaction.
10152+ if (tx) {
10153+ tx->io_perf_end_and_record (&m_io_perf);
10154+ tx->m_n_mysql_tables_in_use --;
10155+ if (tx->m_n_mysql_tables_in_use == 0 &&
10156+ !my_core::thd_test_options (thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
10157+ /*
10158+ Do like InnoDB: when we get here, it's time to commit a
10159+ single-statement transaction.
1007510160
10076- If the statement involved multiple tables, this code will be executed
10077- for each of them, but that's ok because non-first tx->commit() calls
10078- will be no-ops.
10079- */
10080- if (tx->commit_or_rollback ()) {
10081- res = HA_ERR_INTERNAL_ERROR;
10161+ If the statement involved multiple tables, this code will be executed
10162+ for each of them, but that's ok because non-first tx->commit() calls
10163+ will be no-ops.
10164+ */
10165+ if (tx->commit_or_rollback ()) {
10166+ res = HA_ERR_INTERNAL_ERROR;
10167+ }
1008210168 }
1008310169 }
1008410170 } else {
0 commit comments