Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: fix some memory leak in TTL (#45512) #45514

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3584,8 +3584,10 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
// attachStatsCollector attaches the stats collector in the dom for the session
func attachStatsCollector(s *session, dom *domain.Domain) *session {
if dom.StatsHandle() != nil && dom.StatsUpdating() {
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
if GetIndexUsageSyncLease() > 0 {
if s.statsCollector == nil {
s.statsCollector = dom.StatsHandle().NewSessionStatsCollector()
}
if s.idxUsageCollector == nil && GetIndexUsageSyncLease() > 0 {
s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector()
}
}
Expand All @@ -3595,9 +3597,14 @@ func attachStatsCollector(s *session, dom *domain.Domain) *session {

// detachStatsCollector removes the stats collector in the session
func detachStatsCollector(s *session) *session {
s.statsCollector = nil
s.idxUsageCollector = nil

if s.statsCollector != nil {
s.statsCollector.Delete()
s.statsCollector = nil
}
if s.idxUsageCollector != nil {
s.idxUsageCollector.Delete()
s.idxUsageCollector = nil
}
return s
}

Expand Down
8 changes: 8 additions & 0 deletions ttl/ttlworker/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ loop:
err = idleWorker.Schedule(task.ttlScanTask)
if err != nil {
logger.Warn("fail to schedule task", zap.Error(err))
task.cancel()
continue
}

Expand Down Expand Up @@ -457,6 +458,8 @@ func (m *taskManager) checkFinishedTask(se session.Session, now time.Time) {
stillRunningTasks = append(stillRunningTasks, task)
continue
}
// we should cancel task to release inner context and avoid memory leak
task.cancel()
err := m.reportTaskFinished(se, now, task)
if err != nil {
logutil.Logger(m.ctx).Error("fail to report finished task", zap.Error(err))
Expand Down Expand Up @@ -579,6 +582,11 @@ type runningScanTask struct {
result *ttlScanTaskExecResult
}

// Context returns context for the task and is only used by test now
func (t *runningScanTask) Context() context.Context {
return t.ctx
}

func (t *runningScanTask) finished() bool {
return t.result != nil && t.statistics.TotalRows.Load() == t.statistics.ErrorRows.Load()+t.statistics.SuccessRows.Load()
}
16 changes: 15 additions & 1 deletion ttl/ttlworker/task_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func TestParallelSchedule(t *testing.T) {
require.NoError(t, isc.Update(sessionFactory()))
now := time.Now()
scheduleWg := sync.WaitGroup{}
finishTasks := make([]func(), 0, 4)
for i := 0; i < 4; i++ {
workers := []ttlworker.Worker{}
for j := 0; j < 4; j++ {
Expand All @@ -139,21 +140,34 @@ func TestParallelSchedule(t *testing.T) {
workers = append(workers, scanWorker)
}

m := ttlworker.NewTaskManager(context.Background(), nil, isc, fmt.Sprintf("task-manager-%d", i), store)
managerID := fmt.Sprintf("task-manager-%d", i)
m := ttlworker.NewTaskManager(context.Background(), nil, isc, managerID, store)
m.SetScanWorkers4Test(workers)
scheduleWg.Add(1)
go func() {
se := sessionFactory()
m.RescheduleTasks(se, now)
scheduleWg.Done()
}()
finishTasks = append(finishTasks, func() {
se := sessionFactory()
for _, task := range m.GetRunningTasks() {
require.Nil(t, task.Context().Err(), fmt.Sprintf("%s %d", managerID, task.ScanID))
task.SetResult(nil)
m.CheckFinishedTask(se, time.Now())
require.NotNil(t, task.Context().Err(), fmt.Sprintf("%s %d", managerID, task.ScanID))
}
})
}
scheduleWg.Wait()
// all tasks should have been scheduled
tk.MustQuery("select count(1) from mysql.tidb_ttl_task where status = 'running'").Check(testkit.Rows("16"))
for i := 0; i < 4; i++ {
sql := fmt.Sprintf("select count(1) from mysql.tidb_ttl_task where status = 'running' AND owner_id = 'task-manager-%d'", i)
tk.MustQuery(sql).Check(testkit.Rows("4"))
finishTasks[i]()
sql = fmt.Sprintf("select count(1) from mysql.tidb_ttl_task where status = 'finished' AND owner_id = 'task-manager-%d'", i)
tk.MustQuery(sql).Check(testkit.Rows("4"))
}
}

Expand Down