Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: fix syncer gtid auto switch from off to on (#1723) #1745

Merged
6 changes: 4 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ func (s *Server) doStartKeepAlive() {
}

func (s *Server) stopKeepAlive() {
s.kaCancel()
s.kaWg.Wait()
if s.kaCancel != nil {
s.kaCancel()
s.kaWg.Wait()
}
}

func (s *Server) restartKeepAlive() {
Expand Down
28 changes: 28 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,34 @@ func CompareLocation(location1, location2 Location, cmpGTID bool) int {
return compareIndex(location1.Suffix, location2.Suffix)
}

// IsFreshPosition returns true when location1 is a fresh location without any info.
func IsFreshPosition(location1 Location, flavor string, cmpGTID bool) bool {
location2 := NewLocation(flavor)
if cmpGTID {
cmp, canCmp := CompareGTID(location1.gtidSet, location2.gtidSet)
if canCmp {
if cmp != 0 {
return cmp <= 0
}
// not supposed to happen, for safety here.
if location1.gtidSet != nil && location1.gtidSet.String() != "" {
return false
}
// empty GTIDSet, then compare by position
log.L().Warn("both gtidSets are empty, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
} else {
// if can't compare by GTIDSet, then compare by position
log.L().Warn("gtidSet can't be compared, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
}
}

cmp := ComparePosition(location1.Position, location2.Position)
if cmp != 0 {
return cmp <= 0
}
return compareIndex(location1.Suffix, location2.Suffix) <= 0
}

// CompareGTID returns:
// 1, true if gSet1 is bigger than gSet2
// 0, true if gSet1 is equal to gSet2
Expand Down
86 changes: 86 additions & 0 deletions pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,3 +794,89 @@ func (t *testPositionSuite) TestExtractSuffix(c *C) {
c.Assert(suffix, Equals, tc.suffix)
}
}

func (t *testPositionSuite) TestIsFreshPosition(c *C) {
mysqlPos := gmysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
}
mysqlGTIDSet, err := gtid.ParserGTID(gmysql.MySQLFlavor, "e8e592a6-7a59-11eb-8da1-0242ac110002:1-36")
c.Assert(err, IsNil)
mariaGTIDSet, err := gtid.ParserGTID(gmysql.MariaDBFlavor, "0-1001-233")
c.Assert(err, IsNil)
testCases := []struct {
loc Location
flavor string
cmpGTID bool
fresh bool
}{
{
InitLocation(mysqlPos, mysqlGTIDSet),
gmysql.MySQLFlavor,
true,
false,
},
{
InitLocation(mysqlPos, gtid.MinGTIDSet(gmysql.MySQLFlavor)),
gmysql.MySQLFlavor,
true,
false,
},
{

InitLocation(MinPosition, mysqlGTIDSet),
gmysql.MySQLFlavor,
true,
false,
},
{
InitLocation(MinPosition, mysqlGTIDSet),
gmysql.MySQLFlavor,
false,
true,
},
{
InitLocation(MinPosition, gtid.MinGTIDSet(gmysql.MySQLFlavor)),
gmysql.MySQLFlavor,
true,
true,
},

{
InitLocation(mysqlPos, mariaGTIDSet),
gmysql.MariaDBFlavor,
true,
false,
},
{
InitLocation(mysqlPos, gtid.MinGTIDSet(gmysql.MariaDBFlavor)),
gmysql.MariaDBFlavor,
true,
false,
},
{

InitLocation(MinPosition, mariaGTIDSet),
gmysql.MariaDBFlavor,
true,
false,
},
{
InitLocation(MinPosition, mariaGTIDSet),
gmysql.MariaDBFlavor,
false,
true,
},
{
InitLocation(MinPosition, gtid.MinGTIDSet(gmysql.MariaDBFlavor)),
gmysql.MariaDBFlavor,
true,
true,
},
}

for _, tc := range testCases {
fresh := IsFreshPosition(tc.loc, tc.flavor, tc.cmpGTID)
c.Assert(fresh, Equals, tc.fresh)
}
}
6 changes: 6 additions & 0 deletions pkg/binlog/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ type Reader interface {
// Status returns the status of the reader.
Status() interface{}
}

// Streamer provides the ability to get binlog event from remote server or local file.
type Streamer interface {
// GetEvent returns binlog event
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
}
36 changes: 21 additions & 15 deletions pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,14 @@ import (
"github.com/pingcap/dm/relay/common"
)

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Pos: 0,
}
err := r.StartSyncByPos(startPos)
if err != nil {
return nil, err
}
defer r.Close()

// GetGTIDsForPosFromStreamer tries to get GTID sets for the specified binlog position (for the corresponding txn) from a Streamer.
func GetGTIDsForPosFromStreamer(ctx context.Context, r Streamer, endPos gmysql.Position) (gtid.Set, error) {
var (
flavor string
latestPos uint32
latestGSet gmysql.GTIDSet
nextGTIDStr string // can be recorded if the coming transaction completed
err error
)
for {
var e *replication.BinlogEvent
Expand Down Expand Up @@ -141,6 +129,24 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid
}
}

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Pos: 0,
}
err := r.StartSyncByPos(startPos)
if err != nil {
return nil, err
}
defer r.Close()

