diff --git a/.github/licenserc.yml b/.github/licenserc.yml index 2ae01bdfc0d93..4b336f7c1b2a1 100644 --- a/.github/licenserc.yml +++ b/.github/licenserc.yml @@ -49,4 +49,5 @@ header: - "**/OWNERS" - "OWNERS_ALIASES" - "disttask/framework/mock/*_mock.go" + - "util/sqlexec/mock/*_mock.go" comment: on-failure diff --git a/Makefile b/Makefile index 25d5d99cc7a05..189d26202c2b9 100644 --- a/Makefile +++ b/Makefile @@ -386,7 +386,8 @@ mock_lightning: tools/bin/mockgen gen_mock: tools/bin/mockgen tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go - + tools/bin/mockgen -package mock github.com/pingcap/tidb/util/sqlexec RestrictedSQLExecutor > util/sqlexec/mock/restricted_sql_executor_mock.go + # There is no FreeBSD environment for GitHub actions. So cross-compile on Linux # but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have # cgo enabled on regular builds is performance. diff --git a/executor/analyze.go b/executor/analyze.go index 089f01a53698a..df806ee9fbe21 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -203,7 +203,6 @@ func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, inf } else { tbl, ok := infoSchema.TableByID(tid) if !ok { - // Ignore this table because it may have been dropped. logutil.BgLogger().Warn("Unknown table ID in analyze task", zap.Int64("tid", tid)) } else { skippedTables = append(skippedTables, tbl.Meta().Name.L) diff --git a/statistics/handle/handletest/statslock/BUILD.bazel b/statistics/handle/handletest/statslock/BUILD.bazel index 92cdb776a0634..74cf95c801853 100644 --- a/statistics/handle/handletest/statslock/BUILD.bazel +++ b/statistics/handle/handletest/statslock/BUILD.bazel @@ -5,7 +5,7 @@ go_test( timeout = "short", srcs = [ "main_test.go", - "stats_lcok_test.go", + "stats_lock_test.go", ], flaky = True, shard_count = 3, diff --git a/statistics/handle/handletest/statslock/stats_lcok_test.go b/statistics/handle/handletest/statslock/stats_lock_test.go similarity index 99% rename from statistics/handle/handletest/statslock/stats_lcok_test.go rename to statistics/handle/handletest/statslock/stats_lock_test.go index 647b3d0907a0f..54a3b83b753d6 100644 --- a/statistics/handle/handletest/statslock/stats_lcok_test.go +++ b/statistics/handle/handletest/statslock/stats_lock_test.go @@ -172,7 +172,6 @@ func TestStatsLockAndUnlockTables(t *testing.T) { } tk.MustExec("lock stats t1, t2") - rows := tk.MustQuery("select count(*) from mysql.stats_table_locked").Rows() num, _ := strconv.Atoi(rows[0][0].(string)) require.Equal(t, num, 2) diff --git a/statistics/handle/lock_stats_handler.go b/statistics/handle/lock_stats_handler.go index ee33beccd6606..9a65eed2ec1b0 100644 --- a/statistics/handle/lock_stats_handler.go +++ b/statistics/handle/lock_stats_handler.go @@ -15,9 +15,7 @@ package handle import ( - "context" - - "github.com/pingcap/tidb/kv" + "github.com/pingcap/errors" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/statistics/handle/lockstats" "github.com/pingcap/tidb/util/sqlexec" @@ -29,9 +27,15 @@ import ( // - tables: table names of which will be locked. // Return the message of skipped tables and error. func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { - h.mu.Lock() - defer h.mu.Unlock() - return lockstats.AddLockedTables(h.mu.ctx.(sqlexec.SQLExecutor), tids, pids, tables) + se, err := h.pool.Get() + if err != nil { + return "", errors.Trace(err) + } + defer h.pool.Put(se) + + exec := se.(sqlexec.RestrictedSQLExecutor) + + return lockstats.AddLockedTables(exec, tids, pids, tables) } // RemoveLockedTables remove tables from table locked array. @@ -40,41 +44,40 @@ func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.Table // - tables: table names of which will be unlocked. // Return the message of skipped tables and error. func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { - h.mu.Lock() - defer h.mu.Unlock() + se, err := h.pool.Get() + if err != nil { + return "", errors.Trace(err) + } + defer h.pool.Put(se) - return lockstats.RemoveLockedTables(h.mu.ctx.(sqlexec.SQLExecutor), tids, pids, tables) + exec := se.(sqlexec.RestrictedSQLExecutor) + return lockstats.RemoveLockedTables(exec, tids, pids, tables) } -// QueryTablesLockedStatuses query whether table is locked in handle with Handle.Mutex. +// QueryTablesLockedStatuses query whether table is locked. // Note: This function query locked tables from store, so please try to batch the query. func (h *Handle) QueryTablesLockedStatuses(tableIDs ...int64) (map[int64]bool, error) { - h.mu.Lock() - defer h.mu.Unlock() - return h.queryTablesLockedStatuses(tableIDs...) -} - -// queryTablesLockedStatuses query whether table is locked in handle without Handle.Mutex -// Note: This function query locked tables from store, so please try to batch the query. -func (h *Handle) queryTablesLockedStatuses(tableIDs ...int64) (map[int64]bool, error) { - tableLocked, err := h.queryLockedTablesWithoutLock() + tableLocked, err := h.queryLockedTables() if err != nil { return nil, err } + return lockstats.GetTablesLockedStatuses(tableLocked, tableIDs...), nil } -// queryLockedTablesWithoutLock query locked tables from store without Handle.Mutex. -func (h *Handle) queryLockedTablesWithoutLock() (map[int64]struct{}, error) { - ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - exec := h.mu.ctx.(sqlexec.SQLExecutor) +// queryLockedTables query locked tables from store. +func (h *Handle) queryLockedTables() (map[int64]struct{}, error) { + se, err := h.pool.Get() + if err != nil { + return nil, errors.Trace(err) + } + defer h.pool.Put(se) - return lockstats.QueryLockedTables(ctx, exec) + exec := se.(sqlexec.RestrictedSQLExecutor) + return lockstats.QueryLockedTables(exec) } -// GetTableLockedAndClearForTest for unit test only +// GetTableLockedAndClearForTest for unit test only. func (h *Handle) GetTableLockedAndClearForTest() (map[int64]struct{}, error) { - h.mu.Lock() - defer h.mu.Unlock() - return h.queryLockedTablesWithoutLock() + return h.queryLockedTables() } diff --git a/statistics/handle/lockstats/BUILD.bazel b/statistics/handle/lockstats/BUILD.bazel index 7c9d331dd311c..3c915f970fde8 100644 --- a/statistics/handle/lockstats/BUILD.bazel +++ b/statistics/handle/lockstats/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "lockstats", srcs = [ "lock_stats.go", + "query_lock.go", "unlock_stats.go", ], importpath = "github.com/pingcap/tidb/statistics/handle/lockstats", @@ -23,8 +24,25 @@ go_library( go_test( name = "lockstats_test", timeout = "short", - srcs = ["lock_stats_test.go"], + srcs = [ + "lock_stats_test.go", + "query_lock_test.go", + "unlock_stats_test.go", + ], embed = [":lockstats"], flaky = True, - deps = ["@com_github_stretchr_testify//require"], + shard_count = 8, + deps = [ + "//kv", + "//parser/ast", + "//parser/model", + "//parser/mysql", + "//types", + "//util/chunk", + "//util/sqlexec/mock", + "@com_github_pingcap_errors//:errors", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//util", + "@org_uber_go_mock//gomock", + ], ) diff --git a/statistics/handle/lockstats/lock_stats.go b/statistics/handle/lockstats/lock_stats.go index 2069973f9956e..50b8fdf49e3bf 100644 --- a/statistics/handle/lockstats/lock_stats.go +++ b/statistics/handle/lockstats/lock_stats.go @@ -19,10 +19,8 @@ import ( "fmt" "strings" - "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" "go.uber.org/zap" @@ -36,13 +34,11 @@ const ( ) var ( - // maxChunkSize is the max chunk size for load locked tables. - // We use 1024 as the default value, which is the same as the default value of session.maxChunkSize. - // The reason why we don't use session.maxChunkSize is that we don't want to introduce a new dependency. - // See: https://github.com/pingcap/tidb/pull/46478#discussion_r1308786474 - maxChunkSize = 1024 // Stats logger. statsLogger = logutil.BgLogger().With(zap.String("category", "stats")) + // useCurrentSession to make sure the sql is executed in current session. + useCurrentSession = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession} + insertSQL = "INSERT INTO mysql.stats_table_locked (table_id) VALUES (%?) ON DUPLICATE KEY UPDATE table_id = %?" ) // AddLockedTables add locked tables id to store. @@ -51,10 +47,10 @@ var ( // - pids: partition ids of which will be locked. // - tables: table names of which will be locked. // Return the message of skipped tables and error. -func AddLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, tables []*ast.TableName) (string, error) { +func AddLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids []int64, tables []*ast.TableName) (string, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - _, err := exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC") + err := startTransaction(ctx, exec) if err != nil { return "", err } @@ -64,7 +60,7 @@ func AddLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, table }() // Load tables to check duplicate before insert. - lockedTables, err := QueryLockedTables(ctx, exec) + lockedTables, err := QueryLockedTables(exec) if err != nil { return "", err } @@ -118,55 +114,15 @@ func generateSkippedTablesMessage(tids []int64, dupTables []string, action, stat return "" } -func insertIntoStatsTableLocked(ctx context.Context, exec sqlexec.SQLExecutor, tid int64) error { - _, err := exec.ExecuteInternal(ctx, "INSERT INTO mysql.stats_table_locked (table_id) VALUES (%?) ON DUPLICATE KEY UPDATE table_id = %?", tid, tid) +func insertIntoStatsTableLocked(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, tid int64) error { + _, _, err := exec.ExecRestrictedSQL( + ctx, + useCurrentSession, + insertSQL, tid, tid, + ) if err != nil { logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked", zap.String("category", "stats"), zap.Error(err)) return err } return nil } - -// QueryLockedTables loads locked tables from mysql.stats_table_locked. -// Return it as a map for fast query. -func QueryLockedTables(ctx context.Context, exec sqlexec.SQLExecutor) (map[int64]struct{}, error) { - recordSet, err := exec.ExecuteInternal(ctx, "SELECT table_id FROM mysql.stats_table_locked") - if err != nil { - return nil, err - } - rows, err := sqlexec.DrainRecordSet(ctx, recordSet, maxChunkSize) - if err != nil { - return nil, err - } - tableLocked := make(map[int64]struct{}, len(rows)) - for _, row := range rows { - tableLocked[row.GetInt64(0)] = struct{}{} - } - return tableLocked, nil -} - -// GetTablesLockedStatuses check whether table is locked. -func GetTablesLockedStatuses(tableLocked map[int64]struct{}, tableIDs ...int64) map[int64]bool { - lockedTableStatus := make(map[int64]bool, len(tableIDs)) - - for _, tid := range tableIDs { - if _, ok := tableLocked[tid]; ok { - lockedTableStatus[tid] = true - continue - } - lockedTableStatus[tid] = false - } - - return lockedTableStatus -} - -// finishTransaction will execute `commit` when error is nil, otherwise `rollback`. -func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error { - if err == nil { - _, err = exec.ExecuteInternal(ctx, "commit") - } else { - _, err1 := exec.ExecuteInternal(ctx, "rollback") - terror.Log(errors.Trace(err1)) - } - return errors.Trace(err) -} diff --git a/statistics/handle/lockstats/lock_stats_test.go b/statistics/handle/lockstats/lock_stats_test.go index 65c0b45d1a453..e528dda58581d 100644 --- a/statistics/handle/lockstats/lock_stats_test.go +++ b/statistics/handle/lockstats/lock_stats_test.go @@ -15,9 +15,18 @@ package lockstats import ( + "context" "testing" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec/mock" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" ) func TestGenerateDuplicateTablesMessage(t *testing.T) { @@ -77,3 +86,91 @@ func TestGenerateDuplicateTablesMessage(t *testing.T) { }) } } + +func TestInsertIntoStatsTableLocked(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + exec := mock.NewMockRestrictedSQLExecutor(ctrl) + + // Executed SQL should be: + exec.EXPECT().ExecRestrictedSQL( + gomock.Eq(ctx), + useCurrentSession, + gomock.Eq(insertSQL), + gomock.Eq([]interface{}{int64(1), int64(1)}), + ) + err := insertIntoStatsTableLocked(ctx, exec, 1) + require.NoError(t, err) + + // Error should be returned when ExecRestrictedSQL returns error. + exec.EXPECT().ExecRestrictedSQL( + gomock.Any(), + gomock.Any(), + gomock.Any(), + gomock.Any(), + ).Return(nil, nil, errors.New("test error")) + + err = insertIntoStatsTableLocked(ctx, exec, 1) + require.Equal(t, "test error", err.Error()) +} + +func TestAddLockedTables(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + exec := mock.NewMockRestrictedSQLExecutor(ctrl) + + // Executed SQL should be: + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + gomock.Eq("BEGIN PESSIMISTIC"), + ) + // Return table 1 is locked. + c := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1) + c.AppendInt64(0, int64(1)) + rows := []chunk.Row{c.GetRow(0)} + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + selectSQL, + ).Return(rows, nil, nil) + + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + insertSQL, + gomock.Eq([]interface{}{int64(2), int64(2)}), + ) + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + insertSQL, + gomock.Eq([]interface{}{int64(3), int64(3)}), + ) + + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + insertSQL, + gomock.Eq([]interface{}{int64(4), int64(4)}), + ) + + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + "COMMIT", + ) + + msg, err := AddLockedTables( + exec, + []int64{1, 2, 3}, + []int64{4}, + []*ast.TableName{ + {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t1")}, + {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t2")}, + {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t3")}}, + ) + require.NoError(t, err) + require.Equal(t, "skip locking locked tables: test.t1, other tables locked successfully", msg) +} diff --git a/statistics/handle/lockstats/query_lock.go b/statistics/handle/lockstats/query_lock.go new file mode 100644 index 0000000000000..739c84d4d2d16 --- /dev/null +++ b/statistics/handle/lockstats/query_lock.go @@ -0,0 +1,72 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lockstats + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/util/sqlexec" +) + +var selectSQL = "SELECT table_id FROM mysql.stats_table_locked" + +// QueryLockedTables loads locked tables from mysql.stats_table_locked. +// Return it as a map for fast query. +func QueryLockedTables(exec sqlexec.RestrictedSQLExecutor) (map[int64]struct{}, error) { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + rows, _, err := exec.ExecRestrictedSQL(ctx, useCurrentSession, selectSQL) + if err != nil { + return nil, err + } + tableLocked := make(map[int64]struct{}, len(rows)) + for _, row := range rows { + tableLocked[row.GetInt64(0)] = struct{}{} + } + return tableLocked, nil +} + +// GetTablesLockedStatuses check whether table is locked. +func GetTablesLockedStatuses(tableLocked map[int64]struct{}, tableIDs ...int64) map[int64]bool { + lockedTableStatus := make(map[int64]bool, len(tableIDs)) + + for _, tid := range tableIDs { + if _, ok := tableLocked[tid]; ok { + lockedTableStatus[tid] = true + continue + } + lockedTableStatus[tid] = false + } + + return lockedTableStatus +} + +func startTransaction(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) error { + _, _, err := exec.ExecRestrictedSQL(ctx, useCurrentSession, "BEGIN PESSIMISTIC") + return errors.Trace(err) +} + +// finishTransaction will execute `commit` when error is nil, otherwise `rollback`. +func finishTransaction(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, err error) error { + if err == nil { + _, _, err = exec.ExecRestrictedSQL(ctx, useCurrentSession, "COMMIT") + } else { + _, _, err1 := exec.ExecRestrictedSQL(ctx, useCurrentSession, "ROLLBACK") + terror.Log(errors.Trace(err1)) + } + return errors.Trace(err) +} diff --git a/statistics/handle/lockstats/query_lock_test.go b/statistics/handle/lockstats/query_lock_test.go new file mode 100644 index 0000000000000..292e854803b4a --- /dev/null +++ b/statistics/handle/lockstats/query_lock_test.go @@ -0,0 +1,157 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lockstats + +import ( + "context" + "testing" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec/mock" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" + "go.uber.org/mock/gomock" +) + +func TestGetTablesLockedStatuses(t *testing.T) { + tests := []struct { + name string + tableLocked map[int64]struct{} + tableIDs []int64 + want map[int64]bool + }{ + { + name: "not locked", + tableLocked: map[int64]struct{}{}, + tableIDs: []int64{1, 2, 3}, + want: map[int64]bool{ + 1: false, + 2: false, + 3: false, + }, + }, + { + name: "locked", + tableLocked: map[int64]struct{}{1: {}, 2: {}}, + tableIDs: []int64{1, 2, 3}, + want: map[int64]bool{ + 1: true, + 2: true, + 3: false, + }, + }, + { + name: "empty", + tableLocked: map[int64]struct{}{}, + tableIDs: []int64{}, + want: map[int64]bool{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := GetTablesLockedStatuses(tt.tableLocked, tt.tableIDs...) + require.Equal(t, tt.want, got) + }) + } +} + +func TestQueryLockedTables(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + exec := mock.NewMockRestrictedSQLExecutor(ctrl) + + tests := []struct { + name string + numRows int + wantLen int + wantError bool + }{ + { + name: "Empty result", + numRows: 0, + wantLen: 0, + }, + { + name: "One table", + numRows: 1, + wantLen: 1, + }, + { + name: "Two tables", + numRows: 2, + wantLen: 2, + }, + { + name: "Error", + numRows: 0, + wantError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := executeQueryLockedTables(exec, tt.numRows, tt.wantError) + if tt.wantError { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + +type ctxMatcher struct{} + +func (c *ctxMatcher) Matches(x interface{}) bool { + ctx := x.(context.Context) + s := util.RequestSourceFromCtx(ctx) + return s == util.InternalRequest+"_"+kv.InternalTxnStats +} + +func (c *ctxMatcher) String() string { + return "all txns should be internal_stats source" +} + +func executeQueryLockedTables(exec *mock.MockRestrictedSQLExecutor, numRows int, wantErr bool) (map[int64]struct{}, error) { + if wantErr { + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + selectSQL, + ).Return(nil, nil, errors.New("error")) + return QueryLockedTables(exec) + } + + c := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, numRows) + for i := 0; i < numRows; i++ { + c.AppendInt64(0, int64(i+1)) + } + var rows []chunk.Row + for i := 0; i < numRows; i++ { + rows = append(rows, c.GetRow(i)) + } + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + selectSQL, + ).Return(rows, nil, nil) + + return QueryLockedTables(exec) +} diff --git a/statistics/handle/lockstats/unlock_stats.go b/statistics/handle/lockstats/unlock_stats.go index cbbba84ea4926..c1f120e08af6e 100644 --- a/statistics/handle/lockstats/unlock_stats.go +++ b/statistics/handle/lockstats/unlock_stats.go @@ -25,16 +25,22 @@ import ( "go.uber.org/zap" ) +var ( + selectDeltaSQL = "SELECT count, modify_count, version FROM mysql.stats_table_locked WHERE table_id = %?" + updateDeltaSQL = "UPDATE mysql.stats_meta SET version = %?, count = count + %?, modify_count = modify_count + %? WHERE table_id = %?" + deleteLockSQL = "DELETE FROM mysql.stats_table_locked WHERE table_id = %?" +) + // RemoveLockedTables remove tables from table locked array. // - exec: sql executor. // - tids: table ids of which will be unlocked. // - pids: partition ids of which will be unlocked. // - tables: table names of which will be unlocked. // Return the message of skipped tables and error. -func RemoveLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, tables []*ast.TableName) (string, error) { +func RemoveLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids []int64, tables []*ast.TableName) (string, error) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) - _, err := exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC") + err := startTransaction(ctx, exec) if err != nil { return "", err } @@ -44,7 +50,7 @@ func RemoveLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, ta }() // Load tables to check locked before delete. - lockedTables, err := QueryLockedTables(ctx, exec) + lockedTables, err := QueryLockedTables(exec) if err != nil { return "", err } @@ -58,7 +64,7 @@ func RemoveLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, ta skippedTables = append(skippedTables, tables[i].Schema.L+"."+tables[i].Name.L) continue } - if err := updateStatsAndUnlockTable(ctx, exec, tid, maxChunkSize); err != nil { + if err := updateStatsAndUnlockTable(ctx, exec, tid); err != nil { return "", err } } @@ -69,7 +75,7 @@ func RemoveLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, ta if !lockedStatuses[pid] { continue } - if err := updateStatsAndUnlockTable(ctx, exec, pid, maxChunkSize); err != nil { + if err := updateStatsAndUnlockTable(ctx, exec, pid); err != nil { return "", err } } @@ -79,30 +85,37 @@ func RemoveLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, ta return msg, err } -func updateStatsAndUnlockTable(ctx context.Context, exec sqlexec.SQLExecutor, tid int64, maxChunkSize int) error { - count, modifyCount, version, err := getStatsDeltaFromTableLocked(ctx, tid, exec, maxChunkSize) +func updateStatsAndUnlockTable(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, tid int64) error { + count, modifyCount, version, err := getStatsDeltaFromTableLocked(ctx, tid, exec) if err != nil { return err } - if _, err := exec.ExecuteInternal(ctx, - "UPDATE mysql.stats_meta SET version = %?, count = count + %?, modify_count = modify_count + %? WHERE table_id = %?", - version, count, modifyCount, tid); err != nil { + if _, _, err := exec.ExecRestrictedSQL( + ctx, + useCurrentSession, + updateDeltaSQL, + version, count, modifyCount, tid, + ); err != nil { return err } cache.TableRowStatsCache.Invalidate(tid) - _, err = exec.ExecuteInternal(ctx, "DELETE FROM mysql.stats_table_locked WHERE table_id = %?", tid) + _, _, err = exec.ExecRestrictedSQL( + ctx, + useCurrentSession, + deleteLockSQL, tid, + ) return err } // getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked. -func getStatsDeltaFromTableLocked(ctx context.Context, tableID int64, exec sqlexec.SQLExecutor, maxChunkSize int) (count, modifyCount int64, version uint64, err error) { - recordSet, err := exec.ExecuteInternal(ctx, "SELECT count, modify_count, version FROM mysql.stats_table_locked WHERE table_id = %?", tableID) - if err != nil { - return 0, 0, 0, errors.Trace(err) - } - rows, err := sqlexec.DrainRecordSet(ctx, recordSet, maxChunkSize) +func getStatsDeltaFromTableLocked(ctx context.Context, tableID int64, exec sqlexec.RestrictedSQLExecutor) (count, modifyCount int64, version uint64, err error) { + rows, _, err := exec.ExecRestrictedSQL( + ctx, + useCurrentSession, + selectDeltaSQL, tableID, + ) if err != nil { return 0, 0, 0, errors.Trace(err) } diff --git a/statistics/handle/lockstats/unlock_stats_test.go b/statistics/handle/lockstats/unlock_stats_test.go new file mode 100644 index 0000000000000..e2a67aa65066c --- /dev/null +++ b/statistics/handle/lockstats/unlock_stats_test.go @@ -0,0 +1,235 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lockstats + +import ( + "context" + "testing" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec/mock" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestGetStatsDeltaFromTableLocked(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + exec := mock.NewMockRestrictedSQLExecutor(ctrl) + + tests := []struct { + name string + expectedCount int64 + expectedModifyCount int64 + expectedVersion uint64 + execResult []chunk.Row + execError error + }{ + { + name: "No rows", + expectedCount: 0, + expectedModifyCount: 0, + expectedVersion: 0, + execResult: nil, + execError: nil, + }, + { + name: "One row", + expectedCount: 1, + expectedModifyCount: 1, + expectedVersion: 1000, + execResult: []chunk.Row{ + createStatsDeltaRow(1, 1, 1000), + }, + execError: nil, + }, + { + name: "Error", + expectedCount: 0, + expectedModifyCount: 0, + expectedVersion: 0, + execResult: nil, + execError: errors.New("test error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + exec.EXPECT().ExecRestrictedSQL( + ctx, + useCurrentSession, + selectDeltaSQL, + gomock.Eq([]interface{}{int64(1)}), + ).Return(tt.execResult, nil, tt.execError) + + count, modifyCount, version, err := getStatsDeltaFromTableLocked(ctx, 1, exec) + if tt.execError != nil { + require.Equal(t, tt.execError.Error(), err.Error()) + } else { + require.NoError(t, err) + require.Equal(t, tt.expectedCount, count) + require.Equal(t, tt.expectedModifyCount, modifyCount) + require.Equal(t, tt.expectedVersion, version) + } + }) + } +} + +func createStatsDeltaRow(count, modifyCount int64, version uint64) chunk.Row { + c := chunk.NewChunkWithCapacity( + []*types.FieldType{ + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeLonglong), + types.NewFieldType(mysql.TypeLonglong), + }, + 1, + ) + c.AppendInt64(0, count) + c.AppendInt64(1, modifyCount) + c.AppendUint64(2, version) + return c.GetRow(0) +} + +func TestUpdateStatsAndUnlockTable(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + exec := mock.NewMockRestrictedSQLExecutor(ctrl) + + tests := []struct { + name string + tableID int64 + execError error + }{ + { + name: "Success", + tableID: 1, + execError: nil, + }, + { + name: "Error", + tableID: 1, + execError: errors.New("test error"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + exec.EXPECT().ExecRestrictedSQL( + ctx, + useCurrentSession, + selectDeltaSQL, + gomock.Eq([]interface{}{tt.tableID}), + ).Return([]chunk.Row{createStatsDeltaRow(1, 1, 1000)}, nil, nil) + + if tt.execError == nil { + exec.EXPECT().ExecRestrictedSQL( + ctx, + useCurrentSession, + updateDeltaSQL, + gomock.Eq([]interface{}{uint64(1000), int64(1), int64(1), int64(1)}), + ).Return(nil, nil, nil) + exec.EXPECT().ExecRestrictedSQL( + ctx, + useCurrentSession, + deleteLockSQL, + gomock.Eq([]interface{}{tt.tableID}), + ).Return(nil, nil, nil) + } else { + exec.EXPECT().ExecRestrictedSQL( + ctx, + useCurrentSession, + updateDeltaSQL, + gomock.Eq([]interface{}{uint64(1000), int64(1), int64(1), int64(1)}), + ).Return(nil, nil, tt.execError) + } + + err := updateStatsAndUnlockTable(ctx, exec, tt.tableID) + if tt.execError != nil { + require.Equal(t, tt.execError.Error(), err.Error()) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestRemoveLockedTables(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + exec := mock.NewMockRestrictedSQLExecutor(ctrl) + + // Executed SQL should be: + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + gomock.Eq("BEGIN PESSIMISTIC"), + ) + + // Return table 1 is locked. + c := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1) + c.AppendInt64(0, int64(1)) + rows := []chunk.Row{c.GetRow(0)} + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + selectSQL, + ).Return(rows, nil, nil) + + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + selectDeltaSQL, + gomock.Eq([]interface{}{int64(1)}), + ).Return([]chunk.Row{createStatsDeltaRow(1, 1, 1000)}, nil, nil) + + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + updateDeltaSQL, + gomock.Eq([]interface{}{uint64(1000), int64(1), int64(1), int64(1)}), + ).Return(nil, nil, nil) + + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + deleteLockSQL, + gomock.Eq([]interface{}{int64(1)}), + ).Return(nil, nil, nil) + + exec.EXPECT().ExecRestrictedSQL( + gomock.All(&ctxMatcher{}), + useCurrentSession, + "COMMIT", + ) + + msg, err := RemoveLockedTables( + exec, + []int64{1, 2, 3}, + []int64{4}, + []*ast.TableName{ + {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t1")}, + {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t2")}, + {Schema: model.NewCIStr("test"), Name: model.NewCIStr("t3")}}, + ) + require.NoError(t, err) + require.Equal(t, "skip unlocking unlocked tables: test.t2, test.t3, other tables unlocked successfully", msg) +} diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 7de59b92939ca..080229487941b 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -452,7 +452,7 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up } startTS := txn.StartTS() updateStatsMeta := func(id int64) error { - lockedStatuses, err := h.queryTablesLockedStatuses(id) + lockedStatuses, err := h.QueryTablesLockedStatuses(id) if err != nil { return errors.Trace(err) } diff --git a/util/sqlexec/mock/BUILD.bazel b/util/sqlexec/mock/BUILD.bazel new file mode 100644 index 0000000000000..e9166a55e758f --- /dev/null +++ b/util/sqlexec/mock/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "mock", + srcs = ["restricted_sql_executor_mock.go"], + importpath = "github.com/pingcap/tidb/util/sqlexec/mock", + visibility = ["//visibility:public"], + deps = [ + "//parser/ast", + "//util/chunk", + "//util/sqlexec", + "@org_uber_go_mock//gomock", + ], +) diff --git a/util/sqlexec/mock/restricted_sql_executor_mock.go b/util/sqlexec/mock/restricted_sql_executor_mock.go new file mode 100644 index 0000000000000..1321bf09fd83b --- /dev/null +++ b/util/sqlexec/mock/restricted_sql_executor_mock.go @@ -0,0 +1,100 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/pingcap/tidb/util/sqlexec (interfaces: RestrictedSQLExecutor) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + ast "github.com/pingcap/tidb/parser/ast" + chunk "github.com/pingcap/tidb/util/chunk" + sqlexec "github.com/pingcap/tidb/util/sqlexec" + gomock "go.uber.org/mock/gomock" +) + +// MockRestrictedSQLExecutor is a mock of RestrictedSQLExecutor interface. +type MockRestrictedSQLExecutor struct { + ctrl *gomock.Controller + recorder *MockRestrictedSQLExecutorMockRecorder +} + +// MockRestrictedSQLExecutorMockRecorder is the mock recorder for MockRestrictedSQLExecutor. +type MockRestrictedSQLExecutorMockRecorder struct { + mock *MockRestrictedSQLExecutor +} + +// NewMockRestrictedSQLExecutor creates a new mock instance. +func NewMockRestrictedSQLExecutor(ctrl *gomock.Controller) *MockRestrictedSQLExecutor { + mock := &MockRestrictedSQLExecutor{ctrl: ctrl} + mock.recorder = &MockRestrictedSQLExecutorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRestrictedSQLExecutor) EXPECT() *MockRestrictedSQLExecutorMockRecorder { + return m.recorder +} + +// ExecRestrictedSQL mocks base method. +func (m *MockRestrictedSQLExecutor) ExecRestrictedSQL(arg0 context.Context, arg1 []func(*sqlexec.ExecOption), arg2 string, arg3 ...interface{}) ([]chunk.Row, []*ast.ResultField, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1, arg2} + for _, a := range arg3 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ExecRestrictedSQL", varargs...) + ret0, _ := ret[0].([]chunk.Row) + ret1, _ := ret[1].([]*ast.ResultField) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ExecRestrictedSQL indicates an expected call of ExecRestrictedSQL. +func (mr *MockRestrictedSQLExecutorMockRecorder) ExecRestrictedSQL(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1, arg2}, arg3...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecRestrictedSQL", reflect.TypeOf((*MockRestrictedSQLExecutor)(nil).ExecRestrictedSQL), varargs...) +} + +// ExecRestrictedStmt mocks base method. +func (m *MockRestrictedSQLExecutor) ExecRestrictedStmt(arg0 context.Context, arg1 ast.StmtNode, arg2 ...func(*sqlexec.ExecOption)) ([]chunk.Row, []*ast.ResultField, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ExecRestrictedStmt", varargs...) + ret0, _ := ret[0].([]chunk.Row) + ret1, _ := ret[1].([]*ast.ResultField) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// ExecRestrictedStmt indicates an expected call of ExecRestrictedStmt. +func (mr *MockRestrictedSQLExecutorMockRecorder) ExecRestrictedStmt(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExecRestrictedStmt", reflect.TypeOf((*MockRestrictedSQLExecutor)(nil).ExecRestrictedStmt), varargs...) +} + +// ParseWithParams mocks base method. +func (m *MockRestrictedSQLExecutor) ParseWithParams(arg0 context.Context, arg1 string, arg2 ...interface{}) (ast.StmtNode, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ParseWithParams", varargs...) + ret0, _ := ret[0].(ast.StmtNode) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ParseWithParams indicates an expected call of ParseWithParams. +func (mr *MockRestrictedSQLExecutorMockRecorder) ParseWithParams(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ParseWithParams", reflect.TypeOf((*MockRestrictedSQLExecutor)(nil).ParseWithParams), varargs...) +}