Skip to content
This repository was archived by the owner on Nov 24, 2023. It is now read-only.

syncer: fix syncer gtid auto switch from off to on (#1723) #1745

Merged
6 changes: 4 additions & 2 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,10 @@ func (s *Server) doStartKeepAlive() {
}

func (s *Server) stopKeepAlive() {
s.kaCancel()
s.kaWg.Wait()
if s.kaCancel != nil {
s.kaCancel()
s.kaWg.Wait()
}
}

func (s *Server) restartKeepAlive() {
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMe
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/codahale/hdrhistogram v0.9.0 h1:9GjrtRI+mLEFPtTfR/AZhcxp+Ii8NZYWq5104FbZQY0=
github.com/codahale/hdrhistogram v0.9.0/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/coocood/badger v1.5.1-0.20200515070411-e02af0688441/go.mod h1:klY8SfH2lNZ/23/SIxwHoJw+T6wYGB12YPCF9MUoiu0=
Expand Down Expand Up @@ -271,7 +270,6 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu
github.com/frankban/quicktest v1.4.1/go.mod h1:36zfPVQyHxymz4cH7wlDmVwDrJuljRB60qkgn7rorfQ=
github.com/frankban/quicktest v1.11.1 h1:stwUsXhUGliQs9t0ZS39BWCltFdOHgABiIlihop8AD4=
github.com/frankban/quicktest v1.11.1/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsouza/fake-gcs-server v1.15.0/go.mod h1:HNxAJ/+FY/XSsxuwz8iIYdp2GtMmPbJ8WQjjGMxd6Qk=
github.com/fsouza/fake-gcs-server v1.17.0/go.mod h1:D1rTE4YCyHFNa99oyJJ5HyclvN/0uQR+pM/VdlL83bw=
Expand Down Expand Up @@ -1094,7 +1092,6 @@ github.com/xitongsys/parquet-go v1.5.1/go.mod h1:xUxwM8ELydxh4edHGegYq1pA8NnMKDx
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457 h1:tBbuFCtyJNKT+BFAv6qjvTFpVdy97IYNaBwGUXifIUs=
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457/go.mod h1:pheqtXeHQFzxJk45lRQ0UIGIivKnLXvialZSFWs81A8=
github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod h1:xxCx7Wpym/3QCo6JhujJX51dzSXrwmb0oH6FQb39SEA=
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 h1:a742S4V5A15F93smuVxA60LQWsrCnN8bKeWDBARU1/k=
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE=
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
github.com/xo/dburl v0.0.0-20191219122722-3cca8608d645/go.mod h1:A47W3pdWONaZmXuLZgfKLAVgUY0qvfTRM5vVDKS40S4=
Expand Down
28 changes: 28 additions & 0 deletions pkg/binlog/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,34 @@ func CompareLocation(location1, location2 Location, cmpGTID bool) int {
return compareIndex(location1.Suffix, location2.Suffix)
}

// IsFreshPosition returns true when location1 is a fresh location without any info.
func IsFreshPosition(location1 Location, flavor string, cmpGTID bool) bool {
location2 := NewLocation(flavor)
if cmpGTID {
cmp, canCmp := CompareGTID(location1.gtidSet, location2.gtidSet)
if canCmp {
if cmp != 0 {
return cmp <= 0
}
// not supposed to happen, for safety here.
if location1.gtidSet != nil && location1.gtidSet.String() != "" {
return false
}
// empty GTIDSet, then compare by position
log.L().Warn("both gtidSets are empty, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
} else {
// if can't compare by GTIDSet, then compare by position
log.L().Warn("gtidSet can't be compared, will compare by position", zap.Stringer("location1", location1), zap.Stringer("location2", location2))
}
}

cmp := ComparePosition(location1.Position, location2.Position)
if cmp != 0 {
return cmp <= 0
}
return compareIndex(location1.Suffix, location2.Suffix) <= 0
}

// CompareGTID returns:
// 1, true if gSet1 is bigger than gSet2
// 0, true if gSet1 is equal to gSet2
Expand Down
86 changes: 86 additions & 0 deletions pkg/binlog/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,3 +794,89 @@ func (t *testPositionSuite) TestExtractSuffix(c *C) {
c.Assert(suffix, Equals, tc.suffix)
}
}

func (t *testPositionSuite) TestIsFreshPosition(c *C) {
mysqlPos := gmysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
}
mysqlGTIDSet, err := gtid.ParserGTID(gmysql.MySQLFlavor, "e8e592a6-7a59-11eb-8da1-0242ac110002:1-36")
c.Assert(err, IsNil)
mariaGTIDSet, err := gtid.ParserGTID(gmysql.MariaDBFlavor, "0-1001-233")
c.Assert(err, IsNil)
testCases := []struct {
loc Location
flavor string
cmpGTID bool
fresh bool
}{
{
InitLocation(mysqlPos, mysqlGTIDSet),
gmysql.MySQLFlavor,
true,
false,
},
{
InitLocation(mysqlPos, gtid.MinGTIDSet(gmysql.MySQLFlavor)),
gmysql.MySQLFlavor,
true,
false,
},
{

InitLocation(MinPosition, mysqlGTIDSet),
gmysql.MySQLFlavor,
true,
false,
},
{
InitLocation(MinPosition, mysqlGTIDSet),
gmysql.MySQLFlavor,
false,
true,
},
{
InitLocation(MinPosition, gtid.MinGTIDSet(gmysql.MySQLFlavor)),
gmysql.MySQLFlavor,
true,
true,
},

{
InitLocation(mysqlPos, mariaGTIDSet),
gmysql.MariaDBFlavor,
true,
false,
},
{
InitLocation(mysqlPos, gtid.MinGTIDSet(gmysql.MariaDBFlavor)),
gmysql.MariaDBFlavor,
true,
false,
},
{

InitLocation(MinPosition, mariaGTIDSet),
gmysql.MariaDBFlavor,
true,
false,
},
{
InitLocation(MinPosition, mariaGTIDSet),
gmysql.MariaDBFlavor,
false,
true,
},
{
InitLocation(MinPosition, gtid.MinGTIDSet(gmysql.MariaDBFlavor)),
gmysql.MariaDBFlavor,
true,
true,
},
}

for _, tc := range testCases {
fresh := IsFreshPosition(tc.loc, tc.flavor, tc.cmpGTID)
c.Assert(fresh, Equals, tc.fresh)
}
}
6 changes: 6 additions & 0 deletions pkg/binlog/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,9 @@ type Reader interface {
// Status returns the status of the reader.
Status() interface{}
}

// Streamer provides the ability to get binlog event from remote server or local file.
type Streamer interface {
// GetEvent returns binlog event
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
}
36 changes: 21 additions & 15 deletions pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,14 @@ import (
"github.com/pingcap/dm/relay/common"
)

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Pos: 0,
}
err := r.StartSyncByPos(startPos)
if err != nil {
return nil, err
}
defer r.Close()

