From 88bc68f1efcb1a6cca5756530dd3da157715ead8 Mon Sep 17 00:00:00 2001 From: Pushap Goyal Date: Wed, 10 Feb 2021 11:41:39 -0800 Subject: [PATCH] [raft] Support for dumping raft logs to vanilla async replicas Summary: [Porting Notes] We want to dump raft logs to vanilla async replicas regardless of whether it's the relay log or binlog. Effectively after this change we'll dump relay logs on the followers and binlogs on the leader. When the raft role changes, the logs to the dumped are also changed. Dump_log class is introduced as a thin wrapper/continer around mysql_bin_log or rli->relay_log and is inited with mysql_bin_log to emulate vanilla mysql behavior. Dump threads use the global dump_log object instead of mysql_bin_log directly. We switch the log in dump log only when raft role changes (in binlog_change_to_binlog() and binlog_change_to_apply_log()). During raft role change we take all log releated locks (LOCK_log, LOCK_index, LOCK_binlog_end_pos, and dump log lock) to serialize it with other log operations like dumping logs. Related doc - https://fb.quip.com/oTVAAdgEi4zY This diff contains below 7 patches: D23013977 D24766787 D24716539 D24900223 D24955284 D25174166 D25775525 Reviewed By: luqun Differential Revision: D26141496 ------------------------------------------------------------------------------- Passing raw_log pointer to wait_with_heartbeat() and wait_without_heartbeat() Summary: When enable_raft_plugin is OFF Dump_log::lock() is a no-op. Which means that when enable_raft_plugin is OFF there can be a race between log switching and dump threads. This could lead to a scenario where the raw_log that wait_next_event() is working on might be different than what wait_with_heartbeat()/wait_without_heartbeat() is working on. This can cause deadlocks because wait_with_heartbeat()/wait_without_heartbeat()'s mysql_cond_wait would unlock and then lock a different log's LOCK_binlog_end_pos mutex which would then never be unlocked by wait_next_event(). Reviewed By: anirbanr-fb Differential Revision: D32152658 ----------------------------------------------------------------------------------------- Fix rpl_raft_dump_raft_logs Summary: This tests completes but fails because the following warning exists: ``` 2022-08-30T16:28:00.159525Z 11 [ERROR] [MY-013114] [Repl] Slave I/O for channel '': Got fatal error 1236 from master when reading data from binary log: 'Slave has more GTIDs than the master has, using the master's SERVER_UUID. This may indicate that the end of the binary log was truncated or that the last binary log file was lost, e.g., after a power or disk failure when sync_binlog != 1. The master may or may not have rolled back transactions that were already replicated to the slave. Suggest to replicate any transactions that master has rolled back from slave to master, and/or commit empty transactions on master to account for transactions that have been', Error_code: MY-013114 ``` Since the MTR result file is valid, we can suppress this error. Reviewed By: yichenshen Differential Revision: D39141846 ------------------------------------------------------------------------------- Fix heap overflow in group_relay_log_name handling Summary: We were accessing group_relay_log_name in Query_log_event::do_apply_event_worker() but it's assigned only after the coordinator thread encounters an end event (i.e. xid event or a query event with "COMMIT" or "ROLLBACK" query). This was causing a race between accessing group_relay_log_name in the worker thread and writing it on the coordinator thread. We don't need to set transaction position in events other than end event, so now we set transaction position in query event only if it's an end event. The race is eliminated because group_relay_log_name is set before enqueuing the event to the worker thread (in both dep repl and vanilla mts). Reviewed By: lth Differential Revision: D28767430 ------------------------------------------------------------------------------- fix memory during MYSQL_BIN_LOG::open_existing_binlog Summary: asandebug complain there are memory leaks during MYSQL_BIN_LOG open Direct leak of 50 byte(s) in 1 object(s) allocated from: #0 0x67460ef in malloc #1 0x93f0777 in my_raw_malloc(unsigned long, int) #2 0x93f064a in my_malloc(unsigned int, unsigned long, int) #3 0x93f0eb0 in my_strdup(unsigned int, char const*, int) #4 0x8af01a6 in MYSQL_BIN_LOG::open(unsigned int, char const*, char const*, unsigned int) #5 0x8af8064 in MYSQL_BIN_LOG::open_binlog(char const*, char const*, unsigned long, bool, bool, bool, Format_description_log_event*, unsigned int, RaftRotateInfo*, bool) #6 0x8b00c00 in MYSQL_BIN_LOG::new_file_impl(bool, Format_description_log_event*, RaftRotateInfo*) #7 0x8d65e47 in rotate_relay_log(Master_info*, bool, bool, bool, RaftRotateInfo*) #8 0x8d661c0 in rotate_relay_log_for_raft(RaftRotateInfo*) #9 0x8c7696a in process_raft_queue #10 0xa0fa1fd in pfs_spawn_thread(void*) #11 0x7f8c9a12b20b in start_thread release these memory before assign them Reviewed By: Pushapgl Differential Revision: D28819752 --- .../rpl_raft/r/rpl_raft_dump_raft_logs.result | 288 ++++++++++++++++++ .../rpl_raft_purged_gtids_dump_threads.result | 34 +++ .../rpl_raft/t/rpl_raft_dump_raft_logs.test | 271 ++++++++++++++++ .../t/rpl_raft_purged_gtids_dump_threads.test | 81 +++++ .../rpl_raft/t/rpl_raft_wait_for_ack.test | 1 + sql/binlog.cc | 194 +++++++++--- sql/binlog.h | 78 ++++- sql/log_event.cc | 39 ++- sql/log_event.h | 2 + sql/mysqld.cc | 3 +- sql/rpl_binlog_sender.cc | 178 ++++++++--- sql/rpl_binlog_sender.h | 6 +- sql/rpl_handler.cc | 20 +- sql/rpl_replica.cc | 3 +- sql/sql_class.h | 15 + sql/sys_vars.cc | 7 +- 16 files changed, 1108 insertions(+), 112 deletions(-) create mode 100644 mysql-test/suite/rpl_raft/r/rpl_raft_dump_raft_logs.result create mode 100644 mysql-test/suite/rpl_raft/r/rpl_raft_purged_gtids_dump_threads.result create mode 100644 mysql-test/suite/rpl_raft/t/rpl_raft_dump_raft_logs.test create mode 100644 mysql-test/suite/rpl_raft/t/rpl_raft_purged_gtids_dump_threads.test diff --git a/mysql-test/suite/rpl_raft/r/rpl_raft_dump_raft_logs.result b/mysql-test/suite/rpl_raft/r/rpl_raft_dump_raft_logs.result new file mode 100644 index 000000000000..ed2e982d243e --- /dev/null +++ b/mysql-test/suite/rpl_raft/r/rpl_raft_dump_raft_logs.result @@ -0,0 +1,288 @@ +include/raft_3_node.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +include/rpl_connect.inc [creating server_4] +include/rpl_connect.inc [creating server_5] +RESET MASTER; +RESET SLAVE; +SET @@GLOBAL.ENABLE_RAFT_PLUGIN = 0; +CHANGE MASTER TO MASTER_HOST = '::1', MASTER_PORT = SERVER_MYPORT_1, MASTER_USER = 'root', MASTER_CONNECT_RETRY = 1, MASTER_AUTO_POSITION = 1; +Warnings: +Note 1759 Sending passwords in plain text without SSL/TLS is extremely insecure. +Note 1760 Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +START SLAVE; +RESET MASTER; +RESET SLAVE; +SET @@GLOBAL.ENABLE_RAFT_PLUGIN = 0; +CHANGE MASTER TO MASTER_HOST = '::1', MASTER_PORT = SERVER_MYPORT_2, MASTER_USER = 'root', MASTER_CONNECT_RETRY = 1, MASTER_AUTO_POSITION = 1; +Warnings: +Note 1759 Sending passwords in plain text without SSL/TLS is extremely insecure. +Note 1760 Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +START SLAVE; +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role LEADER +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role FOLLOWER +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role FOLLOWER +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role +show status like 'rpl_raft_role'; +Variable_name Value +Rpl_raft_role +create table t1 (a int primary key) engine = innodb; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +SELECT "LOGNAME" LIKE "binary-logs%"; +"LOGNAME" LIKE "binary-logs%" +1 +SELECT "LOGNAME" LIKE "binary-logs%"; +"LOGNAME" LIKE "binary-logs%" +1 +insert into t1 values(1); +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +select * from t1; +a +1 +select * from t1; +a +1 +select * from t1; +a +1 +include/raft_promote_to_leader.inc +insert into t1 values(2); +insert into t1 values(3); +insert into t1 values(4); +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +select * from t1; +a +1 +2 +3 +4 +select * from t1; +a +1 +2 +3 +4 +select * from t1; +a +1 +2 +3 +4 +SELECT "LOGNAME" LIKE "binary-logs%"; +"LOGNAME" LIKE "binary-logs%" +1 +SELECT "LOGNAME" LIKE "binary-logs%"; +"LOGNAME" LIKE "binary-logs%" +1 +include/raft_promote_to_leader.inc +flush binary logs; +insert into t1 values(5); +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +5 +select * from t1; +a +1 +2 +3 +4 +5 +select * from t1; +a +1 +2 +3 +4 +5 +select * from t1; +a +1 +2 +3 +4 +5 +set @@global.debug='+d,dump_wait_before_find_next_log'; +insert into t1 values(6); +insert into t1 values(7); +flush binary logs; +insert into t1 values(8); +insert into t1 values(9); +flush binary logs; +insert into t1 values(10); +insert into t1 values(11); +set debug_sync= 'now wait_for signal.reached'; +purge raft logs to 'LOGNAME'; +set debug_sync= 'now signal signal.done'; +set @@global.debug='-d,dump_wait_before_find_next_log'; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +include/stop_slave.inc +include/rpl_restart_server.inc [server_number=2] +include/rpl_restart_server.inc [server_number=1] +include/raft_promote_to_leader.inc +START SLAVE IO_THREAD; +START SLAVE IO_THREAD; +insert into t1 values(12); +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +select * from t1; +a +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +drop table t1; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +stop slave; +reset slave all; +stop slave; +reset slave all; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl_raft/r/rpl_raft_purged_gtids_dump_threads.result b/mysql-test/suite/rpl_raft/r/rpl_raft_purged_gtids_dump_threads.result new file mode 100644 index 000000000000..0a356c7fb991 --- /dev/null +++ b/mysql-test/suite/rpl_raft/r/rpl_raft_purged_gtids_dump_threads.result @@ -0,0 +1,34 @@ +include/raft_3_node.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +[connection master] +include/rpl_connect.inc [creating server_4] +include/rpl_connect.inc [creating server_5] +create table t1 (a int) engine = innodb; +insert into t1 values(1); +insert into t1 values(2); +set @@global.rpl_raft_new_leader_uuid = 'uuid2'; +change master to master_host='localhost', master_port=port2, master_auto_position=1, master_user='root'; +Warnings: +Note 1759 Sending passwords in plain text without SSL/TLS is extremely insecure. +Note 1760 Storing MySQL user name or password information in the master info repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START SLAVE; see the 'START SLAVE Syntax' in the MySQL Manual for more information. +include/start_slave.inc +insert into t1 values(3); +include/sync_slave_sql_with_master.inc +select * from t1; +a +1 +2 +3 +set @@global.rpl_raft_new_leader_uuid = 'uuid1'; +drop table t1; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/stop_slave.inc +reset slave all; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl_raft/t/rpl_raft_dump_raft_logs.test b/mysql-test/suite/rpl_raft/t/rpl_raft_dump_raft_logs.test new file mode 100644 index 000000000000..b996a0ca4f74 --- /dev/null +++ b/mysql-test/suite/rpl_raft/t/rpl_raft_dump_raft_logs.test @@ -0,0 +1,271 @@ +source ../include/raft_3_node.inc; +source include/have_debug_sync.inc; + +--disable_query_log +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +# Create connections to server 4 and 5 (these are not in the ring) +let $rpl_server_number= 4; +let $rpl_connection_name= server_4; +source include/rpl_connect.inc; +--disable_query_log +connection server_4; +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +let $rpl_server_number= 5; +let $rpl_connection_name= server_5; +source include/rpl_connect.inc; +--disable_query_log +connection server_5; +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +# Connect server 4 and 5 using COM_BINLOG_DUMP_GTID +# server4 will tail the leader +connection server_4; +RESET MASTER; +RESET SLAVE; +SET @@GLOBAL.ENABLE_RAFT_PLUGIN = 0; +replace_result $SERVER_MYPORT_1 SERVER_MYPORT_1; +eval CHANGE MASTER TO MASTER_HOST = '::1', MASTER_PORT = $SERVER_MYPORT_1, MASTER_USER = 'root', MASTER_CONNECT_RETRY = 1, MASTER_AUTO_POSITION = 1; +START SLAVE; + +# server5 will tail a follower +connection server_5; +RESET MASTER; +RESET SLAVE; +SET @@GLOBAL.ENABLE_RAFT_PLUGIN = 0; +replace_result $SERVER_MYPORT_2 SERVER_MYPORT_2; +eval CHANGE MASTER TO MASTER_HOST = '::1', MASTER_PORT = $SERVER_MYPORT_2, MASTER_USER = 'root', MASTER_CONNECT_RETRY = 1, MASTER_AUTO_POSITION = 1; +START SLAVE; + +# Check raft roles, 4 and 5 should be empty +connection server_1; +show status like 'rpl_raft_role'; +connection server_2; +show status like 'rpl_raft_role'; +connection server_3; +show status like 'rpl_raft_role'; +connection server_4; +show status like 'rpl_raft_role'; +connection server_5; +show status like 'rpl_raft_role'; + +# Create a schema and sync it across replicas +connection server_1; +create table t1 (a int primary key) engine = innodb; + +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +# Check if server 4 and 5 are tailing raft logs +connection server_4; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +replace_result $logname LOGNAME; +eval SELECT "$logname" LIKE "binary-logs%"; +connection server_5; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +replace_result $logname LOGNAME; +eval SELECT "$logname" LIKE "binary-logs%"; + +# Execute a regular DML (insert) +connection server_1; +insert into t1 values(1); +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_2; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + +# Transfer leadership from server1 to server2 +let $rpl_raft_leader_number= 2; +source ../include/raft_promote_to_leader.inc; + +# Execute some more trxs +connection server_2; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; +insert into t1 values(2); +insert into t1 values(3); +insert into t1 values(4); +let $sync_slave_connection= server_1; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_1; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + +# Check if server 4 and 5 are still tailing raft logs post election +connection server_4; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +replace_result $logname LOGNAME; +eval SELECT "$logname" LIKE "binary-logs%"; +connection server_5; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +replace_result $logname LOGNAME; +eval SELECT "$logname" LIKE "binary-logs%"; + +# Transfer leadership from server2 to server1 +let $rpl_raft_leader_number= 1; +source ../include/raft_promote_to_leader.inc; + +# Check if binlog flushing works as expected +connection server_1; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; +flush binary logs; +insert into t1 values(5); +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_2; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + +# Check interaction of dump logs with purge +connection server_4; +let $logname= query_get_value(SHOW SLAVE STATUS, Relay_Master_Log_File, 1); +connection server_1; +set @@global.debug='+d,dump_wait_before_find_next_log'; +insert into t1 values(6); +insert into t1 values(7); +flush binary logs; +insert into t1 values(8); +insert into t1 values(9); +flush binary logs; +insert into t1 values(10); +insert into t1 values(11); +set debug_sync= 'now wait_for signal.reached'; +replace_result $logname LOGNAME; +eval purge raft logs to '$logname'; +set debug_sync= 'now signal signal.done'; +set @@global.debug='-d,dump_wait_before_find_next_log'; +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_2; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + + +# Check if clean restart affects dump threads +connection server_2; +source include/stop_slave.inc; +let $rpl_server_number= 2; +source include/rpl_restart_server.inc; + +connection server_1; +let $rpl_server_number= 1; +source include/rpl_restart_server.inc; + +let $rpl_raft_leader_number= 1; +source ../include/raft_promote_to_leader.inc; + +disable_warnings; +connection server_4; +START SLAVE IO_THREAD; +connection server_5; +START SLAVE IO_THREAD; +enable_warnings; + +connection server_1; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; +insert into t1 values(12); +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_2; +select * from t1; +connection server_3; +select * from t1; +connection server_4; +select * from t1; +connection server_5; +select * from t1; + +--disable_query_log +connection server_1; +call mtr.add_suppression("Slave has more GTIDs than the master has, using the master's SERVER_UUID"); +--enable_query_log + +# Cleanup +connection server_1; +drop table t1; + +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_5; +source include/sync_slave_sql_with_master.inc; + +connection server_4; +stop slave; +reset slave all; +connection server_5; +stop slave; +reset slave all; + +source include/rpl_end.inc; diff --git a/mysql-test/suite/rpl_raft/t/rpl_raft_purged_gtids_dump_threads.test b/mysql-test/suite/rpl_raft/t/rpl_raft_purged_gtids_dump_threads.test new file mode 100644 index 000000000000..f14a22a86cbf --- /dev/null +++ b/mysql-test/suite/rpl_raft/t/rpl_raft_purged_gtids_dump_threads.test @@ -0,0 +1,81 @@ +# Test auto position based tailing after election. After election we purge +# apply logs that make gtid_purged == gtid_executed even though all transcations +# exist in the raft log. + +source ../include/raft_3_node.inc; +source include/have_debug_sync.inc; +let $use_gtids= 1; + +--disable_query_log +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +# Create connections to server 4 and 5 (these are not in the ring) +let $rpl_server_number= 4; +let $rpl_connection_name= server_4; +source include/rpl_connect.inc; +--disable_query_log +connection server_4; +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +let $rpl_server_number= 5; +let $rpl_connection_name= server_5; +source include/rpl_connect.inc; +--disable_query_log +connection server_5; +call mtr.add_suppression(".*using --replicate-same-server-id in conjunction with --log-slave-updates.*"); +--enable_query_log + +# Execute some transactions +connection server_1; +let $uuid1= `select variable_value from performance_schema.global_status where variable_name = 'Rpl_raft_peer_uuid'`; +create table t1 (a int) engine = innodb; +insert into t1 values(1); +insert into t1 values(2); + +# Trasfer leadership to server_2 (this will clear out the apply logs and make +# gtid_purged == gtid_executed +connection server_2; +let $uuid2= `select variable_value from performance_schema.global_status where variable_name = 'Rpl_raft_peer_uuid'`; +let $port2= `select @@global.port`; +connection server_1; +replace_result $uuid2 uuid2; +eval set @@global.rpl_raft_new_leader_uuid = '$uuid2'; + +# Make server_4 tail server_2 +connection server_4; +replace_result $port2 port2; +eval change master to master_host='localhost', master_port=$port2, master_auto_position=1, master_user='root'; +source include/start_slave.inc; +--let $uuid4= `SELECT @@GLOBAL.SERVER_UUID` +connection server_2; +# Note: This insert statement is required to move the last_acked to correct position. +insert into t1 values(3); +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; + +connection server_4; +select * from t1; + +# Cleanup +connection server_2; +replace_result $uuid1 uuid1; +eval set @@global.rpl_raft_new_leader_uuid = '$uuid1'; +connection server_1; +let $wait_condition= select @@global.read_only = 0; +source include/wait_condition.inc; +drop table t1; + +let $sync_slave_connection= server_2; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_3; +source include/sync_slave_sql_with_master.inc; +let $sync_slave_connection= server_4; +source include/sync_slave_sql_with_master.inc; + +connection server_4; +source include/stop_slave.inc; +reset slave all; + +source include/rpl_end.inc; diff --git a/mysql-test/suite/rpl_raft/t/rpl_raft_wait_for_ack.test b/mysql-test/suite/rpl_raft/t/rpl_raft_wait_for_ack.test index bebcaa08931c..cf9991c98d65 100644 --- a/mysql-test/suite/rpl_raft/t/rpl_raft_wait_for_ack.test +++ b/mysql-test/suite/rpl_raft/t/rpl_raft_wait_for_ack.test @@ -1,6 +1,7 @@ # TODO(pgl) Excluding this test case till the time we have debug raft plugin in tools directory. source include/have_nodebug.inc; source include/have_debug.inc; + source ../include/raft_3_node.inc; --disable_query_log diff --git a/sql/binlog.cc b/sql/binlog.cc index 57f1eb98819c..3976fbc0b4ef 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -195,6 +195,7 @@ ulong rpl_read_size; bool rpl_semi_sync_source_enabled = false; MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period); +Dump_log dump_log; static int binlog_init(void *p); static int binlog_start_trans_and_stmt(THD *thd, Log_event *start_event); @@ -4154,7 +4155,7 @@ bool show_binlog_events(THD *thd, MYSQL_BIN_LOG *binary_log) { Protocol *protocol = thd->get_protocol(); List field_list; std::string errmsg; - LOG_INFO linfo; + LOG_INFO linfo(binary_log->is_relay_log); DBUG_TRACE; @@ -4863,7 +4864,8 @@ int MYSQL_BIN_LOG::init_index_file() { // NO_LINT_DEBUG sql_print_information( "Binlog apply index file exists. Recovering mysqld " - "based on binlog apply index file"); + "based on binlog apply index file: %s", + opt_applylog_index_name); index_file_name = opt_applylog_index_name; log_file_name = opt_apply_logname; is_apply_log = true; @@ -4871,7 +4873,8 @@ int MYSQL_BIN_LOG::init_index_file() { // NO_LINT_DEBUG sql_print_information( "Binlog apply index file does not exist. Recovering " - "mysqld based on binlog index file"); + "mysqld based on binlog index file: %s", + opt_binlog_index_name); index_file_name = opt_binlog_index_name; log_file_name = opt_bin_logname; } @@ -5593,7 +5596,7 @@ bool MYSQL_BIN_LOG::find_first_log_not_in_gtid_set(char *binlog_file_name, /* * This is a limited version of init_gtid_sets which is only * called from binlog_change_to_apply. - * Needs to be called under LOCK_log and LOCK_index held. + * Needs to be called LOCK_index held. * The previous_gtid_set_map is cleared and reinitialized from * the index file contents. */ @@ -5604,7 +5607,6 @@ bool MYSQL_BIN_LOG::init_prev_gtid_sets_map() { std::pair iterator; DBUG_ENTER("MYSQL_BIN_LOG::init_prev_gtid_sets_map"); - mysql_mutex_assert_owner(&LOCK_log); mysql_mutex_assert_owner(&LOCK_index); // clear the map as it is being reset @@ -6032,7 +6034,8 @@ bool MYSQL_BIN_LOG::open_binlog( const char *log_name, const char *new_name, ulong max_size_arg, bool null_created_arg, bool need_lock_index, bool need_sid_lock, Format_description_log_event *extra_description_event, - uint32 new_index_number, RaftRotateInfo *raft_rotate_info) { + uint32 new_index_number, RaftRotateInfo *raft_rotate_info, + bool need_end_log_pos_lock) { // lock_index must be acquired *before* sid_lock. assert(need_sid_lock || !need_lock_index); DBUG_TRACE; @@ -6308,7 +6311,7 @@ bool MYSQL_BIN_LOG::open_binlog( close_purge_index_file(); - update_binlog_end_pos(); + update_binlog_end_pos(need_end_log_pos_lock); my_free(previous_gtid_set_buffer); return false; @@ -6331,7 +6334,8 @@ bool MYSQL_BIN_LOG::open_binlog( } bool MYSQL_BIN_LOG::open_existing_binlog(const char *log_name, - ulong max_size_arg) { + ulong max_size_arg, + bool need_end_log_pos_lock) { DBUG_ENTER("MYSQL_BIN_LOG::open_existing_binlog(const char *, ...)"); DBUG_PRINT("enter", ("name: %s", log_name)); @@ -6359,8 +6363,8 @@ bool MYSQL_BIN_LOG::open_existing_binlog(const char *log_name, DBUG_RETURN(1); } - if (!(this->name = - my_strdup(key_memory_MYSQL_LOG_name, log_name, MYF(MY_WME)))) { + my_free(name); + if (!(name = my_strdup(key_memory_MYSQL_LOG_name, log_name, MYF(MY_WME)))) { // NO_LINT_DEBUG sql_print_error("Could not allocate name %s (error %d)", log_name, errno); DBUG_RETURN(1); @@ -6370,6 +6374,8 @@ bool MYSQL_BIN_LOG::open_existing_binlog(const char *log_name, Binlog_ofile::open_existing(m_key_file_log, existing_file, MYF(MY_WME)); if (!binlog_file) goto err; + // release current point before assign + delete m_binlog_file; m_binlog_file = binlog_file.release(); file = mysql_file_open(m_key_file_log, existing_file, O_CREAT | O_WRONLY, @@ -6393,7 +6399,7 @@ bool MYSQL_BIN_LOG::open_existing_binlog(const char *log_name, if (offset > 0) m_binlog_file->seek(offset); max_size = max_size_arg; - update_binlog_end_pos(); + update_binlog_end_pos(need_end_log_pos_lock); atomic_log_state = LOG_OPENED; DBUG_RETURN(0); // Success @@ -7346,7 +7352,7 @@ int MYSQL_BIN_LOG::remove_logs_from_index(LOG_INFO *log_info, // now update offsets in index file for running threads if (need_update_threads) - adjust_linfo_offsets(log_info->index_file_start_offset); + adjust_linfo_offsets(log_info->index_file_start_offset, is_relay_log); return 0; err: @@ -7562,6 +7568,10 @@ int MYSQL_BIN_LOG::purge_logs(const char *to_log, bool included, global_sid_lock->unlock(); } + if (enable_raft_plugin && is_relay_log) { + error = init_prev_gtid_sets_map(); + } + err: if (need_lock_index) mysql_mutex_unlock(&LOCK_index); @@ -9300,26 +9310,44 @@ int MYSQL_BIN_LOG::log_in_use(const char *log_name) { return count; } -void MYSQL_BIN_LOG::adjust_linfo_offsets(my_off_t purge_offset) { +void MYSQL_BIN_LOG::adjust_linfo_in_dump_threads(bool is_relay_log) { DBUG_TRACE; mysql_mutex_assert_owner(&LOCK_index); MUTEX_LOCK(lock_log_info, &LOCK_log_info); std::for_each(log_info_set.cbegin(), log_info_set.cend(), - [purge_offset](LOG_INFO *log_info) { - /* - Index file offset can be less that purge offset only if - we just started reading the index file. In that case - we have nothing to adjust. - */ - if (log_info->index_file_offset < purge_offset) - log_info->fatal = (log_info->index_file_offset != 0); - else - log_info->index_file_offset -= purge_offset; + [is_relay_log](LOG_INFO *log_info) { + if (log_info->is_used_by_dump_thd) { + log_info->is_relay_log = is_relay_log; + } }); } +void MYSQL_BIN_LOG::adjust_linfo_offsets(my_off_t purge_offset, + bool is_relay_log) { + DBUG_TRACE; + + mysql_mutex_assert_owner(&LOCK_index); + MUTEX_LOCK(lock_log_info, &LOCK_log_info); + + std::for_each( + log_info_set.cbegin(), log_info_set.cend(), + [purge_offset, is_relay_log](LOG_INFO *log_info) { + if (!enable_raft_plugin || log_info->is_relay_log == is_relay_log) { + /* + Index file offset can be less that purge offset only if + we just started reading the index file. In that case + we have nothing to adjust. + */ + if (log_info->index_file_offset < purge_offset) + log_info->fatal = (log_info->index_file_offset != 0); + else + log_info->index_file_offset -= purge_offset; + } + }); +} + /** Write the contents of the given IO_CACHE to the binary log. @@ -10617,8 +10645,12 @@ void MYSQL_BIN_LOG::process_consensus_queue(THD *queue_head) { enable_raft_plugin && rpl_wait_for_semi_sync_ack) { const char *log_file = nullptr; my_off_t log_pos = 0; - queue_head->get_trans_fixed_pos((const char **)&log_file, &log_pos); - signal_semi_sync_ack(log_file, log_pos); + if (mysql_bin_log.is_apply_log) { + last_thd->get_trans_relay_log_pos((const char **)&log_file, &log_pos); + } else { + last_thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); + } + dump_log.signal_semi_sync_ack(log_file, log_pos); } } } @@ -10831,8 +10863,12 @@ void MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first) { rpl_wait_for_semi_sync_ack) { my_off_t log_pos; const char *log_file = nullptr; - last_thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); - signal_semi_sync_ack(log_file, log_pos); + if (mysql_bin_log.is_apply_log) { + thd->get_trans_relay_log_pos((const char **)&log_file, &log_pos); + } else { + thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); + } + dump_log.signal_semi_sync_ack(log_file, log_pos); } } @@ -11053,8 +11089,12 @@ int MYSQL_BIN_LOG::finish_commit(THD *thd) { enable_raft_plugin && rpl_wait_for_semi_sync_ack) { const char *log_file = nullptr; my_off_t log_pos = 0; - thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); - signal_semi_sync_ack(log_file, log_pos); + if (mysql_bin_log.is_apply_log) { + thd->get_trans_relay_log_pos((const char **)&log_file, &log_pos); + } else { + thd->get_trans_fixed_pos((const char **)&log_file, &log_pos); + } + dump_log.signal_semi_sync_ack(log_file, log_pos); } thd->get_transaction()->m_flags.run_hooks = false; } @@ -11757,8 +11797,9 @@ int binlog_change_to_apply() { int error = 0; LOG_INFO linfo; mysql_mutex_lock(mysql_bin_log.get_log_lock()); + dump_log.lock(); mysql_bin_log.lock_index(); - + mysql_bin_log.lock_binlog_end_pos(); mysql_bin_log.close(LOG_CLOSE_INDEX, /*need_lock_log=*/false, /*need_lock_index=*/false); @@ -11784,10 +11825,14 @@ int binlog_change_to_apply() { /*null_created_arg=*/false, /*need_lock_index=*/false, /*need_sid_lock=*/true, - /*extra_description_event=*/NULL)) { + /*extra_description_event=*/NULL, + /*new_index_number =*/0, + /*raft_rotate_info =*/nullptr, + /*need_end_log_pos_lock =*/false)) { error = 1; goto err; } + dump_log.switch_log(/* relay_log= */ true, /* should_lock= */ false); // Purge all apply logs before the last log, because they // are from the previous epoch of being a FOLLOWER, and they @@ -11808,8 +11853,9 @@ int binlog_change_to_apply() { } err: - + mysql_bin_log.unlock_binlog_end_pos(); mysql_bin_log.unlock_index(); + dump_log.unlock(); mysql_mutex_unlock(mysql_bin_log.get_log_lock()); DBUG_RETURN(error); @@ -11829,8 +11875,21 @@ int binlog_change_to_binlog(THD *thd) { std::vector lognames; mysql_mutex_lock(mysql_bin_log.get_log_lock()); + dump_log.lock(); mysql_bin_log.lock_index(); + Master_info *active_mi; + if (!get_and_lock_master_info(&active_mi) || !active_mi || !active_mi->rli) { + error = 1; + // NO_LINT_DEBUG + sql_print_error("active_mi or rli is not set"); + mysql_bin_log.unlock_index(); + dump_log.unlock(); + mysql_mutex_unlock(mysql_bin_log.get_log_lock()); + return error; + } + active_mi->rli->relay_log.lock_binlog_end_pos(); + // Get the index file name std::string indexfn = mysql_bin_log.get_index_fname(); @@ -11922,13 +11981,18 @@ int binlog_change_to_binlog(THD *thd) { mysql_bin_log.is_apply_log = false; mysql_bin_log.apply_file_count.store(0); + dump_log.switch_log(/* relay_log= */ false, /* should_lock= */ false); + // Register new log to raft // Previous mysql_bin_log.close(LOG_CLOSE_INDEX) will also close binlog and // its IO_CACHE. mysql_bin_log.register_log_entities(thd, /*context=*/0, /*need_lock=*/false, /*is_relay_log=*/false); err: + active_mi->rli->relay_log.unlock_binlog_end_pos(); + unlock_master_info(active_mi); mysql_bin_log.unlock_index(); + dump_log.unlock(); mysql_mutex_unlock(mysql_bin_log.get_log_lock()); DBUG_RETURN(error); @@ -11964,6 +12028,39 @@ int MYSQL_BIN_LOG::get_lognames_from_index(bool need_lock, return error; } +Dump_log::Dump_log() { + if (enable_raft_plugin && mysql_bin_log.is_apply_log) { + Master_info *active_mi = nullptr; + if (!get_and_lock_master_info(&active_mi)) { + // NO_LINT_DEBUG + sql_print_error("active_mi or rli is not set"); + } + assert(active_mi && active_mi->rli); + log_ = &active_mi->rli->relay_log; + unlock_master_info(active_mi); + } else { + log_ = &mysql_bin_log; + } +} + +void Dump_log::switch_log(bool relay_log, bool should_lock) { + if (should_lock) lock(); + mysql_mutex_assert_owner(log_->get_binlog_end_pos_lock()); + log_->update_binlog_end_pos(/* need_lock= */ false); + Master_info *active_mi = nullptr; + if (!get_and_lock_master_info(&active_mi)) { + // NO_LINT_DEBUG + sql_print_error("active_mi or rli is not set"); + } + assert(active_mi && active_mi->rli); + log_ = relay_log ? &active_mi->rli->relay_log : &mysql_bin_log; + unlock_master_info(active_mi); + // Now let's update the dump thread's linfos + log_->reset_semi_sync_last_acked(); + log_->adjust_linfo_in_dump_threads(relay_log); + if (should_lock) unlock(); +} + // Given a file name of the form 'binlog-file-name.index', it extracts the // and and returns it as a pair // Example: @@ -11984,10 +12081,11 @@ static std::pair extract_file_index( } void MYSQL_BIN_LOG::report_missing_purged_gtids( - const Gtid_set *slave_executed_gtid_set, std::string &errmsg) { + const Gtid_set *lost_gtid_set, const Gtid_set *slave_executed_gtid_set, + std::string &errmsg) { DBUG_TRACE; THD *thd = current_thd; - Gtid_set gtid_missing(gtid_state->get_lost_gtids()->get_sid_map()); + Gtid_set gtid_missing(lost_gtid_set->get_sid_map()); gtid_missing.add_gtid_set(gtid_state->get_lost_gtids()); gtid_missing.remove_gtid_set(slave_executed_gtid_set); @@ -12262,13 +12360,17 @@ void MYSQL_BIN_LOG::update_binlog_end_pos(bool need_lock) { if (need_lock) unlock_binlog_end_pos(); } -inline void MYSQL_BIN_LOG::update_binlog_end_pos(const char *file, - my_off_t pos) { - lock_binlog_end_pos(); - if (is_active(file) && (pos > atomic_binlog_end_pos)) +inline void MYSQL_BIN_LOG::update_binlog_end_pos(const char *file, my_off_t pos, + bool need_lock) { + if (need_lock) + lock_binlog_end_pos(); + else + mysql_mutex_assert_owner(&LOCK_binlog_end_pos); + if (is_active(file) && (pos > atomic_binlog_end_pos)) { atomic_binlog_end_pos = pos; + } signal_update(); - unlock_binlog_end_pos(); + if (need_lock) unlock_binlog_end_pos(); } my_off_t MYSQL_BIN_LOG::get_binlog_end_pos() const { @@ -12302,7 +12404,7 @@ my_off_t MYSQL_BIN_LOG::get_last_acked_pos(bool *wait_for_ack, /* Use by raft plugin */ void signal_semi_sync_ack(const std::string &file, uint pos) { - mysql_bin_log.signal_semi_sync_ack(file.c_str(), pos); + dump_log.signal_semi_sync_ack(file.c_str(), pos); } void MYSQL_BIN_LOG::signal_semi_sync_ack(const char *const log_file, @@ -12330,12 +12432,7 @@ void MYSQL_BIN_LOG::signal_semi_sync_ack(const char *const log_file, lock_binlog_end_pos(); if (acked > last_acked.load()) { last_acked = acked; - // Without the following line there are fails for: - // rpl.rpl_seconds_behind_master rpl.rpl_heartbeat_version - // rpl.rpl_relay_status rpl.rpl_seconds_behind_master_mts - // rpl.rpl_binlog_sender_packet_shrink - if (rpl_wait_for_semi_sync_ack && rpl_semi_sync_source_enabled) - signal_update(); + update_binlog_end_pos(log_file, log_pos, false); } unlock_binlog_end_pos(); } @@ -12343,7 +12440,10 @@ void MYSQL_BIN_LOG::signal_semi_sync_ack(const char *const log_file, void MYSQL_BIN_LOG::reset_semi_sync_last_acked() { lock_binlog_end_pos(); /* binary log is rotated and all trxs in previous binlog are already committed - * to the storage engine */ + * to the storage engine. + * Note: when in raft mode we cannot init the coords without consulting the + * plugin, so we reset the coords + */ if (strlen(log_file_name)) { last_acked = {extract_file_index(log_file_name).second, 0}; } else { diff --git a/sql/binlog.h b/sql/binlog.h index f6fa69db3754..37c91828f273 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -155,14 +155,18 @@ struct LOG_INFO { int entry_index; // used in purge_logs(), calculatd in find_log_pos(). int encrypted_header_size; my_thread_id thread_id; - LOG_INFO() + bool is_relay_log; // is this info pointing to a relay log? + bool is_used_by_dump_thd; // is this info being used by a dump thread? + LOG_INFO(bool relay_log = false, bool used_by_dump_thd = false) : index_file_offset(0), index_file_start_offset(0), pos(0), fatal(false), entry_index(0), encrypted_header_size(0), - thread_id(0) { + thread_id(0), + is_relay_log(relay_log), + is_used_by_dump_thd(used_by_dump_thd) { memset(log_file_name, 0, FN_REFLEN); } }; @@ -807,6 +811,16 @@ class MYSQL_BIN_LOG : public TC_LOG { */ bool init_prev_gtid_sets_map(); + void get_lost_gtids(Gtid_set *gtids) { + gtids->clear(); + mysql_mutex_lock(&LOCK_index); + auto it = previous_gtid_set_map.begin(); + if (it != previous_gtid_set_map.end()) + gtids->add_gtid_encoding((const uchar *)it->second.c_str(), + it->second.length()); + mysql_mutex_unlock(&LOCK_index); + } + enum_read_gtids_from_binlog_status read_gtids_from_binlog( const char *filename, Gtid_set *all_gtids, Gtid_set *prev_gtids, Gtid *first_gtid, Sid_map *sid_map, bool verify_checksum, @@ -1142,7 +1156,8 @@ class MYSQL_BIN_LOG : public TC_LOG { void set_max_size(ulong max_size_arg); void update_binlog_end_pos(bool need_lock = true); - void update_binlog_end_pos(const char *file, my_off_t pos); + void update_binlog_end_pos(const char *file, my_off_t pos, + bool need_lock = true); /** Wait until we get a signal that the binary log has been updated. @@ -1212,23 +1227,29 @@ class MYSQL_BIN_LOG : public TC_LOG { after the RESET MASTER TO command is called. @param raft_rotate_info rotate related information passed in by listener callbacks + @param need_end_log_pos_lock If true, LOCK_binlog_end_pos is acquired; + otherwise LOCK_binlog_end_pos must be taken by the caller. */ bool open_binlog(const char *log_name, const char *new_name, ulong max_size_arg, bool null_created_arg, bool need_lock_index, bool need_sid_lock, Format_description_log_event *extra_description_event, uint32 new_index_number = 0, - RaftRotateInfo *raft_rotate_info = nullptr); + RaftRotateInfo *raft_rotate_info = nullptr, + bool need_end_log_pos_lock = true); /** Open an existing binlog/relaylog file @param log_name Name of binlog @param max_size The size at which this binlog will be rotated. + @param need_end_log_pos_lock If true, LOCK_binlog_end_pos is acquired; + otherwise LOCK_binlog_end_pos must be taken by the caller. @retval false on success, true on error */ - bool open_existing_binlog(const char *log_name, ulong max_size_arg); + bool open_existing_binlog(const char *log_name, ulong max_size_arg, + bool need_end_log_pos_lock = true); bool open_index_file(const char *index_file_name_arg, const char *log_name, bool need_lock_index); @@ -1548,10 +1569,12 @@ class MYSQL_BIN_LOG : public TC_LOG { This function will be called from mysql_binlog_send() function. + @param lost_gtid_set GTID set of missing gtids @param slave_executed_gtid_set GTID set executed by slave @param errmsg Pointer to the error message */ - void report_missing_purged_gtids(const Gtid_set *slave_executed_gtid_set, + void report_missing_purged_gtids(const Gtid_set *lost_gtid_set, + const Gtid_set *slave_executed_gtid_set, std::string &errmsg); /** @@ -1654,7 +1677,8 @@ class MYSQL_BIN_LOG : public TC_LOG { in the binary log file with flush_relay_log_info. Now they sync is done for next read. */ - void adjust_linfo_offsets(my_off_t purge_offset); + void adjust_linfo_offsets(my_off_t purge_offset, bool is_relay_log); + void adjust_linfo_in_dump_threads(bool is_relay_log); private: mysql_mutex_t LOCK_log_info; @@ -1671,6 +1695,46 @@ struct LOAD_FILE_INFO { extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; +/** + * Encapsulation over binlog or relay log for dumping raft logs during + * COM_BINLOG_DUMP and COM_BINLOG_DUMP_GTID. + */ +class Dump_log { + public: + Dump_log(); + + void init_pthread_objects(); + + void switch_log(bool relay_log, bool should_lock = true); + + MYSQL_BIN_LOG *get_log(bool should_lock = true) { + if (should_lock) lock(); + auto ret = log_; + if (should_lock) unlock(); + return ret; + } + + void signal_semi_sync_ack(const char *const log_file, + const my_off_t log_pos) { + std::lock_guard guard(log_mutex_); + log_->signal_semi_sync_ack(log_file, log_pos); + } + + void reset_semi_sync_last_acked() { + std::lock_guard guard(log_mutex_); + log_->reset_semi_sync_last_acked(); + } + + void lock() { log_mutex_.lock(); } + + void unlock() { log_mutex_.unlock(); } + + private: + MYSQL_BIN_LOG *log_; + std::mutex log_mutex_; +}; +extern MYSQL_PLUGIN_IMPORT Dump_log dump_log; + /** Check if the the transaction is empty. diff --git a/sql/log_event.cc b/sql/log_event.cc index d5ac01ee06b8..2fc44e9a7928 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -4672,9 +4672,29 @@ void Query_log_event::detach_temp_tables_worker(THD *thd_arg, Query_log_event::do_apply_event() */ int Query_log_event::do_apply_event(Relay_log_info const *rli) { + // Note: We're using event's future_event_relay_log_pos instead of + // rli->get_event_relay_log_pos() because rli is only updated in + // do_update_pos() which is called after applying the event and we might need + // to use this pos during application (e.g. during commit) + Relay_log_info *rli_ptr = const_cast(rli); + thd->set_trans_relay_log_pos(rli_ptr->get_event_relay_log_name(), + future_event_relay_log_pos); return do_apply_event(rli, query, q_len); } +int Query_log_event::do_apply_event_worker(Slave_worker *w) { + // Note: We're using event's future_event_relay_log_pos instead of + // rli->get_event_relay_log_pos() because rli is only updated in + // do_update_pos() which is called after applying the event and we might + // need to use this pos during application (e.g. during commit) + Slave_job_group *ptr_g = w->c_rli->gaq->get_job_group(mts_group_idx); + thd->set_trans_relay_log_pos(ptr_g && ptr_g->group_relay_log_name + ? ptr_g->group_relay_log_name + : w->get_group_relay_log_name(), + future_event_relay_log_pos); + return do_apply_event(w, query, q_len); +} + /* is_silent_error @@ -5919,11 +5939,19 @@ bool Rotate_log_event::write(Basic_ostream *ostream) { write_footer(ostream)); } -int Rotate_log_event::do_apply_event(Relay_log_info const *) { +int Rotate_log_event::do_apply_event(Relay_log_info const *rli) { if (!enable_raft_plugin) return 0; + // Note: We're using event's future_event_relay_log_pos instead of + // rli->get_event_relay_log_pos() because rli is only updated in + // do_update_pos() which is called after applying the event and we might need + // to use this pos during application (e.g. during commit) + Relay_log_info *rli_ptr = const_cast(rli); + thd->set_trans_relay_log_pos(rli_ptr->get_event_relay_log_name(), + future_event_relay_log_pos); int64_t term, index; thd->get_trans_marker(&term, &index); + return RUN_HOOK(raft_replication, after_commit, (thd)); } @@ -6476,6 +6504,9 @@ int Xid_apply_log_event::do_apply_event_worker(Slave_worker *w) { goto err; } + thd->set_trans_relay_log_pos(w->get_group_relay_log_name(), + w->get_group_relay_log_pos()); + DBUG_PRINT( "mta", ("do_apply group source %s %llu group relay %s %llu event %s %llu.", @@ -6615,6 +6646,8 @@ int Xid_apply_log_event::do_apply_event(Relay_log_info const *rli) { strmake(new_group_relay_log_name, rli_ptr->get_group_relay_log_name(), FN_REFLEN - 1); new_group_relay_log_pos = rli_ptr->get_group_relay_log_pos(); + thd->set_trans_relay_log_pos(rli_ptr->get_group_relay_log_name(), + rli_ptr->get_group_relay_log_pos()); /* Rollback positions in memory just before commit. Position values will be reset to their new values only on successful commit operation. @@ -7825,6 +7858,10 @@ int Execute_load_query_log_event::do_apply_event(Relay_log_info const *rli) { return error; } +int Execute_load_query_log_event::do_apply_event_worker(Slave_worker *w) { + return do_apply_event(w); +} + /***************************************************************************** Load_query_generator is used to generate the LOAD DATA statement for binlog ******************************************************************************/ diff --git a/sql/log_event.h b/sql/log_event.h index 31759d94557a..d9533278aff1 100644 --- a/sql/log_event.h +++ b/sql/log_event.h @@ -1543,6 +1543,7 @@ class Query_log_event : public virtual binary_log::Query_event, #if defined(MYSQL_SERVER) enum_skip_reason do_shall_skip(Relay_log_info *rli) override; int do_apply_event(Relay_log_info const *rli) override; + int do_apply_event_worker(Slave_worker *w) override; int do_update_pos(Relay_log_info *rli) override; void prepare_dep(Relay_log_info *rli, std::shared_ptr &ev) override; @@ -2354,6 +2355,7 @@ class Execute_load_query_log_event private: #if defined(MYSQL_SERVER) int do_apply_event(Relay_log_info const *rli) override; + int do_apply_event_worker(Slave_worker *w) override; #endif }; diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 7393aa25d172..29aa77489c6d 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -5523,7 +5523,6 @@ int init_common_variables() { inited before MY_INIT(). So we do it here. */ mysql_bin_log.init_pthread_objects(); - /* TODO: remove this when my_time_t is 64 bit compatible */ if (!is_time_t_valid_for_timestamp(server_start_time)) { LogErr(ERROR_LEVEL, ER_UNSUPPORTED_DATE); @@ -7829,7 +7828,7 @@ static int init_server_components() { rpl_source_io_monitor = new Source_IO_monitor(); udf_load_service.init(); - mysql_bin_log.reset_semi_sync_last_acked(); + dump_log.reset_semi_sync_last_acked(); /* Initialize the optimizer cost module */ init_optimizer_cost_module(true); diff --git a/sql/rpl_binlog_sender.cc b/sql/rpl_binlog_sender.cc index d10cc88ef8be..cda7c71d0f18 100644 --- a/sql/rpl_binlog_sender.cc +++ b/sql/rpl_binlog_sender.cc @@ -50,6 +50,7 @@ #include "mysql/psi/mysql_mutex.h" #include "mysql/psi/mysql_socket.h" #include "scope_guard.h" +#include "sql/binlog.h" #include "sql/binlog_reader.h" #include "sql/debug_sync.h" // debug_sync_set_action #include "sql/derror.h" // ER_THD @@ -353,11 +354,12 @@ bool Binlog_sender::set_dscp(void) { void Binlog_sender::init() { DBUG_TRACE; THD *thd = m_thd; - + auto raw_log = dump_log.get_log(false); thd->push_diagnostics_area(&m_diag_area); init_heartbeat_period(); m_last_event_sent_ts = now_in_nanosecs(); + m_linfo = LOG_INFO(raw_log->is_relay_log, /* is_used_by_dump_thd = */ true); m_linfo.thread_id = thd->thread_id(); mysql_bin_log.register_log_info(&m_linfo); @@ -370,7 +372,7 @@ void Binlog_sender::init() { /* Save original query. Will be unset in cleanup */ m_orig_query = thd->query(); - if (!mysql_bin_log.is_open()) { + if (!raw_log->is_open()) { set_fatal_error("Binary log is not open"); return; } @@ -505,8 +507,9 @@ bool is_semi_sync_slave() { void Binlog_sender::run() { DBUG_TRACE; + dump_log.lock(); init(); - + dump_log.unlock(); m_is_semi_sync_slave = is_semi_sync_slave(); unsigned int max_event_size = @@ -553,28 +556,47 @@ void Binlog_sender::run() { "wait_for continue_dump_thread no_clear_event"; assert(!debug_sync_set_action(m_thd, STRING_WITH_LEN(act))); };); - mysql_bin_log.lock_index(); - if (!mysql_bin_log.is_open()) { - if (mysql_bin_log.open_index_file(mysql_bin_log.get_index_fname(), - log_file, false)) { + + // Note: This part is tricky and should be touched if you really know + // what you're doing. We're locking dump log to get the raw log + // pointer, then we're locking lock_index before unlocking the dump + // log. We're taking the lock in the same sequence as when log is + // switched in binlog_change_to_binlog() and binlog_change_to_apply() + // to avoid deadlocks. This locking pattern ensures that we're working + // with the correct raw log and that there is no race between getting + // the raw log and log switching. Log switching will be blocked until + // we release the binlog end pos lock before waiting for signal in + // wait_for_update_bin_log(). + dump_log.lock(); + MYSQL_BIN_LOG *raw_log = dump_log.get_log(false); + raw_log->lock_index(); + dump_log.unlock(); + if (!raw_log->is_open()) { + if (raw_log->open_index_file(raw_log->get_index_fname(), log_file, + false)) { set_fatal_error( "Binary log is not open and failed to open index file " "to retrieve next file."); - mysql_bin_log.unlock_index(); + raw_log->unlock_index(); break; } is_index_file_reopened_on_binlog_disable = true; } - int error = mysql_bin_log.find_next_log(&m_linfo, false); - mysql_bin_log.unlock_index(); + DBUG_EXECUTE_IF("dump_wait_before_find_next_log", { + const char act[] = "now signal signal.reached wait_for signal.done"; + assert(opt_debug_sync_timeout > 0); + assert(!debug_sync_set_action(m_thd, STRING_WITH_LEN(act))); + };); + int error = raw_log->find_next_log(&m_linfo, false); + raw_log->unlock_index(); if (unlikely(error)) { DBUG_EXECUTE_IF("waiting_for_disable_binlog", { const char act[] = "now signal consumed_binlog"; assert(!debug_sync_set_action(m_thd, STRING_WITH_LEN(act))); };); if (is_index_file_reopened_on_binlog_disable) - mysql_bin_log.close(LOG_CLOSE_INDEX, true /*need_lock_log=true*/, - true /*need_lock_index=true*/); + raw_log->close(LOG_CLOSE_INDEX, true /*need_lock_log=true*/, + true /*need_lock_index=true*/); set_fatal_error("could not find next log"); break; } @@ -686,16 +708,16 @@ std::pair Binlog_sender::get_binlog_end_pos( } bool wait_for_ack = !m_is_semi_sync_slave; + MYSQL_BIN_LOG *raw_log = dump_log.get_log(true); result.first = - mysql_bin_log.get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name); + raw_log->get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name); DBUG_PRINT("info", ("Reading file %s, seek pos %llu, end_pos is %llu", m_linfo.log_file_name, read_pos, result.first)); - DBUG_PRINT("info", ("Active file is %s", mysql_bin_log.get_log_fname())); + DBUG_PRINT("info", ("Active file is %s", raw_log->get_log_fname())); /* If this is a cold binlog file, we are done getting the end pos */ - if (unlikely(!wait_for_ack && - !mysql_bin_log.is_active(m_linfo.log_file_name))) { + if (unlikely(!wait_for_ack && !raw_log->is_active(m_linfo.log_file_name))) { return std::make_pair(0, 0); } if (read_pos < result.first) { @@ -931,55 +953,98 @@ int Binlog_sender::wait_new_events(my_off_t log_pos) { LOCK_binlog_end_pos if we reached the end of the hot log and are going to wait for updates on the binary log (Binlog_sender::wait_new_event()). */ - if (stop_waiting_for_update(log_pos)) { + if (stop_waiting_for_update(log_pos, dump_log.get_log(false))) { return 0; } /* Some data may be in net buffer, it should be flushed before waiting */ if (flush_net()) return 1; - mysql_bin_log.lock_binlog_end_pos(); - - m_thd->ENTER_COND(mysql_bin_log.get_log_cond(), - mysql_bin_log.get_binlog_end_pos_lock(), + // Note: This part is tricky and should be touched if you really know + // what you're doing. We're locking dump log to get the raw log + // pointer, then we're locking end log pos before unlocking the dump + // log. We're taking the lock in the same sequence as when log is + // switched in binlog_change_to_binlog() and binlog_change_to_apply() + // to avoid deadlocks. This locking pattern ensures that we're working + // with the correct raw log and that there is no race between getting + // the raw log and log switching. Log switching will be blocked until + // we release the binlog end pos lock before waiting for signal in + // wait_for_update_bin_log(). + dump_log.lock(); + MYSQL_BIN_LOG *raw_log = dump_log.get_log(false); + raw_log->lock_binlog_end_pos(); + dump_log.unlock(); + + m_thd->ENTER_COND(raw_log->get_log_cond(), raw_log->get_binlog_end_pos_lock(), &stage_source_has_sent_all_binlog_to_replica, &old_stage); + // Note: upstream code rewrote wait_*_heartbeat, but because + // log switches can occur, wait_*_heartbeat needs to release + // and check for log switches too. Otherwise, we may never + // return from wait_*_heartbeat because stop_waiting_for_update() + // never returns true. + // + // Hence, the raw_log may change. The conditional wait metrics + // may not be accurate as a result of this, but that is probably fine. if (m_heartbeat_period.count() > 0) - ret = wait_with_heartbeat(log_pos); + ret = wait_with_heartbeat(&raw_log, log_pos); else - ret = wait_without_heartbeat(log_pos); + ret = wait_without_heartbeat(&raw_log, log_pos); - mysql_bin_log.unlock_binlog_end_pos(); + raw_log->unlock_binlog_end_pos(); m_thd->EXIT_COND(&old_stage); return ret; } -bool Binlog_sender::stop_waiting_for_update(my_off_t log_pos) const { +bool Binlog_sender::stop_waiting_for_update(my_off_t log_pos, + MYSQL_BIN_LOG *raw_log) const { bool wait_for_ack = !m_is_semi_sync_slave; - if (mysql_bin_log.get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name) > + if (raw_log->get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name) > log_pos || - (!wait_for_ack && !mysql_bin_log.is_active(m_linfo.log_file_name)) || + (!wait_for_ack && !raw_log->is_active(m_linfo.log_file_name)) || m_thd->killed) { return true; } return false; } -inline int Binlog_sender::wait_with_heartbeat(my_off_t log_pos) { +static void swap_and_relock_log(MYSQL_BIN_LOG **raw_log) { + // Switch log within wait_*_heartbeat, see wait_new_events() + (*raw_log)->unlock_binlog_end_pos(); + dump_log.lock(); + *raw_log = dump_log.get_log(false); + (*raw_log)->lock_binlog_end_pos(); + dump_log.unlock(); +} + +inline int Binlog_sender::wait_with_heartbeat(MYSQL_BIN_LOG **raw_log, + my_off_t log_pos) { #ifndef NDEBUG ulong hb_info_counter = 0; #endif - while (!stop_waiting_for_update(log_pos)) { - // ignoring timeout on conditional variable - mysql_bin_log.wait_for_update(m_heartbeat_period); - - if (stop_waiting_for_update(log_pos)) { + while (!stop_waiting_for_update(log_pos, *raw_log)) { + // Note: upstream code ignores the timeout on conditional variable + // but due to our implementation of having async readers wait, + // breaking out of wait_for_update early causes heartbeats + // to be sent, so use the return code to determine if + // heartbeats are needed. + int ret = (*raw_log)->wait_for_update(m_heartbeat_period); + + // Upstream unlocks binlog_end_pos, but we'll keep this locked + // through the heartbeat code as in 8.0.28. Unlocking the + // binlog_end_pos is fine too as long as we call get_log() before + // relocking it. + swap_and_relock_log(raw_log); + + if (stop_waiting_for_update(log_pos, *raw_log)) { return 0; } - mysql_bin_log.unlock_binlog_end_pos(); - Scope_guard lock([]() { mysql_bin_log.lock_binlog_end_pos(); }); + + if (!is_timeout(ret)) { + continue; + } #ifndef NDEBUG if (hb_info_counter < 3) { LogErr(INFORMATION_LEVEL, ER_RPL_BINLOG_SOURCE_SENDS_HEARTBEAT); @@ -995,10 +1060,14 @@ inline int Binlog_sender::wait_with_heartbeat(my_off_t log_pos) { return 0; } -inline int Binlog_sender::wait_without_heartbeat(my_off_t log_pos) { +inline int Binlog_sender::wait_without_heartbeat(MYSQL_BIN_LOG **raw_log, + my_off_t log_pos) { int res = 0; - while (!stop_waiting_for_update(log_pos)) { - res = mysql_bin_log.wait_for_update(); + while (!stop_waiting_for_update(log_pos, *raw_log)) { + res = (*raw_log)->wait_for_update(); + + // Check for log switches, see wait_new_events() + swap_and_relock_log(raw_log); } return res; } @@ -1022,9 +1091,9 @@ int Binlog_sender::check_start_file() { char index_entry_name[FN_REFLEN]; char *name_ptr = nullptr; std::string errmsg; - + auto raw_log = dump_log.get_log(false); if (m_start_file[0] != '\0') { - mysql_bin_log.make_log_name(index_entry_name, m_start_file); + raw_log->make_log_name(index_entry_name, m_start_file); name_ptr = index_entry_name; } else if (m_using_gtid_protocol) { /* @@ -1089,15 +1158,34 @@ int Binlog_sender::check_start_file() { will not find one and an error ER_SOURCE_HAS_PURGED_REQUIRED_GTIDS is thrown from there. */ - if (!gtid_state->get_lost_gtids()->is_subset(m_exclude_gtid)) { - mysql_bin_log.report_missing_purged_gtids(m_exclude_gtid, errmsg); + Gtid_set *lost_gtids = const_cast(gtid_state->get_lost_gtids()); + + /** In raft mode we calculate lost gtids from the binlog/relaylog index + * file instead of using the global state that is always based on the + * apply side binlogs. Apply logs are purged on election so global state + * is currently incorrect wrt raft logs. + * + * TODO: Remove this hack after global gtid state is fixed wrt to raft + * logs + */ + Sid_map gtids_lost_sid_map(nullptr); + Gtid_set gtids_lost(>ids_lost_sid_map); + if (enable_raft_plugin) { global_sid_lock->unlock(); + raw_log->get_lost_gtids(>ids_lost); + lost_gtids = >ids_lost; + } + + if (!lost_gtids->is_subset(m_exclude_gtid)) { + mysql_bin_log.report_missing_purged_gtids(lost_gtids, m_exclude_gtid, + errmsg); + if (!enable_raft_plugin) global_sid_lock->unlock(); set_fatal_error(errmsg.c_str()); return 1; } - global_sid_lock->unlock(); + if (!enable_raft_plugin) global_sid_lock->unlock(); Gtid first_gtid = {0, 0}; - if (mysql_bin_log.find_first_log_not_in_gtid_set( + if (raw_log->find_first_log_not_in_gtid_set( index_entry_name, m_exclude_gtid, &first_gtid, errmsg)) { set_fatal_error(errmsg.c_str()); return 1; @@ -1125,7 +1213,7 @@ int Binlog_sender::check_start_file() { then starts from the first file in index file. */ - if (mysql_bin_log.find_log_pos(&m_linfo, name_ptr, true)) { + if (raw_log->find_log_pos(&m_linfo, name_ptr, true)) { set_fatal_error( "Could not find first log file name in binary log " "index file"); diff --git a/sql/rpl_binlog_sender.h b/sql/rpl_binlog_sender.h index 6f4d46ad2974..5c59b21f10cd 100644 --- a/sql/rpl_binlog_sender.h +++ b/sql/rpl_binlog_sender.h @@ -78,7 +78,7 @@ class Binlog_sender { Checks whether thread should continue awaiting new events @param log_pos Last processed (sent) event id */ - bool stop_waiting_for_update(my_off_t log_pos) const; + bool stop_waiting_for_update(my_off_t log_pos, MYSQL_BIN_LOG *raw_log) const; THD *m_thd; String &m_packet; @@ -422,8 +422,8 @@ class Binlog_sender { @return It returns 0 if succeeds, otherwise 1 is returned. */ int wait_new_events(my_off_t log_pos); - int wait_with_heartbeat(my_off_t log_pos); - int wait_without_heartbeat(my_off_t log_pos); + int wait_with_heartbeat(MYSQL_BIN_LOG **raw_log, my_off_t log_pos); + int wait_without_heartbeat(MYSQL_BIN_LOG **raw_log, my_off_t log_pos); #ifndef NDEBUG /* It is used to count the events that have been sent. */ diff --git a/sql/rpl_handler.cc b/sql/rpl_handler.cc index da4cb11fe729..56c0158fce06 100644 --- a/sql/rpl_handler.cc +++ b/sql/rpl_handler.cc @@ -616,6 +616,13 @@ int Trans_delegate::before_commit(THD *thd, bool all, (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION)); if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS; + if (mysql_bin_log.is_apply_log) + thd->get_trans_relay_log_pos(¶m.log_file, ¶m.log_pos); + else + thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos); + + DBUG_PRINT("enter", + ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos)); int ret = 0; /* After this debug point we mark the transaction as committing in THD. */ @@ -849,7 +856,6 @@ int Trans_delegate::after_commit(THD *thd, bool all) { bool is_real_trans = (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION)); if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS; - thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos); param.server_id = thd->server_id; param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type(); @@ -874,7 +880,10 @@ int Trans_delegate::after_rollback(THD *thd, bool all) { bool is_real_trans = (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION)); if (is_real_trans) param.flags |= TRANS_IS_REAL_TRANS; - thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos); + if (mysql_bin_log.is_apply_log) + thd->get_trans_relay_log_pos(¶m.log_file, ¶m.log_pos); + else + thd->get_trans_fixed_pos(¶m.log_file, ¶m.log_pos); param.server_id = thd->server_id; param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type(); @@ -1408,6 +1417,13 @@ int Raft_replication_delegate::after_commit(THD *thd) { thd->get_trans_marker(¶m.term, ¶m.index); + const char *file = nullptr; + my_off_t pos = 0; + if (mysql_bin_log.is_apply_log) + thd->get_trans_relay_log_pos(&file, &pos); + else + thd->get_trans_fixed_pos(&file, &pos); + int ret = 0; FOREACH_OBSERVER(ret, after_commit, (¶m)); DBUG_RETURN(ret); diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index 6fa218730f68..b79fa31351a0 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -1761,7 +1761,6 @@ int raft_reset_slave(THD *) { mi->inited = false; mysql_mutex_lock(&mi->rli->data_lock); mi->rli->inited = false; - mi->flush_info(true); /** Clear the retrieved gtid set for this channel. */ @@ -1771,7 +1770,7 @@ int raft_reset_slave(THD *) { mysql_mutex_unlock(&mi->rli->data_lock); mysql_mutex_unlock(&mi->data_lock); - + remove_info(mi); // no longer a slave. will be set again during change master is_slave = false; channel_map.unlock(); diff --git a/sql/sql_class.h b/sql/sql_class.h index f3c83ad81ab8..7702556a18fb 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -2000,6 +2000,11 @@ class THD : public MDL_context_owner, NET net; // client connection descriptor String packet; // dynamic buffer for network I/O + /** + * Relay log positions for the transaction + */ + std::pair m_trans_relay_log_pos; + /* The term and index that need to be communicated across different raft * plugin hooks. These fields are not protected by locks since they are * accessed by the same THD serially during different stages of ordered commit @@ -3051,6 +3056,16 @@ class THD : public MDL_context_owner, return; } + void get_trans_relay_log_pos(const char **file_var, my_off_t *pos_var) const { + if (file_var) *file_var = m_trans_relay_log_pos.first.c_str(); + if (pos_var) *pos_var = m_trans_relay_log_pos.second; + } + + void set_trans_relay_log_pos(const std::string &file, my_off_t pos) { + m_trans_relay_log_pos.first = file; + m_trans_relay_log_pos.second = pos; + } + /**@}*/ /* Get the trans marker i.e (term, index) tuple stashed in this THD */ diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index cf65c84f4059..177e51020836 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4514,9 +4514,10 @@ static Sys_var_int32 Sys_regexp_stack_limit( static bool update_rpl_wait_for_semi_sync_ack(sys_var *, THD *, enum_var_type) { if (!rpl_wait_for_semi_sync_ack) { - mysql_bin_log.lock_binlog_end_pos(); - mysql_bin_log.signal_update(); - mysql_bin_log.unlock_binlog_end_pos(); + auto raw_log = dump_log.get_log(/*should_lock*/ true); + raw_log->lock_binlog_end_pos(); + raw_log->signal_update(); + raw_log->unlock_binlog_end_pos(); } return false; }