diff --git a/dm/syncer/checkpoint.go b/dm/syncer/checkpoint.go index b102105bbdf..bb66b39ad5e 100644 --- a/dm/syncer/checkpoint.go +++ b/dm/syncer/checkpoint.go @@ -272,12 +272,9 @@ type CheckPoint interface { // corresponding to to Meta.Pos and gtid FlushedGlobalPoint() binlog.Location - // CheckGlobalPoint checks whether we should save global checkpoint - // corresponding to Meta.Check - CheckGlobalPoint() bool - - // CheckLastSnapshotCreationTime checks whether we should async flush checkpoint since last time async flush - CheckLastSnapshotCreationTime() bool + // LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of + // the two times are outdated, LastFlushOutdated returns true. + LastFlushOutdated() bool // GetFlushedTableInfo gets flushed table info // use for lazy create table in schemaTracker @@ -825,18 +822,18 @@ func (cp *RemoteCheckPoint) String() string { return cp.globalPoint.String() } -// CheckGlobalPoint implements CheckPoint.CheckGlobalPoint. -func (cp *RemoteCheckPoint) CheckGlobalPoint() bool { +// LastFlushOutdated implements CheckPoint.LastFlushOutdated. +func (cp *RemoteCheckPoint) LastFlushOutdated() bool { cp.RLock() defer cp.RUnlock() - return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second -} -// CheckLastSnapshotCreationTime implements CheckPoint.CheckLastSnapshotCreationTime. -func (cp *RemoteCheckPoint) CheckLastSnapshotCreationTime() bool { - cp.RLock() - defer cp.RUnlock() - return time.Since(cp.lastSnapshotCreationTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second + if time.Since(cp.globalPointSaveTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second { + return false + } + if time.Since(cp.lastSnapshotCreationTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second { + return false + } + return true } // Rollback implements CheckPoint.Rollback. diff --git a/dm/syncer/checkpoint_test.go b/dm/syncer/checkpoint_test.go index 480b2b0b989..a8293a28354 100644 --- a/dm/syncer/checkpoint_test.go +++ b/dm/syncer/checkpoint_test.go @@ -20,6 +20,8 @@ import ( "os" "path/filepath" "strings" + "testing" + "time" "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/pkg/binlog" @@ -38,6 +40,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" + "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" ) @@ -495,3 +498,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) { c.Assert(rcp.points[schemaName][tableName].flushedPoint.ti, NotNil) c.Assert(*rcp.safeModeExitPoint, DeepEquals, binlog.InitLocation(pos2, gs)) } + +func TestLastFlushOutdated(t *testing.T) { + cfg := &config.SubTaskConfig{ + ServerID: 101, + MetaSchema: "test", + Name: "syncer_checkpoint_ut", + Flavor: mysql.MySQLFlavor, + } + cfg.WorkerCount = 0 + cfg.CheckpointFlushInterval = 1 + + cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1") + checkpoint := cp.(*RemoteCheckPoint) + checkpoint.globalPointSaveTime = time.Now().Add(-2 * time.Second) + + require.True(t, checkpoint.LastFlushOutdated()) + require.Nil(t, checkpoint.Snapshot(true)) + // though snapshot is nil, checkpoint is not outdated + require.False(t, checkpoint.LastFlushOutdated()) +} diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 02f3cc405a1..7d262fb20f4 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1046,7 +1046,7 @@ func (s *Syncer) addJob(job *job) error { } // Periodically create checkpoint snapshot and async flush checkpoint snapshot - if s.checkpoint.CheckGlobalPoint() && s.checkpoint.CheckLastSnapshotCreationTime() { + if s.checkpoint.LastFlushOutdated() { jobSeq := s.getFlushSeq() s.jobWg.Add(1) if s.cfg.Experimental.AsyncCheckpointFlush { @@ -1093,7 +1093,7 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) { // and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before // this should be handled when `s.Run` returned // -// we may need to refactor the concurrency model to make the work-flow more clearer later. +// we may need to refactor the concurrency model to make the work-flow more clear later. func (s *Syncer) flushCheckPoints() error { err := s.execError.Load() // TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put @@ -2015,7 +2015,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { case *replication.GenericEvent: if e.Header.EventType == replication.HEARTBEAT_EVENT { // flush checkpoint even if there are no real binlog events - if s.checkpoint.CheckGlobalPoint() { + if s.checkpoint.LastFlushOutdated() { tctx.L().Info("meet heartbeat event and then flush jobs") err2 = s.flushJobs() }