Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

gtid, relay: use gtid_purged to handle gap in Previous_gtids event #1430

Merged
merged 13 commits into from
Feb 19, 2021
2 changes: 1 addition & 1 deletion .github/workflows/check-and-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
uses: actions/checkout@v2

- name: Cache go modules
uses: actions/cache@v2
uses: actions/cache@v2.1.3 # latest v2 can't work in macOS https://github.com/actions/cache/issues/527
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-dm-${{ hashFiles('**/go.sum') }}
Expand Down
30 changes: 0 additions & 30 deletions pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type Set interface {
// should become `00c04543-f584-11e9-a765-0242ac120002:1-60`.
Truncate(end Set) error

// ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed
ResetStart() bool

String() string
}

Expand Down Expand Up @@ -260,28 +257,6 @@ func (g *MySQLGTIDSet) Truncate(end Set) error {
return nil
}

// ResetStart resets the start part of GTID sets,
// like `00c04543-f584-11e9-a765-0242ac120002:40-60` will be reset to `00c04543-f584-11e9-a765-0242ac120002:1-60`.
// return `true` if reset real happen.
func (g *MySQLGTIDSet) ResetStart() bool {
if g == nil || g.set == nil {
return false
}

reset := false
for _, set := range g.set.Sets {
for i, inter := range set.Intervals {
if inter.Start > 1 {
inter.Start = 1
set.Intervals[i] = inter // re-assign
reset = true
}
}
}

return reset
}

func (g *MySQLGTIDSet) String() string {
if g.set == nil {
return ""
Expand Down Expand Up @@ -444,11 +419,6 @@ func (m *MariadbGTIDSet) Truncate(end Set) error {
return nil
}

// ResetStart does nothing because for MariaDB its GTID set format is `domainID-serverID-SeqNum`.
func (m *MariadbGTIDSet) ResetStart() bool {
return false
}

func (m *MariadbGTIDSet) String() string {
if m.set == nil {
return ""
Expand Down
39 changes: 0 additions & 39 deletions pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,42 +401,3 @@ func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) {
}
}
}

func (s *testGTIDSuite) TestGTIDSetResetStart(c *C) {
var (
gMaria, _ = ParserGTID("", "1-2-3")
flavor = "mysql"
gNil *MySQLGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
g1, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100")
g2, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100")
g3, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:50-100")
g4, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100")
g5, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g6, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:40-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100")
g7, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:10-20:30-100")
)

c.Assert(gMaria.ResetStart(), IsFalse)
c.Assert(gNil.ResetStart(), IsFalse)
c.Assert(gEmpty.ResetStart(), IsFalse)

c.Assert(g1.ResetStart(), IsFalse)

c.Assert(g2.ResetStart(), IsTrue)
c.Assert(g2.Equal(g1), IsTrue)

c.Assert(g3.ResetStart(), IsTrue)
c.Assert(g3.Equal(g1), IsTrue)

c.Assert(g4.ResetStart(), IsFalse)

c.Assert(g5.ResetStart(), IsTrue)
c.Assert(g5.Equal(g4), IsTrue)

c.Assert(g6.ResetStart(), IsTrue)
c.Assert(g6.Equal(g4), IsTrue)

c.Assert(g7.ResetStart(), IsTrue)
// TODO: currently g7 will become "00c04543-f584-11e9-a765-0242ac120002:1-20:1-100", will fix soon
}
42 changes: 42 additions & 0 deletions pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,45 @@ func ExtractTiDBVersion(version string) (*semver.Version, error) {
rawVersion = strings.TrimPrefix(rawVersion, "v")
return semver.NewVersion(rawVersion)
}

// AddGSetWithPurged is used to handle this case: https://github.com/pingcap/dm/issues/1418
// we might get a gtid set from Previous_gtids event in binlog, but that gtid set can't be used to start a gtid sync
// because it doesn't cover all gtid_purged. The error of using it will be
// ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
// so we add gtid_purged to it.
func AddGSetWithPurged(ctx context.Context, gset gtid.Set, conn *sql.Conn) (gtid.Set, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use sql.Conn instead of sql.DB here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some callers only hold sql.Conn not sql.DB

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in line 108 of pkg/v1dbschema/schema.go (below file). I'll close connection in next commit

if _, ok := gset.(*gtid.MariadbGTIDSet); ok {
return gset, nil
}

var (
gtidStr string
row *sql.Row
err error
)

failpoint.Inject("GetGTIDPurged", func(val failpoint.Value) {
str := val.(string)
gtidStr = str
failpoint.Goto("bypass")
})
row = conn.QueryRowContext(ctx, "select @@GLOBAL.gtid_purged")
err = row.Scan(&gtidStr)
if err != nil {
log.L().Error("can't get @@GLOBAL.gtid_purged when try to add it to gtid set", zap.Error(err))
return gset, terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
failpoint.Label("bypass")
if gtidStr == "" {
return gset, nil
}

newGset := gset.Origin()
err = newGset.Update(gtidStr)
if err != nil {
return nil, err
}
ret := &gtid.MySQLGTIDSet{}
_ = ret.Set(newGset)
return ret, nil
}
19 changes: 4 additions & 15 deletions pkg/v1dbschema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN
if err != nil {
return terror.Annotatef(err, "get GTID sets for position %s", pos)
}
gs, err = utils.AddGSetWithPurged(tctx.Context(), gs, dbConn.DBConn)
if err != nil {
return terror.Annotatef(err, "get GTID sets for position %s", pos)
}
logger.Info("got global checkpoint GTID sets", log.WrapStringerField("GTID sets", gs))
}
}
Expand Down Expand Up @@ -176,21 +180,6 @@ func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sour

