Skip to content

Commit

Permalink
planner: use the session pool to execute SQLs in statshandler (#47065)
Browse files Browse the repository at this point in the history
ref #46905
  • Loading branch information
qw4990 committed Sep 19, 2023
1 parent 8d78d7a commit fdff1e6
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 88 deletions.
38 changes: 27 additions & 11 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,30 @@ import (
func (h *Handle) HandleDDLEvent(t *util.Event) error {
switch t.Tp {
case model.ActionCreateTable, model.ActionTruncateTable:
ids := h.getInitStateTableIDs(t.TableInfo)
ids, err := h.getInitStateTableIDs(t.TableInfo)
if err != nil {
return err
}
for _, id := range ids {
if err := h.insertTableStats2KV(t.TableInfo, id); err != nil {
return err
}
}
case model.ActionDropTable:
ids := h.getInitStateTableIDs(t.TableInfo)
ids, err := h.getInitStateTableIDs(t.TableInfo)
if err != nil {
return err
}
for _, id := range ids {
if err := h.resetTableStats2KVForDrop(id); err != nil {
return err
}
}
case model.ActionAddColumn, model.ActionModifyColumn:
ids := h.getInitStateTableIDs(t.TableInfo)
ids, err := h.getInitStateTableIDs(t.TableInfo)
if err != nil {
return err
}
for _, id := range ids {
if err := h.insertColStats2KV(id, t.ColumnInfos); err != nil {
return err
Expand All @@ -64,8 +73,11 @@ func (h *Handle) HandleDDLEvent(t *util.Event) error {
}
}
case model.ActionDropTablePartition:
pruneMode := h.CurrentPruneMode()
if pruneMode == variable.Dynamic && t.PartInfo != nil {
pruneMode, err := h.GetCurrentPruneMode()
if err != nil {
return err
}
if variable.PartitionPruneMode(pruneMode) == variable.Dynamic && t.PartInfo != nil {
if err := h.updateGlobalStats(t.TableInfo); err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +201,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
opts[ast.AnalyzeOptNumBuckets] = uint64(globalColStatsBucketNum)
}
// Generate the new column global-stats
newColGlobalStats, err := h.mergePartitionStats2GlobalStats(h.mu.ctx, opts, is, tblInfo, 0, nil, nil)
newColGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, 0, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -228,7 +240,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
if globalIdxStatsBucketNum != 0 {
opts[ast.AnalyzeOptNumBuckets] = uint64(globalIdxStatsBucketNum)
}
newIndexGlobalStats, err := h.mergePartitionStats2GlobalStats(h.mu.ctx, opts, is, tblInfo, 1, []int64{idx.ID}, nil)
newIndexGlobalStats, err := h.mergePartitionStats2GlobalStats(opts, is, tblInfo, 1, []int64{idx.ID}, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -276,19 +288,23 @@ func (h *Handle) changeGlobalStatsID(from, to int64) (err error) {
return nil
}

func (h *Handle) getInitStateTableIDs(tblInfo *model.TableInfo) (ids []int64) {
func (h *Handle) getInitStateTableIDs(tblInfo *model.TableInfo) (ids []int64, err error) {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
return []int64{tblInfo.ID}
return []int64{tblInfo.ID}, nil
}
ids = make([]int64, 0, len(pi.Definitions)+1)
for _, def := range pi.Definitions {
ids = append(ids, def.ID)
}
if h.CurrentPruneMode() == variable.Dynamic {
pruneMode, err := h.GetCurrentPruneMode()
if err != nil {
return nil, err
}
if variable.PartitionPruneMode(pruneMode) == variable.Dynamic {
ids = append(ids, tblInfo.ID)
}
return ids
return ids, nil
}

// DDLEventCh returns ddl events channel in handle.
Expand Down
8 changes: 5 additions & 3 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,11 @@ func (h *Handle) DumpHistoricalStatsBySnapshot(

// DumpStatsToJSONBySnapshot dumps statistic to json.
func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.TableInfo, snapshot uint64, dumpPartitionStats bool) (*JSONTable, error) {
h.mu.Lock()
isDynamicMode := variable.PartitionPruneMode(h.mu.ctx.GetSessionVars().PartitionPruneMode.Load()) == variable.Dynamic
h.mu.Unlock()
pruneMode, err := h.GetCurrentPruneMode()
if err != nil {
return nil, err
}
isDynamicMode := variable.PartitionPruneMode(pruneMode) == variable.Dynamic
pi := tableInfo.GetPartitionInfo()
if pi == nil {
return h.tableStatsToJSON(dbName, tableInfo, tableInfo.ID, snapshot)
Expand Down
76 changes: 41 additions & 35 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,15 @@ func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...in

func (h *Handle) execRestrictedSQLWithStatsVer(ctx context.Context, statsVer int, procTrackID uint64, analyzeSnapshot bool, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
pruneMode, err := h.GetCurrentPruneMode()
if err != nil {
return nil, nil, err
}
return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) {
optFuncs := []sqlexec.OptionFuncAlias{
execOptionForAnalyze[statsVer],
sqlexec.GetAnalyzeSnapshotOption(analyzeSnapshot),
sqlexec.GetPartitionPruneModeOption(string(h.CurrentPruneMode())),
sqlexec.GetPartitionPruneModeOption(pruneMode),
sqlexec.ExecOptionUseCurSession,
sqlexec.ExecOptionWithSysProcTrack(procTrackID, h.sysProcTracker.Track, h.sysProcTracker.UnTrack),
}
Expand Down Expand Up @@ -320,13 +324,6 @@ func (h *Handle) Update(is infoschema.InfoSchema) error {
return nil
}

// UpdateSessionVar updates the necessary session variables for the stats reader.
func (h *Handle) UpdateSessionVar() error {
h.mu.Lock()
defer h.mu.Unlock()
return UpdateSCtxVarsForStats(h.mu.ctx)
}

// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics.
func UpdateSCtxVarsForStats(sctx sessionctx.Context) error {
// analyzer version
Expand Down Expand Up @@ -401,10 +398,16 @@ func (h *Handle) loadTablePartitionStats(tableInfo *model.TableInfo, partitionDe
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableInfo.
func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
opts map[ast.AnalyzeOptionType]uint64, is infoschema.InfoSchema, globalTableInfo *model.TableInfo,
isIndex int, histIDs []int64,
func (h *Handle) mergePartitionStats2GlobalStats(opts map[ast.AnalyzeOptionType]uint64,
is infoschema.InfoSchema, globalTableInfo *model.TableInfo, isIndex int, histIDs []int64,
allPartitionStats map[int64]*statistics.Table) (globalStats *globalstats.GlobalStats, err error) {
se, err := h.pool.Get()
if err != nil {
return nil, err
}
defer h.pool.Put(se)
sc := se.(sessionctx.Context)

if err := UpdateSCtxVarsForStats(sc); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1157,20 +1160,17 @@ func (h *Handle) statsMetaByTableIDFromStorage(tableID int64, snapshot uint64) (
}

func (h *Handle) getGlobalStatsReader(snapshot uint64) (reader *statistics.StatsReader, err error) {
h.mu.Lock()
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("getGlobalStatsReader panic %v", r)
}
if err != nil {
h.mu.Unlock()
}
}()
return statistics.GetStatsReader(snapshot, h.mu.ctx.(sqlexec.RestrictedSQLExecutor))
se, err := h.pool.Get()
if err != nil {
return nil, err
}
exec := se.(sqlexec.RestrictedSQLExecutor)
return statistics.GetStatsReader(snapshot, exec, func() {
h.pool.Put(se)
})
}

func (h *Handle) releaseGlobalStatsReader(reader *statistics.StatsReader) error {
defer h.mu.Unlock()
func (*Handle) releaseGlobalStatsReader(reader *statistics.StatsReader) error {
return reader.Close()
}

Expand Down Expand Up @@ -1423,9 +1423,16 @@ func (h *Handle) fillExtStatsCorrVals(item *statistics.ExtendedStatsItem, cols [
item.ScalarVals = 0
return item
}
h.mu.Lock()
sc := h.mu.ctx.GetSessionVars().StmtCtx
h.mu.Unlock()

se, seErr := h.pool.Get()
if seErr != nil {
logutil.BgLogger().Error("fail to get session", zap.String("category", "stats"), zap.Error(seErr))
return nil
}
defer h.pool.Put(se)
sctx := se.(sessionctx.Context)
sc := sctx.GetSessionVars().StmtCtx

var err error
samplesX, err = statistics.SortSampleItems(sc, samplesX)
if err != nil {
Expand Down Expand Up @@ -1523,11 +1530,6 @@ func (h *Handle) SaveExtendedStatsToStorage(tableID int64, extStats *statistics.
return nil
}

// CurrentPruneMode indicates whether tbl support runtime prune for table and first partition id.
func (h *Handle) CurrentPruneMode() variable.PartitionPruneMode {
return variable.PartitionPruneMode(h.mu.ctx.GetSessionVars().PartitionPruneMode.Load())
}

// RefreshVars uses to pull PartitionPruneMethod vars from kv storage.
func (h *Handle) RefreshVars() error {
h.mu.Lock()
Expand Down Expand Up @@ -1737,12 +1739,16 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.

// CheckHistoricalStatsEnable is used to check whether TiDBEnableHistoricalStats is enabled.
func (h *Handle) CheckHistoricalStatsEnable() (enable bool, err error) {
h.mu.Lock()
defer h.mu.Unlock()
if err := UpdateSCtxVarsForStats(h.mu.ctx); err != nil {
se, err := h.pool.Get()
if err != nil {
return false, err
}
defer h.pool.Put(se)
sctx := se.(sessionctx.Context)
if err := UpdateSCtxVarsForStats(sctx); err != nil {
return false, err
}
return h.mu.ctx.GetSessionVars().EnableHistoricalStats, nil
return sctx.GetSessionVars().EnableHistoricalStats, nil
}

// InsertAnalyzeJob inserts analyze job into mysql.analyze_jobs and gets job ID for further updating job.
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (h *Handle) loadFreshStatsReader(readerCtx *StatsReaderContext, ctx sqlexec
}
}
for {
newReader, err := statistics.GetStatsReader(0, ctx)
newReader, err := statistics.GetStatsReader(0, ctx, nil)
if err == nil {
readerCtx.reader = newReader
readerCtx.createdTime = time.Now()
Expand Down
15 changes: 12 additions & 3 deletions statistics/handle/historical_stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,23 @@ func (h *Handle) recordHistoricalStatsMeta(tableID int64, version uint64, source
if !tbl.IsInitialized() {
return
}
h.mu.Lock()
defer h.mu.Unlock()
err := recordHistoricalStatsMeta(h.mu.ctx, tableID, version, source)
se, err := h.pool.Get()
if err != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", version),
zap.String("source", source),
zap.Error(err))
return
}
defer h.pool.Put(se)
sctx := se.(sessionctx.Context)
if err := recordHistoricalStatsMeta(sctx, tableID, version, source); err != nil {
logutil.BgLogger().Error("record historical stats meta failed",
zap.Int64("table-id", tableID),
zap.Uint64("version", version),
zap.String("source", source),
zap.Error(err))
return
}
}
Loading

0 comments on commit fdff1e6

Please sign in to comment.