Skip to content

Commit cd39ae1

Browse files
Stop slave immediately in MTS if partial trx in the relay log can be rollbacked
Summary: In MTS stop slave can take a minute to complete if the last transaction is partially downloaded from the master. The slave waits for a minute for the master to send the rest of the trx and then finally gives up. Strictly, this wait is only required if the partial trx cannot be rollbacked safely (e.g. trx on non-transactional engine, DDLs etc.). This change checks if there are no jobs queued in the worker threads and if the partial trx can be rollbacked, if yes, it immediately stops the slave. Reviewed By: tianx Differential Revision: D5130797 fbshipit-source-id: 4f1f0b6
1 parent 754aa79 commit cd39ae1

8 files changed

+327
-1
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
include/master-slave.inc
2+
Warnings:
3+
Note #### Sending passwords in plain text without SSL/TLS is extremely insecure.
4+
Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information.
5+
[connection master]
6+
call mtr.add_suppression("The slave coordinator and worker threads are stopped, possibly leaving data in inconsistent state");
7+
create database d1;
8+
create database d2;
9+
create database d3;
10+
create table d1.t1 (a int) engine=innodb;
11+
create table d2.t2 (a int) engine=myisam;
12+
create table d3.t3 (a int) engine=innodb;
13+
lock tables d1.t1 read;
14+
insert into d1.t1 values(1);
15+
insert into d1.t1 values(1);
16+
insert into d1.t1 values(1);
17+
insert into d1.t1 values(1);
18+
insert into d1.t1 values(1);
19+
set global debug= '+d,dump_thread_wait_after_send_write_rows';
20+
insert into d2.t2 values(1);
21+
unlock tables;
22+
set @start=now();
23+
stop slave;
24+
select timestampdiff(SECOND, @start, now()) >= 60;
25+
timestampdiff(SECOND, @start, now()) >= 60
26+
1
27+
start slave;
28+
SET DEBUG_SYNC= 'now SIGNAL signal.continue';
29+
SET DEBUG_SYNC= 'RESET';
30+
set @@global.debug= '-d,dump_thread_wait_after_send_write_rows';
31+
"Tables on master:"
32+
connection master
33+
select * from d1.t1;
34+
a
35+
1
36+
1
37+
1
38+
1
39+
1
40+
select * from d2.t2;
41+
a
42+
1
43+
select * from d3.t3;
44+
a
45+
"Tables on slave:"
46+
connection slave
47+
select * from d1.t1;
48+
a
49+
1
50+
1
51+
1
52+
1
53+
1
54+
select * from d2.t2;
55+
a
56+
1
57+
select * from d3.t3;
58+
a
59+
delete from d1.t1;
60+
delete from d2.t2;
61+
delete from d3.t3;
62+
lock tables d1.t1 read;
63+
insert into d1.t1 values(1);
64+
insert into d1.t1 values(1);
65+
insert into d1.t1 values(1);
66+
insert into d1.t1 values(1);
67+
insert into d1.t1 values(1);
68+
set global debug= '+d,dump_thread_wait_after_send_write_rows';
69+
insert into d3.t3 values(1);
70+
unlock tables;
71+
set @start=now();
72+
stop slave;
73+
select timestampdiff(SECOND, @start, now()) < 60;
74+
timestampdiff(SECOND, @start, now()) < 60
75+
1
76+
start slave;
77+
SET DEBUG_SYNC= 'now SIGNAL signal.continue';
78+
SET DEBUG_SYNC= 'RESET';
79+
set @@global.debug= '-d,dump_thread_wait_after_send_write_rows';
80+
"Tables on master:"
81+
connection master
82+
select * from d1.t1;
83+
a
84+
1
85+
1
86+
1
87+
1
88+
1
89+
select * from d2.t2;
90+
a
91+
select * from d3.t3;
92+
a
93+
1
94+
"Tables on slave:"
95+
connection slave
96+
select * from d1.t1;
97+
a
98+
1
99+
1
100+
1
101+
1
102+
1
103+
select * from d2.t2;
104+
a
105+
select * from d3.t3;
106+
a
107+
1
108+
drop database d1;
109+
drop database d2;
110+
drop database d3;
111+
include/rpl_end.inc
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
--slave_parallel_workers=8 --enforce-gtid-consistency --gtid-mode=ON --log-bin
2+
--log-slave-updates
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
--slave_parallel_workers=8 --enforce-gtid-consistency --gtid-mode=ON --log-bin
2+
--log-slave-updates
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
source include/master-slave.inc;
2+
source include/have_debug.inc;
3+
source include/have_debug_sync.inc;
4+
source include/have_binlog_format_row.inc;
5+
6+
call mtr.add_suppression("The slave coordinator and worker threads are stopped, possibly leaving data in inconsistent state");
7+
8+
# Create schema
9+
connection master;
10+
create database d1;
11+
create database d2;
12+
create database d3;
13+
create table d1.t1 (a int) engine=innodb;
14+
create table d2.t2 (a int) engine=myisam; # non-transactional engine
15+
create table d3.t3 (a int) engine=innodb; # transactional engine
16+
sync_slave_with_master;
17+
18+
19+
20+
## Test 1: STOP SLAVE when there are pending jobs in the worker queues and there
21+
## is a partial transaction on a non-transactional table. The slave should wait
22+
## for 1 minute to complete the partial transaction before giving up.
23+
24+
# Block all d1.t1 transactions on the slave
25+
connection slave;
26+
lock tables d1.t1 read;
27+
28+
# Generate some load, all of these will be blocked in the slave worker queue
29+
connection master;
30+
let $num_inserts=5;
31+
while ($num_inserts)
32+
{
33+
insert into d1.t1 values(1);
34+
dec $num_inserts;
35+
}
36+
37+
# This will stop the dump thread before sending the entire group
38+
set global debug= '+d,dump_thread_wait_after_send_write_rows';
39+
insert into d2.t2 values(1);
40+
41+
# wait for the dump thread reach the sync point
42+
--let $wait_condition= select count(*)=1 from information_schema.processlist where state LIKE '%debug sync point%' and command like 'Binlog Dump%'
43+
--source include/wait_condition.inc
44+
45+
connection slave;
46+
# unblock d1.t1
47+
unlock tables;
48+
49+
# This should take at least a minute because the trx on t2 is not completely downloaded and cannot be rollbacked safely
50+
set @start=now();
51+
stop slave;
52+
select timestampdiff(SECOND, @start, now()) >= 60;
53+
start slave;
54+
55+
connection master;
56+
SET DEBUG_SYNC= 'now SIGNAL signal.continue';
57+
# wait for the dump thread to come out of the waiting phase before resetting the signals
58+
--let $wait_condition= select count(*)=0 from information_schema.processlist where state LIKE '%debug sync point%' and command='Binlog Dump'
59+
--source include/wait_condition.inc
60+
SET DEBUG_SYNC= 'RESET';
61+
62+
connection master;
63+
set @@global.debug= '-d,dump_thread_wait_after_send_write_rows';
64+
sync_slave_with_master;
65+
66+
# Verification
67+
connection master;
68+
echo "Tables on master:"
69+
connection master;
70+
select * from d1.t1;
71+
select * from d2.t2;
72+
select * from d3.t3;
73+
echo "Tables on slave:"
74+
connection slave;
75+
select * from d1.t1;
76+
select * from d2.t2;
77+
select * from d3.t3;
78+
79+
# cleanup
80+
connection master;
81+
delete from d1.t1;
82+
delete from d2.t2;
83+
delete from d3.t3;
84+
sync_slave_with_master;
85+
86+
87+
88+
## Test 2: STOP SLAVE when there are pending jobs in the worker queues and there
89+
## is a partial transaction on a transactional table. The slave should stop
90+
## immidiately after completing all pending full transactions.
91+
92+
# Block all d1.t1 transactions on the slave
93+
connection slave;
94+
lock tables d1.t1 read;
95+
96+
# Generate some load, all of these will be blocked in the slave worker queue
97+
connection master;
98+
let $num_inserts=5;
99+
while ($num_inserts)
100+
{
101+
insert into d1.t1 values(1);
102+
dec $num_inserts;
103+
}
104+
105+
# This will stop the dump thread before sending the entire group
106+
set global debug= '+d,dump_thread_wait_after_send_write_rows';
107+
insert into d3.t3 values(1);
108+
109+
# wait for the dump thread reach the sync point
110+
--let $wait_condition= select count(*)=1 from information_schema.processlist where state LIKE '%debug sync point%' and command like 'Binlog Dump%'
111+
--source include/wait_condition.inc
112+
113+
connection slave;
114+
# unblock d1.t1
115+
unlock tables;
116+
117+
# Since the partial transaction is on a transactional table the slave should
118+
# stop as soon as is completes all pending full transactions
119+
set @start=now();
120+
stop slave;
121+
select timestampdiff(SECOND, @start, now()) < 60;
122+
start slave;
123+
124+
connection master;
125+
SET DEBUG_SYNC= 'now SIGNAL signal.continue';
126+
# wait for the dump thread to come out of the waiting phase before resetting the signals
127+
--let $wait_condition= select count(*)=0 from information_schema.processlist where state LIKE '%debug sync point%' and command='Binlog Dump'
128+
--source include/wait_condition.inc
129+
SET DEBUG_SYNC= 'RESET';
130+
131+
connection master;
132+
set @@global.debug= '-d,dump_thread_wait_after_send_write_rows';
133+
sync_slave_with_master;
134+
135+
# Verification
136+
connection master;
137+
echo "Tables on master:"
138+
connection master;
139+
select * from d1.t1;
140+
select * from d2.t2;
141+
select * from d3.t3;
142+
echo "Tables on slave:"
143+
connection slave;
144+
select * from d1.t1;
145+
select * from d2.t2;
146+
select * from d3.t3;
147+
148+
# final cleanup
149+
connection master;
150+
drop database d1;
151+
drop database d2;
152+
drop database d3;
153+
sync_slave_with_master;
154+
155+
156+
source include/rpl_end.inc;