return GetGTIDsForPosFromStreamer(ctx, r, endPos)
}

// GetPreviousGTIDFromGTIDSet tries to get previous GTID sets from Previous_GTID_EVENT GTID for the specified GITD Set.
// events should be [fake_rotate_event,format_description_event,previous_gtids_event/mariadb_gtid_list_event].
func GetPreviousGTIDFromGTIDSet(ctx context.Context, r Reader, gset gtid.Set) (gtid.Set, error) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand All @@ -32,10 +33,7 @@ var heartbeatInterval = common.MasterHeartbeatPeriod
// TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer.

// Streamer provides the ability to get binlog event from remote server or local file.
type Streamer interface {
// GetEvent returns binlog event
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
}
type Streamer reader.Streamer

// LocalStreamer reads and parses binlog events from local binlog file.
type LocalStreamer struct {
Expand Down
3 changes: 2 additions & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
gset,
)
if isGlobal {
if binlog.CompareLocation(location, binlog.NewLocation(cp.cfg.Flavor), cp.cfg.EnableGTID) > 0 {
// Use IsFreshPosition here to make sure checkpoint can be updated if gset is empty
if !binlog.IsFreshPosition(location, cp.cfg.Flavor, cp.cfg.EnableGTID) {
cp.globalPoint = newBinlogPoint(location, location, nil, nil, cp.cfg.EnableGTID)
cp.logCtx.L().Info("fetch global checkpoint from DB", log.WrapStringerField("global checkpoint", cp.globalPoint))
}
Expand Down
87 changes: 72 additions & 15 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
fr "github.com/pingcap/dm/pkg/func-rollback"
Expand Down Expand Up @@ -427,7 +428,8 @@ func (s *Syncer) initShardingGroups(ctx context.Context) error {
func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) {
globalPoint := s.checkpoint.GlobalPoint()
tablePoint := s.checkpoint.TablePoint()
return binlog.CompareLocation(globalPoint, binlog.NewLocation(s.cfg.Flavor), s.cfg.EnableGTID) <= 0 && len(tablePoint) == 0, nil
// doesn't have neither GTID nor binlog pos
return binlog.IsFreshPosition(globalPoint, s.cfg.Flavor, s.cfg.EnableGTID) && len(tablePoint) == 0, nil
}

func (s *Syncer) reset() {
Expand Down Expand Up @@ -1128,23 +1130,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}

// for fresh and all-mode task, flush checkpoint so we could delete metadata file
if s.cfg.Mode == config.ModeAll {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else if s.cfg.CleanDumpFile {
tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err = os.Remove(metadataFile); err != nil {
tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err = os.Remove(s.cfg.Dir); err != nil {
tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
needFlushCheckpoint, err := s.adjustGlobalPointGTID(tctx)
if err != nil {
return err
}
if needFlushCheckpoint || s.cfg.Mode == config.ModeAll {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else if s.cfg.Mode == config.ModeAll && s.cfg.CleanDumpFile {
tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err = os.Remove(metadataFile); err != nil {
tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err = os.Remove(s.cfg.Dir); err != nil {
tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
}
failpoint.Inject("AdjustGTIDExit", func() {
tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit"))
s.streamerController.Close(tctx)
utils.OsExit(1)
})

// startLocation is the start location for current received event
// currentLocation is the end location for current received event (End_log_pos in `show binlog events` for mysql)
Expand Down Expand Up @@ -2855,3 +2864,51 @@ func (s *Syncer) getEvent(tctx *tcontext.Context, startLocation binlog.Location)

return s.streamerController.GetEvent(tctx)
}

func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) {
location := s.checkpoint.GlobalPoint()
// situations that don't need to adjust
// 1. GTID is not enabled
// 2. location already has GTID position
// 3. location is totally new, has no position info
if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" {
return false, nil
}
// set enableGTID to false for new streamerController
streamerController := NewStreamerController(s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone)

endPos := binlog.AdjustPosition(location.Position)
startPos := mysql.Position{
Name: endPos.Name,
Pos: 0,
}
startLocation := location.Clone()
startLocation.Position = startPos

err := streamerController.Start(tctx, startLocation)
if err != nil {
return false, err
}
defer streamerController.Close(tctx)

gs, err := reader.GetGTIDsForPosFromStreamer(tctx.Context(), streamerController.streamer, endPos)
if err != nil {
s.tctx.L().Warn("fail to get gtids for global location", zap.Stringer("pos", location), zap.Error(err))
return false, err
}
err = location.SetGTID(gs.Origin())
if err != nil {
s.tctx.L().Warn("fail to set gtid for global location", zap.Stringer("pos", location),
zap.String("adjusted_gtid", gs.String()), zap.Error(err))
return false, err
}
s.checkpoint.SaveGlobalPoint(location)
// redirect streamer for new gtid set location
err = s.streamerController.RedirectStreamer(tctx, location)
if err != nil {
s.tctx.L().Warn("fail to redirect streamer for global location", zap.Stringer("pos", location),
zap.String("adjusted_gtid", gs.String()), zap.Error(err))
return false, err
}
return true, nil
}
Loading