From fd608ec7ddf8dd984352dfb43a4288246afe5286 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 21 Jan 2021 16:33:17 +0800 Subject: [PATCH] cherry pick #1385 to release-2.0 (#1395) --- dm/config/source_config.go | 2 +- dm/master/server.go | 4 ++++ dm/worker/source.yaml | 5 +++-- pkg/gtid/gtid.go | 30 +++++++++++++++++++++++++----- pkg/gtid/gtid_test.go | 8 ++++++++ relay/meta.go | 10 ++++++---- 6 files changed, 47 insertions(+), 12 deletions(-) diff --git a/dm/config/source_config.go b/dm/config/source_config.go index facdbc41e3..9d37de2fa8 100644 --- a/dm/config/source_config.go +++ b/dm/config/source_config.go @@ -138,7 +138,7 @@ func (c *SourceConfig) ParseYaml(content string) error { return terror.ErrConfigYamlTransform.Delegate(err, "decode source config") } c.adjust() - return c.Verify() + return nil } // EncodeToml encodes config. diff --git a/dm/master/server.go b/dm/master/server.go index 460bee1f45..ddb5e67bb6 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -996,6 +996,10 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf return cfgs, err } + if err = cfg.Verify(); err != nil { + return cfgs, err + } + fromDB.Close() cfgs[i] = cfg } diff --git a/dm/worker/source.yaml b/dm/worker/source.yaml index e1ca06323d..1040f9a990 100644 --- a/dm/worker/source.yaml +++ b/dm/worker/source.yaml @@ -7,13 +7,14 @@ server-id: 101 source-id: mysql-replica-01 #flavor: mysql/mariadb -flavor: mysql #directory that used to store relay log relay-dir: ./relay_log #enable gtid in relay log unit -enable-gtid: false +enable-gtid: true + +relay-binlog-gtid: "e68f6068-53ec-11eb-9c5f-0242ac110003:1-50" #charset of DSN of source mysql/mariadb instance # charset: '' diff --git a/pkg/gtid/gtid.go b/pkg/gtid/gtid.go index ef7f38551d..f43cb67fe1 100644 --- a/pkg/gtid/gtid.go +++ b/pkg/gtid/gtid.go @@ -14,6 +14,7 @@ package gtid import ( + "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" "github.com/pingcap/dm/pkg/terror" @@ -47,16 +48,35 @@ type Set interface { // ParserGTID parses GTID from string func ParserGTID(flavor, gtidStr string) (Set, error) { var ( - m Set - err error + m Set + err error + gtid mysql.GTIDSet ) - gtid, err := mysql.ParseGTIDSet(flavor, gtidStr) + if len(flavor) == 0 && len(gtidStr) == 0 { + return nil, errors.Errorf("empty flavor with empty gtid is invalid") + } + + fla := flavor + switch fla { + case mysql.MySQLFlavor, mysql.MariaDBFlavor: + gtid, err = mysql.ParseGTIDSet(fla, gtidStr) + case "": + fla = mysql.MySQLFlavor + gtid, err = mysql.ParseGTIDSet(fla, gtidStr) + if err != nil { + fla = mysql.MariaDBFlavor + gtid, err = mysql.ParseGTIDSet(fla, gtidStr) + } + default: + err = terror.ErrNotSupportedFlavor.Generate(flavor) + } + if err != nil { - return nil, terror.ErrParseGTID.Delegate(err, gtidStr) + return nil, err } - switch flavor { + switch fla { case mysql.MariaDBFlavor: m = &MariadbGTIDSet{} case mysql.MySQLFlavor: diff --git a/pkg/gtid/gtid_test.go b/pkg/gtid/gtid_test.go index aef1f50116..9951a6e08a 100644 --- a/pkg/gtid/gtid_test.go +++ b/pkg/gtid/gtid_test.go @@ -58,6 +58,14 @@ func (s *testGTIDSuite) TestGTID(c *C) { {"mysql", []interface{}{matserUUIDs[0]}, fmt.Sprintf("%s:1-2", matserUUIDs[1]), "", ""}, {"mysql", []interface{}{matserUUIDs[0], matserUUIDs[1]}, fmt.Sprintf("%s:1-2,%s:1-2", matserUUIDs[0], matserUUIDs[1]), fmt.Sprintf("%s:1-12,%s:1-4,%s:1-4", matserUUIDs[0], matserUUIDs[1], matserUUIDs[2]), fmt.Sprintf("%s:1-2,%s:1-2,%s:1-4", matserUUIDs[0], matserUUIDs[1], matserUUIDs[2])}, {"mysql", []interface{}{matserUUIDs[0], matserUUIDs[2]}, fmt.Sprintf("%s:1-2", matserUUIDs[1]), fmt.Sprintf("%s:1-12,%s:1-3,%s:1-4", matserUUIDs[0], matserUUIDs[1], matserUUIDs[2]), fmt.Sprintf("%s:1-2", matserUUIDs[1])}, + {"", []interface{}{uint32(1)}, "1-1-1,2-2-2", "1-1-12,4-4-4", "1-1-1,4-4-4"}, + {"", []interface{}{uint32(1)}, "2-2-2", "1-2-12,2-2-3,4-4-4", "2-2-2,4-4-4"}, + {"", []interface{}{uint32(1), uint32(3)}, "1-1-1,3-3-4,2-2-2", "1-1-12,3-3-8,4-4-4", "1-1-1,3-3-4,4-4-4"}, + {"", []interface{}{uint32(1), uint32(3)}, "2-2-2", "1-2-12,2-2-3,3-3-8,4-4-4", "2-2-2,4-4-4"}, + {"", []interface{}{matserUUIDs[0]}, fmt.Sprintf("%s:1-2,%s:1-2", matserUUIDs[0], matserUUIDs[1]), fmt.Sprintf("%s:1-12,%s:1-4", matserUUIDs[0], matserUUIDs[2]), fmt.Sprintf("%s:1-2,%s:1-4", matserUUIDs[0], matserUUIDs[2])}, + {"", []interface{}{matserUUIDs[0]}, fmt.Sprintf("%s:1-2", matserUUIDs[1]), fmt.Sprintf("%s:1-12,%s:1-3,%s:1-4", matserUUIDs[0], matserUUIDs[1], matserUUIDs[2]), fmt.Sprintf("%s:1-2,%s:1-4", matserUUIDs[1], matserUUIDs[2])}, + {"", []interface{}{matserUUIDs[0], matserUUIDs[1]}, fmt.Sprintf("%s:1-2,%s:1-2", matserUUIDs[0], matserUUIDs[1]), fmt.Sprintf("%s:1-12,%s:1-4,%s:1-4", matserUUIDs[0], matserUUIDs[1], matserUUIDs[2]), fmt.Sprintf("%s:1-2,%s:1-2,%s:1-4", matserUUIDs[0], matserUUIDs[1], matserUUIDs[2])}, + {"", []interface{}{matserUUIDs[0], matserUUIDs[2]}, fmt.Sprintf("%s:1-2", matserUUIDs[1]), fmt.Sprintf("%s:1-12,%s:1-3,%s:1-4", matserUUIDs[0], matserUUIDs[1], matserUUIDs[2]), fmt.Sprintf("%s:1-2", matserUUIDs[1])}, } for _, cs := range cases { diff --git a/relay/meta.go b/relay/meta.go index 08368fa8a0..9fff82ab8c 100644 --- a/relay/meta.go +++ b/relay/meta.go @@ -473,11 +473,13 @@ func (lm *LocalMeta) loadMetaData() error { return terror.ErrRelayLoadMetaData.Delegate(err) } - gset, err := gtid.ParserGTID(lm.flavor, lm.BinlogGTID) - if err != nil { - return terror.ErrRelayLoadMetaData.Delegate(err) + if len(lm.BinlogGTID) != 0 { + gset, err := gtid.ParserGTID("", lm.BinlogGTID) + if err != nil { + return terror.ErrRelayLoadMetaData.Delegate(err) + } + lm.gset = gset } - lm.gset = gset return nil }