sql/rpl_master.cc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1742,6 +1742,20 @@ void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
17421742
set_timespec_nsec(last_event_sent_ts, 0);
17431743
}
17441744

1745+
DBUG_EXECUTE_IF("dump_thread_wait_after_send_write_rows",
1746+
{
1747+
if (event_type == WRITE_ROWS_EVENT)
1748+
{
1749+
net_flush(net);
1750+
const char act[]=
1751+
"now "
1752+
"wait_for signal.continue";
1753+
DBUG_ASSERT(opt_debug_sync_timeout > 0);
1754+
DBUG_ASSERT(!debug_sync_set_action(current_thd,
1755+
STRING_WITH_LEN(act)));
1756+
}
1757+
});
1758+
17451759
DBUG_EXECUTE_IF("dump_thread_wait_before_send_xid",
17461760
{
17471761
if (event_type == XID_EVENT)

sql/rpl_rli.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,39 @@ bool Relay_log_info::mts_finalize_recovery()
417417
DBUG_RETURN(ret);
418418
}
419419

420+
bool Relay_log_info::mts_workers_queue_empty()
421+
{
422+
Slave_worker *worker= NULL;
423+
ulong ret= 0;
424+
for (ulong i= 0; i< workers.elements; ++i)
425+
{
426+
worker= *dynamic_element(&workers, i, Slave_worker**);
427+
mysql_mutex_lock(&worker->jobs_lock);
428+
ret+= worker->curr_jobs;
429+
mysql_mutex_unlock(&worker->jobs_lock);
430+
}
431+
return ret == 0;
432+
}
433+
434+
/* Checks if all in-flight stmts/trx can be safely rollbacked */
435+
bool Relay_log_info::cannot_safely_rollback()
436+
{
437+
if (!is_parallel_exec())
438+
return info_thd->transaction.all.cannot_safely_rollback();
439+
440+
bool ret= false;
441+
Slave_worker *worker= NULL;
442+
443+
for (ulong i= 0; i< workers.elements; ++i)
444+
{
445+
worker= *dynamic_element(&workers, i, Slave_worker**);
446+
mysql_mutex_lock(&worker->jobs_lock);
447+
ret= ret || worker->info_thd->transaction.all.cannot_safely_rollback();
448+
mysql_mutex_unlock(&worker->jobs_lock);
449+
}
450+
return ret;
451+
}
452+
420453
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
421454
{
422455
MY_STAT s;

sql/rpl_rli.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,9 @@ class Relay_log_info : public Rpl_info
707707
mts_group_status == MTS_IN_GROUP;
708708
}
709709

710+
bool mts_workers_queue_empty();
711+
bool cannot_safely_rollback();
712+
710713
/**
711714
While a group is executed by a Worker the relay log can change.
712715
Coordinator notifies Workers about this event. Worker is supposed

sql/rpl_slave.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1838,8 +1838,13 @@ static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
18381838
if (abort_loop || thd->killed || rli->abort_slave)
18391839
{
18401840
rli->sql_thread_kill_accepted= true;
1841+
/* NOTE: In MTS mode if all workers are done and if the partial trx
1842+
(if any) can be rollbacked safely we can accept the kill */
1843+
bool can_rollback= !rli->is_mts_in_group() ||
1844+
(rli->mts_workers_queue_empty() &&
1845+
!rli->cannot_safely_rollback());
18411846
is_parallel_warn= (rli->is_parallel_exec() &&
1842-
(rli->is_mts_in_group() || thd->killed));
1847+
(!can_rollback || thd->killed));
18431848
/*
18441849
Slave can execute stop being in one of two MTS or Single-Threaded mode.
18451850
The modes define different criteria to accept the stop.

0 commit comments

Comments
 (0)