From fbd7779f37630cae2974a7243578a21fc858f627 Mon Sep 17 00:00:00 2001 From: Zsolt Parragi Date: Tue, 30 Apr 2019 12:27:35 -0700 Subject: [PATCH] FB8-87, FB8-119: Supporting dynamic scheduling in MTS (#999) (#999) Summary: Jira ticket: https://jira.percona.com/browse/FB8-87 Jira ticket: https://jira.percona.com/browse/FB8-119 Reference commit: https://github.com/facebook/mysql-5.6/commit/5846968 Reference commit: https://github.com/facebook/mysql-5.6/commit/d6217c9 Statically assigning shards to slave workers can cause imbalance if a few shards are hotter than others. We should be able to check the imbalance among slave workers and dynamically reassign shards. Pull Request resolved: https://github.com/facebook/mysql-5.6/pull/999 Differential Revision: D14883857 --- mysql-test/r/dd_is_compatibility_ci.result | 1 + mysql-test/r/dd_is_compatibility_cs.result | 1 + mysql-test/r/information_schema_ci.result | 6 +- mysql-test/r/information_schema_cs.result | 6 +- mysql-test/r/mysqld--help-notwin.result | 9 ++ mysql-test/r/mysqlshow_ci.result | 2 + mysql-test/r/mysqlshow_cs.result | 2 + .../suite/funcs_1/r/is_columns_is_cs.result | 6 ++ .../suite/funcs_1/r/is_tables_is.result | 46 ++++++++ .../r/information_schema_db.result | 1 + .../rpl/r/rpl_slave_db_load_table.result | 32 ++++++ .../rpl/t/rpl_slave_db_load_table-slave.opt | 1 + .../suite/rpl/t/rpl_slave_db_load_table.test | 47 ++++++++ .../r/mts_dynamic_rebalance_basic.result | 21 ++++ .../r/mts_imbalance_threshold_basic.result | 23 ++++ .../t/mts_dynamic_rebalance_basic.test | 37 +++++++ .../t/mts_imbalance_threshold_basic.test | 38 +++++++ sql/handler.h | 3 +- sql/mysqld.cc | 2 + sql/mysqld.h | 2 + sql/rpl_replica.cc | 17 +++ sql/rpl_rli.cc | 5 +- sql/rpl_rli_pdb.cc | 100 ++++++++++++++++++ sql/rpl_rli_pdb.h | 27 +++++ sql/sql_show.cc | 55 +++++++++- sql/sys_vars.cc | 12 +++ 26 files changed, 496 insertions(+), 6 deletions(-) create mode 100644 mysql-test/suite/rpl/r/rpl_slave_db_load_table.result create mode 100644 mysql-test/suite/rpl/t/rpl_slave_db_load_table-slave.opt create mode 100644 mysql-test/suite/rpl/t/rpl_slave_db_load_table.test create mode 100644 mysql-test/suite/sys_vars/r/mts_dynamic_rebalance_basic.result create mode 100644 mysql-test/suite/sys_vars/r/mts_imbalance_threshold_basic.result create mode 100644 mysql-test/suite/sys_vars/t/mts_dynamic_rebalance_basic.test create mode 100644 mysql-test/suite/sys_vars/t/mts_imbalance_threshold_basic.test diff --git a/mysql-test/r/dd_is_compatibility_ci.result b/mysql-test/r/dd_is_compatibility_ci.result index e99498dd4b70..b170557c8835 100644 --- a/mysql-test/r/dd_is_compatibility_ci.result +++ b/mysql-test/r/dd_is_compatibility_ci.result @@ -201,6 +201,7 @@ ROUTINES SCHEMATA SCHEMATA_EXTENSIONS SCHEMA_PRIVILEGES +SLAVE_DB_LOAD STATISTICS ST_GEOMETRY_COLUMNS ST_SPATIAL_REFERENCE_SYSTEMS diff --git a/mysql-test/r/dd_is_compatibility_cs.result b/mysql-test/r/dd_is_compatibility_cs.result index 6cdec201341a..cf07af8a75a2 100644 --- a/mysql-test/r/dd_is_compatibility_cs.result +++ b/mysql-test/r/dd_is_compatibility_cs.result @@ -202,6 +202,7 @@ ROUTINES SCHEMATA SCHEMATA_EXTENSIONS SCHEMA_PRIVILEGES +SLAVE_DB_LOAD STATISTICS ST_GEOMETRY_COLUMNS ST_SPATIAL_REFERENCE_SYSTEMS diff --git a/mysql-test/r/information_schema_ci.result b/mysql-test/r/information_schema_ci.result index 26c21a9362ba..40d5cceaa955 100644 --- a/mysql-test/r/information_schema_ci.result +++ b/mysql-test/r/information_schema_ci.result @@ -94,6 +94,7 @@ ROUTINES SCHEMATA SCHEMATA_EXTENSIONS SCHEMA_PRIVILEGES +SLAVE_DB_LOAD STATISTICS ST_GEOMETRY_COLUMNS ST_SPATIAL_REFERENCE_SYSTEMS @@ -896,7 +897,7 @@ table_schema IN ('mysql', 'information_schema', 'test', 'mysqltest') AND table_name not like 'ndb%' AND table_name COLLATE utf8_general_ci not like 'innodb_%' GROUP BY TABLE_SCHEMA; TABLE_SCHEMA count(*) -information_schema 50 +information_schema 51 mysql 35 create table t1 (i int, j int); create trigger trg1 before insert on t1 for each row @@ -1378,6 +1379,7 @@ ROUTINES information_schema.ROUTINES 1 SCHEMATA information_schema.SCHEMATA 1 SCHEMATA_EXTENSIONS information_schema.SCHEMATA_EXTENSIONS 1 SCHEMA_PRIVILEGES information_schema.SCHEMA_PRIVILEGES 1 +SLAVE_DB_LOAD information_schema.SLAVE_DB_LOAD 1 STATISTICS information_schema.STATISTICS 1 ST_GEOMETRY_COLUMNS information_schema.ST_GEOMETRY_COLUMNS 1 ST_SPATIAL_REFERENCE_SYSTEMS information_schema.ST_SPATIAL_REFERENCE_SYSTEMS 1 @@ -2522,6 +2524,7 @@ ROUTINES ROUTINE_SCHEMA SCHEMATA SCHEMA_NAME SCHEMATA_EXTENSIONS SCHEMA_NAME SCHEMA_PRIVILEGES TABLE_SCHEMA +SLAVE_DB_LOAD DB STATISTICS TABLE_SCHEMA ST_GEOMETRY_COLUMNS TABLE_SCHEMA ST_SPATIAL_REFERENCE_SYSTEMS SRS_NAME @@ -2592,6 +2595,7 @@ ROUTINES ROUTINE_SCHEMA SCHEMATA SCHEMA_NAME SCHEMATA_EXTENSIONS SCHEMA_NAME SCHEMA_PRIVILEGES TABLE_SCHEMA +SLAVE_DB_LOAD DB STATISTICS TABLE_SCHEMA ST_GEOMETRY_COLUMNS TABLE_SCHEMA ST_SPATIAL_REFERENCE_SYSTEMS SRS_NAME diff --git a/mysql-test/r/information_schema_cs.result b/mysql-test/r/information_schema_cs.result index 5824f73bfd14..5c4829bdfea6 100644 --- a/mysql-test/r/information_schema_cs.result +++ b/mysql-test/r/information_schema_cs.result @@ -94,6 +94,7 @@ ROUTINES SCHEMATA SCHEMATA_EXTENSIONS SCHEMA_PRIVILEGES +SLAVE_DB_LOAD STATISTICS ST_GEOMETRY_COLUMNS ST_SPATIAL_REFERENCE_SYSTEMS @@ -900,7 +901,7 @@ AND table_name COLLATE utf8mb3_general_ci not like 'innodb_%' AND table_name COLLATE utf8mb3_general_ci not like 'rocksdb_%' GROUP BY TABLE_SCHEMA; TABLE_SCHEMA count(*) -information_schema 50 +information_schema 51 mysql 35 create table t1 (i int, j int); create trigger trg1 before insert on t1 for each row @@ -1383,6 +1384,7 @@ ROUTINES information_schema.ROUTINES 1 SCHEMATA information_schema.SCHEMATA 1 SCHEMATA_EXTENSIONS information_schema.SCHEMATA_EXTENSIONS 1 SCHEMA_PRIVILEGES information_schema.SCHEMA_PRIVILEGES 1 +SLAVE_DB_LOAD information_schema.SLAVE_DB_LOAD 1 STATISTICS information_schema.STATISTICS 1 ST_GEOMETRY_COLUMNS information_schema.ST_GEOMETRY_COLUMNS 1 ST_SPATIAL_REFERENCE_SYSTEMS information_schema.ST_SPATIAL_REFERENCE_SYSTEMS 1 @@ -2527,6 +2529,7 @@ ROUTINES ROUTINE_SCHEMA SCHEMATA SCHEMA_NAME SCHEMATA_EXTENSIONS SCHEMA_NAME SCHEMA_PRIVILEGES TABLE_SCHEMA +SLAVE_DB_LOAD DB STATISTICS TABLE_SCHEMA ST_GEOMETRY_COLUMNS TABLE_SCHEMA ST_SPATIAL_REFERENCE_SYSTEMS SRS_NAME @@ -2597,6 +2600,7 @@ ROUTINES ROUTINE_SCHEMA SCHEMATA SCHEMA_NAME SCHEMATA_EXTENSIONS SCHEMA_NAME SCHEMA_PRIVILEGES TABLE_SCHEMA +SLAVE_DB_LOAD DB STATISTICS TABLE_SCHEMA ST_GEOMETRY_COLUMNS TABLE_SCHEMA ST_SPATIAL_REFERENCE_SYSTEMS SRS_NAME diff --git a/mysql-test/r/mysqld--help-notwin.result b/mysql-test/r/mysqld--help-notwin.result index c831beb88baa..cd6e127ce37f 100644 --- a/mysql-test/r/mysqld--help-notwin.result +++ b/mysql-test/r/mysqld--help-notwin.result @@ -739,6 +739,13 @@ The following options may be given as the first argument: --min-examined-row-limit=# Don't write queries to slow log that examine fewer rows than that + --mts-dynamic-rebalance + Shuffle DB's within workers periodically for load + balancing + --mts-imbalance-threshold[=#] + Threshold to trigger worker thread rebalancing. This + parameter denotes the percent load on the most loaded + worker. --myisam-block-size=# Block size to be used for MyISAM index pages --myisam-data-pointer-size=# @@ -2475,6 +2482,8 @@ max-user-connections 0 max-write-lock-count 18446744073709551615 memlock FALSE min-examined-row-limit 0 +mts-dynamic-rebalance FALSE +mts-imbalance-threshold 90 myisam-block-size 1024 myisam-data-pointer-size 6 myisam-max-sort-file-size 9223372036853727232 diff --git a/mysql-test/r/mysqlshow_ci.result b/mysql-test/r/mysqlshow_ci.result index daf1692d67de..463dd51ea1d1 100644 --- a/mysql-test/r/mysqlshow_ci.result +++ b/mysql-test/r/mysqlshow_ci.result @@ -158,6 +158,7 @@ Database: information_schema | SCHEMA_PRIVILEGES | | SCHEMATA | | SCHEMATA_EXTENSIONS | +| SLAVE_DB_LOAD | | ST_GEOMETRY_COLUMNS | | ST_SPATIAL_REFERENCE_SYSTEMS | | ST_UNITS_OF_MEASURE | @@ -259,6 +260,7 @@ Database: INFORMATION_SCHEMA | SCHEMA_PRIVILEGES | | SCHEMATA | | SCHEMATA_EXTENSIONS | +| SLAVE_DB_LOAD | | ST_GEOMETRY_COLUMNS | | ST_SPATIAL_REFERENCE_SYSTEMS | | ST_UNITS_OF_MEASURE | diff --git a/mysql-test/r/mysqlshow_cs.result b/mysql-test/r/mysqlshow_cs.result index 98ebd4fd0156..4c12fd6f853e 100644 --- a/mysql-test/r/mysqlshow_cs.result +++ b/mysql-test/r/mysqlshow_cs.result @@ -159,6 +159,7 @@ Database: information_schema | SCHEMATA | | SCHEMATA_EXTENSIONS | | SCHEMA_PRIVILEGES | +| SLAVE_DB_LOAD | | STATISTICS | | ST_GEOMETRY_COLUMNS | | ST_SPATIAL_REFERENCE_SYSTEMS | @@ -261,6 +262,7 @@ Database: INFORMATION_SCHEMA | SCHEMATA | | SCHEMATA_EXTENSIONS | | SCHEMA_PRIVILEGES | +| SLAVE_DB_LOAD | | STATISTICS | | ST_GEOMETRY_COLUMNS | | ST_SPATIAL_REFERENCE_SYSTEMS | diff --git a/mysql-test/suite/funcs_1/r/is_columns_is_cs.result b/mysql-test/suite/funcs_1/r/is_columns_is_cs.result index 09d0acf98f6e..dd0b49495a1c 100644 --- a/mysql-test/suite/funcs_1/r/is_columns_is_cs.result +++ b/mysql-test/suite/funcs_1/r/is_columns_is_cs.result @@ -442,6 +442,9 @@ def information_schema SCHEMA_PRIVILEGES TABLE_CATALOG 2 NO varchar 170 512 NUL def information_schema SCHEMA_PRIVILEGES TABLE_SCHEMA 3 NO varchar 21 64 NULL NULL NULL utf8mb3 utf8mb3_general_ci varchar(64) select NULL def information_schema SCHEMA_PRIVILEGES PRIVILEGE_TYPE 4 NO varchar 21 64 NULL NULL NULL utf8mb3 utf8mb3_general_ci varchar(64) select NULL def information_schema SCHEMA_PRIVILEGES IS_GRANTABLE 5 NO varchar 1 3 NULL NULL NULL utf8mb3 utf8mb3_general_ci varchar(3) select NULL +def information_schema SLAVE_DB_LOAD DB 1 NO varchar 64 192 NULL NULL NULL utf8mb3 utf8mb3_general_ci varchar(192) select NULL +def information_schema SLAVE_DB_LOAD WORKER 2 NO bigint NULL NULL NULL NULL NULL NULL NULL bigint unsigned select NULL +def information_schema SLAVE_DB_LOAD DB_LOAD 3 NO bigint NULL NULL NULL NULL NULL NULL NULL bigint unsigned select NULL def information_schema STATISTICS TABLE_CATALOG 1 NULL NO varchar 64 192 NULL NULL NULL utf8mb3 utf8mb3_bin varchar(64) select NULL def information_schema STATISTICS TABLE_SCHEMA 2 NULL NO varchar 64 192 NULL NULL NULL utf8mb3 utf8mb3_bin varchar(64) select NULL def information_schema STATISTICS TABLE_NAME 3 NULL NO varchar 64 192 NULL NULL NULL utf8mb3 utf8mb3_bin varchar(64) select NULL @@ -1113,6 +1116,9 @@ NULL information_schema SCHEMATA SQL_PATH binary 0 0 NULL NULL binary(0) 3.0476 information_schema SCHEMA_PRIVILEGES TABLE_SCHEMA varchar 21 64 utf8mb3 utf8mb3_general_ci varchar(64) 3.0476 information_schema SCHEMA_PRIVILEGES PRIVILEGE_TYPE varchar 21 64 utf8mb3 utf8mb3_general_ci varchar(64) 3.0000 information_schema SCHEMA_PRIVILEGES IS_GRANTABLE varchar 1 3 utf8mb3 utf8mb3_general_ci varchar(3) +3.0000 information_schema SLAVE_DB_LOAD DB varchar 64 192 utf8mb3 utf8mb3_general_ci varchar(192) +NULL information_schema SLAVE_DB_LOAD WORKER bigint NULL NULL NULL NULL bigint unsigned +NULL information_schema SLAVE_DB_LOAD DB_LOAD bigint NULL NULL NULL NULL bigint unsigned 3.0000 information_schema STATISTICS TABLE_CATALOG varchar 64 192 utf8mb3 utf8mb3_bin varchar(64) 3.0000 information_schema STATISTICS TABLE_SCHEMA varchar 64 192 utf8mb3 utf8mb3_bin varchar(64) 3.0000 information_schema STATISTICS TABLE_NAME varchar 64 192 utf8mb3 utf8mb3_bin varchar(64) diff --git a/mysql-test/suite/funcs_1/r/is_tables_is.result b/mysql-test/suite/funcs_1/r/is_tables_is.result index 7f5c4b360e51..68030a466e56 100644 --- a/mysql-test/suite/funcs_1/r/is_tables_is.result +++ b/mysql-test/suite/funcs_1/r/is_tables_is.result @@ -754,6 +754,29 @@ user_comment Separator ----------------------------------------------------- TABLE_CATALOG def TABLE_SCHEMA information_schema +TABLE_NAME SLAVE_DB_LOAD +TABLE_TYPE SYSTEM VIEW +ENGINE NULL +VERSION 10 +ROW_FORMAT NULL +TABLE_ROWS #TBLR# +AVG_ROW_LENGTH #ARL# +DATA_LENGTH #DL# +MAX_DATA_LENGTH #MDL# +INDEX_LENGTH #IL# +DATA_FREE #DF# +AUTO_INCREMENT #AI# +CREATE_TIME #CRT# +UPDATE_TIME #UT# +CHECK_TIME #CT# +TABLE_COLLATION NULL +CHECKSUM NULL +CREATE_OPTIONS #CO# +TABLE_COMMENT #TC# +user_comment +Separator ----------------------------------------------------- +TABLE_CATALOG def +TABLE_SCHEMA information_schema TABLE_NAME STATISTICS TABLE_TYPE SYSTEM VIEW ENGINE NULL @@ -1901,6 +1924,29 @@ user_comment Separator ----------------------------------------------------- TABLE_CATALOG def TABLE_SCHEMA information_schema +TABLE_NAME SLAVE_DB_LOAD +TABLE_TYPE SYSTEM VIEW +ENGINE NULL +VERSION 10 +ROW_FORMAT NULL +TABLE_ROWS #TBLR# +AVG_ROW_LENGTH #ARL# +DATA_LENGTH #DL# +MAX_DATA_LENGTH #MDL# +INDEX_LENGTH #IL# +DATA_FREE #DF# +AUTO_INCREMENT #AI# +CREATE_TIME #CRT# +UPDATE_TIME #UT# +CHECK_TIME #CT# +TABLE_COLLATION NULL +CHECKSUM NULL +CREATE_OPTIONS #CO# +TABLE_COMMENT #TC# +user_comment +Separator ----------------------------------------------------- +TABLE_CATALOG def +TABLE_SCHEMA information_schema TABLE_NAME STATISTICS TABLE_TYPE SYSTEM VIEW ENGINE NULL diff --git a/mysql-test/suite/information_schema/r/information_schema_db.result b/mysql-test/suite/information_schema/r/information_schema_db.result index f850b2be7b38..475908dd0d78 100644 --- a/mysql-test/suite/information_schema/r/information_schema_db.result +++ b/mysql-test/suite/information_schema/r/information_schema_db.result @@ -55,6 +55,7 @@ ROUTINES SCHEMATA SCHEMATA_EXTENSIONS SCHEMA_PRIVILEGES +SLAVE_DB_LOAD STATISTICS ST_GEOMETRY_COLUMNS ST_SPATIAL_REFERENCE_SYSTEMS diff --git a/mysql-test/suite/rpl/r/rpl_slave_db_load_table.result b/mysql-test/suite/rpl/r/rpl_slave_db_load_table.result new file mode 100644 index 000000000000..643d5175118d --- /dev/null +++ b/mysql-test/suite/rpl/r/rpl_slave_db_load_table.result @@ -0,0 +1,32 @@ +include/master-slave.inc +Warnings: +Note #### Sending passwords in plain text without SSL/TLS is extremely insecure. +Note #### Storing MySQL user name or password information in the connection metadata repository is not secure and is therefore not recommended. Please consider using the USER and PASSWORD connection options for START REPLICA; see the 'START REPLICA Syntax' in the MySQL Manual for more information. +[connection master] +include/rpl_restart_server.inc [server_number=2] +[connection slave] +set global mts_dynamic_rebalance=TRUE; +set global replica_parallel_workers=2; +set global debug="+d,skip_checkpoint_load_reset"; +include/start_slave.inc +select * from information_schema.slave_db_load; +DB WORKER DB_LOAD +[connection master] +create table t1 (a int); +insert into t1 values(1); +insert into t1 values(1); +insert into t1 values(1); +insert into t1 values(1); +insert into t1 values(1); +include/sync_slave_sql_with_master.inc +[connection slave] +select * from information_schema.slave_db_load; +DB WORKER DB_LOAD +test 1 6 +include/stop_slave.inc +set global mts_dynamic_rebalance=0; +set global debug="-d,skip_checkpoint_load_reset"; +include/start_slave.inc +[connection master] +drop table t1; +include/rpl_end.inc diff --git a/mysql-test/suite/rpl/t/rpl_slave_db_load_table-slave.opt b/mysql-test/suite/rpl/t/rpl_slave_db_load_table-slave.opt new file mode 100644 index 000000000000..c76a1b19324b --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_slave_db_load_table-slave.opt @@ -0,0 +1 @@ +--replica_parallel_type='DATABASE' --replica_preserve_commit_order=OFF diff --git a/mysql-test/suite/rpl/t/rpl_slave_db_load_table.test b/mysql-test/suite/rpl/t/rpl_slave_db_load_table.test new file mode 100644 index 000000000000..a9876ec2cbc6 --- /dev/null +++ b/mysql-test/suite/rpl/t/rpl_slave_db_load_table.test @@ -0,0 +1,47 @@ +# This test verifies the functionality of slave_db_load table. + +source include/have_debug.inc; +source include/master-slave.inc; + +let rpl_server_number=2; +source include/rpl_restart_server.inc; + +source include/rpl_connection_slave.inc; +let $old_mts_dynamic_rebalance= `select @@global.mts_dynamic_rebalance`; +let $old_replica_parallel_workers= `select @@global.replica_parallel_workers`; +set global mts_dynamic_rebalance=TRUE; +set global replica_parallel_workers=2; +set global debug="+d,skip_checkpoint_load_reset"; +source include/start_slave.inc; + +# Load should be empty +select * from information_schema.slave_db_load; + +source include/rpl_connection_master.inc; +create table t1 (a int); +let $num_inserts=5; +while ($num_inserts) +{ + insert into t1 values(1); + dec $num_inserts; +} + +source include/sync_slave_sql_with_master.inc; + +source include/rpl_connection_slave.inc; +# Load of test should be 6 (1 create and 5 inserts) +select * from information_schema.slave_db_load; + +# restore varaibles +source include/stop_slave.inc; +eval set global mts_dynamic_rebalance=$old_mts_dynamic_rebalance; +--disable_query_log +eval set global replica_parallel_workers=$old_replica_parallel_workers; +--enable_query_log +set global debug="-d,skip_checkpoint_load_reset"; +source include/start_slave.inc; + +source include/rpl_connection_master.inc; +drop table t1; + +source include/rpl_end.inc; diff --git a/mysql-test/suite/sys_vars/r/mts_dynamic_rebalance_basic.result b/mysql-test/suite/sys_vars/r/mts_dynamic_rebalance_basic.result new file mode 100644 index 000000000000..4ebd8e4208f1 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/mts_dynamic_rebalance_basic.result @@ -0,0 +1,21 @@ +set @save.mts_dynamic_rebalance= @@global.mts_dynamic_rebalance; +select @@session.mts_dynamic_rebalance; +ERROR HY000: Variable 'mts_dynamic_rebalance' is a GLOBAL variable +select variable_name from performance_schema.global_variables where variable_name='$var'; +variable_name +select variable_name from performance_schema.session_variables where variable_name='$var'; +variable_name +set @@global.mts_dynamic_rebalance= false; +select @@global.mts_dynamic_rebalance; +@@global.mts_dynamic_rebalance +0 +set @@global.mts_dynamic_rebalance= 1.1; +ERROR 42000: Incorrect argument type to variable 'mts_dynamic_rebalance' +set @@global.mts_dynamic_rebalance= "foo"; +ERROR 42000: Variable 'mts_dynamic_rebalance' can't be set to the value of 'foo' +set @@global.mts_dynamic_rebalance= false; +set @@global.mts_dynamic_rebalance= true; +select @@global.mts_dynamic_rebalance as "truncated to the maximum"; +truncated to the maximum +1 +set @@global.mts_dynamic_rebalance= @save.mts_dynamic_rebalance; diff --git a/mysql-test/suite/sys_vars/r/mts_imbalance_threshold_basic.result b/mysql-test/suite/sys_vars/r/mts_imbalance_threshold_basic.result new file mode 100644 index 000000000000..2c29be6afd28 --- /dev/null +++ b/mysql-test/suite/sys_vars/r/mts_imbalance_threshold_basic.result @@ -0,0 +1,23 @@ +set @save.mts_imbalance_threshold= @@global.mts_imbalance_threshold; +select @@session.mts_imbalance_threshold; +ERROR HY000: Variable 'mts_imbalance_threshold' is a GLOBAL variable +select variable_name from performance_schema.global_variables where variable_name='$var'; +variable_name +select variable_name from performance_schema.session_variables where variable_name='$var'; +variable_name +set @@global.mts_imbalance_threshold= 0.0; +select @@global.mts_imbalance_threshold; +@@global.mts_imbalance_threshold +0.000000 +set @@global.mts_imbalance_threshold= test; +ERROR 42000: Incorrect argument type to variable 'mts_imbalance_threshold' +set @@global.mts_imbalance_threshold= "foo"; +ERROR 42000: Incorrect argument type to variable 'mts_imbalance_threshold' +set @@global.mts_imbalance_threshold= 0; +set @@global.mts_imbalance_threshold= cast(-1 as unsigned int); +Warnings: +Warning 1292 Truncated incorrect mts_imbalance_threshold value: '1.8446744073709552e19' +select @@global.mts_imbalance_threshold as "truncated to the maximum"; +truncated to the maximum +100.000000 +set @@global.mts_imbalance_threshold= @save.mts_imbalance_threshold; diff --git a/mysql-test/suite/sys_vars/t/mts_dynamic_rebalance_basic.test b/mysql-test/suite/sys_vars/t/mts_dynamic_rebalance_basic.test new file mode 100644 index 000000000000..5b2885046a0a --- /dev/null +++ b/mysql-test/suite/sys_vars/t/mts_dynamic_rebalance_basic.test @@ -0,0 +1,37 @@ +let $var= mts_dynamic_rebalance; +eval set @save.$var= @@global.$var; + +# +# exists as global only +# +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +eval select @@session.$var; + +select variable_name from performance_schema.global_variables where variable_name='$var'; +select variable_name from performance_schema.session_variables where variable_name='$var'; + +# +# show that it's writable +# +let $value= false; +eval set @@global.$var= $value; +eval select @@global.$var; + +# +# incorrect value +# +--error ER_WRONG_TYPE_FOR_VAR +eval set @@global.$var= 1.1; +--error ER_WRONG_VALUE_FOR_VAR +eval set @@global.$var= "foo"; + +# +# min/max values +# +eval set @@global.$var= false; +eval set @@global.$var= true; +eval select @@global.$var as "truncated to the maximum"; + +# cleanup + +eval set @@global.$var= @save.$var; diff --git a/mysql-test/suite/sys_vars/t/mts_imbalance_threshold_basic.test b/mysql-test/suite/sys_vars/t/mts_imbalance_threshold_basic.test new file mode 100644 index 000000000000..e3e030919487 --- /dev/null +++ b/mysql-test/suite/sys_vars/t/mts_imbalance_threshold_basic.test @@ -0,0 +1,38 @@ + +let $var= mts_imbalance_threshold; +eval set @save.$var= @@global.$var; + +# +# exists as global only +# +--error ER_INCORRECT_GLOBAL_LOCAL_VAR +eval select @@session.$var; + +select variable_name from performance_schema.global_variables where variable_name='$var'; +select variable_name from performance_schema.session_variables where variable_name='$var'; + +# +# show that it's writable +# +let $value= 0.0; +eval set @@global.$var= $value; +eval select @@global.$var; + +# +# incorrect types +# +--error ER_WRONG_TYPE_FOR_VAR +eval set @@global.$var= test; +--error ER_WRONG_TYPE_FOR_VAR +eval set @@global.$var= "foo"; + +# +# min/max values +# +eval set @@global.$var= 0; +eval set @@global.$var= cast(-1 as unsigned int); +eval select @@global.$var as "truncated to the maximum"; + +# cleanup + +eval set @@global.$var= @save.$var; diff --git a/sql/handler.h b/sql/handler.h index 965244120b55..803c59754727 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -950,7 +950,8 @@ enum enum_schema_tables : int { SCH_TMP_TABLE_COLUMNS, SCH_TMP_TABLE_KEYS, SCH_AUTHINFO, - SCH_LAST = SCH_AUTHINFO + SCH_SLAVE_DB_LOAD, + SCH_LAST = SCH_SLAVE_DB_LOAD }; enum ha_stat_type { diff --git a/sql/mysqld.cc b/sql/mysqld.cc index 7accbfdbe673..a4e7fa8c3270 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -1404,6 +1404,8 @@ ulong replica_exec_mode_options; ulonglong replica_type_conversions_options; ulong opt_mts_replica_parallel_workers; ulong slave_run_triggers_for_rbr = 0; +bool opt_mts_dynamic_rebalance; +double opt_mts_imbalance_threshold; ulonglong opt_mts_pending_jobs_size_max; bool opt_replica_preserve_commit_order; #ifndef NDEBUG diff --git a/sql/mysqld.h b/sql/mysqld.h index 25f82dc97685..d10890f77c6d 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -333,6 +333,8 @@ extern bool opt_replica_allow_batching; extern ulong slave_trans_retries; extern uint replica_net_timeout; extern ulong opt_mts_replica_parallel_workers; +extern bool opt_mts_dynamic_rebalance; +extern double opt_mts_imbalance_threshold; extern ulonglong opt_mts_pending_jobs_size_max; extern ulong rpl_stop_replica_timeout; extern ulong what_to_log, flush_time; diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index 8bf012603aa1..2f4c186cc1c9 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -6558,6 +6558,23 @@ bool mta_checkpoint_routine(Relay_log_info *rli, bool force) { ->jobs_done += cnt; } + // case: rebalance workers should be called only when the current event + // in the coordinator is a begin or gtid event + if (!force && opt_mts_dynamic_rebalance && !rli->curr_group_seen_begin && + !rli->curr_group_seen_gtid && !rli->sql_thread_kill_accepted) { + rebalance_workers(rli); + } + + if (DBUG_EVALUATE_IF("skip_checkpoint_load_reset", 0, 1)) { + // reset the database load + mysql_mutex_lock(&rli->slave_worker_hash_lock); + for (auto &item : rli->mapping_db_to_worker) { + db_worker_hash_entry *entry = item.second.get(); + entry->load = 0; + } + mysql_mutex_unlock(&rli->slave_worker_hash_lock); + } + mysql_mutex_lock(&rli->data_lock); /* diff --git a/sql/rpl_rli.cc b/sql/rpl_rli.cc index 177c836e7847..36153e36016c 100644 --- a/sql/rpl_rli.cc +++ b/sql/rpl_rli.cc @@ -174,6 +174,7 @@ Relay_log_info::Relay_log_info(bool is_slave_recovery, workers_array_initialized(false), curr_group_assigned_parts(PSI_NOT_INSTRUMENTED), curr_group_da(PSI_NOT_INSTRUMENTED), + curr_group_seen_gtid(0), curr_group_seen_begin(false), mts_end_group_sets_max_dbs(false), replica_parallel_workers(0), @@ -3028,7 +3029,9 @@ void Relay_log_info::clear_relay_log_truncated() { } bool Relay_log_info::is_time_for_mta_checkpoint() { - if (is_parallel_exec() && opt_mta_checkpoint_period != 0) { + bool period_check = opt_mta_checkpoint_period != 0 && + !curr_group_seen_begin && !curr_group_seen_gtid; + if (is_parallel_exec() && period_check) { struct timespec curr_clock; set_timespec_nsec(&curr_clock, 0); return diff_timespec(&curr_clock, &last_clock) >= diff --git a/sql/rpl_rli_pdb.cc b/sql/rpl_rli_pdb.cc index b8d2e852125c..96c29fc3da95 100644 --- a/sql/rpl_rli_pdb.cc +++ b/sql/rpl_rli_pdb.cc @@ -869,6 +869,102 @@ static void move_temp_tables_to_entry(THD *thd, db_worker_hash_entry *entry) { } } +static double calculate_imbalance(Relay_log_info *rli, + const Slave_worker_array *ws) { + DBUG_ENTER("calculate_imbalance"); + + mysql_mutex_assert_owner(&rli->slave_worker_hash_lock); + + // case: if the number of workers == number of db's + // nothing can be done to rebalance the workers + if (ws->size() == rli->mapping_db_to_worker.size()) DBUG_RETURN(0); + + double max_usage = 0; + double total_usage = 0; + std::vector load_on_workers(ws->size()); + + // calculate the total load per worker + for (auto const &entry : rli->mapping_db_to_worker) { + load_on_workers[entry.second->worker->id] += entry.second->load; + } + + for (ulong load : load_on_workers) { + total_usage += load; + if (load > max_usage) max_usage = load; + } + + if (total_usage == 0) DBUG_RETURN(0); + + DBUG_RETURN(max_usage / total_usage * 100); +} + +void rebalance_workers(Relay_log_info *rli) { + DBUG_ENTER("rebalance_workers"); + assert(!rli->curr_group_seen_begin && !rli->curr_group_seen_gtid); + + const Slave_worker_array *ws = &rli->workers; + std::priority_queue, + std::greater> + heap; + + mysql_mutex_lock(&rli->slave_worker_hash_lock); + + double imbalance = calculate_imbalance(rli, ws); + if (imbalance < opt_mts_imbalance_threshold) { + mysql_mutex_unlock(&rli->slave_worker_hash_lock); + DBUG_VOID_RETURN; + } + + // initializing the heap + for (Slave_worker *current_worker : *ws) { + heap.emplace(worker_load(current_worker)); + } + + double new_max_wrk_load = 0; + double total_wrk_load = 0; + // build the heap according to the current load + for (auto const &entry : rli->mapping_db_to_worker) { + // assign least occupied worker + worker_load wrk_load = heap.top(); + wrk_load.db_entries.push_back(entry.second.get()); + heap.pop(); + + wrk_load.load += entry.second->load; + + if (new_max_wrk_load < wrk_load.load) new_max_wrk_load = wrk_load.load; + total_wrk_load += entry.second->load; + + heap.emplace(std::move(wrk_load)); + } + + // before we remap, check if the rebalancing will make the imbalance + // less than the threshold if no we should leave the map alone and + // there is no need to sync the workers + // NOTE: the comparision method here should be equivalent to @ + // calculate_imbalance + if (total_wrk_load && + new_max_wrk_load / total_wrk_load * 100 > opt_mts_imbalance_threshold) { + mysql_mutex_unlock(&rli->slave_worker_hash_lock); + DBUG_VOID_RETURN; + } + + // remap the db->worker map + while (!heap.empty()) { + worker_load wrk_load = heap.top(); + for (db_worker_hash_entry *entry : wrk_load.db_entries) { + entry->worker = wrk_load.worker; + } + heap.pop(); + } + + mysql_mutex_unlock(&rli->slave_worker_hash_lock); + + // wait for all workers to finish + (void)rli->current_mts_submode->wait_for_workers_to_finish(rli); + + DBUG_VOID_RETURN; +} + /** The function produces a reference to the struct of a Worker that has been or will be engaged to process the @c dbname -keyed partition @@ -992,6 +1088,7 @@ Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, entry->db = db; entry->db_len = strlen(db); entry->usage = 1; + entry->load = 1; entry->temporary_tables = nullptr; /* Unless \exists the last assigned Worker, get a free worker based @@ -1047,6 +1144,7 @@ Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, : last_worker; entry->worker->usage_partition++; entry->usage++; + entry->load++; DBUG_PRINT( "debug", ("worker=%lu, partition=%s, usage=%ld (was 0), wait=false!", @@ -1055,6 +1153,7 @@ Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, assert(entry->worker); entry->usage++; + entry->load++; DBUG_PRINT("debug", ("worker=%lu, partition=%s, usage=%ld, wait=false!", entry->worker->id, entry->db, entry->worker->usage_partition++)); @@ -1089,6 +1188,7 @@ Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, } mysql_mutex_lock(&rli->slave_worker_hash_lock); entry->usage = 1; + entry->load = 1; entry->worker->usage_partition++; } } diff --git a/sql/rpl_rli_pdb.h b/sql/rpl_rli_pdb.h index c1503037a064..b6b4bc98ae12 100644 --- a/sql/rpl_rli_pdb.h +++ b/sql/rpl_rli_pdb.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include "my_bitmap.h" @@ -79,6 +80,12 @@ struct db_worker_hash_entry { This should only be modified under the lock slave_worker_hash_lock. */ long usage; + /* + This is the number of job items this database processed since + the last checkpoint, see @ mts_checkpoint_routine. + This should also be modified under slave_worker_hash_lock + */ + ulong load; /* The list of temp tables belonging to @ db database is attached to an assigned @c worker to become its thd->temporary_tables. @@ -95,8 +102,28 @@ struct db_worker_hash_entry { timestamp updated_at; */ }; +/* + Used in priority queue to find least occupied worker + see @ rebalance_workers +*/ +struct worker_load { + Slave_worker *worker; + std::vector db_entries; + ulong load; + + worker_load(Slave_worker *worker, long load) noexcept + : worker(worker), load(load) {} + + worker_load(Slave_worker *worker) noexcept : worker_load(worker, 0) {} + + bool operator>(const worker_load &other) const noexcept { + return this->load > other.load; + } +}; + bool init_hash_workers(Relay_log_info *rli); void destroy_hash_workers(Relay_log_info *); +void rebalance_workers(Relay_log_info *rli); Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w); diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 0998c560612f..5b14aded58e0 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -73,8 +73,12 @@ #include "mysqld_error.h" #include "nulls.h" #include "query_tag_perf_counter.h" // query tag -#include "scope_guard.h" // Scope_guard -#include "sql/auth/auth_acls.h" // DB_ACLS +#include "rpl_mi.h" +#include "rpl_msr.h" +#include "rpl_rli.h" +#include "rpl_rli_pdb.h" +#include "scope_guard.h" // Scope_guard +#include "sql/auth/auth_acls.h" // DB_ACLS #include "sql/auth/auth_common.h" #include "sql/auth/sql_security_ctx.h" #include "sql/dd/cache/dictionary_client.h" // dd::cache::Dictionary_client @@ -3409,6 +3413,45 @@ static int fill_schema_processlist(THD *thd, Table_ref *tables, Item *) { return 0; } +static int fill_slave_db_load(THD *thd, Table_ref *tables, Item *) { + DBUG_ENTER("fill_slave_db_load"); + int error = 0; + TABLE *table = tables->table; + CHARSET_INFO *cs = system_charset_info; + + for (auto it : channel_map) { + if (Master_info::is_configured(it.second)) { + Relay_log_info *rli = it.second->rli; + + if (!rli->inited_hash_workers) { + continue; + } + + mysql_mutex_lock(&rli->slave_worker_hash_lock); + for (auto const &item : rli->mapping_db_to_worker) { + db_worker_hash_entry const *entry = item.second.get(); + + restore_record(table, s->default_values); + + /* ID */ + if (entry->db) table->field[0]->store(entry->db, strlen(entry->db), cs); + /* Worker */ + if (entry->worker) table->field[1]->store(entry->worker->id, true); + /* Load */ + table->field[2]->store(entry->load, true); + + if (schema_table_store_record(thd, table)) { + error = 1; + break; + } + } + mysql_mutex_unlock(&rli->slave_worker_hash_lock); + } + } + + DBUG_RETURN(error); +} + /***************************************************************************** Status functions *****************************************************************************/ @@ -5257,6 +5300,12 @@ ST_FIELD_INFO authinfo_fields_info[] = { {"INFO", PROCESS_LIST_INFO_WIDTH, MYSQL_TYPE_STRING, 0, 1, "Info", 0}, {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}}; +ST_FIELD_INFO slave_db_load_fields_info[] = { + {"DB", NAME_LEN, MYSQL_TYPE_STRING, 0, 0, "Database", 0}, + {"WORKER", 21, MYSQL_TYPE_LONGLONG, 0, MY_I_S_UNSIGNED, "Worker ID", 0}, + {"DB_LOAD", 21, MYSQL_TYPE_LONGLONG, 0, MY_I_S_UNSIGNED, "DB Load", 0}, + {0, 0, MYSQL_TYPE_STRING, 0, 0, 0, 0}}; + ST_FIELD_INFO plugin_fields_info[] = { {"PLUGIN_NAME", NAME_CHAR_LEN, MYSQL_TYPE_STRING, 0, 0, "Name", 0}, {"PLUGIN_VERSION", 20, MYSQL_TYPE_STRING, 0, 0, nullptr, 0}, @@ -5349,6 +5398,8 @@ ST_SCHEMA_TABLE schema_tables[] = { make_old_format, get_schema_tmp_table_keys_record, true}, {"AUTHINFO", authinfo_fields_info, fill_schema_authinfo, make_old_format, 0, false}, + {"SLAVE_DB_LOAD", slave_db_load_fields_info, fill_slave_db_load, 0, 0, + false}, {nullptr, nullptr, nullptr, nullptr, nullptr, false}}; int initialize_schema_table(st_plugin_int *plugin) { diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index 930d33148642..0855c657aebe 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -6495,6 +6495,18 @@ static Sys_var_ulong Sys_replica_parallel_workers( static Sys_var_deprecated_alias Sys_slave_parallel_workers( "slave_parallel_workers", Sys_replica_parallel_workers); +static Sys_var_bool Sys_mts_dynamic_rebalance( + "mts_dynamic_rebalance", + "Shuffle DB's within workers periodically for load balancing", + GLOBAL_VAR(opt_mts_dynamic_rebalance), CMD_LINE(OPT_ARG), DEFAULT(false)); + +static Sys_var_double Sys_mts_imbalance_threshold( + "mts_imbalance_threshold", + "Threshold to trigger worker thread rebalancing. This parameter " + "denotes the percent load on the most loaded worker.", + GLOBAL_VAR(opt_mts_imbalance_threshold), CMD_LINE(OPT_ARG), + VALID_RANGE(0, 100), DEFAULT(90)); + static Sys_var_ulonglong Sys_replica_pending_jobs_size_max( "replica_pending_jobs_size_max", "Soft limit on the size, in bytes, of per-worker queues of events that "