From b252f2e0d970fcdcad7af61a06f407eddff49697 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Wed, 14 Dec 2022 17:51:44 +0800 Subject: [PATCH] add log in the whole lifecycle of a job Signed-off-by: YangKeao --- ttl/ttlworker/job.go | 4 ++-- ttl/ttlworker/job_manager.go | 44 +++++++++++++++++++++--------------- ttl/ttlworker/job_test.go | 3 +-- 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index 5551984cceb08..35c5881fb4dd7 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -96,8 +96,8 @@ func (job *ttlJob) updateState(ctx context.Context, se session.Session) error { } // peekScanTask returns the next scan task, but doesn't promote the iterator -func (job *ttlJob) peekScanTask() (*ttlScanTask, error) { - return job.tasks[job.taskIter], nil +func (job *ttlJob) peekScanTask() *ttlScanTask { + return job.tasks[job.taskIter] } // nextScanTask promotes the iterator diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index cd3c1208f1f37..5e2b67f8b101e 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -271,12 +271,17 @@ func (m *JobManager) resizeWorkers(workers []worker, count int, factory func() w func (m *JobManager) updateTaskState() bool { results := m.pollScanWorkerResults() for _, result := range results { + logger := logutil.Logger(m.ctx).With(zap.Int64("tableID", result.task.tbl.ID)) + if result.err != nil { + logger = logger.With(zap.Error(result.err)) + } + job := findJobWithTableID(m.runningJobs, result.task.tbl.ID) if job == nil { - logutil.Logger(m.ctx).Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID)) + logger.Warn("task state changed but job not found", zap.Int64("tableID", result.task.tbl.ID)) continue } - logutil.Logger(m.ctx).Debug("scan task finished", zap.String("jobID", job.id)) + logger.Info("scan task finished", zap.String("jobID", job.id)) job.finishedScanTaskCounter += 1 job.scanTaskErr = multierr.Append(job.scanTaskErr, result.err) @@ -303,7 +308,11 @@ func (m *JobManager) checkNotOwnJob() { for _, job := range m.runningJobs { tableStatus := m.tableStatusCache.Tables[job.tbl.ID] if tableStatus == nil || tableStatus.CurrentJobOwnerID != m.id { - logutil.Logger(m.ctx).Info("job has been taken over by another node", zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("statistics", job.statistics.String())) + if tableStatus != nil { + logger.With(zap.String("newJobOwnerID", tableStatus.CurrentJobOwnerID)) + } + logger.Info("job has been taken over by another node") m.removeJob(job) job.cancel() } @@ -357,9 +366,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { case len(newJobTables) > 0: table := newJobTables[0] newJobTables = newJobTables[1:] - logutil.Logger(m.ctx).Debug("try lock new job", zap.Int64("tableID", table.ID)) + logutil.Logger(m.ctx).Info("try lock new job", zap.Int64("tableID", table.ID)) job, err = m.lockNewJob(m.ctx, se, table, now) if job != nil { + logutil.Logger(m.ctx).Info("append new running job", zap.String("jobID", job.id), zap.Int64("tableID", job.tbl.ID)) m.appendJob(job) } } @@ -371,10 +381,10 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { } for !job.AllSpawned() { - task, err := job.peekScanTask() - if err != nil { - logutil.Logger(m.ctx).Warn("fail to generate scan task", zap.Error(err)) - break + task := job.peekScanTask() + logger := logutil.Logger(m.ctx).With(zap.String("jobID", job.id), zap.String("table", task.tbl.TableInfo.Name.L)) + if task.tbl.PartitionDef != nil { + logger = logger.With(zap.String("partition", task.tbl.PartitionDef.Name.L)) } for len(idleScanWorkers) > 0 { @@ -383,7 +393,7 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { err := idleWorker.Schedule(task) if err != nil { - logutil.Logger(m.ctx).Info("fail to schedule task", zap.Error(err)) + logger.Info("fail to schedule task", zap.Error(err)) continue } @@ -392,16 +402,11 @@ func (m *JobManager) rescheduleJobs(se session.Session, now time.Time) { if err != nil { // not a big problem, current logic doesn't depend on the job status to promote // the routine, so we could just print a log here - logutil.Logger(m.ctx).Error("change ttl job status", zap.Error(err), zap.String("id", job.id)) + logger.Error("change ttl job status", zap.Error(err), zap.String("id", job.id)) } cancel() - logArgs := []zap.Field{zap.String("table", task.tbl.TableInfo.Name.L)} - if task.tbl.PartitionDef != nil { - logArgs = append(logArgs, zap.String("partition", task.tbl.PartitionDef.Name.L)) - } - logutil.Logger(m.ctx).Debug("schedule ttl task", - logArgs...) + logger.Info("scheduled ttl task") job.nextScanTask() break @@ -425,14 +430,17 @@ func (m *JobManager) idleScanWorkers() []scanWorker { } func (m *JobManager) localJobs() []*ttlJob { + jobs := make([]*ttlJob, 0, len(m.runningJobs)) for _, job := range m.runningJobs { status := m.tableStatusCache.Tables[job.tbl.ID] if status == nil || status.CurrentJobOwnerID != m.id { - m.removeJob(job) + // these jobs will be removed in `checkNotOwnJob` continue } + + jobs = append(jobs, job) } - return m.runningJobs + return jobs } // readyForNewJobTables returns all tables which should spawn a TTL job according to cache diff --git a/ttl/ttlworker/job_test.go b/ttl/ttlworker/job_test.go index 5af5d0316eed2..460dfa0611d7c 100644 --- a/ttl/ttlworker/job_test.go +++ b/ttl/ttlworker/job_test.go @@ -27,8 +27,7 @@ func TestIterScanTask(t *testing.T) { tbl: tbl, tasks: []*ttlScanTask{{}}, } - scanTask, err := job.peekScanTask() - assert.NoError(t, err) + scanTask := job.peekScanTask() assert.NotNil(t, scanTask) assert.Len(t, job.tasks, 1)