Skip to content

Commit

Permalink
checkpoint(dm): check outdated should respect snapshot create time (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 21, 2022
1 parent 3515f35 commit 2ebc95f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
27 changes: 12 additions & 15 deletions dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,9 @@ type CheckPoint interface {
// corresponding to to Meta.Pos and gtid
FlushedGlobalPoint() binlog.Location

// CheckGlobalPoint checks whether we should save global checkpoint
// corresponding to Meta.Check
CheckGlobalPoint() bool

// CheckLastSnapshotCreationTime checks whether we should async flush checkpoint since last time async flush
CheckLastSnapshotCreationTime() bool
// LastFlushOutdated checks the start time of a flush (when call Snapshot) and finish time of a flush, if both of
// the two times are outdated, LastFlushOutdated returns true.
LastFlushOutdated() bool

// GetFlushedTableInfo gets flushed table info
// use for lazy create table in schemaTracker
Expand Down Expand Up @@ -825,18 +822,18 @@ func (cp *RemoteCheckPoint) String() string {
return cp.globalPoint.String()
}

// CheckGlobalPoint implements CheckPoint.CheckGlobalPoint.
func (cp *RemoteCheckPoint) CheckGlobalPoint() bool {
// LastFlushOutdated implements CheckPoint.LastFlushOutdated.
func (cp *RemoteCheckPoint) LastFlushOutdated() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.globalPointSaveTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
}

// CheckLastSnapshotCreationTime implements CheckPoint.CheckLastSnapshotCreationTime.
func (cp *RemoteCheckPoint) CheckLastSnapshotCreationTime() bool {
cp.RLock()
defer cp.RUnlock()
return time.Since(cp.lastSnapshotCreationTime) >= time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second
if time.Since(cp.globalPointSaveTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second {
return false
}
if time.Since(cp.lastSnapshotCreationTime) < time.Duration(cp.cfg.CheckpointFlushInterval)*time.Second {
return false
}
return true
}

// Rollback implements CheckPoint.Rollback.
Expand Down
23 changes: 23 additions & 0 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/pkg/binlog"
Expand All @@ -38,6 +40,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
)

Expand Down Expand Up @@ -495,3 +498,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
c.Assert(rcp.points[schemaName][tableName].flushedPoint.ti, NotNil)
c.Assert(*rcp.safeModeExitPoint, DeepEquals, binlog.InitLocation(pos2, gs))
}

func TestLastFlushOutdated(t *testing.T) {
cfg := &config.SubTaskConfig{
ServerID: 101,
MetaSchema: "test",
Name: "syncer_checkpoint_ut",
Flavor: mysql.MySQLFlavor,
}
cfg.WorkerCount = 0
cfg.CheckpointFlushInterval = 1

cp := NewRemoteCheckPoint(tcontext.Background(), cfg, "1")
checkpoint := cp.(*RemoteCheckPoint)
checkpoint.globalPointSaveTime = time.Now().Add(-2 * time.Second)

require.True(t, checkpoint.LastFlushOutdated())
require.Nil(t, checkpoint.Snapshot(true))
// though snapshot is nil, checkpoint is not outdated
require.False(t, checkpoint.LastFlushOutdated())
}
6 changes: 3 additions & 3 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,7 @@ func (s *Syncer) addJob(job *job) error {
}

// Periodically create checkpoint snapshot and async flush checkpoint snapshot
if s.checkpoint.CheckGlobalPoint() && s.checkpoint.CheckLastSnapshotCreationTime() {
if s.checkpoint.LastFlushOutdated() {
jobSeq := s.getFlushSeq()
s.jobWg.Add(1)
if s.cfg.Experimental.AsyncCheckpointFlush {
Expand Down Expand Up @@ -1093,7 +1093,7 @@ func (s *Syncer) resetShardingGroup(table *filter.Table) {
// and except rejecting to flush the checkpoint, we also need to rollback the checkpoint saved before
// this should be handled when `s.Run` returned
//
// we may need to refactor the concurrency model to make the work-flow more clearer later.
// we may need to refactor the concurrency model to make the work-flow more clear later.
func (s *Syncer) flushCheckPoints() error {
err := s.execError.Load()
// TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put
Expand Down Expand Up @@ -2015,7 +2015,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
case *replication.GenericEvent:
if e.Header.EventType == replication.HEARTBEAT_EVENT {
// flush checkpoint even if there are no real binlog events
if s.checkpoint.CheckGlobalPoint() {
if s.checkpoint.LastFlushOutdated() {
tctx.L().Info("meet heartbeat event and then flush jobs")
err2 = s.flushJobs()
}
Expand Down

0 comments on commit 2ebc95f

Please sign in to comment.