Skip to content

Commit

Permalink
ddl: fix unstable test TestCreateDropCreateTable
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Mar 7, 2024
1 parent c128a2d commit 66ed429
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 36 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if ingest.LitBackCtxMgr != nil {
ingest.LitBackCtxMgr.MarkJobFinish()
}
d.runningJobs = newRunningJobs()
})

return nil
Expand Down
92 changes: 56 additions & 36 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,57 +28,96 @@ import (

type runningJobs struct {
sync.RWMutex
ids map[int64]struct{}
runningSchema map[string]map[string]struct{} // database -> table -> struct{}
runningJobIDs string
// processingIDs records the IDs of the jobs that are being processed by a worker.
processingIDs map[int64]struct{}
processingIDsStr string

// unfinishedIDs records the IDs of the jobs that are not finished yet.
// It is not necessarily being processed by a worker.
unfinishedIDs map[int64]struct{}
unfinishedSchema map[string]map[string]struct{} // database -> table -> struct{}
}

func newRunningJobs() *runningJobs {
return &runningJobs{
ids: make(map[int64]struct{}),
runningSchema: make(map[string]map[string]struct{}),
processingIDs: make(map[int64]struct{}),
unfinishedSchema: make(map[string]map[string]struct{}),
unfinishedIDs: make(map[int64]struct{}),
}
}

func (j *runningJobs) add(job *model.Job) {
j.Lock()
defer j.Unlock()
j.ids[job.ID] = struct{}{}
j.processingIDs[job.ID] = struct{}{}
j.updateInternalRunningJobIDs()

if _, ok := j.unfinishedIDs[job.ID]; ok {
// Already exists, no need to add it again.
return
}
j.unfinishedIDs[job.ID] = struct{}{}
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.runningSchema[info.Database]; !ok {
j.runningSchema[info.Database] = make(map[string]struct{})
if _, ok := j.unfinishedSchema[info.Database]; !ok {
j.unfinishedSchema[info.Database] = make(map[string]struct{})
}
j.runningSchema[info.Database][info.Table] = struct{}{}
j.unfinishedSchema[info.Database][info.Table] = struct{}{}
}
}

func (j *runningJobs) remove(job *model.Job) {
j.Lock()
defer j.Unlock()
delete(j.ids, job.ID)
delete(j.processingIDs, job.ID)
j.updateInternalRunningJobIDs()
for _, info := range job.GetInvolvingSchemaInfo() {
if db, ok := j.runningSchema[info.Database]; ok {
delete(db, info.Table)

if job.IsFinished() || job.IsSynced() {
delete(j.unfinishedIDs, job.ID)
for _, info := range job.GetInvolvingSchemaInfo() {
if db, ok := j.unfinishedSchema[info.Database]; ok {
delete(db, info.Table)
}
if len(j.unfinishedSchema[info.Database]) == 0 {
delete(j.unfinishedSchema, info.Database)
}
}
if len(j.runningSchema[info.Database]) == 0 {
delete(j.runningSchema, info.Database)
}
}

func (j *runningJobs) allIDs() string {
j.RLock()
defer j.RUnlock()
return j.processingIDsStr
}

func (j *runningJobs) updateInternalRunningJobIDs() {
var sb strings.Builder
i := 0
for id := range j.processingIDs {
sb.WriteString(strconv.Itoa(int(id)))
if i != len(j.processingIDs)-1 {
sb.WriteString(",")
}
i++
}
j.processingIDsStr = sb.String()
}

func (j *runningJobs) checkRunnable(job *model.Job) bool {
j.RLock()
defer j.RUnlock()
if _, ok := j.processingIDs[job.ID]; ok {
// Already processing by a worker. Skip running it again.
return false
}
for _, info := range job.GetInvolvingSchemaInfo() {
if _, ok := j.runningSchema[model.InvolvingAll]; ok {
if _, ok := j.unfinishedSchema[model.InvolvingAll]; ok {
return false
}
if info.Database == model.InvolvingNone {
continue
}
if tbls, ok := j.runningSchema[info.Database]; ok {
if tbls, ok := j.unfinishedSchema[info.Database]; ok {
if _, ok := tbls[model.InvolvingAll]; ok {
return false
}
Expand All @@ -92,22 +131,3 @@ func (j *runningJobs) checkRunnable(job *model.Job) bool {
}
return true
}

func (j *runningJobs) allIDs() string {
j.RLock()
defer j.RUnlock()
return j.runningJobIDs
}

func (j *runningJobs) updateInternalRunningJobIDs() {
var sb strings.Builder
i := 0
for id := range j.ids {
sb.WriteString(strconv.Itoa(int(id)))
if i != len(j.ids)-1 {
sb.WriteString(",")
}
i++
}
j.runningJobIDs = sb.String()
}

0 comments on commit 66ed429

Please sign in to comment.