Skip to content

Commit

Permalink
statistics: split lock stats handler and rewrite the insert SQL (#46403)
Browse files Browse the repository at this point in the history
ref #46351
  • Loading branch information
Rustin170506 authored Aug 29, 2023
1 parent 40c4a16 commit 603a15f
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 280 deletions.
4 changes: 2 additions & 2 deletions executor/historical_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jsTable)
// only has p0 stats
require.NotNil(t, jsTable.Partitions["p0"])
require.Nil(t, jsTable.Partitions["global"])
require.Nil(t, jsTable.Partitions[handle.TiDBGlobalStats])

// change static to dynamic then assert
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
Expand All @@ -365,7 +365,7 @@ PARTITION p0 VALUES LESS THAN (6)
require.NotNil(t, jsTable)
// has both global and p0 stats
require.NotNil(t, jsTable.Partitions["p0"])
require.NotNil(t, jsTable.Partitions["global"])
require.NotNil(t, jsTable.Partitions[handle.TiDBGlobalStats])
}

func TestDumpHistoricalStatsFallback(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions executor/lockstats/lock_stats_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error {
return err
}

msg, err := h.AddLockedTables(tids, pids, e.Tables)
sv := e.Ctx().GetSessionVars()

msg, err := h.AddLockedTables(tids, pids, e.Tables, sv.MaxChunkSize)
if err != nil {
return err
}
if msg != "" {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg))
sv.StmtCtx.AppendWarning(errors.New(msg))
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion server/handler/optimizor/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func testDumpPartitionTableStats(t *testing.T, client *testserverclient.TestServ
jsonTable := &handle.JSONTable{}
err = json.Unmarshal(b, jsonTable)
require.NoError(t, err)
require.NotNil(t, jsonTable.Partitions["global"])
require.NotNil(t, jsonTable.Partitions[handle.TiDBGlobalStats])
require.Len(t, jsonTable.Partitions, expectedLen)
}
check(false)
Expand Down
4 changes: 3 additions & 1 deletion statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"handle.go",
"handle_hist.go",
"historical_stats_handler.go",
"lock_stats_handler.go",
"update.go",
],
importpath = "github.com/pingcap/tidb/statistics/handle",
Expand Down Expand Up @@ -59,13 +60,14 @@ go_test(
"dump_test.go",
"gc_test.go",
"handle_hist_test.go",
"lock_stats_handler_test.go",
"main_test.go",
"update_list_test.go",
],
embed = [":handle"],
flaky = True,
race = "on",
shard_count = 26,
shard_count = 27,
deps = [
"//config",
"//domain",
Expand Down
6 changes: 3 additions & 3 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (h *Handle) DumpHistoricalStatsBySnapshot(
}
// dump its global-stats if existed
if tbl != nil {
jsonTbl.Partitions["global"] = tbl
jsonTbl.Partitions[TiDBGlobalStats] = tbl
}
return jsonTbl, fallbackTbls, nil
}
Expand Down Expand Up @@ -233,7 +233,7 @@ func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.Table
return nil, errors.Trace(err)
}
if tbl != nil {
jsonTbl.Partitions["global"] = tbl
jsonTbl.Partitions[TiDBGlobalStats] = tbl
}
return jsonTbl, nil
}
Expand Down Expand Up @@ -396,7 +396,7 @@ func (h *Handle) LoadStatsFromJSON(is infoschema.InfoSchema, jsonTbl *JSONTable)
}
}
// load global-stats if existed
if globalStats, ok := jsonTbl.Partitions["global"]; ok {
if globalStats, ok := jsonTbl.Partitions[TiDBGlobalStats]; ok {
if err := h.loadStatsFromJSON(tableInfo, tableInfo.ID, globalStats); err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,15 @@ func TestDumpGlobalStats(t *testing.T) {
stats := getStatsJSON(t, dom, "test", "t")
require.NotNil(t, stats.Partitions["p0"])
require.NotNil(t, stats.Partitions["p1"])
require.Nil(t, stats.Partitions["global"])
require.Nil(t, stats.Partitions[handle.TiDBGlobalStats])

// global-stats is existed
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustExec("analyze table t")
stats = getStatsJSON(t, dom, "test", "t")
require.NotNil(t, stats.Partitions["p0"])
require.NotNil(t, stats.Partitions["p1"])
require.NotNil(t, stats.Partitions["global"])
require.NotNil(t, stats.Partitions[handle.TiDBGlobalStats])
}

func TestLoadGlobalStats(t *testing.T) {
Expand Down
269 changes: 0 additions & 269 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
const (
// TiDBGlobalStats represents the global-stats for a partitioned table.
TiDBGlobalStats = "global"

// MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
MaxPartitionMergeBatchSize = 256
)
Expand Down Expand Up @@ -127,274 +126,6 @@ type Handle struct {
lease atomic2.Duration
}

// GetTableLockedAndClearForTest for unit test only
func (h *Handle) GetTableLockedAndClearForTest() []int64 {
tableLocked := h.tableLocked
h.tableLocked = make([]int64, 0)
return tableLocked
}

// LoadLockedTables load locked tables from store
func (h *Handle) LoadLockedTables() error {
h.mu.Lock()
defer h.mu.Unlock()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked")
if err != nil {
return errors.Trace(err)
}

h.tableLocked = make([]int64, len(rows))
for i, row := range rows {
h.tableLocked[i] = row.GetInt64(0)
}

return nil
}

// AddLockedTables add locked tables id to store
func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
h.mu.Lock()
defer h.mu.Unlock()

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)

exec := h.mu.ctx.(sqlexec.SQLExecutor)

_, err := exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return "", err
}

//load tables to check duplicate when insert
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked")
if err != nil {
return "", err
}

dupTables := make([]string, 0)
tableLocked := make([]int64, 0)
for _, row := range rows {
tableLocked = append(tableLocked, row.GetInt64(0))
}

strTids := fmt.Sprintf("%v", tids)
logutil.BgLogger().Info("lock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids))
for i, tid := range tids {
_, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err))
return "", err
}
// update handle
if !isTableLocked(tableLocked, tid) {
tableLocked = append(tableLocked, tid)
} else {
dupTables = append(dupTables, tables[i].Schema.L+"."+tables[i].Name.L)
}
}

