From 0df8cf86b968b230899f71c2d3a3a7b8b25a949c Mon Sep 17 00:00:00 2001 From: Przemyslaw Skibinski Date: Mon, 23 May 2022 14:03:49 +0200 Subject: [PATCH] FB8-144: Send events to async slaves after ACKed by at least one semi-sync slave (#1006) (#1006) Summary: Jira issue: https://jira.percona.com/browse/FB8-144 Reference Patch: https://github.com/facebook/mysql-5.6/commit/cc4e803 Reference Patch: https://github.com/facebook/mysql-5.6/commit/aad3ce2a54e Reference Patch: https://github.com/facebook/mysql-5.6/commit/b42b911fa15 ---------- https://github.com/facebook/mysql-5.6/commit/cc4e803 ---------- Send events to async slaves after ACKed by at least one semi-sync slave Added a new mysql variable `wait_for_semi_sync_ack` to control sending binlog events to async slaves. When this variables is set, events are sent to async slave only after it is ACKed by at least one semi-sync slave (if any). Originally Reviewed By: Tema ---------- https://github.com/facebook/mysql-5.6/commit/aad3ce2a54e ---------- Fixed rpl_wait_for_semi_sync_ack Following changes are made to fix the feature - Condition variable is registered in dump thread's THD before waiting so that the thread can respond to kill command - rpl_wait_for_semi_sync_ack is respected only when semi-sync master is enabled (i.e rpl_semi_sync_master_enabled = 1) - Added a status variable Rpl_semi_sync_master_ack_waits which counts the number of times we waited for an ACK (useful for benchmarking) Originally Reviewed By: hermanlee ---------- https://github.com/facebook/mysql-5.6/commit/b42b911fa15 ---------- Fix rpl_wait_for_semi_sync_ack feature Fixed the following: 1. Initializing last acked position to what is retrived from engine during server startup. This makes sure that lagging async slaves are able to catchup until the last acked position after master restarts. 2. Resetting last acked posistion when `RESET MASTER` is issued. This makes sure that after the binlogs are reset we wait for acks. 3. Signalling/updating last acked positions only on events that were actually acked by the semi-sync slave (like the Xid event of the last trx in a group commit). This is done by signalling inside of the plugin (ReplSemiSyncMaster::reportReplyBinlog). 4. Signalling/updating on trxs skipped on semi-sync slave connection while searching for first gtid connection Originally Reviewed By: hermanlee -------------------------------------------------------------------- Wrapping last semi-sync acked pos in std::atomic to avoid locking in some scenarios We'll now lock the mutex only if we need to wait for last acked pos to update. Otherwise, we just check the current dump thread pos against the last acked pos atomically and if current is less that last acked we sent the event without locking the mutex. -------------------------------------------------------------------- Pull Request resolved: https://github.com/facebook/mysql-5.6/pull/1006 Differential Revision: D16267709 Pulled By: abhinav04sharma --------------------------------------------------------------------- Fix LLVM codegen for struct st_filenum_pos atomic operations (#1183) Summary: LLVM has an issue where atomic operations on a struct with 32-bit fields are compiled using libatomic library calls instead of direct assembly, as if the whole struct were 32-bit aligned, i.e. its objects could cross machine word boundary: https://bugs.llvm.org/show_bug.cgi?id=45055. Workaround this issue by aligning the first 32-bit field at 64 bits. This allows not linking mysys with libatomic. Pull Request resolved: https://github.com/facebook/mysql-5.6/pull/1183 Reviewed By: abhinav04sharma Differential Revision: D34379183 Pulled By: hermanlee --- mysql-test/r/mysqld--help-notwin.result | 4 + .../r/binlog_persist_only_variables.result | 2 + .../r/binlog_persist_variables.result | 2 + ...rpl_semi_sync_master_error_handling.result | 33 ++ ...semi_sync_master_error_handling-master.opt | 1 + ..._semi_sync_master_error_handling-slave.opt | 1 + .../t/rpl_semi_sync_master_error_handling.cnf | 24 ++ .../rpl_semi_sync_master_error_handling.test | 78 ++++ .../r/rpl_wait_for_semi_sync_ack.result | 166 ++++++++ .../t/rpl_wait_for_semi_sync_ack-master.opt | 1 + .../t/rpl_wait_for_semi_sync_ack-slave.opt | 1 + .../rpl_gtid/t/rpl_wait_for_semi_sync_ack.cnf | 14 + .../t/rpl_wait_for_semi_sync_ack.test | 356 ++++++++++++++++++ .../r/rpl_wait_for_semi_sync_ack_basic.result | 21 ++ .../t/rpl_wait_for_semi_sync_ack_basic.test | 37 ++ plugin/semisync/semisync_replica_plugin.cc | 16 + plugin/semisync/semisync_source.cc | 2 +- plugin/semisync/semisync_source.h | 1 - .../semisync/semisync_source_ack_receiver.h | 1 + plugin/semisync/semisync_source_plugin.cc | 24 +- sql/binlog.cc | 114 ++++++ sql/binlog.h | 46 ++- sql/mysqld.cc | 18 + sql/mysqld.h | 1 + sql/rpl_binlog_sender.cc | 30 +- sql/rpl_binlog_sender.h | 1 + sql/rpl_replica.cc | 2 + sql/rpl_source.cc | 4 +- sql/sql_class.cc | 4 +- sql/sql_class.h | 19 +- sql/sys_vars.cc | 17 + 31 files changed, 1009 insertions(+), 32 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_semi_sync_master_error_handling.result create mode 100644 mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling-master.opt create mode 100644 mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling-slave.opt create mode 100644 mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling.cnf create mode 100644 mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling.test create mode 100644 mysql-test/suite/rpl_gtid/r/rpl_wait_for_semi_sync_ack.result create mode 100644 mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack-master.opt create mode 100644 mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack-slave.opt create mode 100644 mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack.cnf create mode 100644 mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack.test create mode 100644 mysql-test/suite/sys_vars/r/rpl_wait_for_semi_sync_ack_basic.result create mode 100644 mysql-test/suite/sys_vars/t/rpl_wait_for_semi_sync_ack_basic.test diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index 263ff2fd3cff..85ca80eb24c3 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -1958,6 +1958,9 @@ The following options may be given as the first argument: --rpl-stop-slave-timeout=# This option is deprecated. Use rpl_stop_replica_timeout instead. + --rpl-wait-for-semi-sync-ack + Wait for events to be acked by a semi-sync slave before + sending them to the async slaves --safe-user-create Don't allow new user creation by the user who has no write privileges to the mysql.user table. --schema-definition-cache=# @@ -2872,6 +2875,7 @@ rpl-receive-buffer-size 2097152 rpl-send-buffer-size 2097152 rpl-stop-replica-timeout 31536000 rpl-stop-slave-timeout 31536000 +rpl-wait-for-semi-sync-ack FALSE safe-user-create FALSE schema-definition-cache 256 secondary-engine-cost-threshold 100000 diff --git a/mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result b/mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result index 75cbf5f7decb..accdb6b4274c 100644 --- a/mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result +++ b/mysql-test/suite/binlog_nogtid/r/binlog_persist_only_variables.result @@ -188,6 +188,7 @@ SET PERSIST_ONLY rpl_stop_slave_timeout = @@GLOBAL.rpl_stop_slave_timeout; Warnings: Warning 1287 '@@rpl_stop_slave_timeout' is deprecated and will be removed in a future release. Please use rpl_stop_replica_timeout instead. Warning 1287 '@@rpl_stop_slave_timeout' is deprecated and will be removed in a future release. Please use rpl_stop_replica_timeout instead. +SET PERSIST_ONLY rpl_wait_for_semi_sync_ack = @@GLOBAL.rpl_wait_for_semi_sync_ack; SET PERSIST_ONLY session_track_gtids = @@GLOBAL.session_track_gtids; SET PERSIST_ONLY skip_replica_start = @@GLOBAL.skip_replica_start; SET PERSIST_ONLY skip_slave_start = @@GLOBAL.skip_slave_start; @@ -385,6 +386,7 @@ RESET PERSIST rpl_semi_sync_source_wait_point; RESET PERSIST rpl_semi_sync_source_whitelist; RESET PERSIST rpl_send_buffer_size; RESET PERSIST rpl_stop_replica_timeout; +RESET PERSIST rpl_wait_for_semi_sync_ack; RESET PERSIST session_track_gtids; RESET PERSIST skip_replica_start; RESET PERSIST slave_compression_lib; diff --git a/mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result b/mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result index b4f5cb69c418..ab1c6873c0ec 100644 --- a/mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result +++ b/mysql-test/suite/binlog_nogtid/r/binlog_persist_variables.result @@ -168,6 +168,7 @@ SET PERSIST rpl_stop_slave_timeout = @@GLOBAL.rpl_stop_slave_timeout; Warnings: Warning 1287 '@@rpl_stop_slave_timeout' is deprecated and will be removed in a future release. Please use rpl_stop_replica_timeout instead. Warning 1287 '@@rpl_stop_slave_timeout' is deprecated and will be removed in a future release. Please use rpl_stop_replica_timeout instead. +SET PERSIST rpl_wait_for_semi_sync_ack = @@GLOBAL.rpl_wait_for_semi_sync_ack; SET PERSIST session_track_gtids = @@GLOBAL.session_track_gtids; SET PERSIST skip_replica_start = @@GLOBAL.skip_replica_start; ERROR HY000: Variable 'skip_replica_start' is a read only variable @@ -399,6 +400,7 @@ RESET PERSIST IF EXISTS rpl_stop_replica_timeout; RESET PERSIST IF EXISTS rpl_stop_slave_timeout; Warnings: Warning 3615 Variable rpl_stop_slave_timeout does not exist in persisted config file +RESET PERSIST IF EXISTS rpl_wait_for_semi_sync_ack; RESET PERSIST IF EXISTS session_track_gtids; RESET PERSIST IF EXISTS skip_replica_start; Warnings: diff --git a/mysql-test/suite/rpl/r/rpl_semi_sync_master_error_handling.result b/mysql-test/suite/rpl/r/rpl_semi_sync_master_error_handling.result new file mode 100644 index 000000000000..16992e99c5bf --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_semi_sync_master_error_handling.result @@ -0,0 +1,33 @@ +include/rpl_init.inc [topology=1->2,1->3] +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information. +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information. +include/rpl_connect.inc [creating master] +include/rpl_connect.inc [creating master1] +include/rpl_connect.inc [creating semi_sync_slave] +include/rpl_connect.inc [creating async_slave] +call mtr.add_suppression("Read semi-sync reply magic number error."); +call mtr.add_suppression("A message intended for a client cannot be sent there as no client-session is attached"); +"Creating schema" +CREATE TABLE t1(a INT) engine = InnoDB; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +"The semi sync slave will error out before sending ACK" +SET @@GLOBAL.DEBUG= '+d,error_before_semi_sync_reply'; +"Inserting a row on the master" +INSERT INTO t1 VALUES(1); +"Waiting for the semi-sync slave to stop" +include/wait_for_slave_io_to_stop.inc +"Waiting for the async dump thread to wait for ACK" +include/assert.inc [Table in semi-sync slave should be empty.] +include/assert.inc [Table in async slave should be empty.] +"Starting semi-sync slave and cleaning up" +SET @@GLOBAL.DEBUG= '-d,error_before_semi_sync_reply'; +START REPLICA; +DROP TABLE t1; +include/sync_slave_sql_with_master.inc +include/sync_slave_sql_with_master.inc +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling-master.opt b/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling-master.opt new file mode 100644 index 000000000000..962aa8ee1694 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling-master.opt @@ -0,0 +1 @@ +$SEMISYNC_MASTER_PLUGIN_OPT --plugin-load=rpl_semi_sync_master=$SEMISYNC_MASTER_PLUGIN;rpl_semi_sync_slave=$SEMISYNC_SLAVE_PLUGIN diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling-slave.opt b/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling-slave.opt new file mode 100644 index 000000000000..7585c48909a2 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling-slave.opt @@ -0,0 +1 @@ +$SEMISYNC_SLAVE_PLUGIN_OPT $SEMISYNC_SLAVE_PLUGIN_LOAD diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling.cnf b/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling.cnf new file mode 100644 index 000000000000..e873d6ade7a1 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling.cnf @@ -0,0 +1,24 @@ +!include ../my.cnf + +[mysqld.1] +rpl_semi_sync_master_enabled=1 +rpl_semi_sync_master_timeout=86400000 # 1 day +rpl_wait_for_semi_sync_ack=1 +log_slave_updates +gtid_mode=ON +enforce_gtid_consistency=ON + +[mysqld.2] +rpl_semi_sync_slave_enabled=1 +log_slave_updates +gtid_mode=ON +enforce_gtid_consistency=ON + +[mysqld.3] +rpl_semi_sync_slave_enabled=0 +log_slave_updates +gtid_mode=ON +enforce_gtid_consistency=ON + +[ENV] +SERVER_MYPORT_3= @mysqld.3.port diff --git a/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling.test b/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling.test new file mode 100644 index 000000000000..ca416406c290 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_semi_sync_master_error_handling.test @@ -0,0 +1,78 @@ +--source include/have_debug.inc + +--let $rpl_topology= 1->2,1->3 +--source include/rpl_init.inc + +--let $rpl_connection_name= master +--let $rpl_server_number= 1 +--source include/rpl_connect.inc + +--let $rpl_connection_name= master1 +--let $rpl_server_number= 1 +--source include/rpl_connect.inc + +--let $rpl_connection_name= semi_sync_slave +--let $rpl_server_number= 2 +--source include/rpl_connect.inc + +--let $rpl_connection_name= async_slave +--let $rpl_server_number= 3 +--source include/rpl_connect.inc + +--connection master +call mtr.add_suppression("Read semi-sync reply magic number error."); +call mtr.add_suppression("A message intended for a client cannot be sent there as no client-session is attached"); + +--echo "Creating schema" +CREATE TABLE t1(a INT) engine = InnoDB; + +--let $sync_slave_connection= semi_sync_slave +--source include/sync_slave_sql_with_master.inc +--connection master +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + +--echo "The semi sync slave will error out before sending ACK" +--connection semi_sync_slave +SET @@GLOBAL.DEBUG= '+d,error_before_semi_sync_reply'; + +--echo "Inserting a row on the master" +--connection master +send INSERT INTO t1 VALUES(1); + +--echo "Waiting for the semi-sync slave to stop" +--connection semi_sync_slave +--source include/wait_for_slave_io_to_stop.inc + +--echo "Waiting for the async dump thread to wait for ACK" +--connection master1 +--let $wait_condition= select count(*)= 1 from information_schema.processlist where state like '%Waiting for semi-sync ACK from slave%' +--source include/wait_condition.inc + +--connection semi_sync_slave +--let $assert_text= Table in semi-sync slave should be empty. +--let $assert_cond= "[SELECT COUNT(*) FROM t1]" = "0" +--source include/assert.inc + +--connection async_slave +--let $assert_text= Table in async slave should be empty. +--let $assert_cond= "[SELECT COUNT(*) FROM t1]" = "0" +--source include/assert.inc + +--echo "Starting semi-sync slave and cleaning up" +--connection semi_sync_slave +SET @@GLOBAL.DEBUG= '-d,error_before_semi_sync_reply'; +START REPLICA; + +# Cleanup +--connection master +--reap +DROP TABLE t1; + +--let $sync_slave_connection= semi_sync_slave +--source include/sync_slave_sql_with_master.inc +--connection master +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + +--source include/rpl_end.inc diff --git a/mysql-test/suite/rpl_gtid/r/rpl_wait_for_semi_sync_ack.result b/mysql-test/suite/rpl_gtid/r/rpl_wait_for_semi_sync_ack.result new file mode 100644 index 000000000000..f9ac7bf6a887 --- /dev/null +++ b/mysql-test/suite/rpl_gtid/r/rpl_wait_for_semi_sync_ack.result @@ -0,0 +1,166 @@ +include/rpl_init.inc [topology=1->2,1->3] +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information. +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information. +include/rpl_default_connections.inc +include/rpl_connect.inc [creating async_slave] +include/rpl_connect.inc [creating semi_sync_slave] +[connection master] +call mtr.add_suppression("Run function 'wait_for_semi_sync_ack' in plugin 'rpl_semi_sync_master' failed"); +call mtr.add_suppression("Error while waiting for semi-sync ACK on dump thread"); +call mtr.add_suppression("A message intended for a client cannot be sent there as no client-session is attached"); +call mtr.add_suppression("Timeout waiting for reply of binlog"); +call mtr.add_suppression("Slave SQL.*Request to stop slave SQL Thread received while applying a group that has non-transactional changes"); +[connection semi_sync_slave] +set @@global.debug= '+d,before_semi_sync_reply'; +[connection master] +"Store the last acked pos" +create table t1 (a int); +[connection semi_sync_slave] +set debug_sync='now WAIT_FOR semi_sync_reply_reached'; +[connection async_slave] +include/assert.inc [Async Slave: Should not contain any tables] +[connection master1] +"Last acked pos should not move" +include/assert.inc [Last acked pos should not move] +[connection semi_sync_slave] +set debug_sync='now SIGNAL semi_sync_reply_continue'; +[connection master] +include/assert.inc [Master: Should contain t1] +include/sync_slave_sql_with_master.inc +include/assert.inc [Async Slave: Should contain t1] +[connection master] +create table t2(a int); +[connection semi_sync_slave] +set debug_sync='now WAIT_FOR semi_sync_reply_reached'; +[connection master1] +"Switching off rpl_semi_sync_master_enabled while async thread is waiting for ack" +set @@global.rpl_semi_sync_master_enabled = 0; +"Waiting till async slave is caught up" +include/sync_slave_sql_with_master.inc +include/assert.inc [Async Slave: should have t1 and t2] +[connection semi_sync_slave] +set debug_sync='now SIGNAL semi_sync_reply_continue'; +"Switching rpl_semi_sync_master_enabled back on" +[connection master] +set @@global.rpl_semi_sync_master_enabled = 1; +[connection master] +"Waiting till semi-sync slave is caught up" +include/sync_slave_sql_with_master.inc +include/assert.inc [Semi-sync Slave: should have t1 and t2] +[connection master] +create table t3(a int); +[connection semi_sync_slave] +set debug_sync='now WAIT_FOR semi_sync_reply_reached'; +[connection master1] +"Switching off rpl_wait_for_semi_sync_ack while async thread is waiting for ack" +set @@global.rpl_wait_for_semi_sync_ack = 0; +"Waiting till async slave is caught up" +include/sync_slave_sql_with_master.inc +[connection semi_sync_slave] +set debug_sync='now SIGNAL semi_sync_reply_continue'; +"Switching rpl_wait_for_semi_sync_ack back on" +[connection master] +set @@global.rpl_wait_for_semi_sync_ack = 1; +[connection master] +"Waiting till semi-sync slave is caught up" +include/sync_slave_sql_with_master.inc +include/assert.inc [Semi-sync Slave: should have t1, t2 and t3] +[connection semi_sync_slave] +set @@global.debug= '-d,before_semi_sync_reply'; +"Stopping async slave to simulate lag" +[connection async_slave] +include/stop_slave.inc +"Generating traffic on the master" +[connection master] +create table t4(a int); +insert into t4 values(1); +insert into t4 values(2); +flush logs; +insert into t4 values(3); +insert into t4 values(4); +flush logs; +include/sync_slave_sql_with_master.inc +[connection semi_sync_slave] +include/stop_slave_io.inc +[connection master] +"Restarting master" +include/rpl_restart_server.inc [server_number=1] +[connection semi_sync_slave] +include/start_slave_io.inc +"Starting async slave" +[connection async_slave] +include/start_slave.inc +"Waiting till async slave is caught up" +[connection master] +include/sync_slave_sql_with_master.inc +include/assert.inc [Async Slave: t4 should have 4 entries] +[connection semi_sync_slave] +include/stop_slave.inc +[connection async_slave] +include/stop_slave.inc +[connection master] +set @gtid_exec= @@global.gtid_executed; +RESET BINARY LOGS AND GTIDS; +include/assert.inc [Last acked pos should be empty] +set @@global.gtid_purged= @gtid_exec; +purge binary logs to 'binlog'; +[connection semi_sync_slave] +include/start_slave.inc +[connection async_slave] +include/start_slave.inc +[connection semi_sync_slave] +set @@global.debug= '+d,before_semi_sync_reply'; +[connection master] +create table t5 (a int); +[connection semi_sync_slave] +set debug_sync='now WAIT_FOR semi_sync_reply_reached'; +[connection async_slave] +include/assert.inc [Async Slave: Should not contain t5] +[connection semi_sync_slave] +set debug_sync='now SIGNAL semi_sync_reply_continue'; +set @@global.debug= '-d,before_semi_sync_reply'; +[connection master] +include/assert.inc [Master: Should contain t5] +include/sync_slave_sql_with_master.inc +include/assert.inc [Async Slave: Should contain t5] +"Stopping async slave to simulate lag" +[connection async_slave] +include/stop_slave.inc +"Generating traffic on the master" +[connection master] +create table t6 (a int); +insert into t6 values(1); +insert into t6 values(2); +insert into t6 values(3); +insert into t6 values(4); +include/sync_slave_sql_with_master.inc +"Blocking semi-sync slave just before sending an ack" +[connection semi_sync_slave] +set @@global.debug= '+d,before_semi_sync_event'; +[connection master] +insert into t6 values(5) # this transaction will be blocked; +[connection semi_sync_slave] +set debug_sync='now wait_for semi_sync_event_reached'; +"Restarting the semi-sync slave" +[connection master] +include/rpl_stop_server.inc [server_number=3] +include/rpl_start_server.inc [server_number=3] +"Starting async slave" +[connection async_slave] +include/start_slave.inc +[connection master] +[connection semi_sync_slave] +"Waiting till async slave is caught up" +[connection master] +include/sync_slave_sql_with_master.inc +[connection master] +drop table t1, t2, t3, t4, t5, t6; +[connection master] +include/sync_slave_sql_with_master.inc +[connection master] +include/sync_slave_sql_with_master.inc +include/rpl_end.inc diff --git a/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack-master.opt b/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack-master.opt new file mode 100644 index 000000000000..962aa8ee1694 --- /dev/null +++ b/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack-master.opt @@ -0,0 +1 @@ +$SEMISYNC_MASTER_PLUGIN_OPT --plugin-load=rpl_semi_sync_master=$SEMISYNC_MASTER_PLUGIN;rpl_semi_sync_slave=$SEMISYNC_SLAVE_PLUGIN diff --git a/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack-slave.opt b/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack-slave.opt new file mode 100644 index 000000000000..7585c48909a2 --- /dev/null +++ b/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack-slave.opt @@ -0,0 +1 @@ +$SEMISYNC_SLAVE_PLUGIN_OPT $SEMISYNC_SLAVE_PLUGIN_LOAD diff --git a/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack.cnf b/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack.cnf new file mode 100644 index 000000000000..4648c518851c --- /dev/null +++ b/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack.cnf @@ -0,0 +1,14 @@ +!include ../my.cnf + +[mysqld.1] +rpl_semi_sync_master_enabled= 1 +rpl_wait_for_semi_sync_ack= 1 +rpl_semi_sync_master_timeout= 10000000 +rpl_semi_sync_master_wait_no_slave= 1 + +[mysqld.3] +rpl_semi_sync_slave_enabled= 1 +rpl_semi_sync_master_wait_no_slave= 1 + +[ENV] +SERVER_MYPORT_3= @mysqld.3.port diff --git a/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack.test b/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack.test new file mode 100644 index 000000000000..b5a2137267a4 --- /dev/null +++ b/mysql-test/suite/rpl_gtid/t/rpl_wait_for_semi_sync_ack.test @@ -0,0 +1,356 @@ +--source include/have_debug_sync.inc + +--let $rpl_topology= 1->2,1->3 +--source include/rpl_init.inc +--source include/rpl_default_connections.inc + +--let $rpl_connection_name= async_slave +--let $rpl_server_number= 2 +--source include/rpl_connect.inc + +--let $rpl_connection_name= semi_sync_slave +--let $rpl_server_number= 3 +--source include/rpl_connect.inc + + +--source include/rpl_connection_master.inc +call mtr.add_suppression("Run function 'wait_for_semi_sync_ack' in plugin 'rpl_semi_sync_master' failed"); +call mtr.add_suppression("Error while waiting for semi-sync ACK on dump thread"); +call mtr.add_suppression("A message intended for a client cannot be sent there as no client-session is attached"); +call mtr.add_suppression("Timeout waiting for reply of binlog"); +call mtr.add_suppression("Slave SQL.*Request to stop slave SQL Thread received while applying a group that has non-transactional changes"); + + + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set @@global.debug= '+d,before_semi_sync_reply'; + + +# Case 1: Test if rpl_wait_for_semi_sync_ack is working correctly + +--source include/rpl_connection_master.inc +--echo "Store the last acked pos" +--let $last_acked= query_get_value(show status like "Rpl_last_semi_sync_acked_pos", Value, 1) +--send create table t1 (a int) + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now WAIT_FOR semi_sync_reply_reached'; + +--let $rpl_connection_name= async_slave +--source include/rpl_connection.inc + +--let $assert_text= Async Slave: Should not contain any tables +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t1", "t2", "t3", "t4")] = 0 +--source include/assert.inc + +--source include/rpl_connection_master1.inc +--echo "Last acked pos should not move" +--let $last_acked2= query_get_value(show status like "Rpl_last_semi_sync_acked_pos", Value, 1) +--let $assert_text= Last acked pos should not move +--let $assert_cond= "$last_acked" = "$last_acked2" +--source include/assert.inc + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now SIGNAL semi_sync_reply_continue'; + +--source include/rpl_connection_master.inc +--reap +--let $assert_text= Master: Should contain t1 +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t1", "t2", "t3", "t4")] = 1 +--source include/assert.inc + +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + +--let $assert_text= Async Slave: Should contain t1 +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t1", "t2", "t3", "t4")] = 1 +--source include/assert.inc + + + +# Case 2: Dump thread should stop waiting if rpl_semi_sync_master_enabled is disabled + +--source include/rpl_connection_master.inc +--send create table t2(a int) + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now WAIT_FOR semi_sync_reply_reached'; + +--source include/rpl_connection_master1.inc +--let $wait_condition= select count(*)= 1 from information_schema.processlist where state like 'Waiting for semi-sync ACK from slave' +--source include/wait_condition.inc + +--echo "Switching off rpl_semi_sync_master_enabled while async thread is waiting for ack" +set @@global.rpl_semi_sync_master_enabled = 0; + +--echo "Waiting till async slave is caught up" +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + +--let $assert_text= Async Slave: should have t1 and t2 +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t1", "t2", "t3", "t4")] = 2 +--source include/assert.inc + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now SIGNAL semi_sync_reply_continue'; + +--echo "Switching rpl_semi_sync_master_enabled back on" +--source include/rpl_connection_master.inc +--reap +set @@global.rpl_semi_sync_master_enabled = 1; + +--source include/rpl_connection_master.inc +--echo "Waiting till semi-sync slave is caught up" +--let $sync_slave_connection= semi_sync_slave +--source include/sync_slave_sql_with_master.inc + +--let $assert_text= Semi-sync Slave: should have t1 and t2 +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t1", "t2", "t3", "t4")] = 2 +--source include/assert.inc + + + +# Case 3: Dump thread should stop waiting if rpl_wait_for_semi_sync_ack is disabled + +--source include/rpl_connection_master.inc +--send create table t3(a int) + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now WAIT_FOR semi_sync_reply_reached'; + +--source include/rpl_connection_master1.inc +--let $wait_condition= select count(*)= 1 from information_schema.processlist where state like 'Waiting for semi-sync ACK from slave' +--source include/wait_condition.inc + +--echo "Switching off rpl_wait_for_semi_sync_ack while async thread is waiting for ack" +set @@global.rpl_wait_for_semi_sync_ack = 0; + +--echo "Waiting till async slave is caught up" +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + +--let $wait_condition= select count(*)= 3 from information_schema.tables where table_schema="test" and table_name in ("t1", "t2", "t3", "t4") +--source include/wait_condition.inc + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now SIGNAL semi_sync_reply_continue'; + +--echo "Switching rpl_wait_for_semi_sync_ack back on" +--source include/rpl_connection_master.inc +--reap +set @@global.rpl_wait_for_semi_sync_ack = 1; + +--source include/rpl_connection_master.inc +--echo "Waiting till semi-sync slave is caught up" +--let $sync_slave_connection= semi_sync_slave +--source include/sync_slave_sql_with_master.inc + +--let $assert_text= Semi-sync Slave: should have t1, t2 and t3 +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t1", "t2", "t3", "t4")] = 3 +--source include/assert.inc + + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set @@global.debug= '-d,before_semi_sync_reply'; + + + +# Case 4: Lagging async slave + master restart + +--echo "Stopping async slave to simulate lag" +--let $rpl_connection_name= async_slave +--source include/rpl_connection.inc +--source include/stop_slave.inc + +--echo "Generating traffic on the master" +--source include/rpl_connection_master.inc +create table t4(a int); +insert into t4 values(1); +insert into t4 values(2); +flush logs; +insert into t4 values(3); +insert into t4 values(4); +flush logs; + +--let $sync_slave_connection= semi_sync_slave +--source include/sync_slave_sql_with_master.inc + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +--source include/stop_slave_io.inc + +--source include/rpl_connection_master.inc +--echo "Restarting master" +--let $rpl_server_number=1 +--source include/rpl_restart_server.inc + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +--source include/start_slave_io.inc + +--echo "Starting async slave" +--let $rpl_connection_name= async_slave +--source include/rpl_connection.inc +--source include/start_slave.inc + +--echo "Waiting till async slave is caught up" +--source include/rpl_connection_master.inc +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + +--let $assert_text= Async Slave: t4 should have 4 entries +--let $assert_cond= [select count(*) from t4] = 4 +--source include/assert.inc + + + +# Case 5: Should wait for ACK after RESET BINARY LOGS AND GTIDS + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +--source include/stop_slave.inc + +--let $rpl_connection_name= async_slave +--source include/rpl_connection.inc +--source include/stop_slave.inc + +--source include/rpl_connection_master.inc +set @gtid_exec= @@global.gtid_executed; +RESET BINARY LOGS AND GTIDS; + +--let $last_acked= query_get_value(show status like "Rpl_last_semi_sync_acked_pos", Value, 1) +--let $assert_text= Last acked pos should be empty +--let $assert_cond= "$last_acked" = "master-bin.000001:0" +--source include/assert.inc + +set @@global.gtid_purged= @gtid_exec; +--let $binlog= query_get_value(SHOW MASTER STATUS, File, 1) +--replace_result $binlog binlog +--eval purge binary logs to '$binlog' + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +--source include/start_slave.inc + +--let $rpl_connection_name= async_slave +--source include/rpl_connection.inc +--source include/start_slave.inc + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set @@global.debug= '+d,before_semi_sync_reply'; + +--source include/rpl_connection_master.inc +--send create table t5 (a int) + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now WAIT_FOR semi_sync_reply_reached'; + +--let $rpl_connection_name= async_slave +--source include/rpl_connection.inc +--let $assert_text= Async Slave: Should not contain t5 +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t5")] = 0 +--source include/assert.inc + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now SIGNAL semi_sync_reply_continue'; +set @@global.debug= '-d,before_semi_sync_reply'; + +--source include/rpl_connection_master.inc +--reap +--let $assert_text= Master: Should contain t5 +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t5")] = 1 +--source include/assert.inc + +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + +--let $assert_text= Async Slave: Should contain t5 +--let $assert_cond= [select count(*) from information_schema.tables where table_schema="test" and table_name in ("t5")] = 1 +--source include/assert.inc + + + +# Case 6: We should update last_acked on skipped transactions + +--echo "Stopping async slave to simulate lag" +--let $rpl_connection_name= async_slave +--source include/rpl_connection.inc +--source include/stop_slave.inc + +--echo "Generating traffic on the master" +--source include/rpl_connection_master.inc +create table t6 (a int); +insert into t6 values(1); +insert into t6 values(2); +insert into t6 values(3); +insert into t6 values(4); + +--let $sync_slave_connection= semi_sync_slave +--source include/sync_slave_sql_with_master.inc + +--echo "Blocking semi-sync slave just before sending an ack" +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set @@global.debug= '+d,before_semi_sync_event'; + +--source include/rpl_connection_master.inc +--send insert into t6 values(5) # this transaction will be blocked + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +set debug_sync='now wait_for semi_sync_event_reached'; + +--echo "Restarting the semi-sync slave" +--source include/rpl_connection_master.inc +--let $rpl_server_number= 3 +--let $rpl_force_stop= 1 +--source include/rpl_stop_server.inc +--source include/rpl_start_server.inc + +--echo "Starting async slave" +--let $rpl_connection_name= async_slave +--source include/rpl_connection.inc +--source include/start_slave.inc + +--source include/rpl_connection_master.inc +--reap + +--let $rpl_connection_name= semi_sync_slave +--source include/rpl_connection.inc +--enable_reconnect +--source include/wait_until_connected_again.inc +--disable_reconnect + +--echo "Waiting till async slave is caught up" +# The async slave will catchup only if we signal for skipped trxs on semi-sync +# connection +--source include/rpl_connection_master.inc +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + + + +# Cleanup +--source include/rpl_connection_master.inc +drop table t1, t2, t3, t4, t5, t6; + +--source include/rpl_connection_master.inc +--let $sync_slave_connection= async_slave +--source include/sync_slave_sql_with_master.inc + +--source include/rpl_connection_master.inc +--let $sync_slave_connection= semi_sync_slave +--source include/sync_slave_sql_with_master.inc + +--source include/rpl_end.inc diff --git a/mysql-test/suite/sys_vars/r/rpl_wait_for_semi_sync_ack_basic.result b/mysql-test/suite/sys_vars/r/rpl_wait_for_semi_sync_ack_basic.result new file mode 100644 index 000000000000..bfa03f7ef659 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/rpl_wait_for_semi_sync_ack_basic.result @@ -0,0 +1,21 @@ +set @save.rpl_wait_for_semi_sync_ack= @@global.rpl_wait_for_semi_sync_ack; +select @@session.rpl_wait_for_semi_sync_ack; +ERROR HY000: Variable 'rpl_wait_for_semi_sync_ack' is a GLOBAL variable +select variable_name from performance_schema.global_variables where variable_name='$var'; +variable_name +select variable_name from performance_schema.session_variables where variable_name='$var'; +variable_name +set @@global.rpl_wait_for_semi_sync_ack= false; +select @@global.rpl_wait_for_semi_sync_ack; +@@global.rpl_wait_for_semi_sync_ack +0 +set @@global.rpl_wait_for_semi_sync_ack= 1.1; +ERROR 42000: Incorrect argument type to variable 'rpl_wait_for_semi_sync_ack' +set @@global.rpl_wait_for_semi_sync_ack= "foo"; +ERROR 42000: Variable 'rpl_wait_for_semi_sync_ack' can't be set to the value of 'foo' +set @@global.rpl_wait_for_semi_sync_ack= false; +set @@global.rpl_wait_for_semi_sync_ack= true; +select @@global.rpl_wait_for_semi_sync_ack as "truncated to the maximum"; +truncated to the maximum +1 +set @@global.rpl_wait_for_semi_sync_ack= @save.rpl_wait_for_semi_sync_ack; diff --git a/mysql-test/suite/sys_vars/t/rpl_wait_for_semi_sync_ack_basic.test b/mysql-test/suite/sys_vars/t/rpl_wait_for_semi_sync_ack_basic.test new file mode 100644 index 000000000000..039b8bfa0dd7 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/rpl_wait_for_semi_sync_ack_basic.test @@ -0,0 +1,37 @@ +let $var= rpl_wait_for_semi_sync_ack; +eval set @save.$var= @@global.$var; + +# +# exists as global only +# +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +eval select @@session.$var; + +select variable_name from performance_schema.global_variables where variable_name='$var'; +select variable_name from performance_schema.session_variables where variable_name='$var'; + +# +# show that it's writable +# +let $value= false; +eval set @@global.$var= $value; +eval select @@global.$var; + +# +# incorrect value +# +--error ER_WRONG_TYPE_FOR_VAR +eval set @@global.$var= 1.1; +--error ER_WRONG_VALUE_FOR_VAR +eval set @@global.$var= "foo"; + +# +# min/max values +# +eval set @@global.$var= false; +eval set @@global.$var= true; +eval select @@global.$var as "truncated to the maximum"; + +# cleanup + +eval set @@global.$var= @save.$var; diff --git a/plugin/semisync/semisync_replica_plugin.cc b/plugin/semisync/semisync_replica_plugin.cc index 14b3ba57e3cd..1829ce8cbdec 100644 --- a/plugin/semisync/semisync_replica_plugin.cc +++ b/plugin/semisync/semisync_replica_plugin.cc @@ -163,7 +163,23 @@ static int repl_semi_slave_read_event(Binlog_relay_IO_param *, static int repl_semi_slave_queue_event(Binlog_relay_IO_param *param, const char *, unsigned long, uint32) { + DBUG_EXECUTE_IF("before_semi_sync_event", { + static constexpr char act[] = + "now SIGNAL semi_sync_event_reached " + "WAIT_FOR semi_sync_event_continue "; + assert(opt_debug_sync_timeout > 0); + assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); + }); + if (rpl_semi_sync_replica_status && semi_sync_need_reply) { + DBUG_EXECUTE_IF("before_semi_sync_reply", { + static constexpr char act[] = + "now SIGNAL semi_sync_reply_reached " + "WAIT_FOR semi_sync_reply_continue "; + assert(opt_debug_sync_timeout > 0); + assert(!debug_sync_set_action(current_thd, STRING_WITH_LEN(act))); + }); + DBUG_EXECUTE_IF("dont_send_semi_sync_reply", { return 0; }); /* We deliberately ignore the error in slaveReply, such error diff --git a/plugin/semisync/semisync_source.cc b/plugin/semisync/semisync_source.cc index dc88ad4a6762..281d71b1147f 100644 --- a/plugin/semisync/semisync_source.cc +++ b/plugin/semisync/semisync_source.cc @@ -30,6 +30,7 @@ #include "my_byteorder.h" #include "my_compiler.h" #include "my_systime.h" +#include "sql/binlog.h" // rpl_semi_sync_source_enabled #include "sql/current_thd.h" #include "sql/item_func.h" // user_var_entry #include "sql/mysqld.h" // max_connections @@ -46,7 +47,6 @@ char *rpl_semi_sync_master_whitelist = nullptr; /* protected by Ack_receiver::m_mutex */ bool rpl_semi_sync_source_crash_if_active_trxs; -bool rpl_semi_sync_source_enabled; unsigned long rpl_semi_sync_source_timeout; unsigned long rpl_semi_sync_source_trace_level; char rpl_semi_sync_source_status = 0; diff --git a/plugin/semisync/semisync_source.h b/plugin/semisync/semisync_source.h index 59c2bf259041..ebdc63b740b6 100644 --- a/plugin/semisync/semisync_source.h +++ b/plugin/semisync/semisync_source.h @@ -837,7 +837,6 @@ class ReplSemiSyncMaster : public ReplSemiSyncBase { /* System and status variables for the master component */ extern bool rpl_semi_sync_source_crash_if_active_trxs; -extern bool rpl_semi_sync_source_enabled; extern char rpl_semi_sync_source_status; extern unsigned long rpl_semi_sync_source_clients; extern unsigned long rpl_semi_sync_source_timeout; diff --git a/plugin/semisync/semisync_source_ack_receiver.h b/plugin/semisync/semisync_source_ack_receiver.h index e6a3db9c753c..85251386902a 100644 --- a/plugin/semisync/semisync_source_ack_receiver.h +++ b/plugin/semisync/semisync_source_ack_receiver.h @@ -31,6 +31,7 @@ #include "my_thread.h" #include "plugin/semisync/semisync.h" #include "plugin/semisync/semisync_source.h" +#include "sql/binlog.h" // rpl_semi_sync_source_enabled #include "sql/sql_class.h" struct Slave { diff --git a/plugin/semisync/semisync_source_plugin.cc b/plugin/semisync/semisync_source_plugin.cc index 930ab82dcdb0..313ec72caeef 100644 --- a/plugin/semisync/semisync_source_plugin.cc +++ b/plugin/semisync/semisync_source_plugin.cc @@ -34,6 +34,7 @@ #include "plugin/semisync/semisync_source_ack_receiver.h" #include "sql/current_thd.h" #include "sql/derror.h" // ER_THD +#include "sql/mysqld.h" #include "sql/protocol_classic.h" #include "sql/raii/sentry.h" // raii::Sentry #include "sql/sql_class.h" // THD @@ -207,6 +208,8 @@ static int repl_semi_after_send_event(Binlog_transmit_param *param, const char *event_buf, unsigned long, const char *skipped_log_file, my_off_t skipped_log_pos) { + int ret = 0; + if (is_semi_sync_dump()) { if (skipped_log_pos > 0) repl_semisync->skipSlaveReply(event_buf, param->server_id, @@ -214,16 +217,19 @@ static int repl_semi_after_send_event(Binlog_transmit_param *param, else { THD *thd = current_thd; /* - Possible errors in reading slave reply are ignored deliberately - because we do not want dump thread to quit on this. Error - messages are already reported. + Unless waiting for ACKs is enabled, possible errors in reading slave + reply are ignored deliberately because we do not want dump thread to + quit on this. Error messages are already reported. */ - (void)repl_semisync->readSlaveReply( + int err = repl_semisync->readSlaveReply( thd->get_protocol_classic()->get_net(), event_buf); - thd->clear_error(); + if (err && rpl_wait_for_semi_sync_ack) + ret = 1; + else + thd->clear_error(); } } - return 0; + return ret; } static int repl_semi_reset_master(Binlog_transmit_param *) { @@ -427,6 +433,12 @@ static void fix_rpl_semi_sync_source_enabled(MYSQL_THD, SYS_VAR *, void *ptr, ack_receiver->stop(); } + if (!rpl_semi_sync_source_enabled) { + mysql_bin_log.lock_binlog_end_pos(); + mysql_bin_log.signal_update(); + mysql_bin_log.unlock_binlog_end_pos(); + } + return; } diff --git a/sql/binlog.cc b/sql/binlog.cc index 0bc420f7f6e1..e1aef7f53634 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -119,6 +119,7 @@ #include "sql/psi_memory_resource.h" #include "sql/query_options.h" #include "sql/raii/sentry.h" // raii::Sentry<> +#include "sql/replication.h" #include "sql/rpl_filter.h" #include "sql/rpl_gtid.h" #include "sql/rpl_handler.h" // RUN_HOOK @@ -192,6 +193,7 @@ const char *log_bin_basename = nullptr; /* Size for IO_CACHE buffer for binlog & relay log */ ulong rpl_read_size; +bool rpl_semi_sync_source_enabled = false; MYSQL_BIN_LOG mysql_bin_log(&sync_binlog_period); @@ -9023,6 +9025,10 @@ void MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first) { Thd_backup_and_restore switch_thd(thd, head); bool all = head->get_transaction()->m_flags.real_commit; (void)RUN_HOOK(transaction, after_commit, (head, all)); + + my_off_t pos; + head->get_trans_pos(nullptr, &pos, nullptr); + signal_semi_sync_ack(head->get_trans_fixed_log_path(), pos); /* When after_commit finished for the transaction, clear the run_hooks flag. This allow other parts of the system to check if after_commit was @@ -9587,6 +9593,25 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) { return thd->commit_error == THD::CE_COMMIT_ERROR; } +// Given a file name of the form 'binlog-file-name.index', it extracts the +// and and returns it as a pair +// Example: +// master-bin-3306.0001 ==> Returns (master-bin-3306, 1) +// master-bin-3306.9999 ==> Returns (master-bin-3306, 9999) +static std::pair extract_file_index( + const std::string &file_name) { + char *end; + size_t pos = file_name.find_last_of('.'); + if (pos == string::npos) { + assert(0); // never should happened + return std::make_pair(file_name, 1); + } + std::string prefix = file_name.substr(0, pos); + uint index = std::strtoul(file_name.substr(pos + 1).c_str(), &end, 10); + + return std::make_pair(std::move(prefix), index); +} + void MYSQL_BIN_LOG::report_missing_purged_gtids( const Gtid_set *slave_executed_gtid_set, std::string &errmsg) { DBUG_TRACE; @@ -9739,6 +9764,95 @@ inline void MYSQL_BIN_LOG::update_binlog_end_pos(const char *file, unlock_binlog_end_pos(); } +my_off_t MYSQL_BIN_LOG::get_binlog_end_pos() const { + mysql_mutex_assert_not_owner(&LOCK_log); + return atomic_binlog_end_pos; +} + +/* wait_for_ack can be modified by this function */ +my_off_t MYSQL_BIN_LOG::get_last_acked_pos(bool *wait_for_ack, + const char *sender_log_name) { + *wait_for_ack = *wait_for_ack && rpl_wait_for_semi_sync_ack && + rpl_semi_sync_source_enabled; + + if (!*wait_for_ack) return atomic_binlog_end_pos; + + const char *file_name = sender_log_name + dirname_length(sender_log_name); + const uint file_num = extract_file_index(file_name).second; + + // get a copy of last acked pos atomically + const st_filenum_pos local_last_acked = last_acked.load(); + + const int res = local_last_acked.file_num - file_num; + const my_off_t last_acked_pos = local_last_acked.pos; + + if (res == 0) return last_acked_pos; + if (res < 0) return 0; // wait for ack + + *wait_for_ack = false; + return atomic_binlog_end_pos; +} + +void MYSQL_BIN_LOG::signal_semi_sync_ack(const char *const log_file, + const my_off_t log_pos) { + if (!log_file || !log_file[0]) return; + + const char *file_name = log_file + dirname_length(log_file); + const st_filenum_pos acked = { + extract_file_index(file_name).second, + // NOTE: If the acked pos cannot fit in st_filenum_pos::pos then we store + // uint_max, this way we'll never send unacked trxs because the last acked + // pos will be stuck at position uint_max in the current binlog file until + // a rotation happens. This can only happen when a very large trx is + // written to the binlog, max_binlog_size is capped at 1G but that's a + // soft limit as one could still write more that 1G of binlogs in a single + // trx, so when we hit this the log will immediately be rotated and things + // will be back to normal. + static_cast(std::min(st_filenum_pos::max_pos, log_pos))}; + + // case: nothing to update so no signal needed, let's exit + if (acked <= last_acked.load()) { + return; + } + + lock_binlog_end_pos(); + if (acked > last_acked.load()) { + last_acked = acked; + // Without the following line there are fails for: + // rpl.rpl_seconds_behind_master rpl.rpl_heartbeat_version + // rpl.rpl_relay_status rpl.rpl_seconds_behind_master_mts + // rpl.rpl_binlog_sender_packet_shrink + if (rpl_wait_for_semi_sync_ack && rpl_semi_sync_source_enabled) + signal_update(); + } + unlock_binlog_end_pos(); +} + +void MYSQL_BIN_LOG::reset_semi_sync_last_acked() { + lock_binlog_end_pos(); + /* binary log is rotated and all trxs in previous binlog are already committed + * to the storage engine */ + if (strlen(log_file_name)) { + last_acked = {extract_file_index(log_file_name).second, 0}; + } else { + last_acked = {0, 0}; + } + signal_update(); + unlock_binlog_end_pos(); +} + +void MYSQL_BIN_LOG::get_semi_sync_last_acked(std::string &log_file, + my_off_t &log_pos) { + const st_filenum_pos local_last_acked = last_acked.load(); + if (local_last_acked.file_num) { + char full_name[FN_REFLEN + 1]; + snprintf(full_name, FN_REFLEN, "%s.%06u", opt_bin_logname, + local_last_acked.file_num); + log_file = std::string(full_name); + } + log_pos = local_last_acked.pos; +} + bool THD::is_binlog_cache_empty(bool is_transactional) const { DBUG_TRACE; diff --git a/sql/binlog.h b/sql/binlog.h index 110a31a459c4..973ddfaaabfc 100644 --- a/sql/binlog.h +++ b/sql/binlog.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include @@ -189,6 +190,37 @@ class HybridLogicalClock { std::atomic current_; }; +struct st_filenum_pos { + // Without alignas, LLVM emits library calls for atomic st_filenum_pos + // operations - https://bugs.llvm.org/show_bug.cgi?id=45055 + alignas(sizeof(uint64_t)) uint file_num = 0; + uint pos = 0; + + static const uint max_pos = std::numeric_limits::max(); + + st_filenum_pos() = default; + + st_filenum_pos(uint file_num, uint pos) { + this->file_num = file_num; + this->pos = pos; + } + + int cmp(const st_filenum_pos &other) const { + if (file_num == other.file_num && pos == other.pos) return 0; + if (file_num == other.file_num) return pos < other.pos ? -1 : 1; + return file_num < other.file_num ? -1 : 1; + } + + bool operator==(const st_filenum_pos &other) const { return cmp(other) == 0; } + bool operator<(const st_filenum_pos &other) const { return cmp(other) < 0; } + bool operator>(const st_filenum_pos &other) const { return cmp(other) > 0; } + bool operator<=(const st_filenum_pos &other) const { return cmp(other) <= 0; } + bool operator>=(const st_filenum_pos &other) const { return cmp(other) >= 0; } +}; + +static_assert(sizeof(st_filenum_pos) <= 8, + "st_filenum_pos must fit into a single word to support atomics"); + /* TODO use mmap instead of IO_CACHE for binlog (mmap+fsync is two times faster than write+fsync) @@ -285,6 +317,7 @@ class MYSQL_BIN_LOG : public TC_LOG { // mutex. char binlog_file_name[FN_REFLEN]; int binlog_encrypted_header_size; + std::atomic last_acked; ulonglong bytes_written; IO_CACHE index_file; @@ -398,7 +431,7 @@ class MYSQL_BIN_LOG : public TC_LOG { */ bool read_binlog_in_use_flag(Binlog_file_reader &binlog_file_reader); - protected: + public: /** @brief Notifies waiting threads that binary log has been updated */ @@ -1139,10 +1172,12 @@ class MYSQL_BIN_LOG : public TC_LOG { It is called by the threads (e.g. dump thread, applier thread) which want to read hot log without LOCK_log protection. */ - my_off_t get_binlog_end_pos() const { - mysql_mutex_assert_not_owner(&LOCK_log); - return atomic_binlog_end_pos; - } + my_off_t get_binlog_end_pos() const; + my_off_t get_last_acked_pos(bool *wait_for_ack, const char *sender_log_name); + void signal_semi_sync_ack(const char *const log_file, const my_off_t log_pos); + void reset_semi_sync_last_acked(); + void get_semi_sync_last_acked(std::string &log_file, my_off_t &log_pos); + mysql_mutex_t *get_binlog_end_pos_lock() { return &LOCK_binlog_end_pos; } void lock_binlog_end_pos() { mysql_mutex_lock(&LOCK_binlog_end_pos); } void unlock_binlog_end_pos() { mysql_mutex_unlock(&LOCK_binlog_end_pos); } @@ -1291,6 +1326,7 @@ extern const char *log_bin_index; extern const char *log_bin_basename; extern bool opt_binlog_order_commits; extern ulong rpl_read_size; +extern bool rpl_semi_sync_source_enabled; /** Turns a relative log binary log path into a full path, based on the opt_bin_logname or opt_relay_logname. Also trims the cr-lf at the diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 541b23ae1295..72a78f2aba97 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -1260,6 +1260,7 @@ bool opt_log_replica_updates = false; char *opt_replica_skip_errors; bool opt_replica_allow_batching = true; ulonglong opt_slave_dump_thread_wait_sleep_usec = 0; +bool rpl_wait_for_semi_sync_ack = false; bool skip_flush_master_info = false; bool skip_flush_relay_worker_info = false; @@ -8989,6 +8990,8 @@ static int init_server_components() { rpl_source_io_monitor = new Source_IO_monitor(); udf_load_service.init(); + mysql_bin_log.reset_semi_sync_last_acked(); + /* Initialize the optimizer cost module */ init_optimizer_cost_module(true); ft_init_stopwords(); @@ -11744,6 +11747,19 @@ static int show_tls_library_version(THD *, SHOW_VAR *var, char *buff) { return 0; } +static int show_last_acked_binlog_pos(THD *, SHOW_VAR *var, char *buff) { + var->type = SHOW_UNDEF; + if (rpl_semi_sync_source_enabled && rpl_wait_for_semi_sync_ack) { + std::string log_file; + my_off_t log_pos; + mysql_bin_log.get_semi_sync_last_acked(log_file, log_pos); + var->type = SHOW_CHAR; + var->value = buff; + sprintf(buff, "%s:%llu", log_file.c_str(), log_pos); + } + return 0; +} + static int show_resource_group_support(THD *, SHOW_VAR *var, char *buf) { var->type = SHOW_BOOL; var->value = buf; @@ -12030,6 +12046,8 @@ SHOW_VAR status_vars[] = { SHOW_LONGLONG_STATUS, SHOW_SCOPE_GLOBAL}, {"Rows_sent", (char *)offsetof(System_status_var, rows_sent), SHOW_LONGLONG_STATUS, SHOW_SCOPE_GLOBAL}, + {"Rpl_last_semi_sync_acked_pos", (char *)&show_last_acked_binlog_pos, + SHOW_FUNC, SHOW_SCOPE_GLOBAL}, {"Secondary_engine_execution_count", (char *)offsetof(System_status_var, secondary_engine_execution_count), SHOW_LONGLONG_STATUS, SHOW_SCOPE_ALL}, diff --git a/sql/mysqld.h b/sql/mysqld.h index b977ac46c70f..366fc210fb2a 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -192,6 +192,7 @@ extern bool opt_replica_compressed_protocol; extern ulong opt_replica_compression_lib; extern ulong replica_exec_mode_options; extern ulonglong opt_slave_dump_thread_wait_sleep_usec; +extern bool rpl_wait_for_semi_sync_ack; extern Rpl_global_filter rpl_global_filter; extern Rpl_acf_configuration_handler *rpl_acf_configuration_handler; extern Source_IO_monitor *rpl_source_io_monitor; diff --git a/sql/rpl_binlog_sender.cc b/sql/rpl_binlog_sender.cc index 0e524fa628b2..c6cadaca5762 100644 --- a/sql/rpl_binlog_sender.cc +++ b/sql/rpl_binlog_sender.cc @@ -408,11 +408,19 @@ void Binlog_sender::cleanup() { my_eof(thd); } +static bool is_semi_sync_slave() { + long long val = 0; + get_user_var_int("rpl_semi_sync_slave", &val, nullptr); + return val; +} + void Binlog_sender::run() { DBUG_TRACE; init(); + m_is_semi_sync_slave = is_semi_sync_slave(); + unsigned int max_event_size = std::max(m_thd->variables.max_allowed_packet, binlog_row_event_max_size + MAX_LOG_EVENT_HEADER); @@ -589,14 +597,17 @@ std::pair Binlog_sender::get_binlog_end_pos( if (unlikely(wait_new_events(read_pos))) return result; } - result.first = mysql_bin_log.get_binlog_end_pos(); + bool wait_for_ack = !m_is_semi_sync_slave; + result.first = + mysql_bin_log.get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name); DBUG_PRINT("info", ("Reading file %s, seek pos %llu, end_pos is %llu", m_linfo.log_file_name, read_pos, result.first)); DBUG_PRINT("info", ("Active file is %s", mysql_bin_log.get_log_fname())); /* If this is a cold binlog file, we are done getting the end pos */ - if (unlikely(!mysql_bin_log.is_active(m_linfo.log_file_name))) { + if (unlikely(!wait_for_ack && + !mysql_bin_log.is_active(m_linfo.log_file_name))) { return std::make_pair(0, 0); } if (read_pos < result.first) { @@ -860,8 +871,11 @@ int Binlog_sender::wait_new_events(my_off_t log_pos) { } bool Binlog_sender::stop_waiting_for_update(my_off_t log_pos) const { - if (mysql_bin_log.get_binlog_end_pos() > log_pos || - !mysql_bin_log.is_active(m_linfo.log_file_name) || m_thd->killed) { + bool wait_for_ack = !m_is_semi_sync_slave; + if (mysql_bin_log.get_last_acked_pos(&wait_for_ack, m_linfo.log_file_name) > + log_pos || + (!wait_for_ack && !mysql_bin_log.is_active(m_linfo.log_file_name)) || + m_thd->killed) { return true; } return false; @@ -1603,12 +1617,6 @@ void Binlog_sender::calc_shrink_buffer_size(size_t current_size) { m_new_shrink_size = ALIGN_SIZE(new_size); } -static bool is_semi_sync_slave() { - long long val = 0; - get_user_var_int("rpl_semi_sync_slave", &val, nullptr); - return val; -} - void Binlog_sender::processlist_slave_offset(const char *log_file_name, my_off_t log_pos) { DBUG_ENTER("processlist_show_binlog_state"); @@ -1622,7 +1630,7 @@ void Binlog_sender::processlist_slave_offset(const char *log_file_name, } int len = snprintf(m_state_msg, m_state_msg_len, "%s slave offset: %s %lld", - is_semi_sync_slave() ? "Semisync" : "Async", + m_is_semi_sync_slave ? "Semisync" : "Async", log_file_name + dirname_length(log_file_name), (long long int)log_pos); diff --git a/sql/rpl_binlog_sender.h b/sql/rpl_binlog_sender.h index 25e66acbe71b..7ff8bb249da1 100644 --- a/sql/rpl_binlog_sender.h +++ b/sql/rpl_binlog_sender.h @@ -207,6 +207,7 @@ class Binlog_sender { LEX_CSTRING m_orig_query; /* The number of times to skip calls to processlist_slave_offset */ int m_skip_state_update = 0; + bool m_is_semi_sync_slave{false}; /* It initializes the context, checks if the dump request is valid and diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index 2f4c186cc1c9..a2de59949af2 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -5685,6 +5685,8 @@ extern "C" void *handle_slave_io(void *arg) { { rpl_replica_debug_point(DBUG_RPL_S_PS_TABLES); };); } #endif + DBUG_EXECUTE_IF("error_before_semi_sync_reply", goto err;); + if (RUN_HOOK(binlog_relay_io, after_queue_event, (thd, mi, event_buf, event_len, synced))) { mi->report(ERROR_LEVEL, ER_REPLICA_FATAL_ERROR, diff --git a/sql/rpl_source.cc b/sql/rpl_source.cc index 74143d4d10f1..b66a1d9a4ddd 100644 --- a/sql/rpl_source.cc +++ b/sql/rpl_source.cc @@ -1286,8 +1286,10 @@ bool reset_binary_logs_and_gtids(THD *thd, bool unlock_global_read_lock) { Only run after_reset_master hook, when all reset operations preceding this have succeeded. */ - if (!ret) + if (!ret) { (void)RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); + mysql_bin_log.reset_semi_sync_last_acked(); + } return ret; } diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 550ae8035dc0..8a7f10a848ce 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -669,6 +669,7 @@ THD::THD(bool enable_plugins) binlog_accessed_db_names(nullptr), m_trans_log_file(nullptr), m_trans_fixed_log_file(nullptr), + m_trans_fixed_log_path(nullptr), m_trans_end_pos(0), m_trans_gtid(NULL), m_transaction(new Transaction_ctx()), @@ -1935,7 +1936,8 @@ void THD::cleanup_after_query() { In case of stored procedures, stored functions, triggers and events m_trans_fixed_log_file will not be set to NULL. The memory will be reused. */ - if (!sp_runtime_ctx) m_trans_fixed_log_file = nullptr; + if (!sp_runtime_ctx) + m_trans_fixed_log_file = m_trans_fixed_log_path = nullptr; /* Forget the binlog stmt filter for the next query. diff --git a/sql/sql_class.h b/sql/sql_class.h index dbf192e32bfb..4f7a9cd21b38 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -1893,7 +1893,8 @@ class THD : public MDL_context_owner, */ /**@{*/ const char *m_trans_log_file; - char *m_trans_fixed_log_file; + const char *m_trans_fixed_log_file; + char *m_trans_fixed_log_path; my_off_t m_trans_end_pos; const char *m_trans_gtid; char trans_gtid[Gtid::MAX_TEXT_LENGTH + 1]; @@ -2627,13 +2628,15 @@ class THD : public MDL_context_owner, DBUG_PRINT("enter", ("file: %s, pos: %llu", file, pos)); // Only the file name should be used, not the full path m_trans_log_file = file + dirname_length(file); - if (!m_trans_fixed_log_file) - m_trans_fixed_log_file = (char *)main_mem_root.Alloc(FN_REFLEN + 1); - assert(strlen(m_trans_log_file) <= FN_REFLEN); - strcpy(m_trans_fixed_log_file, m_trans_log_file); + if (!m_trans_fixed_log_path) + m_trans_fixed_log_path = (char *)main_mem_root.Alloc(FN_REFLEN + 1); + assert(strlen(file) <= FN_REFLEN); + strcpy(m_trans_fixed_log_path, file); + m_trans_fixed_log_file = + m_trans_fixed_log_path + dirname_length(m_trans_fixed_log_path); } else { m_trans_log_file = nullptr; - m_trans_fixed_log_file = nullptr; + m_trans_fixed_log_file = m_trans_fixed_log_path = nullptr; } m_trans_end_pos = pos; @@ -2666,6 +2669,10 @@ class THD : public MDL_context_owner, return; } + const char *get_trans_fixed_log_path() const { + return m_trans_fixed_log_path; + } + void get_trans_fixed_pos(const char **file_var, my_off_t *pos_var) const { DBUG_TRACE; if (file_var) *file_var = m_trans_fixed_log_file; diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 4d0562728e41..0820a54dffc9 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -4089,6 +4089,23 @@ static Sys_var_int32 Sys_regexp_stack_limit( GLOBAL_VAR(opt_regexp_stack_limit), CMD_LINE(REQUIRED_ARG), VALID_RANGE(0, INT32_MAX), DEFAULT(8000000), BLOCK_SIZE(1)); +static bool update_rpl_wait_for_semi_sync_ack(sys_var *, THD *, enum_var_type) { + if (!rpl_wait_for_semi_sync_ack) { + mysql_bin_log.lock_binlog_end_pos(); + mysql_bin_log.signal_update(); + mysql_bin_log.unlock_binlog_end_pos(); + } + return false; +} + +static Sys_var_bool Sys_wait_semi_sync_ack( + "rpl_wait_for_semi_sync_ack", + "Wait for events to be acked by a semi-sync slave before sending them " + "to the async slaves", + GLOBAL_VAR(rpl_wait_for_semi_sync_ack), CMD_LINE(OPT_ARG), DEFAULT(false), + NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(nullptr), + ON_UPDATE(update_rpl_wait_for_semi_sync_ack)); + static Sys_var_bool Sys_replica_compressed_protocol( "replica_compressed_protocol", "Use compression in the source/replica protocol.",