Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support global singleflight for sync load (#52796) (#53839) #55731

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions planner/core/casetest/correlated/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/pkg/statistics/handle/syncload.(*statsSyncLoad).SendLoadRequests.func1"), // For TestPlanStatsLoadTimeout
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions sessionctx/stmtctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"@com_github_tikv_client_go_v2//util",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_sync//singleflight",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/sync/singleflight"
)

const (
Expand Down Expand Up @@ -346,7 +347,7 @@ type StatementContext struct {
// NeededItems stores the columns/indices whose stats are needed for planner.
NeededItems []model.TableItemID
// ResultCh to receive stats loading results
ResultCh chan StatsLoadResult
ResultCh []<-chan singleflight.Result
// LoadStartTime is to record the load start time to calculate latency
LoadStartTime time.Time
}
Expand Down
71 changes: 41 additions & 30 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
"golang.org/x/sync/singleflight"
)

var globalStatsSyncLoadSingleFlight singleflight.Group

type statsWrapper struct {
col *statistics.Column
idx *statistics.Index
Expand Down Expand Up @@ -76,25 +78,26 @@
}
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
tasks := make([]*NeededItemTask, 0)
sc.StatsLoad.ResultCh = make([]<-chan singleflight.Result, 0, len(remainedItems))
for _, item := range remainedItems {
task := &NeededItemTask{
TableItemID: item,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
}
tasks = append(tasks, task)
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for _, task := range tasks {
select {
case h.StatsLoad.NeededItemsCh <- task:
continue
case <-timer.C:
return errors.New("sync load stats channel is full and timeout sending task to channel")
}
localItem := item
resultCh := globalStatsSyncLoadSingleFlight.DoChan(localItem.Key(), func() (any, error) {
timer := time.NewTimer(timeout)
defer timer.Stop()
task := &NeededItemTask{
TableItemID: localItem,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: make(chan stmtctx.StatsLoadResult, 1),
}
select {
case h.StatsLoad.NeededItemsCh <- task:
result := <-task.ResultCh
return result, nil
case <-timer.C:
return nil, errors.New("sync load stats channel is full and timeout sending task to channel")
}
})
sc.StatsLoad.ResultCh = append(sc.StatsLoad.ResultCh, resultCh)
}
sc.StatsLoad.LoadStartTime = time.Now()
return nil
Expand All @@ -120,26 +123,34 @@
metrics.SyncLoadCounter.Inc()
timer := time.NewTimer(sc.StatsLoad.Timeout)
defer timer.Stop()
for {
for _, resultCh := range sc.StatsLoad.ResultCh {
select {
case result, ok := <-sc.StatsLoad.ResultCh:
if ok {
if result.HasError() {
errorMsgs = append(errorMsgs, result.ErrorMsg())
}
delete(resultCheckMap, result.Item)
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return nil
}
} else {
case result, ok := <-resultCh:
if !ok {
return errors.New("sync load stats channel closed unexpectedly")
}
// this error is from statsSyncLoad.SendLoadRequests which start to task and send task into worker,
// not the stats loading error
if result.Err != nil {
errorMsgs = append(errorMsgs, result.Err.Error())

Check warning on line 135 in statistics/handle/handle_hist.go

View check run for this annotation

Codecov / codecov/patch

statistics/handle/handle_hist.go#L135

Added line #L135 was not covered by tests
} else {
val := result.Val.(stmtctx.StatsLoadResult)
// this error is from the stats loading error
if val.HasError() {
errorMsgs = append(errorMsgs, val.ErrorMsg())
}

Check warning on line 141 in statistics/handle/handle_hist.go

View check run for this annotation

Codecov / codecov/patch

statistics/handle/handle_hist.go#L140-L141

Added lines #L140 - L141 were not covered by tests
delete(resultCheckMap, val.Item)
}
case <-timer.C:
metrics.SyncLoadTimeoutCounter.Inc()
return errors.New("sync load stats timeout")
}
}
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return nil
}
return nil

Check warning on line 153 in statistics/handle/handle_hist.go

View check run for this annotation

Codecov / codecov/patch

statistics/handle/handle_hist.go#L153

Added line #L153 was not covered by tests
}

// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache.
Expand Down
38 changes: 28 additions & 10 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,40 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
select {
case <-resultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
default:
}
}
for _, resultCh := range stmtCtx2.StatsLoad.ResultCh {
select {
case <-resultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
default:
}
}

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

task, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task)

rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
require.True(t, ok1)
require.Equal(t, neededColumns[0], rs1.Item)
rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh
require.True(t, ok2)
require.Equal(t, neededColumns[0], rs2.Item)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Equal(t, neededColumns[0].ID, rs1.Val.(stmtctx.StatsLoadResult).Item.ID)
}
for _, resultCh := range stmtCtx2.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Equal(t, neededColumns[0].ID, rs1.Val.(stmtctx.StatsLoadResult).Item.ID)
}

stat = h.GetTableStats(tableInfo)
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
Expand Down