From 7b354f32abb35689e6d78dc980fa5a83ec130fa6 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Wed, 14 Dec 2022 18:53:38 +0800 Subject: [PATCH] periodically update state for a job in heartbeat Signed-off-by: YangKeao --- ttl/ttlworker/BUILD.bazel | 1 + ttl/ttlworker/job.go | 51 +++++++++++++++++++++++++++++++----- ttl/ttlworker/job_manager.go | 5 ++++ ttl/ttlworker/job_test.go | 31 ++++++++++++++++++++++ ttl/ttlworker/scan.go | 15 ----------- ttl/ttlworker/scan_test.go | 10 ------- 6 files changed, 81 insertions(+), 32 deletions(-) diff --git a/ttl/ttlworker/BUILD.bazel b/ttl/ttlworker/BUILD.bazel index 8b7c39270807e..9819b5947e00c 100644 --- a/ttl/ttlworker/BUILD.bazel +++ b/ttl/ttlworker/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//types", "//util", "//util/chunk", + "//util/hack", "//util/logutil", "//util/sqlexec", "//util/timeutil", diff --git a/ttl/ttlworker/job.go b/ttl/ttlworker/job.go index 5551984cceb08..92b659b788e18 100644 --- a/ttl/ttlworker/job.go +++ b/ttl/ttlworker/job.go @@ -16,6 +16,7 @@ package ttlworker import ( "context" + "encoding/json" "fmt" "sync" "time" @@ -23,6 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/ttl/cache" "github.com/pingcap/tidb/ttl/session" + "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) @@ -83,11 +85,11 @@ func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status } func (job *ttlJob) updateState(ctx context.Context, se session.Session) error { - jsonStatistics, err := job.statistics.MarshalJSON() + summary, err := job.summary() if err != nil { - return err + logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err)) } - _, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, string(jsonStatistics), job.ownerID)) + _, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, summary, job.ownerID)) if err != nil { return errors.Trace(err) } @@ -107,13 +109,13 @@ func (job *ttlJob) nextScanTask() { // finish turns current job into last job, and update the error message and statistics summary func (job *ttlJob) finish(se session.Session, now time.Time) { - summary := job.statistics.String() - if job.scanTaskErr != nil { - summary = fmt.Sprintf("Scan Error: %s, Statistics: %s", job.scanTaskErr.Error(), summary) + summary, err := job.summary() + if err != nil { + logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err)) } // at this time, the job.ctx may have been canceled (to cancel this job) // even when it's canceled, we'll need to update the states, so use another context - _, err := se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id)) + _, err = se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id)) if err != nil { logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id)) } @@ -168,3 +170,38 @@ func findJobWithTableID(jobs []*ttlJob, id int64) *ttlJob { return nil } + +type ttlSummary struct { + TotalRows uint64 `json:"total_rows"` + SuccessRows uint64 `json:"success_rows"` + ErrorRows uint64 `json:"error_rows"` + + TotalScanTask int `json:"total_scan_task"` + ScheduledScanTask int `json:"scheduled_scan_task"` + FinishedScanTask int `json:"finished_scan_task"` + + ScanTaskErr string `json:"scan_task_err,omitempty"` +} + +func (job *ttlJob) summary() (string, error) { + summary := &ttlSummary{ + TotalRows: job.statistics.TotalRows.Load(), + SuccessRows: job.statistics.SuccessRows.Load(), + ErrorRows: job.statistics.ErrorRows.Load(), + + TotalScanTask: len(job.tasks), + ScheduledScanTask: job.taskIter, + FinishedScanTask: job.finishedScanTaskCounter, + } + + if job.scanTaskErr != nil { + summary.ScanTaskErr = job.scanTaskErr.Error() + } + + summaryJSON, err := json.Marshal(summary) + if err != nil { + return "", err + } + + return string(hack.String(summaryJSON)), nil +} diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index cd3c1208f1f37..8233576b39855 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -595,6 +595,11 @@ func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session) er if err != nil { return errors.Trace(err) } + // also updates some internal state for this job + err = job.updateState(ctx, se) + if err != nil { + logutil.Logger(m.ctx).Warn("fail to update state of the job", zap.String("jobID", job.id)) + } } return nil } diff --git a/ttl/ttlworker/job_test.go b/ttl/ttlworker/job_test.go index 5af5d0316eed2..48d1859a0d2b4 100644 --- a/ttl/ttlworker/job_test.go +++ b/ttl/ttlworker/job_test.go @@ -17,6 +17,7 @@ package ttlworker import ( "testing" + "github.com/pingcap/errors" "github.com/stretchr/testify/assert" ) @@ -35,3 +36,33 @@ func TestIterScanTask(t *testing.T) { job.nextScanTask() assert.True(t, job.AllSpawned()) } + +func TestJobSummary(t *testing.T) { + statistics := &ttlStatistics{} + statistics.TotalRows.Store(1) + statistics.ErrorRows.Store(255) + statistics.SuccessRows.Store(128) + + job := &ttlJob{ + statistics: statistics, + tasks: []*ttlScanTask{{}}, + } + summary, err := job.summary() + assert.NoError(t, err) + assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":0,"finished_scan_task":0}`, summary) + + job.taskIter += 1 + summary, err = job.summary() + assert.NoError(t, err) + assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":0}`, summary) + + job.finishedScanTaskCounter += 1 + summary, err = job.summary() + assert.NoError(t, err) + assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":1}`, summary) + + job.scanTaskErr = errors.New("test error") + summary, err = job.summary() + assert.NoError(t, err) + assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":1,"scan_task_err":"test error"}`, summary) +} diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index 242c51fb8b686..38a4fd544535d 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -16,7 +16,6 @@ package ttlworker import ( "context" - "encoding/json" "fmt" "strconv" "sync/atomic" @@ -72,20 +71,6 @@ func (s *ttlStatistics) String() string { return fmt.Sprintf("Total Rows: %d, Success Rows: %d, Error Rows: %d", s.TotalRows.Load(), s.SuccessRows.Load(), s.ErrorRows.Load()) } -func (s *ttlStatistics) MarshalJSON() ([]byte, error) { - type jsonStatistics struct { - TotalRows uint64 `json:"total_rows"` - SuccessRows uint64 `json:"success_rows"` - ErrorRows uint64 `json:"error_rows"` - } - - return json.Marshal(jsonStatistics{ - TotalRows: s.TotalRows.Load(), - SuccessRows: s.SuccessRows.Load(), - ErrorRows: s.ErrorRows.Load(), - }) -} - type ttlScanTask struct { ctx context.Context diff --git a/ttl/ttlworker/scan_test.go b/ttl/ttlworker/scan_test.go index 34a25a2539612..66582084b18f3 100644 --- a/ttl/ttlworker/scan_test.go +++ b/ttl/ttlworker/scan_test.go @@ -403,13 +403,3 @@ func TestScanTaskDoScan(t *testing.T) { task.schemaChangeInRetry = 2 task.runDoScanForTest(1, "table 'test.t1' meta changed, should abort current job: [schema:1146]Table 'test.t1' doesn't exist") } - -func TestTTLStatisticsMarshalJSON(t *testing.T) { - statistics := &ttlStatistics{} - statistics.TotalRows.Store(1) - statistics.ErrorRows.Store(255) - statistics.SuccessRows.Store(128) - j, err := statistics.MarshalJSON() - require.NoError(t, err) - require.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255}`, string(j)) -}