// getGTIDsForPos gets the GTID sets for the position.
func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader) (gs gtid.Set, err error) {
// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
defer func() {
if err == nil && gs != nil {
oldGs := gs.Clone()
if gs.ResetStart() {
tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", gs))
}
}
}()

// NOTE: because we have multiple unit test cases updating/clearing binlog in the upstream,
// we may encounter errors when reading binlog event but cleared by another test case.
failpoint.Inject("MockGetGTIDsForPos", func(val failpoint.Value) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/v1dbschema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,15 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) {
endGS, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "ccb992ad-a557-11ea-ba6a-0242ac140002:1-16")
)

c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:10-16")`), IsNil) // need `ResetStart`.
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:10-16")`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos")
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:1-9")`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged")

dbConn, err := t.db.GetBaseConn(tctx.Ctx)
c.Assert(err, IsNil)

defer func() {
_, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{
`DROP DATABASE ` + cfg.MetaSchema,
Expand Down
44 changes: 18 additions & 26 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,20 +379,18 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser
zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs))

if result.LatestGTIDs != nil {
gs := result.LatestGTIDs
oldGs1 := gs.Clone()
// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
if gs.ResetStart() {
r.logger.Warn("force to reset the start part of recovered GTID sets", zap.Stringer("from GTID set", oldGs1), zap.Stringer("to GTID set", gs))
oldGs2 := latestGTID.Clone()
if latestGTID.ResetStart() {
r.logger.Warn("force to reset the start part of latest GTID sets", zap.Stringer("from GTID set", oldGs2), zap.Stringer("to GTID set", latestGTID))
}
dbConn, err2 := r.db.Conn(ctx)
if err2 != nil {
return err2
}
defer dbConn.Close()
result.LatestGTIDs, err2 = utils.AddGSetWithPurged(ctx, result.LatestGTIDs, dbConn)
if err2 != nil {
return err2
}
latestGTID, err2 = utils.AddGSetWithPurged(ctx, latestGTID, dbConn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't add purged gtid to result.LatestGTIDs, do we need to add purged gtid to latestGTID here?

if err2 != nil {
return err2
}
}

Expand Down Expand Up @@ -965,7 +963,7 @@ func (r *Relay) setSyncConfig() error {
}

syncerCfg := replication.BinlogSyncerConfig{
ServerID: uint32(r.cfg.ServerID),
ServerID: r.cfg.ServerID,
Flavor: r.cfg.Flavor,
Host: r.cfg.From.Host,
Port: uint16(r.cfg.From.Port),
Expand All @@ -990,7 +988,6 @@ func (r *Relay) setSyncConfig() error {

// AdjustGTID implements Relay.AdjustGTID
// starting sync at returned gset will wholly fetch a binlog from beginning of the file.
// TODO: check if starting fetch at the middle of binlog is also acceptable
func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) {
// setup a TCP binlog reader (because no relay can be used when upgrading).
syncCfg := r.syncerCfg
Expand All @@ -1006,15 +1003,10 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error)
return nil, err
}

// in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes,
// e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`,
// but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`.
// and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like
// `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.`
// may occur, so we force to reset the START part of any GTID set.
oldGs := resultGs.Clone()
if resultGs.ResetStart() {
r.logger.Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", resultGs))
dbConn, err2 := r.db.Conn(ctx)
if err2 != nil {
return nil, err2
}
return resultGs, nil
defer dbConn.Close()
return utils.AddGSetWithPurged(ctx, resultGs, dbConn)
}
11 changes: 8 additions & 3 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
"github.com/siddontang/go-mysql/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
Expand Down Expand Up @@ -178,6 +179,9 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
relayCfg = newRelayCfg(c, mysql.MySQLFlavor)
r = NewRelay(relayCfg).(*Relay)
)
c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged", `return("406a3f61-690d-11e7-87c5-6c92bf46f384:1-122")`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged")
c.Assert(r.Init(context.Background()), IsNil)
// purge old relay dir
f, err := os.Create(filepath.Join(r.cfg.RelayDir, "old_relay_log"))
Expand Down Expand Up @@ -256,9 +260,10 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) {
previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14"
latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495"
recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:1-456" // 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 --> 406a3f61-690d-11e7-87c5-6c92bf46f384:1-456
filename = "mysql-bin.000001"
startPos = gmysql.Position{Name: filename, Pos: 123}
// if no @@gtid_purged, 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 should be not changed
recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
filename = "mysql-bin.000001"
startPos = gmysql.Position{Name: filename, Pos: 123}

parser2 = parser.New()
relayCfg = newRelayCfg(c, mysql.MySQLFlavor)
Expand Down
56 changes: 56 additions & 0 deletions tests/gtid/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# diff Configuration.

log-level = "info"

chunk-size = 1000

check-thread-count = 4

sample-percent = 100

use-checksum = true

fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "gtid"
tables = ["~t.*"]

[[table-config]]
schema = "gtid"
table = "t1"

[[table-config.source-tables]]
instance-id = "source-1"
schema = "gtid"
table = "t1"

[[table-config]]
schema = "gtid"
table = "t2"

[[table-config.source-tables]]
instance-id = "source-2"
schema = "gtid"
table = "t2"

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"
instance-id = "source-1"

[[source-db]]
host = "127.0.0.1"
port = 3307
user = "root"
password = "123456"
instance-id = "source-2"

[target-db]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
5 changes: 5 additions & 0 deletions tests/gtid/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"

rpc-timeout = "30s"
23 changes: 23 additions & 0 deletions tests/gtid/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
---
name: test
task-mode: all
is-sharding: false
clean-dump-file: false

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"

- source-id: "mysql-replica-02"
block-allow-list: "instance"

black-white-list: # compatible with deprecated config
instance:
do-dbs: ["gtid"]

2 changes: 2 additions & 0 deletions tests/gtid/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
2 changes: 2 additions & 0 deletions tests/gtid/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261"
Loading