Skip to content

Commit

Permalink
ttl: reschedule task to other instances when shriking worker
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Nov 27, 2024
1 parent ba791ab commit 5bb58a4
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 44 deletions.
44 changes: 39 additions & 5 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/ttl/cache"
"github.com/pingcap/tidb/pkg/ttl/metrics"
Expand Down Expand Up @@ -94,12 +95,12 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
leftRows := t.rows
defer func() {
if len(leftRows) > 0 {
t.statistics.IncErrorRows(len(leftRows))
retryRows = append(retryRows, leftRows...)
}
}()

se := newTableSession(rawSe, t.tbl, t.expire)
for len(leftRows) > 0 {
for len(leftRows) > 0 && ctx.Err() == nil {
maxBatch := variable.TTLDeleteBatchSize.Load()
var delBatch [][]types.Datum
if int64(len(leftRows)) < maxBatch {
Expand Down Expand Up @@ -133,7 +134,6 @@ func (t *ttlDeleteTask) doDelete(ctx context.Context, rawSe session.Session) (re
sqlInterval := time.Since(sqlStart)
if err != nil {
metrics.DeleteErrorDuration.Observe(sqlInterval.Seconds())
needRetry = needRetry && ctx.Err() == nil
logutil.BgLogger().Warn(
"delete SQL in TTL failed",
zap.Error(err),
Expand Down Expand Up @@ -214,6 +214,11 @@ func (b *ttlDelRetryBuffer) DoRetry(do func(*ttlDeleteTask) [][]types.Datum) tim
return b.retryInterval
}

// SetRetryInterval sets the retry interval of the buffer.
func (b *ttlDelRetryBuffer) SetRetryInterval(interval time.Duration) {
b.retryInterval = interval
}

// Drain drains a retry buffer.
func (b *ttlDelRetryBuffer) Drain() {
for ele := b.list.Front(); ele != nil; ele = ele.Next() {
Expand Down Expand Up @@ -296,8 +301,37 @@ func (w *ttlDeleteWorker) loop() error {
timer := time.NewTimer(w.retryBuffer.retryInterval)
defer timer.Stop()

// drain retry buffer to make sure the statistics are correct
defer w.retryBuffer.Drain()
defer func() {
// Have a final try to delete all rows in retry buffer while the worker stops
// to avoid leaving any TTL rows undeleted when shrinking the delete worker.
if w.retryBuffer.Len() > 0 {
start := time.Now()
log.Info(
"try to delete TTL rows in del worker buffer immediately because the worker is going to stop",
zap.Int("bufferLen", w.retryBuffer.Len()),
)
retryCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
w.retryBuffer.SetRetryInterval(0)
w.retryBuffer.DoRetry(func(task *ttlDeleteTask) [][]types.Datum {
return task.doDelete(retryCtx, se)
})
log.Info(
"delete TTL rows in del worker buffer finished",
zap.Duration("duration", time.Since(start)),
)
}

// drain retry buffer to make sure the statistics are correct
if w.retryBuffer.Len() > 0 {
log.Warn(
"some TTL rows are still in the buffer while the worker is going to stop, mark them as error",
zap.Int("bufferLen", w.retryBuffer.Len()),
)
w.retryBuffer.Drain()
}
}()

for w.Status() == workerStatusRunning {
tracer.EnterPhase(metrics.PhaseIdle)
select {
Expand Down
22 changes: 15 additions & 7 deletions pkg/ttl/ttlworker/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type ttlScanTaskExecResult struct {
time time.Time
task *ttlScanTask
err error
// interruptedByWorkerStop indicates whether the task has to stop for the worker stops.
// when it is true, we should reschedule this task in another worker or TiDB again.
interruptedByWorkerStop bool
}

func (t *ttlScanTask) result(err error) *ttlScanTaskExecResult {
Expand All @@ -99,6 +102,17 @@ func (t *ttlScanTask) getDatumRows(rows []chunk.Row) [][]types.Datum {
return datums
}

func (t *ttlScanTask) taskLogger(l *zap.Logger) *zap.Logger {
return l.With(
zap.String("jobID", t.JobID),
zap.Int64("scanID", t.ScanID),
zap.Int64("tableID", t.TableID),
zap.String("db", t.tbl.Schema.O),
zap.String("table", t.tbl.Name.O),
zap.String("partition", t.tbl.Partition.O),
)
}

func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, sessPool util.SessionPool) *ttlScanTaskExecResult {
// TODO: merge the ctx and the taskCtx in ttl scan task, to allow both "cancel" and gracefully stop workers
// now, the taskCtx is only check at the beginning of every loop
Expand All @@ -121,13 +135,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s
case <-doScanFinished.Done():
return
}
logger := logutil.BgLogger().With(
zap.Int64("tableID", t.TableID),
zap.String("table", t.tbl.Name.O),
zap.String("partition", t.tbl.Partition.O),
zap.String("jobID", t.JobID),
zap.Int64("scanID", t.ScanID),
)
logger := t.taskLogger(logutil.BgLogger())
logger.Info("kill the running statement in scan task because the task or worker cancelled")
rawSess.KillStmt()
ticker := time.NewTicker(time.Minute)
Expand Down
Loading

0 comments on commit 5bb58a4

Please sign in to comment.