Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #802: MyRocks: Statement rollback doesnt work correctly for nes… #804

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions mysql-test/suite/rocksdb/r/transaction.result
Original file line number Diff line number Diff line change
Expand Up @@ -934,3 +934,27 @@ value
3
rollback;
drop table t1;
#
# #802: MyRocks: Statement rollback doesnt work correctly for nested statements
#
create table t1 (a varchar(100)) engine=rocksdb;
create table t2(a int) engine=rocksdb;
insert into t2 values (1), (2);
create table t3(a varchar(100)) engine=rocksdb;
create function func() returns varchar(100) deterministic
begin
insert into t3 values ('func-called');
set @a= (select a from t2);
return 'func-returned';
end;//
begin;
insert into t1 values (func());
ERROR 21000: Subquery returns more than 1 row
select * from t1;
a
# The following must not produce 'func-called':
select * from t3;
a
rollback;
drop function func;
drop table t1,t2,t3;
30 changes: 30 additions & 0 deletions mysql-test/suite/rocksdb/t/transaction.test
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,33 @@ update t1 set id=115 where id=3;
rollback;

drop table t1;

--echo #
--echo # #802: MyRocks: Statement rollback doesnt work correctly for nested statements
--echo #
create table t1 (a varchar(100)) engine=rocksdb;
create table t2(a int) engine=rocksdb;
insert into t2 values (1), (2);

create table t3(a varchar(100)) engine=rocksdb;

delimiter //;
create function func() returns varchar(100) deterministic
begin
insert into t3 values ('func-called');
set @a= (select a from t2);
return 'func-returned';
end;//
delimiter ;//

begin;
--error ER_SUBQUERY_NO_1_ROW
insert into t1 values (func());
select * from t1;
--echo # The following must not produce 'func-called':
select * from t3;

rollback;
drop function func;
drop table t1,t2,t3;

