diff --git a/dm/worker/server.go b/dm/worker/server.go index 3efd1fe58b..8cd8ca6851 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -254,8 +254,10 @@ func (s *Server) doStartKeepAlive() { } func (s *Server) stopKeepAlive() { - s.kaCancel() - s.kaWg.Wait() + if s.kaCancel != nil { + s.kaCancel() + s.kaWg.Wait() + } } func (s *Server) restartKeepAlive() { diff --git a/pkg/binlog/position.go b/pkg/binlog/position.go index 924108f657..6429501b27 100644 --- a/pkg/binlog/position.go +++ b/pkg/binlog/position.go @@ -316,6 +316,34 @@ func CompareLocation(location1, location2 Location, cmpGTID bool) int { return compareIndex(location1.Suffix, location2.Suffix) } +// IsFreshPosition returns true when location1 is a fresh location without any info. +func IsFreshPosition(location1 Location, flavor string, cmpGTID bool) bool { + location2 := NewLocation(flavor) + if cmpGTID { + cmp, canCmp := CompareGTID(location1.gtidSet, location2.gtidSet) + if canCmp { + if cmp != 0 { + return cmp <= 0 + } + // not supposed to happen, for safety here. + if location1.gtidSet != nil && location1.gtidSet.String() != "" { + return false + } + // empty GTIDSet, then compare by position + log.L().Warn("both gtidSets are empty, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2)) + } else { + // if can't compare by GTIDSet, then compare by position + log.L().Warn("gtidSet can't be compared, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2)) + } + } + + cmp := ComparePosition(location1.Position, location2.Position) + if cmp != 0 { + return cmp <= 0 + } + return compareIndex(location1.Suffix, location2.Suffix) <= 0 +} + // CompareGTID returns: // 1, true if gSet1 is bigger than gSet2 // 0, true if gSet1 is equal to gSet2 diff --git a/pkg/binlog/position_test.go b/pkg/binlog/position_test.go index c1e0f58fe5..2fce477696 100644 --- a/pkg/binlog/position_test.go +++ b/pkg/binlog/position_test.go @@ -794,3 +794,89 @@ func (t *testPositionSuite) TestExtractSuffix(c *C) { c.Assert(suffix, Equals, tc.suffix) } } + +func (t *testPositionSuite) TestIsFreshPosition(c *C) { + mysqlPos := gmysql.Position{ + Name: "mysql-binlog.00001", + Pos: 123, + } + mysqlGTIDSet, err := gtid.ParserGTID(gmysql.MySQLFlavor, "e8e592a6-7a59-11eb-8da1-0242ac110002:1-36") + c.Assert(err, IsNil) + mariaGTIDSet, err := gtid.ParserGTID(gmysql.MariaDBFlavor, "0-1001-233") + c.Assert(err, IsNil) + testCases := []struct { + loc Location + flavor string + cmpGTID bool + fresh bool + }{ + { + InitLocation(mysqlPos, mysqlGTIDSet), + gmysql.MySQLFlavor, + true, + false, + }, + { + InitLocation(mysqlPos, gtid.MinGTIDSet(gmysql.MySQLFlavor)), + gmysql.MySQLFlavor, + true, + false, + }, + { + + InitLocation(MinPosition, mysqlGTIDSet), + gmysql.MySQLFlavor, + true, + false, + }, + { + InitLocation(MinPosition, mysqlGTIDSet), + gmysql.MySQLFlavor, + false, + true, + }, + { + InitLocation(MinPosition, gtid.MinGTIDSet(gmysql.MySQLFlavor)), + gmysql.MySQLFlavor, + true, + true, + }, + + { + InitLocation(mysqlPos, mariaGTIDSet), + gmysql.MariaDBFlavor, + true, + false, + }, + { + InitLocation(mysqlPos, gtid.MinGTIDSet(gmysql.MariaDBFlavor)), + gmysql.MariaDBFlavor, + true, + false, + }, + { + + InitLocation(MinPosition, mariaGTIDSet), + gmysql.MariaDBFlavor, + true, + false, + }, + { + InitLocation(MinPosition, mariaGTIDSet), + gmysql.MariaDBFlavor, + false, + true, + }, + { + InitLocation(MinPosition, gtid.MinGTIDSet(gmysql.MariaDBFlavor)), + gmysql.MariaDBFlavor, + true, + true, + }, + } + + for _, tc := range testCases { + fresh := IsFreshPosition(tc.loc, tc.flavor, tc.cmpGTID) + c.Assert(fresh, Equals, tc.fresh) + } +} diff --git a/pkg/binlog/reader/reader.go b/pkg/binlog/reader/reader.go index 0a8c3c09d4..2b18c79269 100644 --- a/pkg/binlog/reader/reader.go +++ b/pkg/binlog/reader/reader.go @@ -42,3 +42,9 @@ type Reader interface { // Status returns the status of the reader. Status() interface{} } + +// Streamer provides the ability to get binlog event from remote server or local file. +type Streamer interface { + // GetEvent returns binlog event + GetEvent(ctx context.Context) (*replication.BinlogEvent, error) +} diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index c537db2b18..3b5c500f63 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -30,26 +30,14 @@ import ( "github.com/pingcap/dm/relay/common" ) -// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn). -// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later. -// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`. -func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) { - // start to get and parse binlog event from the beginning of the file. - startPos := gmysql.Position{ - Name: endPos.Name, - Pos: 0, - } - err := r.StartSyncByPos(startPos) - if err != nil { - return nil, err - } - defer r.Close() - +// GetGTIDsForPosFromStreamer tries to get GTID sets for the specified binlog position (for the corresponding txn) from a Streamer. +func GetGTIDsForPosFromStreamer(ctx context.Context, r Streamer, endPos gmysql.Position) (gtid.Set, error) { var ( flavor string latestPos uint32 latestGSet gmysql.GTIDSet nextGTIDStr string // can be recorded if the coming transaction completed + err error ) for { var e *replication.BinlogEvent @@ -141,6 +129,24 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid } } +// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn). +// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later. +// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`. +func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) { + // start to get and parse binlog event from the beginning of the file. + startPos := gmysql.Position{ + Name: endPos.Name, + Pos: 0, + } + err := r.StartSyncByPos(startPos) + if err != nil { + return nil, err + } + defer r.Close() + + return GetGTIDsForPosFromStreamer(ctx, r, endPos) +} + // GetPreviousGTIDFromGTIDSet tries to get previous GTID sets from Previous_GTID_EVENT GTID for the specified GITD Set. // events should be [fake_rotate_event,format_description_event,previous_gtids_event/mariadb_gtid_list_event]. func GetPreviousGTIDFromGTIDSet(ctx context.Context, r Reader, gset gtid.Set) (gtid.Set, error) { diff --git a/pkg/streamer/streamer.go b/pkg/streamer/streamer.go index 7bb05137af..31b5a9c9a8 100644 --- a/pkg/streamer/streamer.go +++ b/pkg/streamer/streamer.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/dm/pkg/binlog/common" "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/binlog/reader" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" @@ -32,10 +33,7 @@ var heartbeatInterval = common.MasterHeartbeatPeriod // TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer. // Streamer provides the ability to get binlog event from remote server or local file. -type Streamer interface { - // GetEvent returns binlog event - GetEvent(ctx context.Context) (*replication.BinlogEvent, error) -} +type Streamer reader.Streamer // LocalStreamer reads and parses binlog events from local binlog file. type LocalStreamer struct { diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 75de6ab5bc..e2ff9ec75d 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -781,7 +781,8 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error { gset, ) if isGlobal { - if binlog.CompareLocation(location, binlog.NewLocation(cp.cfg.Flavor), cp.cfg.EnableGTID) > 0 { + // Use IsFreshPosition here to make sure checkpoint can be updated if gset is empty + if !binlog.IsFreshPosition(location, cp.cfg.Flavor, cp.cfg.EnableGTID) { cp.globalPoint = newBinlogPoint(location, location, nil, nil, cp.cfg.EnableGTID) cp.logCtx.L().Info("fetch global checkpoint from DB", log.WrapStringerField("global checkpoint", cp.globalPoint)) } diff --git a/syncer/syncer.go b/syncer/syncer.go index dc8d48a0db..3924d20342 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -49,6 +49,7 @@ import ( "github.com/pingcap/dm/pkg/binlog" "github.com/pingcap/dm/pkg/binlog/common" "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/binlog/reader" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" fr "github.com/pingcap/dm/pkg/func-rollback" @@ -427,7 +428,8 @@ func (s *Syncer) initShardingGroups(ctx context.Context) error { func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) { globalPoint := s.checkpoint.GlobalPoint() tablePoint := s.checkpoint.TablePoint() - return binlog.CompareLocation(globalPoint, binlog.NewLocation(s.cfg.Flavor), s.cfg.EnableGTID) <= 0 && len(tablePoint) == 0, nil + // doesn't have neither GTID nor binlog pos + return binlog.IsFreshPosition(globalPoint, s.cfg.Flavor, s.cfg.EnableGTID) && len(tablePoint) == 0, nil } func (s *Syncer) reset() { @@ -1128,23 +1130,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if err != nil { return err } - - // for fresh and all-mode task, flush checkpoint so we could delete metadata file - if s.cfg.Mode == config.ModeAll { - if err = s.flushCheckPoints(); err != nil { - tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) - } else if s.cfg.CleanDumpFile { - tctx.L().Info("try to remove loaded files") - metadataFile := path.Join(s.cfg.Dir, "metadata") - if err = os.Remove(metadataFile); err != nil { - tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err)) - } - if err = os.Remove(s.cfg.Dir); err != nil { - tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) - } + } + needFlushCheckpoint, err := s.adjustGlobalPointGTID(tctx) + if err != nil { + return err + } + if needFlushCheckpoint || s.cfg.Mode == config.ModeAll { + if err = s.flushCheckPoints(); err != nil { + tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) + } else if s.cfg.Mode == config.ModeAll && s.cfg.CleanDumpFile { + tctx.L().Info("try to remove loaded files") + metadataFile := path.Join(s.cfg.Dir, "metadata") + if err = os.Remove(metadataFile); err != nil { + tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err)) + } + if err = os.Remove(s.cfg.Dir); err != nil { + tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) } } } + failpoint.Inject("AdjustGTIDExit", func() { + tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit")) + s.streamerController.Close(tctx) + utils.OsExit(1) + }) // startLocation is the start location for current received event // currentLocation is the end location for current received event (End_log_pos in `show binlog events` for mysql) @@ -2855,3 +2864,51 @@ func (s *Syncer) getEvent(tctx *tcontext.Context, startLocation binlog.Location) return s.streamerController.GetEvent(tctx) } + +func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) { + location := s.checkpoint.GlobalPoint() + // situations that don't need to adjust + // 1. GTID is not enabled + // 2. location already has GTID position + // 3. location is totally new, has no position info + if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" { + return false, nil + } + // set enableGTID to false for new streamerController + streamerController := NewStreamerController(s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone) + + endPos := binlog.AdjustPosition(location.Position) + startPos := mysql.Position{ + Name: endPos.Name, + Pos: 0, + } + startLocation := location.Clone() + startLocation.Position = startPos + + err := streamerController.Start(tctx, startLocation) + if err != nil { + return false, err + } + defer streamerController.Close(tctx) + + gs, err := reader.GetGTIDsForPosFromStreamer(tctx.Context(), streamerController.streamer, endPos) + if err != nil { + s.tctx.L().Warn("fail to get gtids for global location", zap.Stringer("pos", location), zap.Error(err)) + return false, err + } + err = location.SetGTID(gs.Origin()) + if err != nil { + s.tctx.L().Warn("fail to set gtid for global location", zap.Stringer("pos", location), + zap.String("adjusted_gtid", gs.String()), zap.Error(err)) + return false, err + } + s.checkpoint.SaveGlobalPoint(location) + // redirect streamer for new gtid set location + err = s.streamerController.RedirectStreamer(tctx, location) + if err != nil { + s.tctx.L().Warn("fail to redirect streamer for global location", zap.Stringer("pos", location), + zap.String("adjusted_gtid", gs.String()), zap.Error(err)) + return false, err + } + return true, nil +} diff --git a/tests/adjust_gtid/conf/diff_config.toml b/tests/adjust_gtid/conf/diff_config.toml new file mode 100644 index 0000000000..eb41009ab8 --- /dev/null +++ b/tests/adjust_gtid/conf/diff_config.toml @@ -0,0 +1,56 @@ +# diff Configuration. + +log-level = "info" + +chunk-size = 1000 + +check-thread-count = 4 + +sample-percent = 100 + +use-checksum = true + +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] +schema = "adjust_gtid" +tables = ["~t.*"] + +[[table-config]] +schema = "adjust_gtid" +table = "t1" + +[[table-config.source-tables]] +instance-id = "source-1" +schema = "adjust_gtid" +table = "t1" + +[[table-config]] +schema = "adjust_gtid" +table = "t2" + +[[table-config.source-tables]] +instance-id = "source-2" +schema = "adjust_gtid" +table = "t2" + +[[source-db]] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" +instance-id = "source-1" + +[[source-db]] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "123456" +instance-id = "source-2" + +[target-db] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/tests/adjust_gtid/conf/dm-master.toml b/tests/adjust_gtid/conf/dm-master.toml new file mode 100644 index 0000000000..53a294e7d0 --- /dev/null +++ b/tests/adjust_gtid/conf/dm-master.toml @@ -0,0 +1,6 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" + +rpc-timeout = "30s" +auto-compaction-retention = "3s" diff --git a/tests/adjust_gtid/conf/dm-task.yaml b/tests/adjust_gtid/conf/dm-task.yaml new file mode 100644 index 0000000000..65a029cf62 --- /dev/null +++ b/tests/adjust_gtid/conf/dm-task.yaml @@ -0,0 +1,62 @@ +--- +name: test +task-mode: task-mode-placeholder +is-sharding: false +meta-schema: "dm_meta" +# enable-heartbeat: true +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 +clean-dump-file: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + session: + tidb_skip_utf8_check: 1 + tidb_disable_txn_auto_retry: off + tidb_retry_limit: "10" + +mysql-instances: + - source-id: "mysql-replica-01" + meta: + binlog-name: binlog-name-placeholder-1 + binlog-pos: binlog-pos-placeholder-1 + binlog-gtid: binlog-gtid-placeholder-1 + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + meta: + binlog-name: binlog-name-placeholder-2 + binlog-pos: binlog-pos-placeholder-2 + binlog-gtid: binlog-gtid-placeholder-2 + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +black-white-list: # compatible with deprecated config + instance: + do-dbs: ["adjust_gtid"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 + enable-ansi-quotes: false # compatible with deprecated config diff --git a/tests/adjust_gtid/conf/dm-worker1.toml b/tests/adjust_gtid/conf/dm-worker1.toml new file mode 100644 index 0000000000..7a72ea72bf --- /dev/null +++ b/tests/adjust_gtid/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/tests/adjust_gtid/conf/dm-worker2.toml b/tests/adjust_gtid/conf/dm-worker2.toml new file mode 100644 index 0000000000..010e21c73e --- /dev/null +++ b/tests/adjust_gtid/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" diff --git a/tests/adjust_gtid/conf/source1.yaml b/tests/adjust_gtid/conf/source1.yaml new file mode 100644 index 0000000000..664e2509c5 --- /dev/null +++ b/tests/adjust_gtid/conf/source1.yaml @@ -0,0 +1,18 @@ +# MySQL Configuration. + +source-id: mysql-replica-01 +flavor: '' +enable-gtid: true +enable-relay: true +relay-binlog-name: '' +relay-binlog-gtid: '' +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 +checker: + check-enable: true + backoff-rollback: 5m + backoff-max: 5m + diff --git a/tests/adjust_gtid/conf/source2.yaml b/tests/adjust_gtid/conf/source2.yaml new file mode 100644 index 0000000000..32f44eb948 --- /dev/null +++ b/tests/adjust_gtid/conf/source2.yaml @@ -0,0 +1,21 @@ +# MySQL Configuration. + +source-id: mysql-replica-02 +flavor: '' +enable-gtid: true +enable-relay: false +relay-binlog-name: '' +relay-binlog-gtid: '' +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3307 + + +# let dm-worker2 use the default config for checker +#checker: +# check-enable: true +# backoff-rollback: 5m +# backoff-max: 5m + diff --git a/tests/adjust_gtid/data/db1.increment.sql b/tests/adjust_gtid/data/db1.increment.sql new file mode 100644 index 0000000000..9546244556 --- /dev/null +++ b/tests/adjust_gtid/data/db1.increment.sql @@ -0,0 +1,48 @@ +use adjust_gtid; +insert into t1 (id, name) values (3, 'Eddard Stark'); +update t1 set name = 'Arya Stark' where id = 1; +update t1 set name = 'Catelyn Stark' where name = 'catelyn'; + +-- test multi column index with generated column +alter table t1 add column info json; +alter table t1 add column gen_id int as (info->'$.id'); +alter table t1 add index multi_col(`id`, `gen_id`, ts); +insert into t1 (id, name, info) values (4, 'gentest', '{"id": 123}'); +insert into t1 (id, name, info) values (5, 'gentest', '{"id": 124}'); +update t1 set info = '{"id": 120}', ts = '2021-05-11 12:02:05' where id = 1; +update t1 set info = '{"id": 121}' where id = 2; +update t1 set info = '{"id": 122}' where id = 3; + +-- test genColumnCache is reset after ddl +alter table t1 add column info2 varchar(40); +insert into t1 (id, name, info) values (6, 'gentest', '{"id": 125, "test cache": false}'); +alter table t1 add unique key gen_idx(`gen_id`); +update t1 set name = 'gentestxx' where gen_id = 123; + +insert into t1 (id, name, info) values (7, 'gentest', '{"id": 126}'); +update t1 set name = 'gentestxxxxxx', dt = '2021-05-11 12:03:05', ts = '2021-05-11 12:03:05' where gen_id = 124; +-- delete with unique key +delete from t1 where gen_id > 124; + +-- test alter database +-- tidb doesn't support alter character set from latin1 to utf8m64 so we comment this now +-- alter database adjust_gtid CHARACTER SET = utf8mb4; + +-- test decimal type +alter table t1 add column lat decimal(9,6) default '0.000000'; +insert into t1 (id, name, info, lat) values (8, 'gentest', '{"id":127}', '123.123'); + +-- test bit type +alter table t1 add column bin bit(1) default NULL; +insert into t1 (id, name, info, lat, bin) values (9, 'gentest', '{"id":128}', '123.123', b'0'); +insert into t1 (id, name, info, lat, bin) values (10, 'gentest', '{"id":129}', '123.123', b'1'); + +-- test bigint, min and max value for bigint/bigint unsigned +alter table t1 add column big1 bigint; +alter table t1 add column big2 bigint unsigned; +insert into t1 (id, name, info, lat, big1, big2) values (11, 'gentest', '{"id":130}', '123.123', -9223372036854775808, 0); +insert into t1 (id, name, info, lat, big1, big2) values (12, 'gentest', '{"id":131}', '123.123', 9223372036854775807, 18446744073709551615); + +-- test with different session time_zone +SET @@session.time_zone = '+07:00'; +insert into t1 (id, name, info) values (13, 'tztest', '{"id": 132}'); diff --git a/tests/adjust_gtid/data/db1.prepare.sql b/tests/adjust_gtid/data/db1.prepare.sql new file mode 100644 index 0000000000..6f7b0b37e4 --- /dev/null +++ b/tests/adjust_gtid/data/db1.prepare.sql @@ -0,0 +1,20 @@ +drop database if exists `adjust_gtid`; +create database `adjust_gtid`; +use `adjust_gtid`; +create table t1 ( + id int NOT NULL AUTO_INCREMENT, + name varchar(20), + dt datetime, + ts timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id)); +-- test ANSI_QUOTES works with quote in string +insert into t1 (id, name, dt, ts) values (1, 'ar"ya', now(), now()), (2, 'catelyn', '2021-05-11 10:01:05', '2021-05-11 10:01:05'); + +-- test sql_mode=NO_AUTO_VALUE_ON_ZERO +insert into t1 (id, name) values (0, 'lalala'); + +-- test block-allow-list +drop database if exists `ignore_db`; +create database `ignore_db`; +use `ignore_db`; +create table `ignore_table`(id int); \ No newline at end of file diff --git a/tests/adjust_gtid/data/db2.increment.sql b/tests/adjust_gtid/data/db2.increment.sql new file mode 100644 index 0000000000..9be41c3a53 --- /dev/null +++ b/tests/adjust_gtid/data/db2.increment.sql @@ -0,0 +1,5 @@ +use adjust_gtid; +delete from t2 where name = 'Sansa'; + +-- test sql_mode=NO_AUTO_VALUE_ON_ZERO +insert into t2 (id, name) values (0,'haha') \ No newline at end of file diff --git a/tests/adjust_gtid/data/db2.prepare.sql b/tests/adjust_gtid/data/db2.prepare.sql new file mode 100644 index 0000000000..ab1d850083 --- /dev/null +++ b/tests/adjust_gtid/data/db2.prepare.sql @@ -0,0 +1,15 @@ +drop database if exists `adjust_gtid`; +create database `adjust_gtid`; +use `adjust_gtid`; +create table t2 ( + id int NOT NULL AUTO_INCREMENT, + name varchar(20), + ts timestamp, + PRIMARY KEY (id));; +insert into t2 (name, ts) values ('Arya', now()), ('Bran', '2021-05-11 10:01:05'), ('Sansa', NULL); + +-- test block-allow-list +drop database if exists `ignore_db`; +create database `ignore_db`; +use `ignore_db`; +create table `ignore_table`(id int); \ No newline at end of file diff --git a/tests/adjust_gtid/run.sh b/tests/adjust_gtid/run.sh new file mode 100755 index 0000000000..7aa870500d --- /dev/null +++ b/tests/adjust_gtid/run.sh @@ -0,0 +1,147 @@ +#!/bin/bash + +set -eu + +cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $cur/../_utils/test_prepare +TASK_NAME="test" +WORK_DIR=$TEST_DIR/$TEST_NAME +# SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt" + +# clean_gtid will +# 1. delete source1's gtid info, but keep the binlog pos info (simulate switch gtid and resume from checkpoint) +# 2. delete source2's checkpoint info, set only binlog pos info in the task.yaml (simulate switch gtid and start for meta) +function clean_gtid() { + # delete SOURCE1 checkpoint's gtid info + run_sql "update dm_meta.${TASK_NAME}_syncer_checkpoint set binlog_gtid=\"\" where id=\"$SOURCE_ID1\" and is_global=1" $TIDB_PORT $TIDB_PASSWORD + # set SOURCE2 incremental metadata without checkpoint + run_sql "delete from dm_meta.${TASK_NAME}_syncer_checkpoint where id=\"$SOURCE_ID2\" and is_global=1" $TIDB_PORT $TIDB_PASSWORD + + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + sed -i "s/task-mode-placeholder/incremental/g" $WORK_DIR/dm-task.yaml + # avoid cannot unmarshal !!str `binlog-...` into uint32 error + sed -i "s/binlog-name-placeholder-1/$name1/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-pos-placeholder-1/4/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-gtid-placeholder-1/\"\"/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-name-placeholder-2/$name2/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-gtid-placeholder-2/\"\"/g" $WORK_DIR/dm-task.yaml +} + +# check_checkpoint checks checkpoint data from the database +function check_checkpoint() { + source_id=$1 + expected_name=$2 + expected_pos=$3 + expected_gtid=$4 + + run_sql "select binlog_name,binlog_pos,binlog_gtid from dm_meta.${TASK_NAME}_syncer_checkpoint where id=\"$source_id\" and is_global=1" $TIDB_PORT $TIDB_PASSWORD + check_contains $expected_name + check_contains $expected_pos + if [[ -n $expected_gtid ]]; then + check_contains $expected_gtid + fi +} + +function run() { + run_sql_both_source "SET @@GLOBAL.SQL_MODE='ANSI_QUOTES,NO_AUTO_VALUE_ON_ZERO'" + run_sql_source1 "SET @@global.time_zone = '+01:00';" + run_sql_source2 "SET @@global.time_zone = '+02:00';" + + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + check_metric $MASTER_PORT 'start_leader_counter' 3 0 2 + + export GO_FAILPOINTS='github.com/pingcap/dm/syncer/AdjustGTIDExit=return(true)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + # make sure source1 is bound to worker1 + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "\"result\": true" 1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + # start DM task only + cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml + sed -i "s/task-mode-placeholder/all/g" $WORK_DIR/dm-task.yaml + # avoid cannot unmarshal !!str `binlog-...` into uint32 error + sed -i "s/binlog-pos-placeholder-1/4/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml + dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta" + # check task has started + check_metric $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"$TASK_NAME\"}" 3 1 3 + check_metric $WORKER2_PORT "dm_worker_task_state{source_id=\"mysql-replica-02\",task=\"$TASK_NAME\"}" 3 1 3 + + # use sync_diff_inspector to check full dump loader + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') + + check_checkpoint $SOURCE_ID1 $name1 $pos1 $gtid1 + check_checkpoint $SOURCE_ID2 $name2 $pos2 $gtid2 + dmctl_stop_task_with_retry $TASK_NAME $MASTER_PORT + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + clean_gtid + + # start two workers again + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # start task without checking, worker may exit before we get success result + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" "start-task $WORK_DIR/dm-task.yaml" + + check_checkpoint $SOURCE_ID1 $name1 $pos1 $gtid1 + check_checkpoint $SOURCE_ID2 $name2 $pos2 $gtid2 + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + clean_gtid + + run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + export GO_FAILPOINTS='' + # start two workers again + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # use sync_diff_inspector to check incremental dump loader + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_sql_both_source "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'" + run_sql_both_source "SET @@global.time_zone = 'SYSTEM';" +} + +cleanup_data adjust_gtid +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 07af11af3b..c5d6cc6221 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -291,7 +291,7 @@ function run() { # use sync_diff_inspector to check data now! check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - # TODO(ehco): now this metrics(syncer) hava some problem need to fix. + # TODO(ehco): now this metrics(syncer) have some problem need to fix. # check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2 # check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2 diff --git a/tests/others_integration.txt b/tests/others_integration.txt index b80f59d2f2..26227503ca 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -5,3 +5,4 @@ sequence_sharding_removemeta drop_column_with_index gtid only_dml +adjust_gtid