Skip to content

Commit

Permalink
Merge branch 'master' into default_curdate
Browse files Browse the repository at this point in the history
  • Loading branch information
CbcWestwolf authored Jan 5, 2023
2 parents 453ad8f + 4a5a447 commit f80ddcd
Show file tree
Hide file tree
Showing 26 changed files with 633 additions and 42 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2881,8 +2881,8 @@ def go_deps():
name = "com_github_pingcap_badger",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/badger",
sum = "h1:QB16qn8wx5X4SRn3/5axrjPMNS3WRt87+5Bfrnmt6IA=",
version = "v1.5.1-0.20221229114011-ddffaa0fff7a",
sum = "h1:AEcvKyVM8CUII3bYzgz8haFXtGiqcrtXW1csu/5UELY=",
version = "v1.5.1-0.20230103063557-828f39b09b6d",
)
go_repository(
name = "com_github_pingcap_check",
Expand Down
18 changes: 14 additions & 4 deletions domain/historical_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package domain
import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics/handle"
)
Expand Down Expand Up @@ -48,18 +49,27 @@ func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle *
}
sctx := w.sctx
is := GetDomain(sctx).InfoSchema()
isPartition := false
var tblInfo *model.TableInfo
tbl, existed := is.TableByID(tableID)
if !existed {
return errors.Errorf("cannot get table by id %d", tableID)
tbl, db, p := is.FindTableByPartitionID(tableID)
if tbl != nil && db != nil && p != nil {
isPartition = true
tblInfo = tbl.Meta()
} else {
return errors.Errorf("cannot get table by id %d", tableID)
}
} else {
tblInfo = tbl.Meta()
}
tblInfo := tbl.Meta()
dbInfo, existed := is.SchemaByTable(tblInfo)
if !existed {
return errors.Errorf("cannot get DBInfo by TableID %d", tableID)
}
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil {
if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo, tableID, isPartition); err != nil {
generateHistoricalStatsFailedCounter.Inc()
return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O)
return errors.Errorf("record table %s.%s's historical stats failed, err:%v", dbInfo.Name.O, tblInfo.Name.O, err)
}
generateHistoricalStatsSuccessCounter.Inc()
return nil
Expand Down
22 changes: 18 additions & 4 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
}
}

tableIDs := map[int64]struct{}{}

// save analyze results in single-thread.
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0
Expand All @@ -311,6 +313,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}

if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil {
tableID := results.TableID.TableID
Expand All @@ -319,17 +322,20 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n
finishJobWithLog(e.ctx, results.Job, err)
} else {
finishJobWithLog(e.ctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 {
finishJobWithLog(e.ctx, results.Job, ErrQueryInterrupted)
return errors.Trace(ErrQueryInterrupted)
}
}
// Dump stats to historical storage.
for tableID := range tableIDs {
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}

return err
}

Expand All @@ -348,6 +354,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot)
})
}
tableIDs := map[int64]struct{}{}
panicCnt := 0
var err error
for panicCnt < statsConcurrency {
Expand All @@ -370,6 +377,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
continue
}
handleGlobalStats(needGlobalStats, globalStatsMap, results)
tableIDs[results.TableID.GetStatisticsID()] = struct{}{}
saveResultsCh <- results
}
close(saveResultsCh)
Expand All @@ -382,6 +390,12 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta
}
err = errors.New(strings.Join(errMsg, ","))
}
for tableID := range tableIDs {
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
return err
}