128 changes: 107 additions & 21 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,17 @@ class Rdb_transaction {
bool m_is_delayed_snapshot = false;
bool m_is_two_phase = false;

private:
/* Number of RockDB savepoints taken */
int m_n_savepoints;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_n_savepoints and m_writes_at_last_savepoint need to be initialized at similar locations to m_write_count, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Lost that initialization when re-working the patch. Will now add it back

/*
Number of write operations this transaction had when we took the last
savepoint (the idea is not to take another savepoint if we haven't made
any changes)
*/
ulonglong m_writes_at_last_savepoint;

protected:
THD *m_thd = nullptr;

static std::multiset<Rdb_transaction *> s_tx_list;
Expand Down Expand Up @@ -1903,6 +1914,14 @@ class Rdb_transaction {
return s;
}

protected:
/*
The following two are helper functions to be overloaded by child classes.
They should provide RocksDB's savepoint semantics.
*/
virtual void do_set_savepoint() = 0;
virtual void do_rollback_to_savepoint() = 0;

public:
rocksdb::ReadOptions m_read_opts;
const char *m_mysql_log_file_name;
Expand Down Expand Up @@ -2389,6 +2408,50 @@ class Rdb_transaction {
virtual bool is_tx_started() const = 0;
virtual void start_tx() = 0;
virtual void start_stmt() = 0;

void set_initial_savepoint() {
/*
Set the initial savepoint. If the first statement in the transaction
fails, we need something to roll back to, without rolling back the
entire transaction.
*/
do_set_savepoint();
m_n_savepoints= 1;
m_writes_at_last_savepoint= m_write_count;
}

/*
Called when a "top-level" statement inside a transaction completes
successfully and its changes become part of the transaction's changes.
*/
void make_stmt_savepoint_permanent() {

// Take another RocksDB savepoint only if we had changes since the last
// one. This is very important for long transactions doing lots of
// SELECTs.
if (m_writes_at_last_savepoint != m_write_count)
{
do_set_savepoint();
m_writes_at_last_savepoint= m_write_count;
m_n_savepoints++;
}
}


/*
Rollback to the savepoint we've set before the last statement
*/
void rollback_to_stmt_savepoint() {
if (m_writes_at_last_savepoint != m_write_count) {
do_rollback_to_savepoint();
if (!--m_n_savepoints) {
do_set_savepoint();
m_n_savepoints= 1;
}
m_writes_at_last_savepoint= m_write_count;
}
}

virtual void rollback_stmt() = 0;

void set_tx_failed(bool failed_arg) { m_is_tx_failed = failed_arg; }
Expand Down Expand Up @@ -2715,9 +2778,20 @@ class Rdb_transaction_impl : public Rdb_transaction {

m_read_opts = rocksdb::ReadOptions();

set_initial_savepoint();

m_ddl_transaction = false;
}

/* Implementations of do_*savepoint based on rocksdB::Transaction savepoints */
void do_set_savepoint() override {
m_rocksdb_tx->SetSavePoint();
}

void do_rollback_to_savepoint() override {
m_rocksdb_tx->RollbackToSavePoint();
}

/*
Start a statement inside a multi-statement transaction.

Expand All @@ -2730,7 +2804,6 @@ class Rdb_transaction_impl : public Rdb_transaction {
void start_stmt() override {
// Set the snapshot to delayed acquisition (SetSnapshotOnNextOperation)
acquire_snapshot(false);
m_rocksdb_tx->SetSavePoint();
}

/*
Expand All @@ -2741,7 +2814,7 @@ class Rdb_transaction_impl : public Rdb_transaction {
/* TODO: here we must release the locks taken since the start_stmt() call */
if (m_rocksdb_tx) {
const rocksdb::Snapshot *const org_snapshot = m_rocksdb_tx->GetSnapshot();
m_rocksdb_tx->RollbackToSavePoint();
rollback_to_stmt_savepoint();

const rocksdb::Snapshot *const cur_snapshot = m_rocksdb_tx->GetSnapshot();
if (org_snapshot != cur_snapshot) {
Expand Down Expand Up @@ -2831,6 +2904,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
return res;
}

/* Implementations of do_*savepoint based on rocksdB::WriteBatch savepoints */
void do_set_savepoint() override {
m_batch->SetSavePoint();
}

void do_rollback_to_savepoint() override {
m_batch->RollbackToSavePoint();
}

public:
bool is_writebatch_trx() const override { return true; }

Expand Down Expand Up @@ -2936,13 +3018,15 @@ class Rdb_writebatch_impl : public Rdb_transaction {
write_opts.disableWAL = THDVAR(m_thd, write_disable_wal);
write_opts.ignore_missing_column_families =
THDVAR(m_thd, write_ignore_missing_column_families);

set_initial_savepoint();
}

void start_stmt() override { m_batch->SetSavePoint(); }
void start_stmt() override {}

void rollback_stmt() override {
if (m_batch)
m_batch->RollbackToSavePoint();
rollback_to_stmt_savepoint();
}

explicit Rdb_writebatch_impl(THD *const thd)
Expand Down Expand Up @@ -3157,6 +3241,8 @@ static int rocksdb_prepare(handlerton *const hton, THD *const thd,

DEBUG_SYNC(thd, "rocksdb.prepared");
}
else
tx->make_stmt_savepoint_permanent();

return HA_EXIT_SUCCESS;
}
Expand Down Expand Up @@ -3328,11 +3414,9 @@ static int rocksdb_commit(handlerton *const hton, THD *const thd,
} else {
/*
We get here when committing a statement within a transaction.

We don't need to do anything here. tx->start_stmt() will notify
Rdb_transaction_impl that another statement has started.
*/
tx->set_tx_failed(false);
tx->make_stmt_savepoint_permanent();
}

if (my_core::thd_tx_isolation(thd) <= ISO_READ_COMMITTED) {
Expand Down Expand Up @@ -10315,22 +10399,24 @@ int ha_rocksdb::external_lock(THD *const thd, int lock_type) {
}

if (lock_type == F_UNLCK) {
Rdb_transaction *const tx = get_or_create_tx(thd);
Rdb_transaction *const tx = get_tx_from_thd(thd);

tx->io_perf_end_and_record(&m_io_perf);
tx->m_n_mysql_tables_in_use--;
if (tx->m_n_mysql_tables_in_use == 0 &&
!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
/*
Do like InnoDB: when we get here, it's time to commit a
single-statement transaction.
if (tx) {
tx->io_perf_end_and_record(&m_io_perf);
tx->m_n_mysql_tables_in_use--;
if (tx->m_n_mysql_tables_in_use == 0 &&
!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
/*
Do like InnoDB: when we get here, it's time to commit a
single-statement transaction.

If the statement involved multiple tables, this code will be executed
for each of them, but that's ok because non-first tx->commit() calls
will be no-ops.
*/
if (tx->commit_or_rollback()) {
res = HA_ERR_INTERNAL_ERROR;
If the statement involved multiple tables, this code will be executed
for each of them, but that's ok because non-first tx->commit() calls
will be no-ops.
*/
if (tx->commit_or_rollback()) {
res = HA_ERR_INTERNAL_ERROR;
}
}
}
} else {
Expand Down