Skip to content

Commit

Permalink
ttl: add log in the whole lifecycle of a job (#39931)
Browse files Browse the repository at this point in the history
close #39930
  • Loading branch information
YangKeao authored Dec 14, 2022
1 parent e5c0b6b commit aeec193
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
4 changes: 2 additions & 2 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
}

// peekScanTask returns the next scan task, but doesn't promote the iterator
func (job *ttlJob) peekScanTask() (*ttlScanTask, error) {
return job.tasks[job.taskIter], nil
func (job *ttlJob) peekScanTask() *ttlScanTask {
return job.tasks[job.taskIter]
}

// nextScanTask promotes the iterator
Expand Down
44 changes: 26 additions & 18 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,17 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w
func (m *JobManager) updateTaskState() bool {
results := m.pollScanWorkerResults()
for _, result := range results {
logger := logutil.Logger(m.ctx).With(zap.Int64("tableID", result.task.tbl.ID))
if result.err != nil {
logger = logger.With(zap.Error(result.err))
}

job := findJobWithTableID(m.runningJobs, result.task.tbl.ID)
if job == nil {
logutil.Logger(m.ctx).Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID))
logger.Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID))
continue
}
logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", job.id))
logger.Info("scan task finished", zap.String("jobID", job.id))

job.finishedScanTaskCounter += 1
job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err)
Expand All @@ -303,7 +308,11 @@ func (m *JobManager) checkNotOwnJob() {
for _, job := range m.runningJobs {
tableStatus := m.tableStatusCache.Tables[job.tbl.ID]
if tableStatus == nil || tableStatus.CurrentJobOwnerID != m.id {
logutil.Logger(m.ctx).Info("job has been taken over by another node", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String()))
logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("statistics", job.statistics.String()))
if tableStatus != nil {
logger.With(zap.String("newJobOwnerID", tableStatus.CurrentJobOwnerID))
}
logger.Info("job has been taken over by another node")
m.removeJob(job)
job.cancel()
}
Expand Down Expand Up @@ -357,9 +366,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
case len(newJobTables) > 0:
table := newJobTables[0]
newJobTables = newJobTables[1:]
logutil.Logger(m.ctx).Debug("try lock new job", zap.Int64("tableID", table.ID))
logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID))
job, err = m.lockNewJob(m.ctx, se, table, now)
if job != nil {
logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID))
m.appendJob(job)
}
}
Expand All @@ -371,10 +381,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
}

for !job.AllSpawned() {
task, err := job.peekScanTask()
if err != nil {
logutil.Logger(m.ctx).Warn("fail to generate scan task", zap.Error(err))
break
task := job.peekScanTask()
logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("table", task.tbl.TableInfo.Name.L))
if task.tbl.PartitionDef != nil {
logger = logger.With(zap.String("partition", task.tbl.PartitionDef.Name.L))
}

for len(idleScanWorkers) > 0 {
Expand All @@ -383,7 +393,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {

err := idleWorker.Schedule(task)
if err != nil {
logutil.Logger(m.ctx).Info("fail to schedule task", zap.Error(err))
logger.Info("fail to schedule task", zap.Error(err))
continue
}

Expand All @@ -392,16 +402,11 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) {
if err != nil {
// not a big problem, current logic doesn't depend on the job status to promote
// the routine, so we could just print a log here
logutil.Logger(m.ctx).Error("change ttl job status", zap.Error(err), zap.String("id", job.id))
logger.Error("change ttl job status", zap.Error(err), zap.String("id", job.id))
}
cancel()

logArgs := []zap.Field{zap.String("table", task.tbl.TableInfo.Name.L)}
if task.tbl.PartitionDef != nil {
logArgs = append(logArgs, zap.String("partition", task.tbl.PartitionDef.Name.L))
}
logutil.Logger(m.ctx).Debug("schedule ttl task",
logArgs...)
logger.Info("scheduled ttl task")

job.nextScanTask()
break
Expand All @@ -425,14 +430,17 @@ func (m *JobManager) idleScanWorkers() []scanWorker {
}

func (m *JobManager) localJobs() []*ttlJob {
jobs := make([]*ttlJob, 0, len(m.runningJobs))
for _, job := range m.runningJobs {
status := m.tableStatusCache.Tables[job.tbl.ID]
if status == nil || status.CurrentJobOwnerID != m.id {
m.removeJob(job)
// these jobs will be removed in `checkNotOwnJob`
continue
}

jobs = append(jobs, job)
}
return m.runningJobs
return jobs
}

// readyForNewJobTables returns all tables which should spawn a TTL job according to cache
Expand Down
3 changes: 1 addition & 2 deletions ttl/ttlworker/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func TestIterScanTask(t *testing.T) {
tbl: tbl,
tasks: []*ttlScanTask{{}},
}
scanTask, err := job.peekScanTask()
assert.NoError(t, err)
scanTask := job.peekScanTask()
assert.NotNil(t, scanTask)
assert.Len(t, job.tasks, 1)

Expand Down

0 comments on commit aeec193

Please sign in to comment.