From 603a15f7297a7e4901925b22de96f342c4c48536 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Tue, 29 Aug 2023 15:41:08 +0800 Subject: [PATCH] statistics: split lock stats handler and rewrite the insert SQL (#46403) ref pingcap/tidb#46351 --- executor/historical_stats_test.go | 4 +- executor/lockstats/lock_stats_executor.go | 6 +- .../optimizor/statistics_handler_test.go | 2 +- statistics/handle/BUILD.bazel | 4 +- statistics/handle/dump.go | 6 +- statistics/handle/dump_test.go | 4 +- statistics/handle/handle.go | 269 --------------- .../handle/handletest/statslock/BUILD.bazel | 1 + .../handletest/statslock/stats_lcok_test.go | 47 +++ statistics/handle/lock_stats_handler.go | 310 ++++++++++++++++++ statistics/handle/lock_stats_handler_test.go | 61 ++++ 11 files changed, 434 insertions(+), 280 deletions(-) create mode 100644 statistics/handle/lock_stats_handler.go create mode 100644 statistics/handle/lock_stats_handler_test.go diff --git a/executor/historical_stats_test.go b/executor/historical_stats_test.go index 973ecac880083..7288a8958b062 100644 --- a/executor/historical_stats_test.go +++ b/executor/historical_stats_test.go @@ -343,7 +343,7 @@ PARTITION p0 VALUES LESS THAN (6) require.NotNil(t, jsTable) // only has p0 stats require.NotNil(t, jsTable.Partitions["p0"]) - require.Nil(t, jsTable.Partitions["global"]) + require.Nil(t, jsTable.Partitions[handle.TiDBGlobalStats]) // change static to dynamic then assert tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") @@ -365,7 +365,7 @@ PARTITION p0 VALUES LESS THAN (6) require.NotNil(t, jsTable) // has both global and p0 stats require.NotNil(t, jsTable.Partitions["p0"]) - require.NotNil(t, jsTable.Partitions["global"]) + require.NotNil(t, jsTable.Partitions[handle.TiDBGlobalStats]) } func TestDumpHistoricalStatsFallback(t *testing.T) { diff --git a/executor/lockstats/lock_stats_executor.go b/executor/lockstats/lock_stats_executor.go index 33a71f4d5eb3f..465fbea4e01e6 100644 --- a/executor/lockstats/lock_stats_executor.go +++ b/executor/lockstats/lock_stats_executor.go @@ -50,12 +50,14 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error { return err } - msg, err := h.AddLockedTables(tids, pids, e.Tables) + sv := e.Ctx().GetSessionVars() + + msg, err := h.AddLockedTables(tids, pids, e.Tables, sv.MaxChunkSize) if err != nil { return err } if msg != "" { - e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg)) + sv.StmtCtx.AppendWarning(errors.New(msg)) } return nil diff --git a/server/handler/optimizor/statistics_handler_test.go b/server/handler/optimizor/statistics_handler_test.go index 7b1966dfd6131..f9cdf89d2e0d5 100644 --- a/server/handler/optimizor/statistics_handler_test.go +++ b/server/handler/optimizor/statistics_handler_test.go @@ -180,7 +180,7 @@ func testDumpPartitionTableStats(t *testing.T, client *testserverclient.TestServ jsonTable := &handle.JSONTable{} err = json.Unmarshal(b, jsonTable) require.NoError(t, err) - require.NotNil(t, jsonTable.Partitions["global"]) + require.NotNil(t, jsonTable.Partitions[handle.TiDBGlobalStats]) require.Len(t, jsonTable.Partitions, expectedLen) } check(false) diff --git a/statistics/handle/BUILD.bazel b/statistics/handle/BUILD.bazel index d466b12824434..629992f665153 100644 --- a/statistics/handle/BUILD.bazel +++ b/statistics/handle/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "handle.go", "handle_hist.go", "historical_stats_handler.go", + "lock_stats_handler.go", "update.go", ], importpath = "github.com/pingcap/tidb/statistics/handle", @@ -59,13 +60,14 @@ go_test( "dump_test.go", "gc_test.go", "handle_hist_test.go", + "lock_stats_handler_test.go", "main_test.go", "update_list_test.go", ], embed = [":handle"], flaky = True, race = "on", - shard_count = 26, + shard_count = 27, deps = [ "//config", "//domain", diff --git a/statistics/handle/dump.go b/statistics/handle/dump.go index 8340ed18d6073..56bd83c9cb626 100644 --- a/statistics/handle/dump.go +++ b/statistics/handle/dump.go @@ -195,7 +195,7 @@ func (h *Handle) DumpHistoricalStatsBySnapshot( } // dump its global-stats if existed if tbl != nil { - jsonTbl.Partitions["global"] = tbl + jsonTbl.Partitions[TiDBGlobalStats] = tbl } return jsonTbl, fallbackTbls, nil } @@ -233,7 +233,7 @@ func (h *Handle) DumpStatsToJSONBySnapshot(dbName string, tableInfo *model.Table return nil, errors.Trace(err) } if tbl != nil { - jsonTbl.Partitions["global"] = tbl + jsonTbl.Partitions[TiDBGlobalStats] = tbl } return jsonTbl, nil } @@ -396,7 +396,7 @@ func (h *Handle) LoadStatsFromJSON(is infoschema.InfoSchema, jsonTbl *JSONTable) } } // load global-stats if existed - if globalStats, ok := jsonTbl.Partitions["global"]; ok { + if globalStats, ok := jsonTbl.Partitions[TiDBGlobalStats]; ok { if err := h.loadStatsFromJSON(tableInfo, tableInfo.ID, globalStats); err != nil { return errors.Trace(err) } diff --git a/statistics/handle/dump_test.go b/statistics/handle/dump_test.go index bd7d44de26e69..bd56020d6591d 100644 --- a/statistics/handle/dump_test.go +++ b/statistics/handle/dump_test.go @@ -137,7 +137,7 @@ func TestDumpGlobalStats(t *testing.T) { stats := getStatsJSON(t, dom, "test", "t") require.NotNil(t, stats.Partitions["p0"]) require.NotNil(t, stats.Partitions["p1"]) - require.Nil(t, stats.Partitions["global"]) + require.Nil(t, stats.Partitions[handle.TiDBGlobalStats]) // global-stats is existed tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") @@ -145,7 +145,7 @@ func TestDumpGlobalStats(t *testing.T) { stats = getStatsJSON(t, dom, "test", "t") require.NotNil(t, stats.Partitions["p0"]) require.NotNil(t, stats.Partitions["p1"]) - require.NotNil(t, stats.Partitions["global"]) + require.NotNil(t, stats.Partitions[handle.TiDBGlobalStats]) } func TestLoadGlobalStats(t *testing.T) { diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 9582bf0fbd7ee..5c994ef261cab 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -56,7 +56,6 @@ import ( const ( // TiDBGlobalStats represents the global-stats for a partitioned table. TiDBGlobalStats = "global" - // MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats MaxPartitionMergeBatchSize = 256 ) @@ -127,274 +126,6 @@ type Handle struct { lease atomic2.Duration } -// GetTableLockedAndClearForTest for unit test only -func (h *Handle) GetTableLockedAndClearForTest() []int64 { - tableLocked := h.tableLocked - h.tableLocked = make([]int64, 0) - return tableLocked -} - -// LoadLockedTables load locked tables from store -func (h *Handle) LoadLockedTables() error { - h.mu.Lock() - defer h.mu.Unlock() - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") - if err != nil { - return errors.Trace(err) - } - - h.tableLocked = make([]int64, len(rows)) - for i, row := range rows { - h.tableLocked[i] = row.GetInt64(0) - } - - return nil -} - -// AddLockedTables add locked tables id to store -func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { - h.mu.Lock() - defer h.mu.Unlock() - - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - - exec := h.mu.ctx.(sqlexec.SQLExecutor) - - _, err := exec.ExecuteInternal(ctx, "begin pessimistic") - if err != nil { - return "", err - } - - //load tables to check duplicate when insert - rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") - if err != nil { - return "", err - } - - dupTables := make([]string, 0) - tableLocked := make([]int64, 0) - for _, row := range rows { - tableLocked = append(tableLocked, row.GetInt64(0)) - } - - strTids := fmt.Sprintf("%v", tids) - logutil.BgLogger().Info("lock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids)) - for i, tid := range tids { - _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - // update handle - if !isTableLocked(tableLocked, tid) { - tableLocked = append(tableLocked, tid) - } else { - dupTables = append(dupTables, tables[i].Schema.L+"."+tables[i].Name.L) - } - } - - //insert related partitions while don't warning duplicate partitions - for _, tid := range pids { - _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - if !isTableLocked(tableLocked, tid) { - tableLocked = append(tableLocked, tid) - } - } - - err = finishTransaction(ctx, exec, err) - if err != nil { - return "", err - } - // update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated - h.tableLocked = tableLocked - - if len(dupTables) > 0 { - tables := dupTables[0] - for i, table := range dupTables { - if i == 0 { - continue - } - tables += ", " + table - } - var msg string - if len(tids) > 1 { - if len(tids) > len(dupTables) { - msg = "skip locking locked tables: " + tables + ", other tables locked successfully" - } else { - msg = "skip locking locked tables: " + tables - } - } else { - msg = "skip locking locked table: " + tables - } - return msg, err - } - return "", err -} - -// getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked. -func (h *Handle) getStatsDeltaFromTableLocked(ctx context.Context, tableID int64) (count, modifyCount int64, version uint64, err error) { - rows, _, err := h.execRestrictedSQL(ctx, "select count, modify_count, version from mysql.stats_table_locked where table_id = %?", tableID) - if err != nil { - return 0, 0, 0, err - } - - if len(rows) == 0 { - return 0, 0, 0, nil - } - count = rows[0].GetInt64(0) - modifyCount = rows[0].GetInt64(1) - version = rows[0].GetUint64(2) - return count, modifyCount, version, nil -} - -// RemoveLockedTables remove tables from table locked array -func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { - h.mu.Lock() - defer h.mu.Unlock() - - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - - exec := h.mu.ctx.(sqlexec.SQLExecutor) - _, err := exec.ExecuteInternal(ctx, "begin pessimistic") - if err != nil { - return "", err - } - - //load tables to check unlock the unlock table - rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") - if err != nil { - return "", err - } - - nonlockedTables := make([]string, 0) - tableLocked := make([]int64, 0) - for _, row := range rows { - tableLocked = append(tableLocked, row.GetInt64(0)) - } - - strTids := fmt.Sprintf("%v", tids) - logutil.BgLogger().Info("unlock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids)) - for i, tid := range tids { - // get stats delta during table locked - count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - // update stats_meta with stats delta - _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - cache.TableRowStatsCache.Invalidate(tid) - - _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) - if err != nil { - logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - var exist bool - exist, tableLocked = removeIfTableLocked(tableLocked, tid) - if !exist { - nonlockedTables = append(nonlockedTables, tables[i].Schema.L+"."+tables[i].Name.L) - } - } - //delete related partitions while don't warning delete empty partitions - for _, tid := range pids { - // get stats delta during table locked - count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - // update stats_meta with stats delta - _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) - if err != nil { - logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - cache.TableRowStatsCache.Invalidate(tid) - - _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) - if err != nil { - logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) - return "", err - } - _, tableLocked = removeIfTableLocked(tableLocked, tid) - } - - err = finishTransaction(ctx, exec, err) - if err != nil { - return "", err - } - // update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated - h.tableLocked = tableLocked - - if len(nonlockedTables) > 0 { - tables := nonlockedTables[0] - for i, table := range nonlockedTables { - if i == 0 { - continue - } - tables += ", " + table - } - var msg string - if len(tids) > 1 { - if len(tids) > len(nonlockedTables) { - msg = "skip unlocking non-locked tables: " + tables + ", other tables unlocked successfully" - } else { - msg = "skip unlocking non-locked tables: " + tables - } - } else { - msg = "skip unlocking non-locked table: " + tables - } - return msg, err - } - return "", err -} - -// IsTableLocked check whether table is locked in handle with Handle.Mutex -func (h *Handle) IsTableLocked(tableID int64) bool { - h.mu.RLock() - defer h.mu.RUnlock() - return h.isTableLocked(tableID) -} - -// IsTableLocked check whether table is locked in handle without Handle.Mutex -func (h *Handle) isTableLocked(tableID int64) bool { - return isTableLocked(h.tableLocked, tableID) -} - -// isTableLocked check whether table is locked -func isTableLocked(tableLocked []int64, tableID int64) bool { - return lockTableIndexOf(tableLocked, tableID) > -1 -} - -// lockTableIndexOf get the locked table's index in the array -func lockTableIndexOf(tableLocked []int64, tableID int64) int { - for idx, id := range tableLocked { - if id == tableID { - return idx - } - } - return -1 -} - -// removeIfTableLocked try to remove the table from table locked array -func removeIfTableLocked(tableLocked []int64, tableID int64) (bool, []int64) { - idx := lockTableIndexOf(tableLocked, tableID) - if idx > -1 { - tableLocked = append(tableLocked[:idx], tableLocked[idx+1:]...) - } - return idx > -1, tableLocked -} - func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) { se, err := h.pool.Get() if err != nil { diff --git a/statistics/handle/handletest/statslock/BUILD.bazel b/statistics/handle/handletest/statslock/BUILD.bazel index 78bea0e87a71c..92cdb776a0634 100644 --- a/statistics/handle/handletest/statslock/BUILD.bazel +++ b/statistics/handle/handletest/statslock/BUILD.bazel @@ -8,6 +8,7 @@ go_test( "stats_lcok_test.go", ], flaky = True, + shard_count = 3, deps = [ "//config", "//domain", diff --git a/statistics/handle/handletest/statslock/stats_lcok_test.go b/statistics/handle/handletest/statslock/stats_lcok_test.go index 366f9d1ad3779..1774b125e176f 100644 --- a/statistics/handle/handletest/statslock/stats_lcok_test.go +++ b/statistics/handle/handletest/statslock/stats_lcok_test.go @@ -76,6 +76,53 @@ func TestStatsLockAndUnlockTable(t *testing.T) { require.Equal(t, int64(2), tblStats2.RealtimeCount) } +func TestStatsLockTableRepeatedly(t *testing.T) { + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.EnableStatsCacheMemQuota = true + }) + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10), index idx_b (b))") + tk.MustExec("analyze table test.t") + tbl, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.Nil(t, err) + + handle := domain.GetDomain(tk.Session()).StatsHandle() + tblStats := handle.GetTableStats(tbl.Meta()) + for _, col := range tblStats.Columns { + require.True(t, col.IsStatsInitialized()) + } + tk.MustExec("lock stats t") + + rows := tk.MustQuery("select count(*) from mysql.stats_table_locked").Rows() + num, _ := strconv.Atoi(rows[0][0].(string)) + require.Equal(t, num, 1) + + tk.MustExec("insert into t(a, b) values(1,'a')") + tk.MustExec("insert into t(a, b) values(2,'b')") + + tk.MustExec("analyze table test.t") + tblStats1 := handle.GetTableStats(tbl.Meta()) + require.Equal(t, tblStats, tblStats1) + + // Lock the table again and check the warning. + tableLocked1 := handle.GetTableLockedAndClearForTest() + tk.MustExec("lock stats t") + tk.MustQuery("show warnings").Sort().Check(testkit.Rows( + "Warning 1105 skip locking locked table: test.t", + )) + + err = handle.LoadLockedTables() + require.Nil(t, err) + tableLocked2 := handle.GetTableLockedAndClearForTest() + require.Equal(t, tableLocked1, tableLocked2) +} + func TestStatsLockAndUnlockTables(t *testing.T) { restore := config.RestoreFunc() defer restore() diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go new file mode 100644 index 0000000000000..4db1b60b46ff1 --- /dev/null +++ b/statistics/handle/lock_stats_handler.go @@ -0,0 +1,310 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle + +import ( + "context" + "fmt" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/statistics/handle/cache" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" +) + +// Stats logger. +var statsLogger = logutil.BgLogger().With(zap.String("category", "stats")) + +// AddLockedTables add locked tables id to store. +func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName, maxChunkSize int) (string, error) { + h.mu.Lock() + defer h.mu.Unlock() + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + exec := h.mu.ctx.(sqlexec.SQLExecutor) + + _, err := exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC") + if err != nil { + return "", err + } + + // Load tables to check duplicate before insert. + recordSet, err := exec.ExecuteInternal(ctx, "SELECT table_id FROM mysql.stats_table_locked") + if err != nil { + return "", err + } + rows, err := sqlexec.DrainRecordSet(ctx, recordSet, maxChunkSize) + if err != nil { + return "", err + } + + dupTables := make([]string, 0, len(tables)) + tableLocked := make([]int64, 0, len(rows)) + for _, row := range rows { + tableLocked = append(tableLocked, row.GetInt64(0)) + } + + statsLogger.Info("lock table", zap.Int64s("tableIDs", tids)) + + // Insert locked tables. + for i, tid := range tids { + if !isTableLocked(tableLocked, tid) { + if err := insertIntoStatsTableLocked(ctx, exec, tid); err != nil { + return "", err + } + tableLocked = append(tableLocked, tid) + } else { + dupTables = append(dupTables, tables[i].Schema.L+"."+tables[i].Name.L) + } + } + + // Insert related partitions while don't warning duplicate partitions. + for _, tid := range pids { + if !isTableLocked(tableLocked, tid) { + if err := insertIntoStatsTableLocked(ctx, exec, tid); err != nil { + return "", err + } + tableLocked = append(tableLocked, tid) + } + } + + // Commit transaction. + err = finishTransaction(ctx, exec, err) + if err != nil { + return "", err + } + + // Update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated. + h.tableLocked = tableLocked + + mag := generateDuplicateTablesMessage(tids, dupTables) + return mag, nil +} + +func generateDuplicateTablesMessage(tids []int64, dupTables []string) string { + if len(dupTables) > 0 { + tables := strings.Join(dupTables, ", ") + var msg string + if len(tids) > 1 { + if len(tids) > len(dupTables) { + msg = "skip locking locked tables: " + tables + ", other tables locked successfully" + } else { + msg = "skip locking locked tables: " + tables + } + } else { + msg = "skip locking locked table: " + tables + } + return msg + } + + return "" +} + +func insertIntoStatsTableLocked(ctx context.Context, exec sqlexec.SQLExecutor, tid int64) error { + _, err := exec.ExecuteInternal(ctx, "INSERT INTO mysql.stats_table_locked (table_id) VALUES (%?) ON DUPLICATE KEY UPDATE table_id = %?", tid, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked", zap.String("category", "stats"), zap.Error(err)) + return err + } + return nil +} + +// RemoveLockedTables remove tables from table locked array +func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { + h.mu.Lock() + defer h.mu.Unlock() + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err := exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return "", err + } + + //load tables to check unlock the unlock table + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") + if err != nil { + return "", err + } + + nonlockedTables := make([]string, 0) + tableLocked := make([]int64, 0) + for _, row := range rows { + tableLocked = append(tableLocked, row.GetInt64(0)) + } + + strTids := fmt.Sprintf("%v", tids) + logutil.BgLogger().Info("unlock table ", zap.String("category", "stats"), zap.String("tableIDs", strTids)) + for i, tid := range tids { + // get stats delta during table locked + count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + // update stats_meta with stats delta + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + cache.TableRowStatsCache.Invalidate(tid) + + _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) + if err != nil { + logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + var exist bool + exist, tableLocked = removeIfTableLocked(tableLocked, tid) + if !exist { + nonlockedTables = append(nonlockedTables, tables[i].Schema.L+"."+tables[i].Name.L) + } + } + //delete related partitions while don't warning delete empty partitions + for _, tid := range pids { + // get stats delta during table locked + count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when getStatsDeltaFromTableLocked", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + // update stats_meta with stats delta + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) + if err != nil { + logutil.BgLogger().Error("error occurred when update mysql.stats_meta", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + cache.TableRowStatsCache.Invalidate(tid) + + _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) + if err != nil { + logutil.BgLogger().Error("error occurred when delete from mysql.stats_table_locked ", zap.String("category", "stats"), zap.Error(err)) + return "", err + } + _, tableLocked = removeIfTableLocked(tableLocked, tid) + } + + err = finishTransaction(ctx, exec, err) + if err != nil { + return "", err + } + // update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated + h.tableLocked = tableLocked + + if len(nonlockedTables) > 0 { + tables := nonlockedTables[0] + for i, table := range nonlockedTables { + if i == 0 { + continue + } + tables += ", " + table + } + var msg string + if len(tids) > 1 { + if len(tids) > len(nonlockedTables) { + msg = "skip unlocking non-locked tables: " + tables + ", other tables unlocked successfully" + } else { + msg = "skip unlocking non-locked tables: " + tables + } + } else { + msg = "skip unlocking non-locked table: " + tables + } + return msg, err + } + return "", err +} + +// getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked. +func (h *Handle) getStatsDeltaFromTableLocked(ctx context.Context, tableID int64) (count, modifyCount int64, version uint64, err error) { + rows, _, err := h.execRestrictedSQL(ctx, "select count, modify_count, version from mysql.stats_table_locked where table_id = %?", tableID) + if err != nil { + return 0, 0, 0, err + } + + if len(rows) == 0 { + return 0, 0, 0, nil + } + count = rows[0].GetInt64(0) + modifyCount = rows[0].GetInt64(1) + version = rows[0].GetUint64(2) + return count, modifyCount, version, nil +} + +// LoadLockedTables load locked tables from store +func (h *Handle) LoadLockedTables() error { + h.mu.Lock() + defer h.mu.Unlock() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") + if err != nil { + return errors.Trace(err) + } + + h.tableLocked = make([]int64, len(rows)) + for i, row := range rows { + h.tableLocked[i] = row.GetInt64(0) + } + + return nil +} + +// removeIfTableLocked try to remove the table from table locked array +func removeIfTableLocked(tableLocked []int64, tableID int64) (bool, []int64) { + idx := lockTableIndexOf(tableLocked, tableID) + if idx > -1 { + tableLocked = append(tableLocked[:idx], tableLocked[idx+1:]...) + } + return idx > -1, tableLocked +} + +// IsTableLocked check whether table is locked in handle with Handle.Mutex +func (h *Handle) IsTableLocked(tableID int64) bool { + h.mu.RLock() + defer h.mu.RUnlock() + return h.isTableLocked(tableID) +} + +// IsTableLocked check whether table is locked in handle without Handle.Mutex +func (h *Handle) isTableLocked(tableID int64) bool { + return isTableLocked(h.tableLocked, tableID) +} + +// isTableLocked check whether table is locked +func isTableLocked(tableLocked []int64, tableID int64) bool { + return lockTableIndexOf(tableLocked, tableID) > -1 +} + +// lockTableIndexOf get the locked table's index in the array +func lockTableIndexOf(tableLocked []int64, tableID int64) int { + for idx, id := range tableLocked { + if id == tableID { + return idx + } + } + return -1 +} + +// GetTableLockedAndClearForTest for unit test only +func (h *Handle) GetTableLockedAndClearForTest() []int64 { + tableLocked := h.tableLocked + h.tableLocked = make([]int64, 0) + return tableLocked +} diff --git a/statistics/handle/lock_stats_handler_test.go b/statistics/handle/lock_stats_handler_test.go new file mode 100644 index 0000000000000..a943ada718856 --- /dev/null +++ b/statistics/handle/lock_stats_handler_test.go @@ -0,0 +1,61 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handle + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGenerateDuplicateTablesMessage(t *testing.T) { + tests := []struct { + name string + totalTableIDs []int64 + dupTables []string + expectedMsg string + }{ + { + name: "no duplicate tables", + totalTableIDs: []int64{1, 2, 3}, + expectedMsg: "", + }, + { + name: "one duplicate table", + totalTableIDs: []int64{1}, + dupTables: []string{"t1"}, + expectedMsg: "skip locking locked table: t1", + }, + { + name: "multiple duplicate tables", + totalTableIDs: []int64{1, 2, 3, 4}, + dupTables: []string{"t1", "t2", "t3"}, + expectedMsg: "skip locking locked tables: t1, t2, t3, other tables locked successfully", + }, + { + name: "all tables are duplicate", + totalTableIDs: []int64{1, 2, 3, 4}, + dupTables: []string{"t1", "t2", "t3", "t4"}, + expectedMsg: "skip locking locked tables: t1, t2, t3, t4", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msg := generateDuplicateTablesMessage(tt.totalTableIDs, tt.dupTables) + require.Equal(t, tt.expectedMsg, msg) + }) + } +}