Expand Down
12 changes: 8 additions & 4 deletions executor/analyze_global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
globalStatsTableIDs[globalStatsID.tableID] = struct{}{}
}
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
tableIDs := map[int64]struct{}{}
for tableID := range globalStatsTableIDs {
tableIDs[tableID] = struct{}{}
tableAllPartitionStats := make(map[int64]*statistics.Table)
for globalStatsID, info := range globalStatsMap {
if globalStatsID.tableID != tableID {
Expand Down Expand Up @@ -101,16 +103,18 @@ func (e *AnalyzeExec) handleGlobalStats(ctx context.Context, needGlobalStats boo
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.String("info", job.JobInfo),
zap.Int64("histID", hg.ID), zap.Error(err), zap.Int64("tableID", tableID))
}
// Dump stats to historical storage.
if err1 := recordHistoricalStats(e.ctx, globalStatsID.tableID); err1 != nil {
logutil.BgLogger().Error("record historical stats failed", zap.String("info", job.JobInfo), zap.Int64("histID", hg.ID), zap.Error(err1))
}
}
return err
}()
FinishAnalyzeMergeJob(e.ctx, job, mergeStatsErr)
}
}
for tableID := range tableIDs {
// Dump stats to historical storage.
if err := recordHistoricalStats(e.ctx, tableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
return nil
}

Expand Down
4 changes: 0 additions & 4 deletions executor/analyze_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ func (worker *analyzeSaveStatsWorker) run(ctx context.Context, analyzeSnapshot b
worker.errCh <- err
} else {
finishJobWithLog(worker.sctx, results.Job, nil)
// Dump stats to historical storage.
if err := recordHistoricalStats(worker.sctx, results.TableID.TableID); err != nil {
logutil.BgLogger().Error("record historical stats failed", zap.Error(err))
}
}
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID())
if err != nil {
Expand Down
27 changes: 27 additions & 0 deletions executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,30 @@ func TestGCOutdatedHistoryStats(t *testing.T) {
tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'",
tableInfo.Meta().ID)).Check(testkit.Rows("0"))
}

