Skip to content

Commit

Permalink
=?UTF-8?q?Issue=20#802:=20MyRocks:=20Statement=20rollback?= =?UTF-8?q…
Browse files Browse the repository at this point in the history
…?=20doesnt=20work=20correctly=20for=20nes=E2=80=A6=20(facebook#804)?= (facebook#804)

Summary:
…ted statements

Variant #1: When the statement fails, we should roll back to the latest
savepoint taken at the top level.
Closes facebook#804

Differential Revision: D7509380

Pulled By: hermanlee
  • Loading branch information
spetrunia authored and inikep committed Mar 28, 2023
1 parent 3627877 commit 04036a0
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 21 deletions.
24 changes: 24 additions & 0 deletions mysql-test/suite/rocksdb/r/transaction.result
Original file line number Diff line number Diff line change
Expand Up @@ -934,3 +934,27 @@ value
3
rollback;
drop table t1;
#
# #802: MyRocks: Statement rollback doesnt work correctly for nested statements
#
create table t1 (a varchar(100)) engine=rocksdb;
create table t2(a int) engine=rocksdb;
insert into t2 values (1), (2);
create table t3(a varchar(100)) engine=rocksdb;
create function func() returns varchar(100) deterministic
begin
insert into t3 values ('func-called');
set @a= (select a from t2);
return 'func-returned';
end;//
begin;
insert into t1 values (func());
ERROR 21000: Subquery returns more than 1 row
select * from t1;
a
# The following must not produce 'func-called':
select * from t3;
a
rollback;
drop function func;
drop table t1,t2,t3;
30 changes: 30 additions & 0 deletions mysql-test/suite/rocksdb/t/transaction.test
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,33 @@ update t1 set id=115 where id=3;
rollback;

drop table t1;

--echo #
--echo # #802: MyRocks: Statement rollback doesnt work correctly for nested statements
--echo #
create table t1 (a varchar(100)) engine=rocksdb;
create table t2(a int) engine=rocksdb;
insert into t2 values (1), (2);

create table t3(a varchar(100)) engine=rocksdb;

delimiter //;
create function func() returns varchar(100) deterministic
begin
insert into t3 values ('func-called');
set @a= (select a from t2);
return 'func-returned';
end;//
delimiter ;//

begin;
--error ER_SUBQUERY_NO_1_ROW
insert into t1 values (func());
select * from t1;
--echo # The following must not produce 'func-called':
select * from t3;

rollback;
drop function func;
drop table t1,t2,t3;

128 changes: 107 additions & 21 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,17 @@ class Rdb_transaction {
bool m_is_delayed_snapshot = false;
bool m_is_two_phase = false;

private:
/* Number of RockDB savepoints taken */
int m_n_savepoints;
/*
Number of write operations this transaction had when we took the last
savepoint (the idea is not to take another savepoint if we haven't made
any changes)
*/
ulonglong m_writes_at_last_savepoint;

protected:
THD *m_thd = nullptr;

static std::multiset<Rdb_transaction *> s_tx_list;
Expand Down Expand Up @@ -1913,6 +1924,14 @@ class Rdb_transaction {
return s;
}

protected:
/*
The following two are helper functions to be overloaded by child classes.
They should provide RocksDB's savepoint semantics.
*/
virtual void do_set_savepoint() = 0;
virtual void do_rollback_to_savepoint() = 0;

public:
rocksdb::ReadOptions m_read_opts;
const char *m_mysql_log_file_name;
Expand Down Expand Up @@ -2399,6 +2418,50 @@ class Rdb_transaction {
virtual bool is_tx_started() const = 0;
virtual void start_tx() = 0;
virtual void start_stmt() = 0;

void set_initial_savepoint() {
/*
Set the initial savepoint. If the first statement in the transaction
fails, we need something to roll back to, without rolling back the
entire transaction.
*/
do_set_savepoint();
m_n_savepoints= 1;
m_writes_at_last_savepoint= m_write_count;
}

/*
Called when a "top-level" statement inside a transaction completes
successfully and its changes become part of the transaction's changes.
*/
void make_stmt_savepoint_permanent() {

// Take another RocksDB savepoint only if we had changes since the last
// one. This is very important for long transactions doing lots of
// SELECTs.
if (m_writes_at_last_savepoint != m_write_count)
{
do_set_savepoint();
m_writes_at_last_savepoint= m_write_count;
m_n_savepoints++;
}
}


/*
Rollback to the savepoint we've set before the last statement
*/
void rollback_to_stmt_savepoint() {
if (m_writes_at_last_savepoint != m_write_count) {
do_rollback_to_savepoint();
if (!--m_n_savepoints) {
do_set_savepoint();
m_n_savepoints= 1;
}
m_writes_at_last_savepoint= m_write_count;
}
}

virtual void rollback_stmt() = 0;

void set_tx_failed(bool failed_arg) { m_is_tx_failed = failed_arg; }
Expand Down Expand Up @@ -2725,9 +2788,20 @@ class Rdb_transaction_impl : public Rdb_transaction {

m_read_opts = rocksdb::ReadOptions();

set_initial_savepoint();

m_ddl_transaction = false;
}

/* Implementations of do_*savepoint based on rocksdB::Transaction savepoints */
void do_set_savepoint() override {
m_rocksdb_tx->SetSavePoint();
}

void do_rollback_to_savepoint() override {
m_rocksdb_tx->RollbackToSavePoint();
}

/*
Start a statement inside a multi-statement transaction.
Expand All @@ -2740,7 +2814,6 @@ class Rdb_transaction_impl : public Rdb_transaction {
void start_stmt() override {
// Set the snapshot to delayed acquisition (SetSnapshotOnNextOperation)
acquire_snapshot(false);
m_rocksdb_tx->SetSavePoint();
}

/*
Expand All @@ -2751,7 +2824,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
/* TODO: here we must release the locks taken since the start_stmt() call */
if (m_rocksdb_tx) {
const rocksdb::Snapshot *const org_snapshot = m_rocksdb_tx->GetSnapshot();
m_rocksdb_tx->RollbackToSavePoint();
rollback_to_stmt_savepoint();

const rocksdb::Snapshot *const cur_snapshot = m_rocksdb_tx->GetSnapshot();
if (org_snapshot != cur_snapshot) {
Expand Down Expand Up @@ -2841,6 +2914,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
return res;
}

/* Implementations of do_*savepoint based on rocksdB::WriteBatch savepoints */
void do_set_savepoint() override {
m_batch->SetSavePoint();
}

void do_rollback_to_savepoint() override {
m_batch->RollbackToSavePoint();
}

public:
bool is_writebatch_trx() const override { return true; }

Expand Down Expand Up @@ -2946,13 +3028,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
write_opts.disableWAL = THDVAR(m_thd, write_disable_wal);
write_opts.ignore_missing_column_families =
THDVAR(m_thd, write_ignore_missing_column_families);

set_initial_savepoint();
}

void start_stmt() override { m_batch->SetSavePoint(); }
void start_stmt() override {}

void rollback_stmt() override {
if (m_batch)
m_batch->RollbackToSavePoint();
rollback_to_stmt_savepoint();
}

explicit Rdb_writebatch_impl(THD *const thd)
Expand Down Expand Up @@ -3161,6 +3245,8 @@ static int rocksdb_prepare(handlerton *const hton, THD *const thd,

DEBUG_SYNC(thd, "rocksdb.prepared");
}
else
tx->make_stmt_savepoint_permanent();

return HA_EXIT_SUCCESS;
}
Expand Down Expand Up @@ -3332,11 +3418,9 @@ static int rocksdb_commit(handlerton *const hton, THD *const thd,
} else {
/*
We get here when committing a statement within a transaction.

We don't need to do anything here. tx->start_stmt() will notify
Rdb_transaction_impl that another statement has started.
*/
tx->set_tx_failed(false);
tx->make_stmt_savepoint_permanent();
}

if (my_core::thd_tx_isolation(thd) <= ISO_READ_COMMITTED) {
Expand Down Expand Up @@ -10341,22 +10425,24 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) {
}

if (lock_type == F_UNLCK) {
Rdb_transaction *const tx = get_or_create_tx(thd);
Rdb_transaction *const tx = get_tx_from_thd(thd);

tx->io_perf_end_and_record(&m_io_perf);
tx->m_n_mysql_tables_in_use--;
if (tx->m_n_mysql_tables_in_use == 0 &&
!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
/*
Do like InnoDB: when we get here, it's time to commit a
single-statement transaction.
if (tx) {
tx->io_perf_end_and_record(&m_io_perf);
tx->m_n_mysql_tables_in_use--;
if (tx->m_n_mysql_tables_in_use == 0 &&
!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
/*
Do like InnoDB: when we get here, it's time to commit a
single-statement transaction.
If the statement involved multiple tables, this code will be executed
for each of them, but that's ok because non-first tx->commit() calls
will be no-ops.
*/
if (tx->commit_or_rollback()) {
res = HA_ERR_INTERNAL_ERROR;
If the statement involved multiple tables, this code will be executed
for each of them, but that's ok because non-first tx->commit() calls
will be no-ops.
*/
if (tx->commit_or_rollback()) {
res = HA_ERR_INTERNAL_ERROR;
}
}
}
} else {
Expand Down

0 comments on commit 04036a0

Please sign in to comment.