Skip to content

Commit

Permalink
topsql: fix memory leak in stmtstats (#34558)
Browse files Browse the repository at this point in the history
ref #34502, ref #34525
  • Loading branch information
mornyx authored May 11, 2022
1 parent 218247e commit 9e7c951
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 26 deletions.
13 changes: 5 additions & 8 deletions util/topsql/stmtstats/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,25 @@ func (m *aggregator) run() {
case <-m.ctx.Done():
return
case <-tick.C:
m.aggregate(state.TopSQLEnabled())
m.aggregate()
}
}
}

// aggregate data from all associated StatementStats.
// If StatementStats has been closed, collect will remove it from the map.
func (m *aggregator) aggregate(take bool) {
func (m *aggregator) aggregate() {
total := StatementStatsMap{}
m.statsSet.Range(func(statsR, _ interface{}) bool {
stats := statsR.(*StatementStats)
if stats.Finished() {
m.unregister(stats)
total.Merge(stats.Take())
return true
}
if take {
total.Merge(stats.Take())
}
total.Merge(stats.Take())
return true
})
if len(total) > 0 {
// If TopSQL is not enabled, just drop them.
if len(total) > 0 && state.TopSQLEnabled() {
m.collectors.Range(func(c, _ interface{}) bool {
c.(Collector).CollectStmtStatsMap(total)
return true
Expand Down
36 changes: 18 additions & 18 deletions util/topsql/stmtstats/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package stmtstats

import (
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -51,6 +50,8 @@ func Test_RegisterUnregisterCollector(t *testing.T) {
}

func Test_aggregator_register_collect(t *testing.T) {
state.EnableTopSQL()
defer state.DisableTopSQL()
a := newAggregator()
stats := &StatementStats{
data: StatementStatsMap{},
Expand All @@ -63,7 +64,7 @@ func Test_aggregator_register_collect(t *testing.T) {
a.registerCollector(newMockCollector(func(data StatementStatsMap) {
total.Merge(data)
}))
a.aggregate(true)
a.aggregate()
assert.NotEmpty(t, total)
assert.Equal(t, uint64(1), total[SQLPlanDigest{SQLDigest: "SQL-1"}].ExecCount)
assert.Equal(t, uint64(time.Millisecond.Nanoseconds()), total[SQLPlanDigest{SQLDigest: "SQL-1"}].SumDurationNs)
Expand All @@ -90,37 +91,36 @@ func Test_aggregator_run_close(t *testing.T) {
}

func TestAggregatorDisableAggregate(t *testing.T) {
var mu sync.Mutex
total := StatementStatsMap{}
a := newAggregator()
a.registerCollector(newMockCollector(func(data StatementStatsMap) {
mu.Lock()
total.Merge(data)
mu.Unlock()
}))

a.start()

state.DisableTopSQL()
stats := &StatementStats{
data: StatementStatsMap{
SQLPlanDigest{SQLDigest: BinaryDigest("")}: &StatementStatsItem{},
SQLPlanDigest{SQLDigest: ""}: &StatementStatsItem{},
},
finished: atomic.NewBool(false),
}
state.DisableTopSQL()
a.register(stats)
time.Sleep(1500 * time.Millisecond)
mu.Lock()
require.Empty(t, total)
mu.Unlock()
a.aggregate()
require.Empty(t, stats.data) // a.aggregate() will take all data even if TopSQL is not enabled.
require.Empty(t, total) // But just drop them.

state.EnableTopSQL()
time.Sleep(1500 * time.Millisecond)
mu.Lock()
stats = &StatementStats{
data: StatementStatsMap{
SQLPlanDigest{SQLDigest: ""}: &StatementStatsItem{},
},
finished: atomic.NewBool(false),
}
a.register(stats)
a.aggregate()
require.Empty(t, stats.data)
require.Len(t, total, 1)
mu.Unlock()
state.DisableTopSQL()

a.close()
}

type mockCollector struct {
Expand Down

0 comments on commit 9e7c951

Please sign in to comment.