Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: periodically update state for a job in heartbeat #39939

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//types",
"//util",
"//util/chunk",
"//util/hack",
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
Expand Down
51 changes: 44 additions & 7 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package ttlworker

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -83,11 +85,11 @@ func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status
}

func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
jsonStatistics, err := job.statistics.MarshalJSON()
summary, err := job.summary()
if err != nil {
return err
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}
_, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, string(jsonStatistics), job.ownerID))
_, err = se.ExecuteSQL(ctx, updateJobState(job.tbl.ID, job.id, summary, job.ownerID))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -107,13 +109,13 @@ func (job *ttlJob) nextScanTask() {

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time) {
summary := job.statistics.String()
if job.scanTaskErr != nil {
summary = fmt.Sprintf("Scan Error: %s, Statistics: %s", job.scanTaskErr.Error(), summary)
summary, err := job.summary()
if err != nil {
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
_, err := se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id))
_, err = se.ExecuteSQL(context.TODO(), finishJobSQL(job.tbl.ID, now, summary, job.id))
if err != nil {
logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
}
Expand Down Expand Up @@ -168,3 +170,38 @@ func findJobWithTableID(jobs []*ttlJob, id int64) *ttlJob {

return nil
}

type ttlSummary struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`

TotalScanTask int `json:"total_scan_task"`
ScheduledScanTask int `json:"scheduled_scan_task"`
FinishedScanTask int `json:"finished_scan_task"`

ScanTaskErr string `json:"scan_task_err,omitempty"`
}

func (job *ttlJob) summary() (string, error) {
summary := &ttlSummary{
TotalRows: job.statistics.TotalRows.Load(),
SuccessRows: job.statistics.SuccessRows.Load(),
ErrorRows: job.statistics.ErrorRows.Load(),

TotalScanTask: len(job.tasks),
ScheduledScanTask: job.taskIter,
FinishedScanTask: job.finishedScanTaskCounter,
}

if job.scanTaskErr != nil {
summary.ScanTaskErr = job.scanTaskErr.Error()
}

summaryJSON, err := json.Marshal(summary)
if err != nil {
return "", err
}

return string(hack.String(summaryJSON)), nil
}
5 changes: 5 additions & 0 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ func (m *JobManager) updateHeartBeat(ctx context.Context, se session.Session) er
if err != nil {
return errors.Trace(err)
}
// also updates some internal state for this job
err = job.updateState(ctx, se)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update state of the job", zap.String("jobID", job.id))
}
}
return nil
}
Expand Down
31 changes: 31 additions & 0 deletions ttl/ttlworker/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package ttlworker
import (
"testing"

"github.com/pingcap/errors"
"github.com/stretchr/testify/assert"
)

Expand All @@ -35,3 +36,33 @@ func TestIterScanTask(t *testing.T) {
job.nextScanTask()
assert.True(t, job.AllSpawned())
}

func TestJobSummary(t *testing.T) {
statistics := &ttlStatistics{}
statistics.TotalRows.Store(1)
statistics.ErrorRows.Store(255)
statistics.SuccessRows.Store(128)

job := &ttlJob{
statistics: statistics,
tasks: []*ttlScanTask{{}},
}
summary, err := job.summary()
assert.NoError(t, err)
assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":0,"finished_scan_task":0}`, summary)

job.taskIter += 1
summary, err = job.summary()
assert.NoError(t, err)
assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":0}`, summary)

job.finishedScanTaskCounter += 1
summary, err = job.summary()
assert.NoError(t, err)
assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":1}`, summary)

job.scanTaskErr = errors.New("test error")
summary, err = job.summary()
assert.NoError(t, err)
assert.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255,"total_scan_task":1,"scheduled_scan_task":1,"finished_scan_task":1,"scan_task_err":"test error"}`, summary)
}
15 changes: 0 additions & 15 deletions ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ttlworker

import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -72,20 +71,6 @@ func (s *ttlStatistics) String() string {
return fmt.Sprintf("Total Rows: %d, Success Rows: %d, Error Rows: %d", s.TotalRows.Load(), s.SuccessRows.Load(), s.ErrorRows.Load())
}

func (s *ttlStatistics) MarshalJSON() ([]byte, error) {
type jsonStatistics struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`
}

return json.Marshal(jsonStatistics{
TotalRows: s.TotalRows.Load(),
SuccessRows: s.SuccessRows.Load(),
ErrorRows: s.ErrorRows.Load(),
})
}

type ttlScanTask struct {
ctx context.Context

Expand Down
10 changes: 0 additions & 10 deletions ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,3 @@ func TestScanTaskDoScan(t *testing.T) {
task.schemaChangeInRetry = 2
task.runDoScanForTest(1, "table 'test.t1' meta changed, should abort current job: [schema:1146]Table 'test.t1' doesn't exist")
}

func TestTTLStatisticsMarshalJSON(t *testing.T) {
statistics := &ttlStatistics{}
statistics.TotalRows.Store(1)
statistics.ErrorRows.Store(255)
statistics.SuccessRows.Store(128)
j, err := statistics.MarshalJSON()
require.NoError(t, err)
require.Equal(t, `{"total_rows":1,"success_rows":128,"error_rows":255}`, string(j))
}