Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support global singleflight for sync load (#52796) #53340

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/planner/core/casetest/planstats/plan_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
tk.MustExec("set global tidb_stats_load_pseudo_timeout=true")
require.NoError(t, failpoint.Enable("github.com/pingcap/executor/assertSyncStatsFailed", `return(true)`))
tk.MustExec(sql) // not fail sql for timeout when pseudo=true
failpoint.Disable("github.com/pingcap/executor/assertSyncStatsFailed")
require.NoError(t, failpoint.Disable("github.com/pingcap/executor/assertSyncStatsFailed"))

plan, _, err := planner.Optimize(context.TODO(), ctx, stmt, is)
require.NoError(t, err) // not fail sql for timeout when pseudo=true
Expand Down
1 change: 1 addition & 0 deletions pkg/sessionctx/stmtctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@org_golang_x_exp//maps",
"@org_golang_x_sync//singleflight",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
Expand Down
3 changes: 2 additions & 1 deletion pkg/sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/sync/singleflight"
)

const (
Expand Down Expand Up @@ -363,7 +364,7 @@ type StatementContext struct {
// NeededItems stores the columns/indices whose stats are needed for planner.
NeededItems []model.TableItemID
// ResultCh to receive stats loading results
ResultCh chan StatsLoadResult
ResultCh []<-chan singleflight.Result
// LoadStartTime is to record the load start time to calculate latency
LoadStartTime time.Time
}
Expand Down
1 change: 1 addition & 0 deletions pkg/statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/intest",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
104 changes: 51 additions & 53 deletions pkg/statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
utilstats "github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
Expand All @@ -41,6 +42,8 @@ import (
// RetryCount is the max retry count for a sync load task.
const RetryCount = 3

var globalStatsSyncLoadSingleFlight singleflight.Group

type statsWrapper struct {
col *statistics.Column
idx *statistics.Index
Expand Down Expand Up @@ -81,25 +84,27 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
}
sc.StatsLoad.Timeout = timeout
sc.StatsLoad.NeededItems = remainedItems
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
tasks := make([]*NeededItemTask, 0)
sc.StatsLoad.ResultCh = make([]<-chan singleflight.Result, 0, len(remainedItems))
for _, item := range remainedItems {
task := &NeededItemTask{
TableItemID: item,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: sc.StatsLoad.ResultCh,
}
tasks = append(tasks, task)
}
timer := time.NewTimer(timeout)
defer timer.Stop()
for _, task := range tasks {
select {
case h.StatsLoad.NeededItemsCh <- task:
continue
case <-timer.C:
return errors.New("sync load stats channel is full and timeout sending task to channel")
}
localItem := item
resultCh := globalStatsSyncLoadSingleFlight.DoChan(localItem.Key(), func() (any, error) {
timer := time.NewTimer(timeout)
defer timer.Stop()
task := &NeededItemTask{
TableItemID: localItem,
ToTimeout: time.Now().Local().Add(timeout),
ResultCh: make(chan stmtctx.StatsLoadResult, 1),
}
select {
case h.StatsLoad.NeededItemsCh <- task:
result, ok := <-task.ResultCh
intest.Assert(ok, "task.ResultCh cannot be closed")
return result, nil
case <-timer.C:
return nil, errors.New("sync load stats channel is full and timeout sending task to channel")
}
})
sc.StatsLoad.ResultCh = append(sc.StatsLoad.ResultCh, resultCh)
}
sc.StatsLoad.LoadStartTime = time.Now()
return nil
Expand All @@ -125,25 +130,34 @@ func (*Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
metrics.SyncLoadCounter.Inc()
timer := time.NewTimer(sc.StatsLoad.Timeout)
defer timer.Stop()
for {
for _, resultCh := range sc.StatsLoad.ResultCh {
select {
case result, ok := <-sc.StatsLoad.ResultCh:
case result, ok := <-resultCh:
if !ok {
return errors.New("sync load stats channel closed unexpectedly")
}
if result.HasError() {
errorMsgs = append(errorMsgs, result.ErrorMsg())
}
delete(resultCheckMap, result.Item)
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return nil
// this error is from statsSyncLoad.SendLoadRequests which start to task and send task into worker,
// not the stats loading error
if result.Err != nil {
errorMsgs = append(errorMsgs, result.Err.Error())
} else {
val := result.Val.(stmtctx.StatsLoadResult)
// this error is from the stats loading error
if val.HasError() {
errorMsgs = append(errorMsgs, val.ErrorMsg())
}
delete(resultCheckMap, val.Item)
}
case <-timer.C:
metrics.SyncLoadTimeoutCounter.Inc()
return errors.New("sync load stats timeout")
}
}
if len(resultCheckMap) == 0 {
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
return nil
}
return nil
}

// removeHistLoadedColumns removed having-hist columns based on neededColumns and statsCache.
Expand Down Expand Up @@ -230,33 +244,17 @@ func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask
task = lastTask
}
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
resultChan := h.StatsLoad.Singleflight.DoChan(task.TableItemID.Key(), func() (any, error) {
err := h.handleOneItemTask(task)
return nil, err
})
timeout := time.Until(task.ToTimeout)
select {
case sr := <-resultChan:
// sr.Val is always nil.
if sr.Err == nil {
task.ResultCh <- result
return nil, nil
}
if !isVaildForRetry(task) {
result.Error = sr.Err
task.ResultCh <- result
return nil, nil
}
return task, sr.Err
case <-time.After(timeout):
if !isVaildForRetry(task) {
result.Error = errors.New("stats loading timeout")
task.ResultCh <- result
return nil, nil
}
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
return task, nil
err = h.handleOneItemTask(task)
if err == nil {
task.ResultCh <- result
return nil, nil
}
if !isVaildForRetry(task) {
result.Error = err
task.ResultCh <- result
return nil, nil
}
return task, err
}

func isVaildForRetry(task *NeededItemTask) bool {
Expand Down
54 changes: 33 additions & 21 deletions pkg/statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,23 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
select {
case <-resultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
default:
}
}
for _, resultCh := range stmtCtx2.StatsLoad.ResultCh {
select {
case <-resultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
default:
}
}
select {
case <-stmtCtx1.StatsLoad.ResultCh:
t.Logf("stmtCtx1.ResultCh should not get anything")
t.FailNow()
case <-stmtCtx2.StatsLoad.ResultCh:
t.Logf("stmtCtx2.ResultCh should not get anything")
t.FailNow()
case <-task1.ResultCh:
t.Logf("task1.ResultCh should not get anything")
t.FailNow()
Expand All @@ -225,16 +235,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
require.NoError(t, err3)
require.Nil(t, task3)

task, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh)
require.NoError(t, err3)
require.Nil(t, task)

rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
require.True(t, ok1)
require.Equal(t, neededColumns[0], rs1.Item)
rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh
require.True(t, ok2)
require.Equal(t, neededColumns[0], rs2.Item)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Equal(t, neededColumns[0], rs1.Val.(stmtctx.StatsLoadResult).Item)
}
for _, resultCh := range stmtCtx2.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Equal(t, neededColumns[0], rs1.Val.(stmtctx.StatsLoadResult).Item)
}

stat = h.GetTableStats(tableInfo)
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
Expand Down Expand Up @@ -310,11 +322,11 @@ func TestRetry(t *testing.T) {
result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh)
require.NoError(t, err1)
require.Nil(t, result)
select {
case <-task1.ResultCh:
default:
t.Logf("task1.ResultCh should get nothing")
t.FailNow()
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
rs1, ok1 := <-resultCh
require.True(t, rs1.Shared)
require.True(t, ok1)
require.Error(t, rs1.Val.(stmtctx.StatsLoadResult).Error)
}
task1.Retry = 0
for i := 0; i < handle.RetryCount*5; i++ {
Expand Down