diff --git a/executor/lockstats/BUILD.bazel b/executor/lockstats/BUILD.bazel index deacf056e8ba2..6bbdc51990004 100644 --- a/executor/lockstats/BUILD.bazel +++ b/executor/lockstats/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "lockstats", @@ -19,3 +19,17 @@ go_library( "@com_github_pingcap_errors//:errors", ], ) + +go_test( + name = "lockstats_test", + timeout = "short", + srcs = ["lock_stats_executor_test.go"], + embed = [":lockstats"], + flaky = True, + deps = [ + "//infoschema", + "//parser/ast", + "//parser/model", + "@com_github_stretchr_testify//require", + ], +) diff --git a/executor/lockstats/lock_stats_executor.go b/executor/lockstats/lock_stats_executor.go index 674324b839a73..ef441b06a6375 100644 --- a/executor/lockstats/lock_stats_executor.go +++ b/executor/lockstats/lock_stats_executor.go @@ -16,6 +16,7 @@ package lockstats import ( "context" + "fmt" "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" @@ -51,12 +52,13 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error { is := do.InfoSchema() if e.onlyLockPartitions() { - tableName := e.Tables[0] - tid, pidNames, err := populatePartitionIDAndNames(tableName, tableName.PartitionNames, is) + table := e.Tables[0] + tid, pidNames, err := populatePartitionIDAndNames(table, table.PartitionNames, is) if err != nil { return err } + tableName := fmt.Sprintf("%s.%s", table.Schema.L, table.Name.L) msg, err := h.LockPartitions(tid, tableName, pidNames) if err != nil { return err @@ -65,12 +67,12 @@ func (e *LockExec) Next(_ context.Context, _ *chunk.Chunk) error { e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg)) } } else { - tids, pids, err := populateTableAndPartitionIDs(e.Tables, is) + tidAndNames, pidAndNames, err := populateTableAndPartitionIDs(e.Tables, is) if err != nil { return err } - msg, err := h.LockTables(tids, pids, e.Tables) + msg, err := h.LockTables(tidAndNames, pidAndNames) if err != nil { return err } @@ -97,15 +99,23 @@ func (*LockExec) Open(context.Context) error { } // populatePartitionIDAndNames returns the table ID and partition IDs for the given table name and partition names. -func populatePartitionIDAndNames(tableName *ast.TableName, partitionNames []model.CIStr, is infoschema.InfoSchema) (int64, map[int64]string, error) { - tbl, err := is.TableByName(tableName.Schema, tableName.Name) +func populatePartitionIDAndNames( + table *ast.TableName, + partitionNames []model.CIStr, + is infoschema.InfoSchema, +) (int64, map[int64]string, error) { + if len(partitionNames) == 0 { + return 0, nil, errors.New("partition list should not be empty") + } + tbl, err := is.TableByName(table.Schema, table.Name) if err != nil { return 0, nil, err } pi := tbl.Meta().GetPartitionInfo() if pi == nil { - return 0, nil, errors.Errorf("table %s is not a partition table", tableName.Name) + return 0, nil, errors.Errorf("table %s is not a partition table", + fmt.Sprintf("%s.%s", table.Schema.L, table.Name.L)) } pidNames := make(map[int64]string, len(partitionNames)) @@ -121,25 +131,36 @@ func populatePartitionIDAndNames(tableName *ast.TableName, partitionNames []mode } // populateTableAndPartitionIDs returns table IDs and partition IDs for the given table names. -func populateTableAndPartitionIDs(tables []*ast.TableName, is infoschema.InfoSchema) ([]int64, []int64, error) { - tids := make([]int64, 0, len(tables)) - pids := make([]int64, 0) +func populateTableAndPartitionIDs( + tables []*ast.TableName, + is infoschema.InfoSchema, +) (map[int64]string, map[int64]string, error) { + if len(tables) == 0 { + return nil, nil, errors.New("table list should not be empty") + } + + tidAndNames := make(map[int64]string, len(tables)) + pidAndNames := make(map[int64]string, len(tables)) for _, table := range tables { tbl, err := is.TableByName(table.Schema, table.Name) if err != nil { return nil, nil, err } - tids = append(tids, tbl.Meta().ID) + tidAndNames[tbl.Meta().ID] = fmt.Sprintf("%s.%s", table.Schema.L, table.Name.L) pi := tbl.Meta().GetPartitionInfo() if pi == nil { continue } for _, p := range pi.Definitions { - pids = append(pids, p.ID) + pidAndNames[p.ID] = genFullPartitionName(table, p.Name.L) } } - return tids, pids, nil + return tidAndNames, pidAndNames, nil +} + +func genFullPartitionName(table *ast.TableName, partitionName string) string { + return fmt.Sprintf("%s.%s partition (%s)", table.Schema.L, table.Name.L, partitionName) } diff --git a/executor/lockstats/lock_stats_executor_test.go b/executor/lockstats/lock_stats_executor_test.go new file mode 100644 index 0000000000000..6a15b8bbe322c --- /dev/null +++ b/executor/lockstats/lock_stats_executor_test.go @@ -0,0 +1,108 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lockstats + +import ( + "testing" + + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/stretchr/testify/require" +) + +func TestPopulatePartitionIDAndNames(t *testing.T) { + fakeInfo := infoschema.MockInfoSchema([]*model.TableInfo{ + tInfo(1, "t1", "p1", "p2"), + }) + + table := &ast.TableName{ + Schema: model.NewCIStr("test"), + Name: model.NewCIStr("t1"), + PartitionNames: []model.CIStr{ + model.NewCIStr("p1"), + model.NewCIStr("p2"), + }, + } + + gotTID, gotPIDNames, err := populatePartitionIDAndNames(table, table.PartitionNames, fakeInfo) + require.NoError(t, err) + require.Equal(t, int64(1), gotTID) + require.Equal(t, map[int64]string{ + 2: "p1", + 3: "p2", + }, gotPIDNames) + + // Empty partition names. + _, _, err = populatePartitionIDAndNames(nil, nil, fakeInfo) + require.Error(t, err) +} + +func TestPopulateTableAndPartitionIDs(t *testing.T) { + fakeInfo := infoschema.MockInfoSchema([]*model.TableInfo{ + tInfo(1, "t1", "p1", "p2"), + tInfo(4, "t2"), + }) + + tables := []*ast.TableName{ + { + Schema: model.NewCIStr("test"), + Name: model.NewCIStr("t1"), + PartitionNames: []model.CIStr{ + model.NewCIStr("p1"), + model.NewCIStr("p2"), + }, + }, + { + Schema: model.NewCIStr("test"), + Name: model.NewCIStr("t2"), + }, + } + + gotTIDAndNames, gotPIDAndNames, err := populateTableAndPartitionIDs(tables, fakeInfo) + require.NoError(t, err) + require.Equal(t, map[int64]string{ + 1: "test.t1", + 4: "test.t2", + }, gotTIDAndNames) + require.Equal(t, map[int64]string{ + 2: "test.t1 partition (p1)", + 3: "test.t1 partition (p2)", + }, gotPIDAndNames) + + // Empty table list. + _, _, err = populateTableAndPartitionIDs(nil, fakeInfo) + require.Error(t, err) +} + +func tInfo(id int, tableName string, partitionNames ...string) *model.TableInfo { + tbl := &model.TableInfo{ + ID: int64(id), + Name: model.NewCIStr(tableName), + } + if len(partitionNames) > 0 { + tbl.Partition = &model.PartitionInfo{ + Enable: true, + } + for i, partitionName := range partitionNames { + tbl.Partition.Definitions = append(tbl.Partition.Definitions, model.PartitionDefinition{ + ID: int64(id + 1 + i), + Name: model.NewCIStr(partitionName), + }) + } + } + + return tbl +} diff --git a/executor/lockstats/unlock_stats_executor.go b/executor/lockstats/unlock_stats_executor.go index 742d059d2354e..91f3a4edd511d 100644 --- a/executor/lockstats/unlock_stats_executor.go +++ b/executor/lockstats/unlock_stats_executor.go @@ -16,6 +16,7 @@ package lockstats import ( "context" + "fmt" "github.com/pingcap/errors" "github.com/pingcap/tidb/domain" @@ -48,11 +49,12 @@ func (e *UnlockExec) Next(context.Context, *chunk.Chunk) error { is := do.InfoSchema() if e.onlyUnlockPartitions() { - tableName := e.Tables[0] - tid, pidNames, err := populatePartitionIDAndNames(tableName, tableName.PartitionNames, is) + table := e.Tables[0] + tid, pidNames, err := populatePartitionIDAndNames(table, table.PartitionNames, is) if err != nil { return err } + tableName := fmt.Sprintf("%s.%s", table.Schema.O, table.Name.O) msg, err := h.RemoveLockedPartitions(tid, tableName, pidNames) if err != nil { return err @@ -61,11 +63,11 @@ func (e *UnlockExec) Next(context.Context, *chunk.Chunk) error { e.Ctx().GetSessionVars().StmtCtx.AppendWarning(errors.New(msg)) } } else { - tids, pids, err := populateTableAndPartitionIDs(e.Tables, is) + tidAndNames, pidAndNames, err := populateTableAndPartitionIDs(e.Tables, is) if err != nil { return err } - msg, err := h.RemoveLockedTables(tids, pids, e.Tables) + msg, err := h.RemoveLockedTables(tidAndNames, pidAndNames) if err != nil { return err } diff --git a/statistics/handle/handletest/lockstats/BUILD.bazel b/statistics/handle/handletest/lockstats/BUILD.bazel index 211efe4532b0e..bf20299eb8072 100644 --- a/statistics/handle/handletest/lockstats/BUILD.bazel +++ b/statistics/handle/handletest/lockstats/BUILD.bazel @@ -9,7 +9,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 12, + shard_count = 14, deps = [ "//config", "//domain", diff --git a/statistics/handle/handletest/lockstats/lock_partition_stats_test.go b/statistics/handle/handletest/lockstats/lock_partition_stats_test.go index 30d75ef439401..5b8164edf4e46 100644 --- a/statistics/handle/handletest/lockstats/lock_partition_stats_test.go +++ b/statistics/handle/handletest/lockstats/lock_partition_stats_test.go @@ -148,7 +148,7 @@ func TestLockAndUnlockPartitionStatsRepeatedly(t *testing.T) { // Lock the partition again and check the warning. tk.MustExec("lock stats t partition p0") tk.MustQuery("show warnings").Check(testkit.Rows( - "Warning 1105 skip locking locked table: test.t partition (p0)", + "Warning 1105 skip locking locked partition of table test.t: p0", )) // Unlock the partition. @@ -160,7 +160,7 @@ func TestLockAndUnlockPartitionStatsRepeatedly(t *testing.T) { // Unlock the partition again and check the warning. tk.MustExec("unlock stats t partition p0") tk.MustQuery("show warnings").Check(testkit.Rows( - "Warning 1105 skip unlocking unlocked table: test.t partition (p0)", + "Warning 1105 skip unlocking unlocked partition of table test.t: p0", )) } @@ -260,6 +260,28 @@ func TestUnlockTheWholeTableWouldUnlockLockedPartitionsAndGenerateWarning(t *tes require.Equal(t, 0, num) } +func TestSkipLockALotOfPartitions(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10), index idx_b (b)) partition by range(a) " + + "(partition p0 values less than (10), partition p1 values less than (20), " + + "partition a values less than (30), " + + "partition b values less than (40), " + + "partition g values less than (90), " + + "partition h values less than (100))") + + tk.MustExec("lock stats t partition p0, p1, a, b, g, h") + + // Skip locking a lot of partitions. + tk.MustExec("lock stats t partition p0, p1, a, b, g, h") + tk.MustQuery("show warnings").Check(testkit.Rows( + "Warning 1105 skip locking locked partitions of table test.t: a, b, g, h, p0, p1", + )) +} + func setupTestEnvironmentWithPartitionedTableT(t *testing.T) (kv.Storage, *testkit.TestKit, *model.TableInfo) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/statistics/handle/handletest/lockstats/lock_table_stats_test.go b/statistics/handle/handletest/lockstats/lock_table_stats_test.go index 6eb4fb30bee19..50f05bb90de3f 100644 --- a/statistics/handle/handletest/lockstats/lock_table_stats_test.go +++ b/statistics/handle/handletest/lockstats/lock_table_stats_test.go @@ -316,6 +316,26 @@ func TestShowStatsLockedTablePrivilege(t *testing.T) { require.Len(t, rows, 1) } +func TestSkipLockALotOfTables(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t; drop table if exists a; drop table if exists x; drop table if exists y; drop table if exists z") + tk.MustExec("create table t(a int, b varchar(10), index idx_b (b)); " + + "create table a(a int, b varchar(10), index idx_b (b)); " + + "create table x(a int, b varchar(10), index idx_b (b)); " + + "create table y(a int, b varchar(10), index idx_b (b)); " + + "create table z(a int, b varchar(10), index idx_b (b))") + tk.MustExec("lock stats test.t, test.a, test.x, test.y, test.z") + + // Skip locking a lot of tables. + tk.MustExec("lock stats test.t, test.a, test.x, test.y, test.z") + tk.MustQuery("show warnings").Check(testkit.Rows( + "Warning 1105 skip locking locked tables: test.a, test.t, test.x, test.y, test.z", + )) +} + func setupTestEnvironmentWithTableT(t *testing.T) (kv.Storage, *testkit.TestKit, *model.TableInfo) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go index 66d9b2dc7bb69..db17e1e9f625a 100644 --- a/statistics/handle/lock_stats_handler.go +++ b/statistics/handle/lock_stats_handler.go @@ -16,17 +16,15 @@ package handle import ( "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/statistics/handle/lockstats" "github.com/pingcap/tidb/util/sqlexec" ) // LockTables add locked tables id to store. -// - tids: table ids of which will be locked. -// - pids: partition ids of which will be locked. -// - tables: table names of which will be locked. +// - tidAndNames: table ids and names of which will be locked. +// - pidAndNames: partition ids and names of which will be locked. // Return the message of skipped tables and error. -func (h *Handle) LockTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { +func (h *Handle) LockTables(tidAndNames map[int64]string, pidAndNames map[int64]string) (string, error) { se, err := h.pool.Get() if err != nil { return "", errors.Trace(err) @@ -35,7 +33,7 @@ func (h *Handle) LockTables(tids []int64, pids []int64, tables []*ast.TableName) exec := se.(sqlexec.RestrictedSQLExecutor) - return lockstats.AddLockedTables(exec, tids, pids, tables) + return lockstats.AddLockedTables(exec, tidAndNames, pidAndNames) } // LockPartitions add locked partitions id to store. @@ -44,9 +42,10 @@ func (h *Handle) LockTables(tids []int64, pids []int64, tables []*ast.TableName) // - tableName: table name of which will be locked. // - pidNames: partition ids of which will be locked. // Return the message of skipped tables and error. +// Note: If the whole table is locked, then skip all partitions of the table. func (h *Handle) LockPartitions( tid int64, - tableName *ast.TableName, + tableName string, pidNames map[int64]string, ) (string, error) { se, err := h.pool.Get() @@ -61,11 +60,10 @@ func (h *Handle) LockPartitions( } // RemoveLockedTables remove tables from table locked records. -// - tids: table ids of which will be unlocked. -// - pids: partition ids of which will be unlocked. -// - tables: table names of which will be unlocked. +// - tidAndNames: table ids and names of which will be unlocked. +// - pidAndNames: partition ids and names of which will be unlocked. // Return the message of skipped tables and error. -func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { +func (h *Handle) RemoveLockedTables(tidAndNames map[int64]string, pidAndNames map[int64]string) (string, error) { se, err := h.pool.Get() if err != nil { return "", errors.Trace(err) @@ -73,16 +71,17 @@ func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.Ta defer h.pool.Put(se) exec := se.(sqlexec.RestrictedSQLExecutor) - return lockstats.RemoveLockedTables(exec, tids, pids, tables) + return lockstats.RemoveLockedTables(exec, tidAndNames, pidAndNames) } // RemoveLockedPartitions remove partitions from table locked records. // - tid: table id of which will be unlocked. // - tableName: table name of which will be unlocked. // - pidNames: partition ids of which will be unlocked. +// Note: If the whole table is locked, then skip all partitions of the table. func (h *Handle) RemoveLockedPartitions( tid int64, - tableName *ast.TableName, + tableName string, pidNames map[int64]string, ) (string, error) { se, err := h.pool.Get() diff --git a/statistics/handle/lockstats/BUILD.bazel b/statistics/handle/lockstats/BUILD.bazel index 3c915f970fde8..9e553867976f0 100644 --- a/statistics/handle/lockstats/BUILD.bazel +++ b/statistics/handle/lockstats/BUILD.bazel @@ -11,7 +11,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//kv", - "//parser/ast", "//parser/terror", "//statistics/handle/cache", "//util/logutil", @@ -31,11 +30,9 @@ go_test( ], embed = [":lockstats"], flaky = True, - shard_count = 8, + shard_count = 9, deps = [ "//kv", - "//parser/ast", - "//parser/model", "//parser/mysql", "//types", "//util/chunk", diff --git a/statistics/handle/lockstats/lock_stats.go b/statistics/handle/lockstats/lock_stats.go index 8716e4f8992b9..f09fd9e0cf9dc 100644 --- a/statistics/handle/lockstats/lock_stats.go +++ b/statistics/handle/lockstats/lock_stats.go @@ -17,10 +17,10 @@ package lockstats import ( "context" "fmt" + "slices" "strings" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" @@ -44,11 +44,14 @@ var ( // AddLockedTables add locked tables id to store. // - exec: sql executor. -// - tids: table ids of which will be locked. -// - pids: partition ids of which will be locked. -// - tables: table names of which will be locked. +// - tidAndNames: table ids and names of which will be locked. +// - pidAndNames: partition ids and names of which will be locked. // Return the message of skipped tables and error. -func AddLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids []int64, tables []*ast.TableName) (string, error) { +func AddLockedTables( + exec sqlexec.RestrictedSQLExecutor, + tidAndNames map[int64]string, + pidAndNames map[int64]string, +) (string, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) err := startTransaction(ctx, exec) @@ -66,18 +69,35 @@ func AddLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids []in return "", err } - skippedTables := make([]string, 0, len(tables)) - statsLogger.Info("lock table", zap.Int64s("tableIDs", tids)) + skippedTables := make([]string, 0, len(tidAndNames)) + tids := make([]int64, 0, len(tidAndNames)) + tables := make([]string, 0, len(tidAndNames)) + for tid, tableName := range tidAndNames { + tids = append(tids, tid) + tables = append(tables, tableName) + } + pids := make([]int64, 0, len(pidAndNames)) + partitions := make([]string, 0, len(pidAndNames)) + for pid, partitionName := range pidAndNames { + pids = append(pids, pid) + partitions = append(partitions, partitionName) + } + statsLogger.Info("lock table", + zap.Int64s("tableIDs", tids), + zap.Strings("tableNames", tables), + zap.Int64s("partitionIDs", pids), + zap.Strings("partitionNames", partitions), + ) // Insert locked tables. checkedTables := GetLockedTables(lockedTables, tids...) - for i, tid := range tids { + for _, tid := range tids { if _, ok := checkedTables[tid]; !ok { if err := insertIntoStatsTableLocked(ctx, exec, tid); err != nil { return "", err } } else { - skippedTables = append(skippedTables, tables[i].Schema.L+"."+tables[i].Name.L) + skippedTables = append(skippedTables, tidAndNames[tid]) } } @@ -91,7 +111,7 @@ func AddLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids []in } } - msg := generateSkippedMessage(tids, skippedTables, lockAction, lockedStatus) + msg := generateStableSkippedTablesMessage(tids, skippedTables, lockAction, lockedStatus) // Note: defer commit transaction, so we can't use `return nil` here. return msg, err } @@ -106,7 +126,7 @@ func AddLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids []in func AddLockedPartitions( exec sqlexec.RestrictedSQLExecutor, tid int64, - tableName *ast.TableName, + tableName string, pidNames map[int64]string, ) (string, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) @@ -126,17 +146,25 @@ func AddLockedPartitions( return "", err } pids := make([]int64, 0, len(pidNames)) - for pid := range pidNames { + pNames := make([]string, 0, len(pidNames)) + for pid, pName := range pidNames { pids = append(pids, pid) + pNames = append(pNames, pName) } - statsLogger.Info("lock partitions", zap.Int64("tableID", tid), zap.Int64s("partitionIDs", pids)) + + statsLogger.Info("lock partitions", + zap.Int64("tableID", tid), + zap.String("tableName", tableName), + zap.Int64s("partitionIDs", pids), + zap.Strings("partitionNames", pNames), + ) // Check if whole table is locked. // Then we can skip locking partitions. // It is not necessary to lock partitions if whole table is locked. checkedTables := GetLockedTables(lockedTables, tid) if _, locked := checkedTables[tid]; locked { - return "skip locking partitions of locked table: " + tableName.Schema.L + "." + tableName.Name.L, err + return "skip locking partitions of locked table: " + tableName, err } // Insert related partitions and warning already locked partitions. @@ -148,17 +176,20 @@ func AddLockedPartitions( return "", err } } else { - partition := generatePartitionFullName(tableName, pidNames[pid]) - skippedPartitions = append(skippedPartitions, partition) + skippedPartitions = append(skippedPartitions, pidNames[pid]) } } - msg := generateSkippedMessage(pids, skippedPartitions, lockAction, lockedStatus) + msg := generateStableSkippedPartitionsMessage(pids, tableName, skippedPartitions, lockAction, lockedStatus) // Note: defer commit transaction, so we can't use `return nil` here. return msg, err } -func generateSkippedMessage(ids []int64, skippedNames []string, action, status string) string { +// generateStableSkippedTablesMessage generates stable skipped tables message. +func generateStableSkippedTablesMessage(ids []int64, skippedNames []string, action, status string) string { + // Sort to stabilize the output. + slices.Sort(skippedNames) + if len(skippedNames) > 0 { tables := strings.Join(skippedNames, ", ") var msg string @@ -177,6 +208,28 @@ func generateSkippedMessage(ids []int64, skippedNames []string, action, status s return "" } +// generateStableSkippedPartitionsMessage generates stable skipped partitions message. +func generateStableSkippedPartitionsMessage(ids []int64, tableName string, skippedNames []string, action, status string) string { + // Sort to stabilize the output. + slices.Sort(skippedNames) + + if len(skippedNames) > 0 { + partitions := strings.Join(skippedNames, ", ") + var msg string + if len(ids) > 1 { + if len(ids) > len(skippedNames) { + msg = fmt.Sprintf("skip %s %s partitions of table %s: %s, other partitions %s successfully", action, status, tableName, partitions, status) + } else { + msg = fmt.Sprintf("skip %s %s partitions of table %s: %s", action, status, tableName, partitions) + } + } else { + msg = fmt.Sprintf("skip %s %s partition of table %s: %s", action, status, tableName, partitions) + } + return msg + } + return "" +} + func insertIntoStatsTableLocked(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, tid int64) error { _, _, err := exec.ExecRestrictedSQL( ctx, diff --git a/statistics/handle/lockstats/lock_stats_test.go b/statistics/handle/lockstats/lock_stats_test.go index d3864b313c1a2..ba1a12c10d779 100644 --- a/statistics/handle/lockstats/lock_stats_test.go +++ b/statistics/handle/lockstats/lock_stats_test.go @@ -19,8 +19,6 @@ import ( "testing" "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -29,7 +27,7 @@ import ( "go.uber.org/mock/gomock" ) -func TestGenerateSkippedMessage(t *testing.T) { +func TestGenerateSkippedTablesMessage(t *testing.T) { tests := []struct { name string totalTableIDs []int64 @@ -81,7 +79,71 @@ func TestGenerateSkippedMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - msg := generateSkippedMessage(tt.totalTableIDs, tt.tables, tt.action, tt.status) + msg := generateStableSkippedTablesMessage(tt.totalTableIDs, tt.tables, tt.action, tt.status) + require.Equal(t, tt.expectedMsg, msg) + }) + } +} + +func TestGenerateSkippedPartitionsMessage(t *testing.T) { + tests := []struct { + name string + tableName string + totalPartitionIDs []int64 + partitions []string + action string + status string + expectedMsg string + }{ + { + name: "no duplicate partitions when locking", + tableName: "test.t", + totalPartitionIDs: []int64{1, 2, 3}, + action: lockAction, + status: lockedStatus, + expectedMsg: "", + }, + { + name: "one duplicate table when locking", + tableName: "test.t", + totalPartitionIDs: []int64{1}, + partitions: []string{"t1"}, + action: lockAction, + status: lockedStatus, + expectedMsg: "skip locking locked partition of table test.t: t1", + }, + { + name: "multiple duplicate partitions when locking", + tableName: "test.t", + totalPartitionIDs: []int64{1, 2, 3, 4}, + partitions: []string{"t1", "t2", "t3"}, + action: lockAction, + status: lockedStatus, + expectedMsg: "skip locking locked partitions of table test.t: t1, t2, t3, other partitions locked successfully", + }, + { + name: "all partitions are duplicate when locking", + tableName: "test.t", + totalPartitionIDs: []int64{1, 2, 3, 4}, + partitions: []string{"t1", "t2", "t3", "t4"}, + action: lockAction, + status: lockedStatus, + expectedMsg: "skip locking locked partitions of table test.t: t1, t2, t3, t4", + }, + { + name: "all partitions are duplicate when unlocking", + tableName: "test.t", + totalPartitionIDs: []int64{1, 2, 3, 4}, + partitions: []string{"t1", "t2", "t3", "t4"}, + action: unlockAction, + status: unlockedStatus, + expectedMsg: "skip unlocking unlocked partitions of table test.t: t1, t2, t3, t4", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + msg := generateStableSkippedPartitionsMessage(tt.totalPartitionIDs, tt.tableName, tt.partitions, tt.action, tt.status) require.Equal(t, tt.expectedMsg, msg) }) } @@ -162,14 +224,19 @@ func TestAddLockedTables(t *testing.T) { "COMMIT", ) + tidsAndNames := map[int64]string{ + 1: "test.t1", + 2: "test.t2", + 3: "test.t3", + } + pidAndNames := map[int64]string{ + 4: "p1", + } + msg, err := AddLockedTables( exec, - []int64{1, 2, 3}, - []int64{4}, - []*ast.TableName{ - {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t1")}, - {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t2")}, - {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t3")}}, + tidsAndNames, + pidAndNames, ) require.NoError(t, err) require.Equal(t, "skip locking locked tables: test.t1, other tables locked successfully", msg) diff --git a/statistics/handle/lockstats/unlock_stats.go b/statistics/handle/lockstats/unlock_stats.go index 61ebd1f0f8f08..afb4ea7476e78 100644 --- a/statistics/handle/lockstats/unlock_stats.go +++ b/statistics/handle/lockstats/unlock_stats.go @@ -16,11 +16,9 @@ package lockstats import ( "context" - "fmt" "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/statistics/handle/cache" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" @@ -34,11 +32,14 @@ const ( // RemoveLockedTables remove tables from table locked records. // - exec: sql executor. -// - tids: table ids of which will be unlocked. -// - pids: partition ids of which will be unlocked. -// - tables: table names of which will be unlocked. +// - tidAndNames: table ids and names of which will be unlocked. +// - pidAndNames: partition ids and names of which will be unlocked. // Return the message of skipped tables and error. -func RemoveLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids []int64, tables []*ast.TableName) (string, error) { +func RemoveLockedTables( + exec sqlexec.RestrictedSQLExecutor, + tidAndNames map[int64]string, + pidAndNames map[int64]string, +) (string, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) err := startTransaction(ctx, exec) @@ -55,14 +56,31 @@ func RemoveLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids [ if err != nil { return "", err } - skippedTables := make([]string, 0, len(tables)) + skippedTables := make([]string, 0, len(tidAndNames)) + tids := make([]int64, 0, len(tidAndNames)) + tables := make([]string, 0, len(tidAndNames)) + for tid, tableName := range tidAndNames { + tids = append(tids, tid) + tables = append(tables, tableName) + } + pids := make([]int64, 0, len(pidAndNames)) + partitions := make([]string, 0, len(pidAndNames)) + for pid, partitionName := range pidAndNames { + pids = append(pids, pid) + partitions = append(partitions, partitionName) + } - statsLogger.Info("unlock table", zap.Int64s("tableIDs", tids)) + statsLogger.Info("unlock table", + zap.Int64s("tableIDs", tids), + zap.Strings("tableNames", tables), + zap.Int64s("partitionIDs", pids), + zap.Strings("partitionNames", partitions), + ) checkedTables := GetLockedTables(lockedTables, tids...) - for i, tid := range tids { + for _, tid := range tids { if _, ok := checkedTables[tid]; !ok { - skippedTables = append(skippedTables, tables[i].Schema.L+"."+tables[i].Name.L) + skippedTables = append(skippedTables, tidAndNames[tid]) continue } if err := updateStatsAndUnlockTable(ctx, exec, tid); err != nil { @@ -81,7 +99,7 @@ func RemoveLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids [ } } - msg := generateSkippedMessage(tids, skippedTables, unlockAction, unlockedStatus) + msg := generateStableSkippedTablesMessage(tids, skippedTables, unlockAction, unlockedStatus) // Note: defer commit transaction, so we can't use `return nil` here. return msg, err } @@ -95,7 +113,7 @@ func RemoveLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids [ func RemoveLockedPartitions( exec sqlexec.RestrictedSQLExecutor, tid int64, - tableName *ast.TableName, + tableName string, pidNames map[int64]string, ) (string, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) @@ -119,14 +137,18 @@ func RemoveLockedPartitions( for pid := range pidNames { pids = append(pids, pid) } - statsLogger.Info("unlock partitions", zap.Int64("tableID", tid), zap.Int64s("partitionIDs", pids)) + statsLogger.Info("unlock partitions", + zap.Int64("tableID", tid), + zap.String("tableName", tableName), + zap.Int64s("partitionIDs", pids), + ) // Check if whole table is locked. // Then we can not unlock any partitions of the table. // It is invalid to unlock partitions if whole table is locked. checkedTables := GetLockedTables(lockedTables, tid) if _, locked := checkedTables[tid]; locked { - return "skip unlocking partitions of locked table: " + tableName.Schema.L + "." + tableName.Name.L, err + return "skip unlocking partitions of locked table: " + tableName, err } // Delete related partitions and warning already unlocked partitions. @@ -134,8 +156,7 @@ func RemoveLockedPartitions( lockedPartitions := GetLockedTables(lockedTables, pids...) for _, pid := range pids { if _, ok := lockedPartitions[pid]; !ok { - partition := generatePartitionFullName(tableName, pidNames[pid]) - skippedPartitions = append(skippedPartitions, partition) + skippedPartitions = append(skippedPartitions, pidNames[pid]) continue } if err := updateStatsAndUnlockTable(ctx, exec, pid); err != nil { @@ -143,7 +164,7 @@ func RemoveLockedPartitions( } } - msg := generateSkippedMessage(pids, skippedPartitions, unlockAction, unlockedStatus) + msg := generateStableSkippedPartitionsMessage(pids, tableName, skippedPartitions, unlockAction, unlockedStatus) // Note: defer commit transaction, so we can't use `return nil` here. return msg, err } @@ -191,7 +212,3 @@ func getStatsDeltaFromTableLocked(ctx context.Context, tableID int64, exec sqlex version = rows[0].GetUint64(2) return count, modifyCount, version, nil } - -func generatePartitionFullName(tableName *ast.TableName, partitionName string) string { - return fmt.Sprintf("%s.%s partition (%s)", tableName.Schema.L, tableName.Name.L, partitionName) -} diff --git a/statistics/handle/lockstats/unlock_stats_test.go b/statistics/handle/lockstats/unlock_stats_test.go index e2a67aa65066c..071709cc6bbd4 100644 --- a/statistics/handle/lockstats/unlock_stats_test.go +++ b/statistics/handle/lockstats/unlock_stats_test.go @@ -19,8 +19,6 @@ import ( "testing" "github.com/pingcap/errors" - "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -221,14 +219,19 @@ func TestRemoveLockedTables(t *testing.T) { "COMMIT", ) + tidsAndNames := map[int64]string{ + 1: "test.t1", + 2: "test.t2", + 3: "test.t3", + } + pidAndNames := map[int64]string{ + 4: "p1", + } + msg, err := RemoveLockedTables( exec, - []int64{1, 2, 3}, - []int64{4}, - []*ast.TableName{ - {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t1")}, - {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t2")}, - {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t3")}}, + tidsAndNames, + pidAndNames, ) require.NoError(t, err) require.Equal(t, "skip unlocking unlocked tables: test.t2, test.t3, other tables unlocked successfully", msg)