Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support global singleflight for sync load (#52796) #53838

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions planner/core/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("gopkg.in/natefinch/lumberjack%2ev2.(*Logger).millRun"),
goleak.IgnoreTopFunction("github.com/tikv/client-go/v2/txnkv/transaction.keepAlive"),
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/pkg/statistics/handle/syncload.(*statsSyncLoad).SendLoadRequests.func1"), // For TestPlanStatsLoadTimeout
}
goleak.VerifyTestMain(m, opts...)
}
1 change: 1 addition & 0 deletions sessionctx/stmtctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@org_golang_x_sync//singleflight",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/tikv/client-go/v2/util"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)

const (
Expand Down Expand Up @@ -333,7 +334,7 @@ type StatementContext struct {
// NeededItems stores the columns/indices whose stats are needed for planner.
NeededItems []model.TableItemID
// ResultCh to receive stats loading results
ResultCh chan StatsLoadResult
ResultCh []<-chan singleflight.Result
// LoadStartTime is to record the load start time to calculate latency
LoadStartTime time.Time
}
Expand Down
102 changes: 51 additions & 51 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

var globalStatsSyncLoadSingleFlight singleflight.Group

type statsWrapper struct {
col *statistics.Column
idx *statistics.Index
Expand Down Expand Up @@ -80,25 +82,26 @@
}
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
tasks := make([]*NeededItemTask, 0)
sc.StatsLoad.ResultCh = make([]<-chan singleflight.Result, 0, len(remainedItems))
for _, item := range remainedItems {
task := &NeededItemTask{
TableItemID: item,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
}
tasks = append(tasks, task)
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for _, task := range tasks {
select {
case h.StatsLoad.NeededItemsCh <- task:
continue
case <-timer.C:
return errors.New("sync load stats channel is full and timeout sending task to channel")
}
localItem := item
resultCh := globalStatsSyncLoadSingleFlight.DoChan(localItem.Key(), func() (any, error) {
timer := time.NewTimer(timeout)
defer timer.Stop()
task := &NeededItemTask{
TableItemID: localItem,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: make(chan stmtctx.StatsLoadResult, 1),
}
select {
case h.StatsLoad.NeededItemsCh <- task:
result := <-task.ResultCh
return result, nil
case <-timer.C:
return nil, errors.New("sync load stats channel is full and timeout sending task to channel")
}
})
sc.StatsLoad.ResultCh = append(sc.StatsLoad.ResultCh, resultCh)
}
sc.StatsLoad.LoadStartTime = time.Now()
return nil
Expand All @@ -124,26 +127,34 @@
metrics.SyncLoadCounter.Inc()
timer := time.NewTimer(sc.StatsLoad.Timeout)
defer timer.Stop()
for {
for _, resultCh := range sc.StatsLoad.ResultCh {
select {
case result, ok := <-sc.StatsLoad.ResultCh:
if ok {
if result.HasError() {
errorMsgs = append(errorMsgs, result.ErrorMsg())
}
delete(resultCheckMap, result.Item)
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return nil
}
} else {
case result, ok := <-resultCh:
if !ok {
return errors.New("sync load stats channel closed unexpectedly")
}
// this error is from statsSyncLoad.SendLoadRequests which start to task and send task into worker,
// not the stats loading error
if result.Err != nil {
errorMsgs = append(errorMsgs, result.Err.Error())

Check warning on line 139 in statistics/handle/handle_hist.go

View check run for this annotation

Codecov / codecov/patch

statistics/handle/handle_hist.go#L139

Added line #L139 was not covered by tests
} else {
val := result.Val.(stmtctx.StatsLoadResult)
// this error is from the stats loading error
if val.HasError() {
errorMsgs = append(errorMsgs, val.ErrorMsg())
}

Check warning on line 145 in statistics/handle/handle_hist.go

View check run for this annotation

Codecov / codecov/patch

statistics/handle/handle_hist.go#L144-L145

Added lines #L144 - L145 were not covered by tests
delete(resultCheckMap, val.Item)
}
case <-timer.C:
metrics.SyncLoadTimeoutCounter.Inc()
return errors.New("sync load stats timeout")
}
}
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return nil
}
return nil

Check warning on line 157 in statistics/handle/handle_hist.go

View check run for this annotation

Codecov / codecov/patch

statistics/handle/handle_hist.go#L157

Added line #L157 was not covered by tests
}

// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache.
Expand Down Expand Up @@ -244,28 +255,17 @@
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
err := h.handleOneItemTask(task, readerCtx, ctx)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case sr := <-resultChan:
// sr.Val is always nil.
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
if !isVaildForRetry(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, sr.Err
case <-time.After(timeout):
task.ToTimeout.Add(time.Duration(h.mu.ctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
return task, nil
err = h.handleOneItemTask(task, readerCtx, ctx)
if err == nil {
task.ResultCh <- result
return nil, nil
}
if !isVaildForRetry(task) {
result.Error = err
task.ResultCh <- result
return nil, nil
}
return task, err
}

func isVaildForRetry(task *NeededItemTask) bool {
Expand Down
56 changes: 33 additions & 23 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,23 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)

for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
select {
case <-resultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
default:
}
}
for _, resultCh := range stmtCtx2.StatsLoad.ResultCh {
select {
case <-resultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
default:
}
}
select {
case <-stmtCtx1.StatsLoad.ResultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
case <-stmtCtx2.StatsLoad.ResultCh:
t.Logf("stmtCtx2.ResultCh should not get anything")
t.FailNow()
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get anything")
t.FailNow()
Expand All @@ -226,17 +235,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

task, err3 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task)

rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
require.True(t, ok1)
require.Equal(t, neededColumns[0], rs1.Item)
rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh
require.True(t, ok2)
require.Equal(t, neededColumns[0], rs2.Item)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Equal(t, neededColumns[0].ID, rs1.Val.(stmtctx.StatsLoadResult).Item.ID)
}
for _, resultCh := range stmtCtx2.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Equal(t, neededColumns[0].ID, rs1.Val.(stmtctx.StatsLoadResult).Item.ID)
}

stat = h.GetTableStats(tableInfo)
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
Expand Down Expand Up @@ -313,11 +323,11 @@ func TestRetry(t *testing.T) {
result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err1)
require.Nil(t, result)
select {
case <-task1.ResultCh:
default:
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Error(t, rs1.Val.(stmtctx.StatsLoadResult).Error)
}
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/statistics/handle/mockReadStatsForOneFail"))
}
Loading