@@ -1864,6 +1864,17 @@ class Rdb_transaction {
1864
1864
bool m_is_delayed_snapshot = false ;
1865
1865
bool m_is_two_phase = false ;
1866
1866
1867
+ private:
1868
+ /* Number of RockDB savepoints taken */
1869
+ int m_n_savepoints;
1870
+ /*
1871
+ Number of write operations this transaction had when we took the last
1872
+ savepoint (the idea is not to take another savepoint if we haven't made
1873
+ any changes)
1874
+ */
1875
+ ulonglong m_writes_at_last_savepoint;
1876
+
1877
+ protected:
1867
1878
THD *m_thd = nullptr ;
1868
1879
1869
1880
static std::multiset<Rdb_transaction *> s_tx_list;
@@ -1913,6 +1924,14 @@ class Rdb_transaction {
1913
1924
return s;
1914
1925
}
1915
1926
1927
+ protected:
1928
+ /*
1929
+ The following two are helper functions to be overloaded by child classes.
1930
+ They should provide RocksDB's savepoint semantics.
1931
+ */
1932
+ virtual void do_set_savepoint () = 0;
1933
+ virtual void do_rollback_to_savepoint () = 0;
1934
+
1916
1935
public:
1917
1936
rocksdb::ReadOptions m_read_opts;
1918
1937
const char *m_mysql_log_file_name;
@@ -2399,6 +2418,50 @@ class Rdb_transaction {
2399
2418
virtual bool is_tx_started () const = 0;
2400
2419
virtual void start_tx () = 0;
2401
2420
virtual void start_stmt () = 0;
2421
+
2422
+ void set_initial_savepoint () {
2423
+ /*
2424
+ Set the initial savepoint. If the first statement in the transaction
2425
+ fails, we need something to roll back to, without rolling back the
2426
+ entire transaction.
2427
+ */
2428
+ do_set_savepoint ();
2429
+ m_n_savepoints= 1 ;
2430
+ m_writes_at_last_savepoint= m_write_count;
2431
+ }
2432
+
2433
+ /*
2434
+ Called when a "top-level" statement inside a transaction completes
2435
+ successfully and its changes become part of the transaction's changes.
2436
+ */
2437
+ void make_stmt_savepoint_permanent () {
2438
+
2439
+ // Take another RocksDB savepoint only if we had changes since the last
2440
+ // one. This is very important for long transactions doing lots of
2441
+ // SELECTs.
2442
+ if (m_writes_at_last_savepoint != m_write_count)
2443
+ {
2444
+ do_set_savepoint ();
2445
+ m_writes_at_last_savepoint= m_write_count;
2446
+ m_n_savepoints++;
2447
+ }
2448
+ }
2449
+
2450
+
2451
+ /*
2452
+ Rollback to the savepoint we've set before the last statement
2453
+ */
2454
+ void rollback_to_stmt_savepoint () {
2455
+ if (m_writes_at_last_savepoint != m_write_count) {
2456
+ do_rollback_to_savepoint ();
2457
+ if (!--m_n_savepoints) {
2458
+ do_set_savepoint ();
2459
+ m_n_savepoints= 1 ;
2460
+ }
2461
+ m_writes_at_last_savepoint= m_write_count;
2462
+ }
2463
+ }
2464
+
2402
2465
virtual void rollback_stmt () = 0;
2403
2466
2404
2467
void set_tx_failed (bool failed_arg) { m_is_tx_failed = failed_arg; }
@@ -2725,9 +2788,20 @@ class Rdb_transaction_impl : public Rdb_transaction {
2725
2788
2726
2789
m_read_opts = rocksdb::ReadOptions ();
2727
2790
2791
+ set_initial_savepoint ();
2792
+
2728
2793
m_ddl_transaction = false ;
2729
2794
}
2730
2795
2796
+ /* Implementations of do_*savepoint based on rocksdB::Transaction savepoints */
2797
+ void do_set_savepoint () override {
2798
+ m_rocksdb_tx->SetSavePoint ();
2799
+ }
2800
+
2801
+ void do_rollback_to_savepoint () override {
2802
+ m_rocksdb_tx->RollbackToSavePoint ();
2803
+ }
2804
+
2731
2805
/*
2732
2806
Start a statement inside a multi-statement transaction.
2733
2807
@@ -2740,7 +2814,6 @@ class Rdb_transaction_impl : public Rdb_transaction {
2740
2814
void start_stmt () override {
2741
2815
// Set the snapshot to delayed acquisition (SetSnapshotOnNextOperation)
2742
2816
acquire_snapshot (false );
2743
- m_rocksdb_tx->SetSavePoint ();
2744
2817
}
2745
2818
2746
2819
/*
@@ -2751,7 +2824,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
2751
2824
/* TODO: here we must release the locks taken since the start_stmt() call */
2752
2825
if (m_rocksdb_tx) {
2753
2826
const rocksdb::Snapshot *const org_snapshot = m_rocksdb_tx->GetSnapshot ();
2754
- m_rocksdb_tx-> RollbackToSavePoint ();
2827
+ rollback_to_stmt_savepoint ();
2755
2828
2756
2829
const rocksdb::Snapshot *const cur_snapshot = m_rocksdb_tx->GetSnapshot ();
2757
2830
if (org_snapshot != cur_snapshot) {
@@ -2841,6 +2914,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
2841
2914
return res;
2842
2915
}
2843
2916
2917
+ /* Implementations of do_*savepoint based on rocksdB::WriteBatch savepoints */
2918
+ void do_set_savepoint () override {
2919
+ m_batch->SetSavePoint ();
2920
+ }
2921
+
2922
+ void do_rollback_to_savepoint () override {
2923
+ m_batch->RollbackToSavePoint ();
2924
+ }
2925
+
2844
2926
public:
2845
2927
bool is_writebatch_trx () const override { return true ; }
2846
2928
@@ -2946,13 +3028,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
2946
3028
write_opts.disableWAL = THDVAR (m_thd, write_disable_wal);
2947
3029
write_opts.ignore_missing_column_families =
2948
3030
THDVAR (m_thd, write_ignore_missing_column_families);
3031
+
3032
+ set_initial_savepoint ();
2949
3033
}
2950
3034
2951
- void start_stmt () override { m_batch-> SetSavePoint (); }
3035
+ void start_stmt () override {}
2952
3036
2953
3037
void rollback_stmt () override {
2954
3038
if (m_batch)
2955
- m_batch-> RollbackToSavePoint ();
3039
+ rollback_to_stmt_savepoint ();
2956
3040
}
2957
3041
2958
3042
explicit Rdb_writebatch_impl (THD *const thd)
@@ -3161,6 +3245,8 @@ static int rocksdb_prepare(handlerton *const hton, THD *const thd,
3161
3245
3162
3246
DEBUG_SYNC (thd, " rocksdb.prepared" );
3163
3247
}
3248
+ else
3249
+ tx->make_stmt_savepoint_permanent ();
3164
3250
3165
3251
return HA_EXIT_SUCCESS;
3166
3252
}
@@ -3332,11 +3418,9 @@ static int rocksdb_commit(handlerton *const hton, THD *const thd,
3332
3418
} else {
3333
3419
/*
3334
3420
We get here when committing a statement within a transaction.
3335
-
3336
- We don't need to do anything here. tx->start_stmt() will notify
3337
- Rdb_transaction_impl that another statement has started.
3338
3421
*/
3339
3422
tx->set_tx_failed (false );
3423
+ tx->make_stmt_savepoint_permanent ();
3340
3424
}
3341
3425
3342
3426
if (my_core::thd_tx_isolation (thd) <= ISO_READ_COMMITTED) {
@@ -10341,22 +10425,24 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) {
10341
10425
}
10342
10426
10343
10427
if (lock_type == F_UNLCK) {
10344
- Rdb_transaction *const tx = get_or_create_tx (thd);
10428
+ Rdb_transaction *const tx = get_tx_from_thd (thd);
10345
10429
10346
- tx->io_perf_end_and_record (&m_io_perf);
10347
- tx->m_n_mysql_tables_in_use --;
10348
- if (tx->m_n_mysql_tables_in_use == 0 &&
10349
- !my_core::thd_test_options (thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
10350
- /*
10351
- Do like InnoDB: when we get here, it's time to commit a
10352
- single-statement transaction.
10430
+ if (tx) {
10431
+ tx->io_perf_end_and_record (&m_io_perf);
10432
+ tx->m_n_mysql_tables_in_use --;
10433
+ if (tx->m_n_mysql_tables_in_use == 0 &&
10434
+ !my_core::thd_test_options (thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
10435
+ /*
10436
+ Do like InnoDB: when we get here, it's time to commit a
10437
+ single-statement transaction.
10353
10438
10354
- If the statement involved multiple tables, this code will be executed
10355
- for each of them, but that's ok because non-first tx->commit() calls
10356
- will be no-ops.
10357
- */
10358
- if (tx->commit_or_rollback ()) {
10359
- res = HA_ERR_INTERNAL_ERROR;
10439
+ If the statement involved multiple tables, this code will be executed
10440
+ for each of them, but that's ok because non-first tx->commit() calls
10441
+ will be no-ops.
10442
+ */
10443
+ if (tx->commit_or_rollback ()) {
10444
+ res = HA_ERR_INTERNAL_ERROR;
10445
+ }
10360
10446
}
10361
10447
}
10362
10448
} else {
0 commit comments