Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: use session pool when locking or unlocking table stats #46611

Merged
merged 16 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ header:
- "**/OWNERS"
- "OWNERS_ALIASES"
- "disttask/framework/mock/*_mock.go"
- "util/sqlexec/mock/*_mock.go"
comment: on-failure
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ mock_lightning: tools/bin/mockgen
gen_mock: tools/bin/mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/scheduler TaskTable,SubtaskExecutor,Pool,Scheduler,InternalScheduler > disttask/framework/mock/scheduler_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/disttask/framework/planner LogicalPlan,PipelineSpec > disttask/framework/mock/plan_mock.go

tools/bin/mockgen -package mock github.com/pingcap/tidb/util/sqlexec RestrictedSQLExecutor > util/sqlexec/mock/restricted_sql_executor_mock.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
# cgo enabled on regular builds is performance.
Expand Down
1 change: 0 additions & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, inf
} else {
tbl, ok := infoSchema.TableByID(tid)
if !ok {
// Ignore this table because it may have been dropped.
logutil.BgLogger().Warn("Unknown table ID in analyze task", zap.Int64("tid", tid))
} else {
skippedTables = append(skippedTables, tbl.Meta().Name.L)
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/handletest/statslock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go_test(
timeout = "short",
srcs = [
"main_test.go",
"stats_lcok_test.go",
"stats_lock_test.go",
],
flaky = True,
shard_count = 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ func TestStatsLockAndUnlockTables(t *testing.T) {
}

tk.MustExec("lock stats t1, t2")

rows := tk.MustQuery("select count(*) from mysql.stats_table_locked").Rows()
num, _ := strconv.Atoi(rows[0][0].(string))
require.Equal(t, num, 2)
Expand Down
59 changes: 31 additions & 28 deletions statistics/handle/lock_stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package handle

import (
"context"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/statistics/handle/lockstats"
"github.com/pingcap/tidb/util/sqlexec"
Expand All @@ -29,9 +27,15 @@ import (
// - tables: table names of which will be locked.
// Return the message of skipped tables and error.
func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
Copy link
Contributor

@qw4990 qw4990 Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR (in the next few PRs), can we thoroughly detach(decouple) StatsLock from StatsHandler? Specifically, can we remove all Handle.XXLockedXX methods (all methods in this file lock_stats_handler.go) and move them into a new separate package?

Logically, if we implement these XXLockedXX methods on Handler, we think the management of locked stats is part of the responsibility of Handler, although Handler is big enough and it already has too many responsibilities.
Another way is to let the Handler just as a consumer of locked stats info. It doesn't update locked stats info and only reads locked stats info from the storage and reacts to it accordingly.
I'm not sure whether this can simplify the design and implementation, what do you think of this? @hi-rustin
(I'm not sure whether I explain it clearly...

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this really helps. If we want to split the entry from the handle to stats locker. Then we need to put it in the domain or some other global structurations. I guess using the stats handle as an entry point for stats logic is fine. The problem with the handle is that we put too much business logic in the entry point.

Right now, the lock/unlock stats handler is pretty simple:

// AddLockedTables add locked tables id to store.
// - tids: table ids of which will be locked.
// - pids: partition ids of which will be locked.
// - tables: table names of which will be locked.
// Return the message of skipped tables and error.
func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
	se, err := h.pool.Get()
	if err != nil {
		return "", errors.Trace(err)
	}
	defer h.pool.Put(se)

	exec := se.(sqlexec.RestrictedSQLExecutor)

	return lockstats.AddLockedTables(exec, tids, pids, tables)
}

// RemoveLockedTables remove tables from table locked array.
// - tids: table ids of which will be unlocked.
// - pids: partition ids of which will be unlocked.
// - tables: table names of which will be unlocked.
// Return the message of skipped tables and error.
func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
	se, err := h.pool.Get()
	if err != nil {
		return "", errors.Trace(err)
	}
	defer h.pool.Put(se)

	exec := se.(sqlexec.RestrictedSQLExecutor)
	return lockstats.RemoveLockedTables(exec, tids, pids, tables)
}

// QueryTablesLockedStatuses query whether table is locked.
// Note: This function query locked tables from store, so please try to batch the query.
func (h *Handle) QueryTablesLockedStatuses(tableIDs ...int64) (map[int64]bool, error) {
	tableLocked, err := h.queryLockedTables()
	if err != nil {
		return nil, err
	}

	return lockstats.GetTablesLockedStatuses(tableLocked, tableIDs...), nil
}

// queryLockedTables query locked tables from store.
func (h *Handle) queryLockedTables() (map[int64]struct{}, error) {
	se, err := h.pool.Get()
	if err != nil {
		return nil, errors.Trace(err)
	}
	defer h.pool.Put(se)

	exec := se.(sqlexec.RestrictedSQLExecutor)
	return lockstats.QueryLockedTables(exec)
}

But I can try it recently to see if this helps. Do you have any suggestions on where to put the `statsLocker/statsUnlocker/statsQuerier'?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then in the feature, we can break the huge StatsHandler down to multiple smaller sub-components.
This makes our code easier to maintain (maintaining multiple smaller and simpler components is better than maintaining a big and complex component).
And these sub-components rely on the storage to synchronize information instead of relying on memory status, which is much safer.

image

h.mu.Lock()
defer h.mu.Unlock()
return lockstats.AddLockedTables(h.mu.ctx.(sqlexec.SQLExecutor), tids, pids, tables)
se, err := h.pool.Get()
if err != nil {
return "", errors.Trace(err)
}
defer h.pool.Put(se)

exec := se.(sqlexec.RestrictedSQLExecutor)

return lockstats.AddLockedTables(exec, tids, pids, tables)
}

// RemoveLockedTables remove tables from table locked array.
Expand All @@ -40,41 +44,40 @@ func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.Table
// - tables: table names of which will be unlocked.
// Return the message of skipped tables and error.
func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
h.mu.Lock()
defer h.mu.Unlock()
se, err := h.pool.Get()
if err != nil {
return "", errors.Trace(err)
}
defer h.pool.Put(se)

return lockstats.RemoveLockedTables(h.mu.ctx.(sqlexec.SQLExecutor), tids, pids, tables)
exec := se.(sqlexec.RestrictedSQLExecutor)
return lockstats.RemoveLockedTables(exec, tids, pids, tables)
}

// QueryTablesLockedStatuses query whether table is locked in handle with Handle.Mutex.
// QueryTablesLockedStatuses query whether table is locked.
// Note: This function query locked tables from store, so please try to batch the query.
func (h *Handle) QueryTablesLockedStatuses(tableIDs ...int64) (map[int64]bool, error) {
h.mu.Lock()
defer h.mu.Unlock()
return h.queryTablesLockedStatuses(tableIDs...)
}

// queryTablesLockedStatuses query whether table is locked in handle without Handle.Mutex
// Note: This function query locked tables from store, so please try to batch the query.
func (h *Handle) queryTablesLockedStatuses(tableIDs ...int64) (map[int64]bool, error) {
tableLocked, err := h.queryLockedTablesWithoutLock()
tableLocked, err := h.queryLockedTables()
if err != nil {
return nil, err
}

return lockstats.GetTablesLockedStatuses(tableLocked, tableIDs...), nil
}

// queryLockedTablesWithoutLock query locked tables from store without Handle.Mutex.
func (h *Handle) queryLockedTablesWithoutLock() (map[int64]struct{}, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
exec := h.mu.ctx.(sqlexec.SQLExecutor)
// queryLockedTables query locked tables from store.
func (h *Handle) queryLockedTables() (map[int64]struct{}, error) {
se, err := h.pool.Get()
if err != nil {
return nil, errors.Trace(err)
}
defer h.pool.Put(se)

return lockstats.QueryLockedTables(ctx, exec)
exec := se.(sqlexec.RestrictedSQLExecutor)
return lockstats.QueryLockedTables(exec)
}

// GetTableLockedAndClearForTest for unit test only
// GetTableLockedAndClearForTest for unit test only.
func (h *Handle) GetTableLockedAndClearForTest() (map[int64]struct{}, error) {
h.mu.Lock()
defer h.mu.Unlock()
return h.queryLockedTablesWithoutLock()
return h.queryLockedTables()
}
22 changes: 20 additions & 2 deletions statistics/handle/lockstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "lockstats",
srcs = [
"lock_stats.go",
"query_lock.go",
"unlock_stats.go",
],
importpath = "github.com/pingcap/tidb/statistics/handle/lockstats",
Expand All @@ -23,8 +24,25 @@ go_library(
go_test(
name = "lockstats_test",
timeout = "short",
srcs = ["lock_stats_test.go"],
srcs = [
"lock_stats_test.go",
"query_lock_test.go",
"unlock_stats_test.go",
],
embed = [":lockstats"],
flaky = True,
deps = ["@com_github_stretchr_testify//require"],
shard_count = 8,
deps = [
"//kv",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//types",
"//util/chunk",
"//util/sqlexec/mock",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_mock//gomock",
],
)
68 changes: 12 additions & 56 deletions statistics/handle/lockstats/lock_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ import (
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
Expand All @@ -36,13 +34,11 @@ const (
)

var (
// maxChunkSize is the max chunk size for load locked tables.
// We use 1024 as the default value, which is the same as the default value of session.maxChunkSize.
// The reason why we don't use session.maxChunkSize is that we don't want to introduce a new dependency.
// See: https://github.com/pingcap/tidb/pull/46478#discussion_r1308786474
maxChunkSize = 1024
// Stats logger.
statsLogger = logutil.BgLogger().With(zap.String("category", "stats"))
// useCurrentSession to make sure the sql is executed in current session.
useCurrentSession = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}
insertSQL = "INSERT INTO mysql.stats_table_locked (table_id) VALUES (%?) ON DUPLICATE KEY UPDATE table_id = %?"
)

// AddLockedTables add locked tables id to store.
Expand All @@ -51,10 +47,10 @@ var (
// - pids: partition ids of which will be locked.
// - tables: table names of which will be locked.
// Return the message of skipped tables and error.
func AddLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
func AddLockedTables(exec sqlexec.RestrictedSQLExecutor, tids []int64, pids []int64, tables []*ast.TableName) (string, error) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)

_, err := exec.ExecuteInternal(ctx, "BEGIN PESSIMISTIC")
err := startTransaction(ctx, exec)
if err != nil {
return "", err
}
Expand All @@ -64,7 +60,7 @@ func AddLockedTables(exec sqlexec.SQLExecutor, tids []int64, pids []int64, table
}()

// Load tables to check duplicate before insert.
lockedTables, err := QueryLockedTables(ctx, exec)
lockedTables, err := QueryLockedTables(exec)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -118,55 +114,15 @@ func generateSkippedTablesMessage(tids []int64, dupTables []string, action, stat
return ""
}

func insertIntoStatsTableLocked(ctx context.Context, exec sqlexec.SQLExecutor, tid int64) error {
_, err := exec.ExecuteInternal(ctx, "INSERT INTO mysql.stats_table_locked (table_id) VALUES (%?) ON DUPLICATE KEY UPDATE table_id = %?", tid, tid)
func insertIntoStatsTableLocked(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, tid int64) error {
_, _, err := exec.ExecRestrictedSQL(
ctx,
useCurrentSession,
insertSQL, tid, tid,
)
if err != nil {
logutil.BgLogger().Error("error occurred when insert mysql.stats_table_locked", zap.String("category", "stats"), zap.Error(err))
return err
}
return nil
}

// QueryLockedTables loads locked tables from mysql.stats_table_locked.
// Return it as a map for fast query.
func QueryLockedTables(ctx context.Context, exec sqlexec.SQLExecutor) (map[int64]struct{}, error) {
recordSet, err := exec.ExecuteInternal(ctx, "SELECT table_id FROM mysql.stats_table_locked")
if err != nil {
return nil, err
}
rows, err := sqlexec.DrainRecordSet(ctx, recordSet, maxChunkSize)
if err != nil {
return nil, err
}
tableLocked := make(map[int64]struct{}, len(rows))
for _, row := range rows {
tableLocked[row.GetInt64(0)] = struct{}{}
}
return tableLocked, nil
}

// GetTablesLockedStatuses check whether table is locked.
func GetTablesLockedStatuses(tableLocked map[int64]struct{}, tableIDs ...int64) map[int64]bool {
lockedTableStatus := make(map[int64]bool, len(tableIDs))

for _, tid := range tableIDs {
if _, ok := tableLocked[tid]; ok {
lockedTableStatus[tid] = true
continue
}
lockedTableStatus[tid] = false
}

return lockedTableStatus
}

// finishTransaction will execute `commit` when error is nil, otherwise `rollback`.
func finishTransaction(ctx context.Context, exec sqlexec.SQLExecutor, err error) error {
if err == nil {
_, err = exec.ExecuteInternal(ctx, "commit")
} else {
_, err1 := exec.ExecuteInternal(ctx, "rollback")
terror.Log(errors.Trace(err1))
}
return errors.Trace(err)
}
97 changes: 97 additions & 0 deletions statistics/handle/lockstats/lock_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,18 @@
package lockstats

import (
"context"
"testing"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec/mock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

func TestGenerateDuplicateTablesMessage(t *testing.T) {
Expand Down Expand Up @@ -77,3 +86,91 @@ func TestGenerateDuplicateTablesMessage(t *testing.T) {
})
}
}

func TestInsertIntoStatsTableLocked(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
exec := mock.NewMockRestrictedSQLExecutor(ctrl)

// Executed SQL should be:
exec.EXPECT().ExecRestrictedSQL(
gomock.Eq(ctx),
useCurrentSession,
gomock.Eq(insertSQL),
gomock.Eq([]interface{}{int64(1), int64(1)}),
)
err := insertIntoStatsTableLocked(ctx, exec, 1)
require.NoError(t, err)

// Error should be returned when ExecRestrictedSQL returns error.
exec.EXPECT().ExecRestrictedSQL(
gomock.Any(),
gomock.Any(),
gomock.Any(),
gomock.Any(),
).Return(nil, nil, errors.New("test error"))

err = insertIntoStatsTableLocked(ctx, exec, 1)
require.Equal(t, "test error", err.Error())
}

func TestAddLockedTables(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
exec := mock.NewMockRestrictedSQLExecutor(ctrl)

// Executed SQL should be:
exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
gomock.Eq("BEGIN PESSIMISTIC"),
)
// Return table 1 is locked.
c := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1)
c.AppendInt64(0, int64(1))
rows := []chunk.Row{c.GetRow(0)}
exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
selectSQL,
).Return(rows, nil, nil)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
insertSQL,
gomock.Eq([]interface{}{int64(2), int64(2)}),
)
exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
insertSQL,
gomock.Eq([]interface{}{int64(3), int64(3)}),
)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
insertSQL,
gomock.Eq([]interface{}{int64(4), int64(4)}),
)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
"COMMIT",
)

msg, err := AddLockedTables(
exec,
[]int64{1, 2, 3},
[]int64{4},
[]*ast.TableName{
{Schema: model.NewCIStr("test"), Name: model.NewCIStr("t1")},
{Schema: model.NewCIStr("test"), Name: model.NewCIStr("t2")},
{Schema: model.NewCIStr("test"), Name: model.NewCIStr("t3")}},
)
require.NoError(t, err)
require.Equal(t, "skip locking locked tables: test.t1, other tables locked successfully", msg)
}
Loading