//insert related partitions while don't warning duplicate partitions
for _, tid := range pids {
_, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err))
return "", err
}
if !isTableLocked(tableLocked, tid) {
tableLocked = append(tableLocked, tid)
}
}

err = finishTransaction(ctx, exec, err)
if err != nil {
return "", err
}
// update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated
h.tableLocked = tableLocked

if len(dupTables) > 0 {
tables := dupTables[0]
for i, table := range dupTables {
if i == 0 {
continue
}
tables += ", " + table
}
var msg string
if len(tids) > 1 {
if len(tids) > len(dupTables) {
msg = "skip locking locked tables: " + tables + ", other tables locked successfully"
} else {
msg = "skip locking locked tables: " + tables
}
} else {
msg = "skip locking locked table: " + tables
}
return msg, err
}
return "", err
}

// getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked.
func (h *Handle) getStatsDeltaFromTableLocked(ctx context.Context, tableID int64) (count, modifyCount int64, version uint64, err error) {
rows, _, err := h.execRestrictedSQL(ctx, "select count, modify_count, version from mysql.stats_table_locked where table_id = %?", tableID)
if err != nil {
return 0, 0, 0, err
}

if len(rows) == 0 {
return 0, 0, 0, nil
}
count = rows[0].GetInt64(0)
modifyCount = rows[0].GetInt64(1)
version = rows[0].GetUint64(2)
return count, modifyCount, version, nil
}

// RemoveLockedTables remove tables from table locked array
func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
h.mu.Lock()
defer h.mu.Unlock()

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)

exec := h.mu.ctx.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, "begin pessimistic")
if err != nil {
return "", err
}

//load tables to check unlock the unlock table
rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked")
if err != nil {
return "", err
}

nonlockedTables := make([]string, 0)
tableLocked := make([]int64, 0)
for _, row := range rows {
tableLocked = append(tableLocked, row.GetInt64(0))
}

strTids := fmt.Sprintf("%v", tids)
logutil.BgLogger().Info("unlock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids))
for i, tid := range tids {
// get stats delta during table locked
count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err))
return "", err
}
// update stats_meta with stats delta
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err))
return "", err
}
cache.TableRowStatsCache.Invalidate(tid)

_, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid)
if err != nil {
logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err))
return "", err
}
var exist bool
exist, tableLocked = removeIfTableLocked(tableLocked, tid)
if !exist {
nonlockedTables = append(nonlockedTables, tables[i].Schema.L+"."+tables[i].Name.L)
}
}
//delete related partitions while don't warning delete empty partitions
for _, tid := range pids {
// get stats delta during table locked
count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err))
return "", err
}
// update stats_meta with stats delta
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid)
if err != nil {
logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err))
return "", err
}
cache.TableRowStatsCache.Invalidate(tid)

_, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid)
if err != nil {
logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err))
return "", err
}
_, tableLocked = removeIfTableLocked(tableLocked, tid)
}

err = finishTransaction(ctx, exec, err)
if err != nil {
return "", err
}
// update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated
h.tableLocked = tableLocked

if len(nonlockedTables) > 0 {
tables := nonlockedTables[0]
for i, table := range nonlockedTables {
if i == 0 {
continue
}
tables += ", " + table
}
var msg string
if len(tids) > 1 {
if len(tids) > len(nonlockedTables) {
msg = "skip unlocking non-locked tables: " + tables + ", other tables unlocked successfully"
} else {
msg = "skip unlocking non-locked tables: " + tables
}
} else {
msg = "skip unlocking non-locked table: " + tables
}
return msg, err
}
return "", err
}

// IsTableLocked check whether table is locked in handle with Handle.Mutex
func (h *Handle) IsTableLocked(tableID int64) bool {
h.mu.RLock()
defer h.mu.RUnlock()
return h.isTableLocked(tableID)
}

// IsTableLocked check whether table is locked in handle without Handle.Mutex
func (h *Handle) isTableLocked(tableID int64) bool {
return isTableLocked(h.tableLocked, tableID)
}

// isTableLocked check whether table is locked
func isTableLocked(tableLocked []int64, tableID int64) bool {
return lockTableIndexOf(tableLocked, tableID) > -1
}

// lockTableIndexOf get the locked table's index in the array
func lockTableIndexOf(tableLocked []int64, tableID int64) int {
for idx, id := range tableLocked {
if id == tableID {
return idx
}
}
return -1
}

// removeIfTableLocked try to remove the table from table locked array
func removeIfTableLocked(tableLocked []int64, tableID int64) (bool, []int64) {
idx := lockTableIndexOf(tableLocked, tableID)
if idx > -1 {
tableLocked = append(tableLocked[:idx], tableLocked[idx+1:]...)
}
return idx > -1, tableLocked
}

func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) {
se, err := h.pool.Get()
if err != nil {
Expand Down
Loading

0 comments on commit 603a15f

Please sign in to comment.