diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index b5e0d0340ef7b..788bd024db897 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -874,6 +874,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { if ingest.LitBackCtxMgr != nil { ingest.LitBackCtxMgr.MarkJobFinish() } + d.runningJobs = newRunningJobs() }) return nil diff --git a/pkg/ddl/ddl_running_jobs.go b/pkg/ddl/ddl_running_jobs.go index 95faa765bee6e..ffcb79af8a422 100644 --- a/pkg/ddl/ddl_running_jobs.go +++ b/pkg/ddl/ddl_running_jobs.go @@ -28,57 +28,96 @@ import ( type runningJobs struct { sync.RWMutex - ids map[int64]struct{} - runningSchema map[string]map[string]struct{} // database -> table -> struct{} - runningJobIDs string + // processingIDs records the IDs of the jobs that are being processed by a worker. + processingIDs map[int64]struct{} + processingIDsStr string + + // unfinishedIDs records the IDs of the jobs that are not finished yet. + // It is not necessarily being processed by a worker. + unfinishedIDs map[int64]struct{} + unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{} } func newRunningJobs() *runningJobs { return &runningJobs{ - ids: make(map[int64]struct{}), - runningSchema: make(map[string]map[string]struct{}), + processingIDs: make(map[int64]struct{}), + unfinishedSchema: make(map[string]map[string]struct{}), + unfinishedIDs: make(map[int64]struct{}), } } func (j *runningJobs) add(job *model.Job) { j.Lock() defer j.Unlock() - j.ids[job.ID] = struct{}{} + j.processingIDs[job.ID] = struct{}{} j.updateInternalRunningJobIDs() + + if _, ok := j.unfinishedIDs[job.ID]; ok { + // Already exists, no need to add it again. + return + } + j.unfinishedIDs[job.ID] = struct{}{} for _, info := range job.GetInvolvingSchemaInfo() { - if _, ok := j.runningSchema[info.Database]; !ok { - j.runningSchema[info.Database] = make(map[string]struct{}) + if _, ok := j.unfinishedSchema[info.Database]; !ok { + j.unfinishedSchema[info.Database] = make(map[string]struct{}) } - j.runningSchema[info.Database][info.Table] = struct{}{} + j.unfinishedSchema[info.Database][info.Table] = struct{}{} } } func (j *runningJobs) remove(job *model.Job) { j.Lock() defer j.Unlock() - delete(j.ids, job.ID) + delete(j.processingIDs, job.ID) j.updateInternalRunningJobIDs() - for _, info := range job.GetInvolvingSchemaInfo() { - if db, ok := j.runningSchema[info.Database]; ok { - delete(db, info.Table) + + if job.IsFinished() || job.IsSynced() { + delete(j.unfinishedIDs, job.ID) + for _, info := range job.GetInvolvingSchemaInfo() { + if db, ok := j.unfinishedSchema[info.Database]; ok { + delete(db, info.Table) + } + if len(j.unfinishedSchema[info.Database]) == 0 { + delete(j.unfinishedSchema, info.Database) + } } - if len(j.runningSchema[info.Database]) == 0 { - delete(j.runningSchema, info.Database) + } +} + +func (j *runningJobs) allIDs() string { + j.RLock() + defer j.RUnlock() + return j.processingIDsStr +} + +func (j *runningJobs) updateInternalRunningJobIDs() { + var sb strings.Builder + i := 0 + for id := range j.processingIDs { + sb.WriteString(strconv.Itoa(int(id))) + if i != len(j.processingIDs)-1 { + sb.WriteString(",") } + i++ } + j.processingIDsStr = sb.String() } func (j *runningJobs) checkRunnable(job *model.Job) bool { j.RLock() defer j.RUnlock() + if _, ok := j.processingIDs[job.ID]; ok { + // Already processing by a worker. Skip running it again. + return false + } for _, info := range job.GetInvolvingSchemaInfo() { - if _, ok := j.runningSchema[model.InvolvingAll]; ok { + if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok { return false } if info.Database == model.InvolvingNone { continue } - if tbls, ok := j.runningSchema[info.Database]; ok { + if tbls, ok := j.unfinishedSchema[info.Database]; ok { if _, ok := tbls[model.InvolvingAll]; ok { return false } @@ -92,22 +131,3 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool { } return true } - -func (j *runningJobs) allIDs() string { - j.RLock() - defer j.RUnlock() - return j.runningJobIDs -} - -func (j *runningJobs) updateInternalRunningJobIDs() { - var sb strings.Builder - i := 0 - for id := range j.ids { - sb.WriteString(strconv.Itoa(int(id))) - if i != len(j.ids)-1 { - sb.WriteString(",") - } - i++ - } - j.runningJobIDs = sb.String() -}