diff --git a/.github/workflows/check-and-build.yml b/.github/workflows/check-and-build.yml index 0d102cc5eb..e0271d3d0d 100644 --- a/.github/workflows/check-and-build.yml +++ b/.github/workflows/check-and-build.yml @@ -30,7 +30,7 @@ jobs: uses: actions/checkout@v2 - name: Cache go modules - uses: actions/cache@v2 + uses: actions/cache@v2.1.3 # latest v2 can't work in macOS https://github.com/actions/cache/issues/527 with: path: ~/go/pkg/mod key: ${{ runner.os }}-dm-${{ hashFiles('**/go.sum') }} diff --git a/pkg/gtid/gtid.go b/pkg/gtid/gtid.go index cf4a2b91ae..6381d1df93 100644 --- a/pkg/gtid/gtid.go +++ b/pkg/gtid/gtid.go @@ -42,9 +42,6 @@ type Set interface { // should become `00c04543-f584-11e9-a765-0242ac120002:1-60`. Truncate(end Set) error - // ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed - ResetStart() bool - String() string } @@ -260,28 +257,6 @@ func (g *MySQLGTIDSet) Truncate(end Set) error { return nil } -// ResetStart resets the start part of GTID sets, -// like `00c04543-f584-11e9-a765-0242ac120002:40-60` will be reset to `00c04543-f584-11e9-a765-0242ac120002:1-60`. -// return `true` if reset real happen. -func (g *MySQLGTIDSet) ResetStart() bool { - if g == nil || g.set == nil { - return false - } - - reset := false - for _, set := range g.set.Sets { - for i, inter := range set.Intervals { - if inter.Start > 1 { - inter.Start = 1 - set.Intervals[i] = inter // re-assign - reset = true - } - } - } - - return reset -} - func (g *MySQLGTIDSet) String() string { if g.set == nil { return "" @@ -444,11 +419,6 @@ func (m *MariadbGTIDSet) Truncate(end Set) error { return nil } -// ResetStart does nothing because for MariaDB its GTID set format is `domainID-serverID-SeqNum`. -func (m *MariadbGTIDSet) ResetStart() bool { - return false -} - func (m *MariadbGTIDSet) String() string { if m.set == nil { return "" diff --git a/pkg/gtid/gtid_test.go b/pkg/gtid/gtid_test.go index 1f0084d0e2..e1d8b4086a 100644 --- a/pkg/gtid/gtid_test.go +++ b/pkg/gtid/gtid_test.go @@ -401,42 +401,3 @@ func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) { } } } - -func (s *testGTIDSuite) TestGTIDSetResetStart(c *C) { - var ( - gMaria, _ = ParserGTID("", "1-2-3") - flavor = "mysql" - gNil *MySQLGTIDSet - gEmpty, _ = ParserGTID(flavor, "") - g1, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100") - g2, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100") - g3, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:50-100") - g4, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100") - g5, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100") - g6, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:40-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100") - g7, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:10-20:30-100") - ) - - c.Assert(gMaria.ResetStart(), IsFalse) - c.Assert(gNil.ResetStart(), IsFalse) - c.Assert(gEmpty.ResetStart(), IsFalse) - - c.Assert(g1.ResetStart(), IsFalse) - - c.Assert(g2.ResetStart(), IsTrue) - c.Assert(g2.Equal(g1), IsTrue) - - c.Assert(g3.ResetStart(), IsTrue) - c.Assert(g3.Equal(g1), IsTrue) - - c.Assert(g4.ResetStart(), IsFalse) - - c.Assert(g5.ResetStart(), IsTrue) - c.Assert(g5.Equal(g4), IsTrue) - - c.Assert(g6.ResetStart(), IsTrue) - c.Assert(g6.Equal(g4), IsTrue) - - c.Assert(g7.ResetStart(), IsTrue) - // TODO: currently g7 will become "00c04543-f584-11e9-a765-0242ac120002:1-20:1-100", will fix soon -} diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 25fb57921a..600d694832 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -473,3 +473,45 @@ func ExtractTiDBVersion(version string) (*semver.Version, error) { rawVersion = strings.TrimPrefix(rawVersion, "v") return semver.NewVersion(rawVersion) } + +// AddGSetWithPurged is used to handle this case: https://github.com/pingcap/dm/issues/1418 +// we might get a gtid set from Previous_gtids event in binlog, but that gtid set can't be used to start a gtid sync +// because it doesn't cover all gtid_purged. The error of using it will be +// ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. +// so we add gtid_purged to it. +func AddGSetWithPurged(ctx context.Context, gset gtid.Set, conn *sql.Conn) (gtid.Set, error) { + if _, ok := gset.(*gtid.MariadbGTIDSet); ok { + return gset, nil + } + + var ( + gtidStr string + row *sql.Row + err error + ) + + failpoint.Inject("GetGTIDPurged", func(val failpoint.Value) { + str := val.(string) + gtidStr = str + failpoint.Goto("bypass") + }) + row = conn.QueryRowContext(ctx, "select @@GLOBAL.gtid_purged") + err = row.Scan(>idStr) + if err != nil { + log.L().Error("can't get @@GLOBAL.gtid_purged when try to add it to gtid set", zap.Error(err)) + return gset, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + failpoint.Label("bypass") + if gtidStr == "" { + return gset, nil + } + + newGset := gset.Origin() + err = newGset.Update(gtidStr) + if err != nil { + return nil, err + } + ret := >id.MySQLGTIDSet{} + _ = ret.Set(newGset) + return ret, nil +} diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index a6d362680a..6c9eb10572 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -105,6 +105,10 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN if err != nil { return terror.Annotatef(err, "get GTID sets for position %s", pos) } + gs, err = utils.AddGSetWithPurged(tctx.Context(), gs, dbConn.DBConn) + if err != nil { + return terror.Annotatef(err, "get GTID sets for position %s", pos) + } logger.Info("got global checkpoint GTID sets", log.WrapStringerField("GTID sets", gs)) } } @@ -176,21 +180,6 @@ func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sour // getGTIDsForPos gets the GTID sets for the position. func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader) (gs gtid.Set, err error) { - // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, - // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, - // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. - // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like - // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` - // may occur, so we force to reset the START part of any GTID set. - defer func() { - if err == nil && gs != nil { - oldGs := gs.Clone() - if gs.ResetStart() { - tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", gs)) - } - } - }() - // NOTE: because we have multiple unit test cases updating/clearing binlog in the upstream, // we may encounter errors when reading binlog event but cleared by another test case. failpoint.Inject("MockGetGTIDsForPos", func(val failpoint.Value) { diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go index 014fbd5446..a5ced3cafd 100644 --- a/pkg/v1dbschema/schema_test.go +++ b/pkg/v1dbschema/schema_test.go @@ -113,13 +113,15 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { endGS, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "ccb992ad-a557-11ea-ba6a-0242ac140002:1-16") ) - c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:10-16")`), IsNil) // need `ResetStart`. + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:10-16")`), IsNil) //nolint:errcheck defer failpoint.Disable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos") + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:1-9")`), IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged") dbConn, err := t.db.GetBaseConn(tctx.Ctx) c.Assert(err, IsNil) - defer func() { _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ `DROP DATABASE ` + cfg.MetaSchema, diff --git a/relay/relay.go b/relay/relay.go index 7c7675af14..9d37f9ac53 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -379,20 +379,18 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs)) if result.LatestGTIDs != nil { - gs := result.LatestGTIDs - oldGs1 := gs.Clone() - // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, - // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, - // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. - // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like - // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` - // may occur, so we force to reset the START part of any GTID set. - if gs.ResetStart() { - r.logger.Warn("force to reset the start part of recovered GTID sets", zap.Stringer("from GTID set", oldGs1), zap.Stringer("to GTID set", gs)) - oldGs2 := latestGTID.Clone() - if latestGTID.ResetStart() { - r.logger.Warn("force to reset the start part of latest GTID sets", zap.Stringer("from GTID set", oldGs2), zap.Stringer("to GTID set", latestGTID)) - } + dbConn, err2 := r.db.Conn(ctx) + if err2 != nil { + return err2 + } + defer dbConn.Close() + result.LatestGTIDs, err2 = utils.AddGSetWithPurged(ctx, result.LatestGTIDs, dbConn) + if err2 != nil { + return err2 + } + latestGTID, err2 = utils.AddGSetWithPurged(ctx, latestGTID, dbConn) + if err2 != nil { + return err2 } } @@ -965,7 +963,7 @@ func (r *Relay) setSyncConfig() error { } syncerCfg := replication.BinlogSyncerConfig{ - ServerID: uint32(r.cfg.ServerID), + ServerID: r.cfg.ServerID, Flavor: r.cfg.Flavor, Host: r.cfg.From.Host, Port: uint16(r.cfg.From.Port), @@ -990,7 +988,6 @@ func (r *Relay) setSyncConfig() error { // AdjustGTID implements Relay.AdjustGTID // starting sync at returned gset will wholly fetch a binlog from beginning of the file. -// TODO: check if starting fetch at the middle of binlog is also acceptable func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) { // setup a TCP binlog reader (because no relay can be used when upgrading). syncCfg := r.syncerCfg @@ -1006,15 +1003,10 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) return nil, err } - // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, - // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, - // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. - // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like - // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` - // may occur, so we force to reset the START part of any GTID set. - oldGs := resultGs.Clone() - if resultGs.ResetStart() { - r.logger.Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", resultGs)) + dbConn, err2 := r.db.Conn(ctx) + if err2 != nil { + return nil, err2 } - return resultGs, nil + defer dbConn.Close() + return utils.AddGSetWithPurged(ctx, resultGs, dbConn) } diff --git a/relay/relay_test.go b/relay/relay_test.go index ed06e41621..18c801fab2 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -28,6 +28,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser" "github.com/siddontang/go-mysql/mysql" gmysql "github.com/siddontang/go-mysql/mysql" @@ -178,6 +179,9 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { relayCfg = newRelayCfg(c, mysql.MySQLFlavor) r = NewRelay(relayCfg).(*Relay) ) + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged", `return("406a3f61-690d-11e7-87c5-6c92bf46f384:1-122")`), IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged") c.Assert(r.Init(context.Background()), IsNil) // purge old relay dir f, err := os.Create(filepath.Join(r.cfg.RelayDir, "old_relay_log")) @@ -256,9 +260,10 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) { previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495" - recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:1-456" // 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 --> 406a3f61-690d-11e7-87c5-6c92bf46f384:1-456 - filename = "mysql-bin.000001" - startPos = gmysql.Position{Name: filename, Pos: 123} + // if no @@gtid_purged, 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 should be not changed + recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" + filename = "mysql-bin.000001" + startPos = gmysql.Position{Name: filename, Pos: 123} parser2 = parser.New() relayCfg = newRelayCfg(c, mysql.MySQLFlavor) diff --git a/tests/gtid/conf/diff_config.toml b/tests/gtid/conf/diff_config.toml new file mode 100644 index 0000000000..82786b1628 --- /dev/null +++ b/tests/gtid/conf/diff_config.toml @@ -0,0 +1,56 @@ +# diff Configuration. + +log-level = "info" + +chunk-size = 1000 + +check-thread-count = 4 + +sample-percent = 100 + +use-checksum = true + +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] +schema = "gtid" +tables = ["~t.*"] + +[[table-config]] +schema = "gtid" +table = "t1" + +[[table-config.source-tables]] +instance-id = "source-1" +schema = "gtid" +table = "t1" + +[[table-config]] +schema = "gtid" +table = "t2" + +[[table-config.source-tables]] +instance-id = "source-2" +schema = "gtid" +table = "t2" + +[[source-db]] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" +instance-id = "source-1" + +[[source-db]] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "123456" +instance-id = "source-2" + +[target-db] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/tests/gtid/conf/dm-master.toml b/tests/gtid/conf/dm-master.toml new file mode 100644 index 0000000000..9b360834d3 --- /dev/null +++ b/tests/gtid/conf/dm-master.toml @@ -0,0 +1,5 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" + +rpc-timeout = "30s" diff --git a/tests/gtid/conf/dm-task.yaml b/tests/gtid/conf/dm-task.yaml new file mode 100644 index 0000000000..8e96b67f10 --- /dev/null +++ b/tests/gtid/conf/dm-task.yaml @@ -0,0 +1,23 @@ +--- +name: test +task-mode: all +is-sharding: false +clean-dump-file: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + + - source-id: "mysql-replica-02" + block-allow-list: "instance" + +black-white-list: # compatible with deprecated config + instance: + do-dbs: ["gtid"] + diff --git a/tests/gtid/conf/dm-worker1.toml b/tests/gtid/conf/dm-worker1.toml new file mode 100644 index 0000000000..7a72ea72bf --- /dev/null +++ b/tests/gtid/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/tests/gtid/conf/dm-worker2.toml b/tests/gtid/conf/dm-worker2.toml new file mode 100644 index 0000000000..010e21c73e --- /dev/null +++ b/tests/gtid/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" diff --git a/tests/gtid/conf/source1.yaml b/tests/gtid/conf/source1.yaml new file mode 100644 index 0000000000..3c702b3fc2 --- /dev/null +++ b/tests/gtid/conf/source1.yaml @@ -0,0 +1,12 @@ +# MySQL Configuration. + +source-id: mysql-replica-01 +enable-gtid: true +enable-relay: true +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 +checker: + check-enable: false diff --git a/tests/gtid/conf/source2.yaml b/tests/gtid/conf/source2.yaml new file mode 100644 index 0000000000..c37ddc7ded --- /dev/null +++ b/tests/gtid/conf/source2.yaml @@ -0,0 +1,12 @@ +# MySQL Configuration. + +source-id: mysql-replica-02 +enable-gtid: true +enable-relay: false +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3307 +checker: + check-enable: false diff --git a/tests/gtid/data/db1.increment.sql b/tests/gtid/data/db1.increment.sql new file mode 100644 index 0000000000..69a3f9740c --- /dev/null +++ b/tests/gtid/data/db1.increment.sql @@ -0,0 +1,2 @@ +use gtid; +insert into t1 values (2); diff --git a/tests/gtid/data/db1.prepare.sql b/tests/gtid/data/db1.prepare.sql new file mode 100644 index 0000000000..e86aeda508 --- /dev/null +++ b/tests/gtid/data/db1.prepare.sql @@ -0,0 +1,6 @@ +drop database if exists `gtid`; +reset master; +create database `gtid`; +use `gtid`; +create table t1 (id int PRIMARY KEY); +insert into t1 values (1); \ No newline at end of file diff --git a/tests/gtid/data/db2.increment.sql b/tests/gtid/data/db2.increment.sql new file mode 100644 index 0000000000..e2415de822 --- /dev/null +++ b/tests/gtid/data/db2.increment.sql @@ -0,0 +1,2 @@ +use gtid; +insert into t2 values (2); \ No newline at end of file diff --git a/tests/gtid/data/db2.prepare.sql b/tests/gtid/data/db2.prepare.sql new file mode 100644 index 0000000000..02385f8817 --- /dev/null +++ b/tests/gtid/data/db2.prepare.sql @@ -0,0 +1,6 @@ +drop database if exists `gtid`; +reset master; +create database `gtid`; +use `gtid`; +create table t2 (id int primary key); +insert into t2 values (1); \ No newline at end of file diff --git a/tests/gtid/run.sh b/tests/gtid/run.sh new file mode 100755 index 0000000000..fe4b52cefa --- /dev/null +++ b/tests/gtid/run.sh @@ -0,0 +1,125 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME +API_VERSION="v1alpha1" +TASK_NAME="test" + +function run() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/from:/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/from:/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + + # start DM worker and source one-by-one, make sure the source1 bound to worker1 + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # TODO: when there's a purged gap, if starting gtid set covers gap, there should be no data lost + # if starting gtid set not fully covers gap, behaviour should be same whether we enable relay + # hard to reproduce in CI + + # when there's a not purged gap, there should be no data lost + # we manually `set gtid_next = 'uuid:gtid'`to reproduce + gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2,":",$3}'|tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2,":",$3}'|tr -d ' ') + uuid1=$(echo $gtid1 | awk -F: '{print $1}') + uuid2=$(echo $gtid2 | awk -F: '{print $1}') + end_gtid_num1=$(echo $gtid1 | awk -F: '{print $2}' | awk -F- '{print $2}') + end_gtid_num2=$(echo $gtid2 | awk -F: '{print $2}' | awk -F- '{print $2}') + new_gtid1=${uuid1}:$((end_gtid_num1 + 3)) + new_gtid2=${uuid2}:$((end_gtid_num2 + 3)) + echo "new_gtid1 $new_gtid1 new_gtid2 $new_gtid2" + + run_sql_source1 "SET gtid_next='$new_gtid1';insert into gtid.t1 values (3);SET gtid_next='AUTOMATIC';" + run_sql_source2 "SET gtid_next='$new_gtid2';insert into gtid.t2 values (3);SET gtid_next='AUTOMATIC'" + run_sql_both_source "flush logs" + run_sql_source1 "insert into gtid.t1 values (4)" + run_sql_source2 "insert into gtid.t2 values (4)" + # now Previous_gtids event is 09bec856-ba95-11ea-850a-58f2b4af5188:1-4:6 + + sleep 1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 + + run_sql_source1 "insert into gtid.t1 values (5)" + run_sql_source2 "insert into gtid.t2 values (5)" + # now Previous_gtids event is 09bec856-ba95-11ea-850a-58f2b4af5188:1-6 + + # remove relay-dir, now relay starting point(syncer checkpoint) should be 09bec856-ba95-11ea-850a-58f2b4af5188:1-4:6 + # check if relay correctly handle gap + pkill -hup dm-worker.test 2>/dev/null || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + rm -rf $WORK_DIR/worker1/relay_log || true + rm -rf $WORK_DIR/worker2/relay_log || true + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # we didn't lost 09bec856-ba95-11ea-850a-58f2b4af5188:5, which is insert into gtid.tx values (5) + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/dm-task.yaml"\ + "\"result\": true" 3 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 + + run_sql_source1 "insert into gtid.t1 values (7)" + run_sql_source2 "insert into gtid.t2 values (7)" + run_sql_both_source "flush logs" + run_sql_source1 "insert into gtid.t1 values (8)" + run_sql_source2 "insert into gtid.t2 values (8)" + sleep 2 + run_sql_both_source "purge binary logs before '`date '+%Y-%m-%d %H:%M:%S'`'" + + # remove relay-dir, now relay starting point(syncer checkpoint) should be 09bec856-ba95-11ea-850a-58f2b4af5188:1-6 + # which is already purged + pkill -hup dm-worker.test 2>/dev/null || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + rm -rf $WORK_DIR/worker1/relay_log || true + rm -rf $WORK_DIR/worker2/relay_log || true + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/dm-task.yaml" + + # both with and without relay should error + # (different version of MySQL has different error message, only compare error code here) + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $TASK_NAME" \ + "no relay pos match gtid" 1 \ + "ERROR 1236 (HY000)" 1 +} + +cleanup_data gtid +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/others_integration.txt b/tests/others_integration.txt index 4631ecf8b5..eff8b3c429 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -7,3 +7,4 @@ dm_syncer sequence_sharding_optimistic sequence_sharding_removemeta drop_column_with_index +gtid