func TestPartitionTableHistoricalStats(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set global tidb_enable_historical_stats = 1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b))
PARTITION BY RANGE ( a ) (
PARTITION p0 VALUES LESS THAN (6)
)`)
tk.MustExec("delete from mysql.stats_history")

tk.MustExec("analyze table test.t")
// dump historical stats
h := dom.StatsHandle()
hsWorker := dom.GetHistoricalStatsWorker()

// assert global table and partition table be dumped
tblID := hsWorker.GetOneHistoricalStatsTable()
err := hsWorker.DumpHistoricalStats(tblID, h)
require.NoError(t, err)
tblID = hsWorker.GetOneHistoricalStatsTable()
err = hsWorker.DumpHistoricalStats(tblID, h)
require.NoError(t, err)
tk.MustQuery("select count(*) from mysql.stats_history").Check(testkit.Rows("2"))
}
2 changes: 1 addition & 1 deletion expression/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ go_library(

go_test(
name = "expression_test",
timeout = "short",
timeout = "moderate",
srcs = [
"bench_test.go",
"builtin_arithmetic_test.go",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ require (
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.2.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a
github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d
github.com/pingcap/errors v0.11.5-0.20221009092201-b66cddb77c32
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,8 @@ github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2/go.mod h1:iIss55rK
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM=
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a h1:QB16qn8wx5X4SRn3/5axrjPMNS3WRt87+5Bfrnmt6IA=
github.com/pingcap/badger v1.5.1-0.20221229114011-ddffaa0fff7a/go.mod h1:p8QnkZnmyV8L/M/jzYb8rT7kv3bz9m7bn1Ju94wDifs=
github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d h1:AEcvKyVM8CUII3bYzgz8haFXtGiqcrtXW1csu/5UELY=
github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d/go.mod h1:p8QnkZnmyV8L/M/jzYb8rT7kv3bz9m7bn1Ju94wDifs=
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8/go.mod h1:B1+S9LNcuMyLH/4HMTViQOJevkGiik3wW2AN9zb2fNQ=
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712 h1:R8gStypOBmpnHEx1qi//SaqxJVI4inOqljg/Aj5/390=
Expand Down
12 changes: 3 additions & 9 deletions planner/core/expression_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,16 +1559,10 @@ func (er *expressionRewriter) inToExpression(lLen int, not bool, tp *types.Field
if c, ok := args[i].(*expression.Constant); ok {
var isExceptional bool
if expression.MaybeOverOptimized4PlanCache(er.sctx, []expression.Expression{c}) {
if c.GetType().EvalType() == types.ETString {
// To keep the result be compatible with MySQL, refine `int non-constant <cmp> str constant`
// here and skip this refine operation in all other cases for safety.
er.sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", c.String()))
expression.RemoveMutableConst(er.sctx, []expression.Expression{c})
} else {
continue
if c.GetType().EvalType() == types.ETInt {
continue // no need to refine it
}
} else if !er.sctx.GetSessionVars().StmtCtx.UseCache {
// We should remove the mutable constant for correctness, because its value may be changed.
er.sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", c.String()))
expression.RemoveMutableConst(er.sctx, []expression.Expression{c})
}
args[i], isExceptional = expression.RefineComparedConstant(er.sctx, *leftFt, c, opcode.EQ)
Expand Down
32 changes: 32 additions & 0 deletions planner/core/plan_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,38 @@ func TestPlanCacheDiagInfo(t *testing.T) {
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: some parameters may be overwritten"))
}

func TestIssue40224(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int, key(a))")
tk.MustExec("prepare st from 'select a from t where a in (?, ?)'")
tk.MustExec("set @a=1.0, @b=2.0")
tk.MustExec("execute st using @a, @b")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: '1.0' may be converted to INT"))
tk.MustExec("execute st using @a, @b")
tkProcess := tk.Session().ShowProcess()
ps := []*util.ProcessInfo{tkProcess}
tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps})
tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).CheckAt([]int{0},
[][]interface{}{
{"IndexReader_6"},
{"└─IndexRangeScan_5"}, // range scan not full scan
})

tk.MustExec("set @a=1, @b=2")
tk.MustExec("execute st using @a, @b")
tk.MustQuery("show warnings").Check(testkit.Rows()) // no warning for INT values
tk.MustExec("execute st using @a, @b")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // cacheable for INT
tk.MustExec("execute st using @a, @b")
tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).CheckAt([]int{0},
[][]interface{}{
{"IndexReader_6"},
{"└─IndexRangeScan_5"}, // range scan not full scan
})
}

func TestIssue40225(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
11 changes: 9 additions & 2 deletions resourcemanager/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "resourcemanage",
srcs = ["rm.go"],
name = "resourcemanager",
srcs = [
"rm.go",
"schedule.go",
],
importpath = "github.com/pingcap/tidb/resourcemanager",
visibility = ["//visibility:public"],
deps = [
"//resourcemanager/scheduler",
"//resourcemanager/util",
"//util",
"//util/cpu",
"@com_github_pingcap_log//:log",
"@org_uber_go_zap//:zap",
],
)
35 changes: 35 additions & 0 deletions resourcemanager/rm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
package resourcemanager

import (
"time"

"github.com/pingcap/tidb/resourcemanager/scheduler"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/cpu"
)
Expand All @@ -24,24 +28,55 @@ var GlobalResourceManager = NewResourceManger()

// ResourceManager is a resource manager
type ResourceManager struct {
poolMap *util.ShardPoolMap
scheduler []scheduler.Scheduler
cpuObserver *cpu.Observer
exitCh chan struct{}
wg tidbutil.WaitGroupWrapper
}

// NewResourceManger is to create a new resource manager
func NewResourceManger() *ResourceManager {
sc := make([]scheduler.Scheduler, 0, 1)
sc = append(sc, scheduler.NewCPUScheduler())
return &ResourceManager{
cpuObserver: cpu.NewCPUObserver(),
exitCh: make(chan struct{}),
poolMap: util.NewShardPoolMap(),
scheduler: sc,
}
}

// Start is to start resource manager
func (r *ResourceManager) Start() {
r.wg.Run(r.cpuObserver.Start)
r.wg.Run(func() {
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
for {
select {
case <-tick.C:
r.schedule()
case <-r.exitCh:
return
}
}
})
}

// Stop is to stop resource manager
func (r *ResourceManager) Stop() {
r.cpuObserver.Stop()
close(r.exitCh)
r.wg.Wait()
}

// Register is to register pool into resource manager
func (r *ResourceManager) Register(pool util.GorotinuePool, name string, component util.Component) error {
p := util.PoolContainer{Pool: pool, Component: component}
return r.registerPool(name, &p)
}

func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) error {
return r.poolMap.Add(name, pool)
}
Loading

0 comments on commit f80ddcd

Please sign in to comment.