// GetGTIDsForPosFromStreamer tries to get GTID sets for the specified binlog position (for the corresponding txn) from a Streamer.
func GetGTIDsForPosFromStreamer(ctx context.Context, r Streamer, endPos gmysql.Position) (gtid.Set, error) {
var (
flavor string
latestPos uint32
latestGSet gmysql.GTIDSet
nextGTIDStr string // can be recorded if the coming transaction completed
err error
)
for {
var e *replication.BinlogEvent
Expand Down Expand Up @@ -141,6 +129,24 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid
}
}

// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn).
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later.
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`.
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position) (gtid.Set, error) {
// start to get and parse binlog event from the beginning of the file.
startPos := gmysql.Position{
Name: endPos.Name,
Pos: 0,
}
err := r.StartSyncByPos(startPos)
if err != nil {
return nil, err
}
defer r.Close()

return GetGTIDsForPosFromStreamer(ctx, r, endPos)
}

// GetPreviousGTIDFromGTIDSet tries to get previous GTID sets from Previous_GTID_EVENT GTID for the specified GITD Set.
// events should be [fake_rotate_event,format_description_event,previous_gtids_event/mariadb_gtid_list_event].
func GetPreviousGTIDFromGTIDSet(ctx context.Context, r Reader, gset gtid.Set) (gtid.Set, error) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

Expand All @@ -32,10 +33,7 @@ var heartbeatInterval = common.MasterHeartbeatPeriod
// TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer.

// Streamer provides the ability to get binlog event from remote server or local file.
type Streamer interface {
// GetEvent returns binlog event
GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
}
type Streamer reader.Streamer

// LocalStreamer reads and parses binlog events from local binlog file.
type LocalStreamer struct {
Expand Down
3 changes: 2 additions & 1 deletion syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
gset,
)
if isGlobal {
if binlog.CompareLocation(location, binlog.NewLocation(cp.cfg.Flavor), cp.cfg.EnableGTID) > 0 {
// Use IsFreshPosition here to make sure checkpoint can be updated if gset is empty
if !binlog.IsFreshPosition(location, cp.cfg.Flavor, cp.cfg.EnableGTID) {
cp.globalPoint = newBinlogPoint(location, location, nil, nil, cp.cfg.EnableGTID)
cp.logCtx.L().Info("fetch global checkpoint from DB", log.WrapStringerField("global checkpoint", cp.globalPoint))
}
Expand Down
87 changes: 72 additions & 15 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/binlog/reader"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
fr "github.com/pingcap/dm/pkg/func-rollback"
Expand Down Expand Up @@ -427,7 +428,8 @@ func (s *Syncer) initShardingGroups(ctx context.Context) error {
func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) {
globalPoint := s.checkpoint.GlobalPoint()
tablePoint := s.checkpoint.TablePoint()
return binlog.CompareLocation(globalPoint, binlog.NewLocation(s.cfg.Flavor), s.cfg.EnableGTID) <= 0 && len(tablePoint) == 0, nil
// doesn't have neither GTID nor binlog pos
return binlog.IsFreshPosition(globalPoint, s.cfg.Flavor, s.cfg.EnableGTID) && len(tablePoint) == 0, nil
}

func (s *Syncer) reset() {
Expand Down Expand Up @@ -1128,23 +1130,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if err != nil {
return err
}

// for fresh and all-mode task, flush checkpoint so we could delete metadata file
if s.cfg.Mode == config.ModeAll {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else if s.cfg.CleanDumpFile {
tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err = os.Remove(metadataFile); err != nil {
tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err = os.Remove(s.cfg.Dir); err != nil {
tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
needFlushCheckpoint, err := s.adjustGlobalPointGTID(tctx)
if err != nil {
return err
}
if needFlushCheckpoint || s.cfg.Mode == config.ModeAll {
if err = s.flushCheckPoints(); err != nil {
tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err))
} else if s.cfg.Mode == config.ModeAll && s.cfg.CleanDumpFile {
tctx.L().Info("try to remove loaded files")
metadataFile := path.Join(s.cfg.Dir, "metadata")
if err = os.Remove(metadataFile); err != nil {
tctx.L().Warn("error when remove loaded dump file", zap.String("data file", metadataFile), zap.Error(err))
}
if err = os.Remove(s.cfg.Dir); err != nil {
tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err))
}
}
}
failpoint.Inject("AdjustGTIDExit", func() {
tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit"))
s.streamerController.Close(tctx)
utils.OsExit(1)
})

// startLocation is the start location for current received event
// currentLocation is the end location for current received event (End_log_pos in `show binlog events` for mysql)
Expand Down Expand Up @@ -2854,3 +2863,51 @@ func (s *Syncer) getEvent(tctx *tcontext.Context, startLocation binlog.Location)

return s.streamerController.GetEvent(tctx)
}

func (s *Syncer) adjustGlobalPointGTID(tctx *tcontext.Context) (bool, error) {
location := s.checkpoint.GlobalPoint()
// situations that don't need to adjust
// 1. GTID is not enabled
// 2. location already has GTID position
// 3. location is totally new, has no position info
if !s.cfg.EnableGTID || location.GTIDSetStr() != "" || location.Position.Name == "" {
return false, nil
}
// set enableGTID to false for new streamerController
streamerController := NewStreamerController(s.syncCfg, false, s.fromDB, s.binlogType, s.cfg.RelayDir, s.timezone)

endPos := binlog.AdjustPosition(location.Position)
startPos := mysql.Position{
Name: endPos.Name,
Pos: 0,
}
startLocation := location.Clone()
startLocation.Position = startPos

err := streamerController.Start(tctx, startLocation)
if err != nil {
return false, err
}
defer streamerController.Close(tctx)

gs, err := reader.GetGTIDsForPosFromStreamer(tctx.Context(), streamerController.streamer, endPos)
if err != nil {
s.tctx.L().Warn("fail to get gtids for global location", zap.Stringer("pos", location), zap.Error(err))
return false, err
}
err = location.SetGTID(gs.Origin())
if err != nil {
s.tctx.L().Warn("fail to set gtid for global location", zap.Stringer("pos", location),
zap.String("adjusted_gtid", gs.String()), zap.Error(err))
return false, err
}
s.checkpoint.SaveGlobalPoint(location)
// redirect streamer for new gtid set location
err = s.streamerController.RedirectStreamer(tctx, location)
if err != nil {
s.tctx.L().Warn("fail to redirect streamer for global location", zap.Stringer("pos", location),
zap.String("adjusted_gtid", gs.String()), zap.Error(err))
return false, err
}
return true, nil
}
Loading