Skip to content

Commit

Permalink
statistics: remove lockedTable cache (#46555)
Browse files Browse the repository at this point in the history
ref #46351
  • Loading branch information
Rustin170506 authored Sep 2, 2023
1 parent c002263 commit 93dce26
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 309 deletions.
6 changes: 0 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2386,7 +2386,6 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
lease := do.statsLease
deltaUpdateTicker := time.NewTicker(20 * lease)
gcStatsTicker := time.NewTicker(100 * lease)
loadLockedTablesTicker := time.NewTicker(5 * lease)
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
readMemTricker := time.NewTicker(memory.ReadMemInterval)
statsHandle := do.StatsHandle()
Expand Down Expand Up @@ -2416,11 +2415,6 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
if err != nil {
logutil.BgLogger().Debug("dump stats delta failed", zap.Error(err))
}
case <-loadLockedTablesTicker.C:
err := statsHandle.LoadLockedTables()
if err != nil {
logutil.BgLogger().Debug("load locked table failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
continue
Expand Down
47 changes: 29 additions & 18 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
sessionVars := e.Ctx().GetSessionVars()

// Filter the locked tables.
tasks, needAnalyzeTableCnt, skippedTables := filterAndCollectTasks(e.tasks, statsHandle, infoSchema)
tasks, needAnalyzeTableCnt, skippedTables, err := filterAndCollectTasks(e.tasks, statsHandle, infoSchema)
if err != nil {
return err
}
warnLockedTableMsg(sessionVars, needAnalyzeTableCnt, skippedTables)

if len(tasks) == 0 {
Expand Down Expand Up @@ -170,37 +173,45 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}

// filterAndCollectTasks filters the tasks that are not locked and collects the table IDs.
func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, infoSchema infoschema.InfoSchema) ([]*analyzeTask, uint, []string) {
func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, infoSchema infoschema.InfoSchema) ([]*analyzeTask, uint, []string, error) {
var (
filteredTasks []*analyzeTask
skippedTables []string
needAnalyzeTableCnt uint
tids = make(map[int64]struct{})
tids = make([]int64, 0, len(tasks))
// taskMap is used to collect the tasks that belong to the same table.
// In stats v1, analyze for each index is a single task, and they have the same table id.
taskMap = make(map[int64][]*analyzeTask, len(tasks))
)

for _, task := range tasks {
tableID := getTableIDFromTask(task)
isLocked := statsHandle.IsTableLocked(tableID)
tids = append(tids, tableID)
taskMap[tableID] = append(taskMap[tableID], task)
}

// Check the locked tables in one transaction.
lockedStatuses, err := statsHandle.QueryTablesLockedStatuses(tids...)
if err != nil {
return nil, 0, nil, err
}

for tid, isLocked := range lockedStatuses {
if !isLocked {
filteredTasks = append(filteredTasks, task)
}
if _, ok := tids[tableID]; !ok {
if isLocked {
tbl, ok := infoSchema.TableByID(tableID)
if !ok {
// Ignore this table because it may have been dropped.
logutil.BgLogger().Warn("Unknown table ID in analyze task", zap.Int64("tid", tableID))
} else {
skippedTables = append(skippedTables, tbl.Meta().Name.L)
}
filteredTasks = append(filteredTasks, taskMap[tid]...)
needAnalyzeTableCnt++
} else {
tbl, ok := infoSchema.TableByID(tid)
if !ok {
// Ignore this table because it may have been dropped.
logutil.BgLogger().Warn("Unknown table ID in analyze task", zap.Int64("tid", tid))
} else {
needAnalyzeTableCnt++
skippedTables = append(skippedTables, tbl.Meta().Name.L)
}
tids[tableID] = struct{}{}
}
}

return filteredTasks, needAnalyzeTableCnt, skippedTables
return filteredTasks, needAnalyzeTableCnt, skippedTables, nil
}

// warnLockedTableMsg warns the locked table IDs.
Expand Down
38 changes: 29 additions & 9 deletions executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ func (e *ShowExec) fetchShowStatsLocked() error {
do := domain.GetDomain(e.Ctx())
h := do.StatsHandle()
dbs := do.InfoSchema().AllSchemas()

type LockedTableInfo struct {
dbName string
tblName string
partitionName string
}
tableInfo := make(map[int64]*LockedTableInfo)

for _, db := range dbs {
for _, tbl := range db.Tables {
pi := tbl.GetPartitionInfo()
Expand All @@ -168,25 +176,37 @@ func (e *ShowExec) fetchShowStatsLocked() error {
if pi != nil {
partitionName = "global"
}
if h.IsTableLocked(tbl.ID) {
e.appendTableForStatsLocked(db.Name.O, tbl.Name.O, partitionName)
}
tableInfo[tbl.ID] = &LockedTableInfo{db.Name.O, tbl.Name.O, partitionName}
if pi != nil {
for _, def := range pi.Definitions {
if h.IsTableLocked(def.ID) {
e.appendTableForStatsLocked(db.Name.O, tbl.Name.O, def.Name.O)
}
tableInfo[def.ID] = &LockedTableInfo{db.Name.O, tbl.Name.O, def.Name.O}
}
}
} else {
for _, def := range pi.Definitions {
if h.IsTableLocked(def.ID) {
e.appendTableForStatsLocked(db.Name.O, tbl.Name.O, def.Name.O)
}
tableInfo[def.ID] = &LockedTableInfo{db.Name.O, tbl.Name.O, def.Name.O}
}
}
}
}

tids := make([]int64, 0, len(tableInfo))
for tid := range tableInfo {
tids = append(tids, tid)
}

lockedStatuses, err := h.QueryTablesLockedStatuses(tids...)
if err != nil {
return err
}

for tid, locked := range lockedStatuses {
if locked {
info := tableInfo[tid]
e.appendTableForStatsLocked(info.dbName, info.tblName, info.partitionName)
}
}

return nil
}

Expand Down
5 changes: 2 additions & 3 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
"handle_hist.go",
"historical_stats_handler.go",
"lock_stats_handler.go",
"unlock_stats_handler.go",
"update.go",
],
importpath = "github.com/pingcap/tidb/statistics/handle",
Expand All @@ -32,6 +31,7 @@ go_library(
"//sessiontxn",
"//statistics",
"//statistics/handle/cache",
"//statistics/handle/lockstats",
"//statistics/handle/metrics",
"//table",
"//types",
Expand Down Expand Up @@ -61,14 +61,13 @@ go_test(
"dump_test.go",
"gc_test.go",
"handle_hist_test.go",
"lock_stats_handler_test.go",
"main_test.go",
"update_list_test.go",
],
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 29,
shard_count = 26,
deps = [
"//config",
"//domain",
Expand Down
3 changes: 0 additions & 3 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,6 @@ type Handle struct {
sync.Mutex
}

// tableLocked used to store locked tables
tableLocked []int64

// StatsLoad is used to load stats concurrently
StatsLoad StatsLoad

Expand Down
20 changes: 8 additions & 12 deletions statistics/handle/handletest/statslock/stats_lcok_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ func TestStatsLockAndUnlockTable(t *testing.T) {
tblStats1 := handle.GetTableStats(tbl.Meta())
require.Equal(t, tblStats, tblStats1)

tableLocked1 := handle.GetTableLockedAndClearForTest()
err = handle.LoadLockedTables()
lockedTables, err := handle.GetTableLockedAndClearForTest()
require.Nil(t, err)
tableLocked2 := handle.GetTableLockedAndClearForTest()
require.Equal(t, tableLocked1, tableLocked2)
require.Equal(t, 1, len(lockedTables))

tk.MustExec("unlock stats t")
rows = tk.MustQuery("select count(*) from mysql.stats_table_locked").Rows()
Expand Down Expand Up @@ -111,16 +109,16 @@ func TestStatsLockTableAndUnlockTableRepeatedly(t *testing.T) {
require.Equal(t, tblStats, tblStats1)

// Lock the table again and check the warning.
tableLocked1 := handle.GetTableLockedAndClearForTest()
lockedTables1, err := handle.GetTableLockedAndClearForTest()
require.Nil(t, err)
tk.MustExec("lock stats t")
tk.MustQuery("show warnings").Check(testkit.Rows(
"Warning 1105 skip locking locked table: test.t",
))

err = handle.LoadLockedTables()
lockedTables2, err := handle.GetTableLockedAndClearForTest()
require.Nil(t, err)
tableLocked2 := handle.GetTableLockedAndClearForTest()
require.Equal(t, tableLocked1, tableLocked2)
require.Equal(t, lockedTables1, lockedTables2)

// Unlock the table.
tk.MustExec("unlock stats t")
Expand Down Expand Up @@ -191,11 +189,9 @@ func TestStatsLockAndUnlockTables(t *testing.T) {
tbl2Stats1 := handle.GetTableStats(tbl2.Meta())
require.Equal(t, tbl2Stats, tbl2Stats1)

tableLocked1 := handle.GetTableLockedAndClearForTest()
err = handle.LoadLockedTables()
lockedTables, err := handle.GetTableLockedAndClearForTest()
require.Nil(t, err)
tableLocked2 := handle.GetTableLockedAndClearForTest()
require.Equal(t, tableLocked1, tableLocked2)
require.Equal(t, 2, len(lockedTables))

tk.MustExec("unlock stats test.t1, test.t2")
rows = tk.MustQuery("select count(*) from mysql.stats_table_locked").Rows()
Expand Down
Loading

0 comments on commit 93dce26

Please sign in to comment.