diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index 576c215153..7c4cee91b1 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -15,6 +15,7 @@ package syncer import ( "bytes" + "context" "database/sql" "encoding/json" "fmt" @@ -321,8 +322,11 @@ func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { defer cp.Unlock() // delete all checkpoints + // use a new context apart from syncer, to make sure when syncer call `cancel` checkpoint could update + tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) + defer cancel() _, err := cp.dbConn.executeSQL( - tctx, + tctx2, []string{`DELETE FROM ` + cp.tableName + ` WHERE id = ?`}, []interface{}{cp.id}, ) @@ -390,9 +394,12 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchem return nil } + // use a new context apart from syncer, to make sure when syncer call `cancel` checkpoint could update + tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) + defer cancel() cp.logCtx.L().Info("delete table checkpoint", zap.String("schema", sourceSchema), zap.String("table", sourceTable)) _, err := cp.dbConn.executeSQL( - tctx, + tctx2, []string{`DELETE FROM ` + cp.tableName + ` WHERE id = ? AND cp_schema = ? AND cp_table = ?`}, []interface{}{cp.id, sourceSchema, sourceTable}, ) @@ -412,9 +419,12 @@ func (cp *RemoteCheckPoint) DeleteSchemaPoint(tctx *tcontext.Context, sourceSche return nil } + // use a new context apart from syncer, to make sure when syncer call `cancel` checkpoint could update + tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) + defer cancel() cp.logCtx.L().Info("delete schema checkpoint", zap.String("schema", sourceSchema)) _, err := cp.dbConn.executeSQL( - tctx, + tctx2, []string{`DELETE FROM ` + cp.tableName + ` WHERE id = ? AND cp_schema = ?`}, []interface{}{cp.id, sourceSchema}, ) @@ -522,7 +532,10 @@ func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTabl args = append(args, extraArgs[i]) } - _, err := cp.dbConn.executeSQL(tctx, sqls, args...) + // use a new context apart from syncer, to make sure when syncer call `cancel` checkpoint could update + tctx2, cancel := tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) + defer cancel() + _, err := cp.dbConn.executeSQL(tctx2, sqls, args...) if err != nil { return err } diff --git a/syncer/syncer.go b/syncer/syncer.go index d6edf73636..d27c7fe0dc 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -872,9 +872,7 @@ func (s *Syncer) flushCheckPoints() error { s.tctx.L().Info("prepare flush sqls", zap.Strings("shard meta sqls", shardMetaSQLs), zap.Reflect("shard meta arguments", shardMetaArgs)) } - tctx, cancel := s.tctx.WithContext(context.Background()).WithTimeout(maxDMLConnectionDuration) - defer cancel() - err := s.checkpoint.FlushPointsExcept(tctx, exceptTables, shardMetaSQLs, shardMetaArgs) + err := s.checkpoint.FlushPointsExcept(s.tctx, exceptTables, shardMetaSQLs, shardMetaArgs) if err != nil { return terror.Annotatef(err, "flush checkpoint %s", s.checkpoint) }