From 5c4b328b0256be31618ac3601e35ce03ab742393 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Fri, 30 Sep 2022 14:35:45 -0400 Subject: [PATCH] variables: enable variable hook to access storage (#38227) close pingcap/tidb#38225 --- br/pkg/lightning/backend/kv/session.go | 10 +-- distsql/distsql_test.go | 4 +- distsql/request_builder_test.go | 14 ++-- executor/executor_pkg_test.go | 2 +- session/bootstrap_test.go | 2 +- session/schema_amender_test.go | 2 +- session/session.go | 5 +- sessionctx/variable/mock_globalaccessor.go | 2 +- .../variable/mock_globalaccessor_test.go | 2 +- sessionctx/variable/session.go | 10 ++- sessionctx/variable/session_test.go | 20 +++++- sessionctx/variable/sysvar_test.go | 66 +++++++++---------- sessionctx/variable/variable_test.go | 24 +++---- sessionctx/variable/varsutil_test.go | 14 ++-- table/tables/mutation_checker_test.go | 4 +- util/mock/context.go | 8 +-- 16 files changed, 106 insertions(+), 83 deletions(-) diff --git a/br/pkg/lightning/backend/kv/session.go b/br/pkg/lightning/backend/kv/session.go index 54a761e3d1470..1cc261b677fe4 100644 --- a/br/pkg/lightning/backend/kv/session.go +++ b/br/pkg/lightning/backend/kv/session.go @@ -253,8 +253,11 @@ func NewSession(options *SessionOptions, logger log.Logger) sessionctx.Context { } func newSession(options *SessionOptions, logger log.Logger) *session { + s := &session{ + values: make(map[fmt.Stringer]interface{}, 1), + } sqlMode := options.SQLMode - vars := variable.NewSessionVars() + vars := variable.NewSessionVars(s) vars.SkipUTF8Check = true vars.StmtCtx.InInsertStmt = true vars.StmtCtx.BatchCheck = true @@ -289,10 +292,7 @@ func newSession(options *SessionOptions, logger log.Logger) *session { log.ShortError(err)) } vars.TxnCtx = nil - s := &session{ - vars: vars, - values: make(map[fmt.Stringer]interface{}, 1), - } + s.vars = vars s.txn.kvPairs = &KvPairs{} return s diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 50e21f6b66702..52aa62ba112fa 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -321,7 +321,7 @@ func createSelectNormalByBenchmarkTest(batch, totalRows int, ctx sessionctx.Cont SetDAGRequest(&tipb.DAGRequest{}). SetDesc(false). SetKeepOrder(false). - SetFromSessionVars(variable.NewSessionVars()). + SetFromSessionVars(variable.NewSessionVars(nil)). SetMemTracker(memory.NewTracker(-1, -1)). Build() @@ -390,7 +390,7 @@ func createSelectNormal(t *testing.T, batch, totalRows int, planIDs []int, sctx SetDAGRequest(&tipb.DAGRequest{}). SetDesc(false). SetKeepOrder(false). - SetFromSessionVars(variable.NewSessionVars()). + SetFromSessionVars(variable.NewSessionVars(nil)). SetMemTracker(memory.NewTracker(-1, -1)). Build() require.NoError(t, err) diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 53ea7ff4a3c56..0a11b9e04512c 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -235,7 +235,7 @@ func TestRequestBuilder1(t *testing.T) { SetDAGRequest(&tipb.DAGRequest{}). SetDesc(false). SetKeepOrder(false). - SetFromSessionVars(variable.NewSessionVars()). + SetFromSessionVars(variable.NewSessionVars(nil)). Build() require.NoError(t, err) expect := &kv.Request{ @@ -318,7 +318,7 @@ func TestRequestBuilder2(t *testing.T) { SetDAGRequest(&tipb.DAGRequest{}). SetDesc(false). SetKeepOrder(false). - SetFromSessionVars(variable.NewSessionVars()). + SetFromSessionVars(variable.NewSessionVars(nil)). Build() require.NoError(t, err) expect := &kv.Request{ @@ -371,7 +371,7 @@ func TestRequestBuilder3(t *testing.T) { SetDAGRequest(&tipb.DAGRequest{}). SetDesc(false). SetKeepOrder(false). - SetFromSessionVars(variable.NewSessionVars()). + SetFromSessionVars(variable.NewSessionVars(nil)). Build() require.NoError(t, err) expect := &kv.Request{ @@ -436,7 +436,7 @@ func TestRequestBuilder4(t *testing.T) { SetDAGRequest(&tipb.DAGRequest{}). SetDesc(false). SetKeepOrder(false). - SetFromSessionVars(variable.NewSessionVars()). + SetFromSessionVars(variable.NewSessionVars(nil)). Build() require.NoError(t, err) expect := &kv.Request{ @@ -543,7 +543,7 @@ func TestRequestBuilder7(t *testing.T) { // copy iterator variable into a new variable, see issue #27779 replicaRead := replicaRead t.Run(replicaRead.src, func(t *testing.T) { - vars := variable.NewSessionVars() + vars := variable.NewSessionVars(nil) vars.SetReplicaRead(replicaRead.replicaReadType) concurrency := 10 @@ -573,7 +573,7 @@ func TestRequestBuilder7(t *testing.T) { } func TestRequestBuilder8(t *testing.T) { - sv := variable.NewSessionVars() + sv := variable.NewSessionVars(nil) actual, err := (&RequestBuilder{}). SetFromSessionVars(sv). Build() @@ -640,7 +640,7 @@ func TestIndexRangesToKVRangesWithFbs(t *testing.T) { } func TestScanLimitConcurrency(t *testing.T) { - vars := variable.NewSessionVars() + vars := variable.NewSessionVars(nil) for _, tt := range []struct { tp tipb.ExecType limit uint64 diff --git a/executor/executor_pkg_test.go b/executor/executor_pkg_test.go index 3944948fbcfc8..6e8f2ba1e2921 100644 --- a/executor/executor_pkg_test.go +++ b/executor/executor_pkg_test.go @@ -258,7 +258,7 @@ func getGrowing(m aggPartialResultMapper) bool { } func TestFilterTemporaryTableKeys(t *testing.T) { - vars := variable.NewSessionVars() + vars := variable.NewSessionVars(nil) const tableID int64 = 3 vars.TxnCtx = &variable.TransactionContext{ TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{ diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index 6c3b459d8ee72..68ce4e8d3ac44 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -141,7 +141,7 @@ func TestBootstrapWithError(t *testing.T) { { se := &session{ store: store, - sessionVars: variable.NewSessionVars(), + sessionVars: variable.NewSessionVars(nil), } se.functionUsageMu.builtinFunctionUsage = make(telemetry.BuiltinFunctionsUsage) se.txn.init() diff --git a/session/schema_amender_test.go b/session/schema_amender_test.go index d2d746a8887b2..e82faba8e6e56 100644 --- a/session/schema_amender_test.go +++ b/session/schema_amender_test.go @@ -255,7 +255,7 @@ func TestAmendCollectAndGenMutations(t *testing.T) { defer func() { require.NoError(t, store.Close()) }() se := &session{ store: store, - sessionVars: variable.NewSessionVars(), + sessionVars: variable.NewSessionVars(nil), } se.mu.values = make(map[fmt.Stringer]interface{}) domain.BindDomain(se, domain.NewMockDomain()) diff --git a/session/session.go b/session/session.go index 3ee97f078f5b6..e40b82cf07052 100644 --- a/session/session.go +++ b/session/session.go @@ -2902,13 +2902,14 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { } s := &session{ store: store, - sessionVars: variable.NewSessionVars(), ddlOwnerManager: dom.DDL().OwnerManager(), client: store.GetClient(), mppClient: store.GetMPPClient(), stmtStats: stmtstats.CreateStatementStats(), sessionStatesHandlers: make(map[sessionstates.SessionStateType]sessionctx.SessionStatesHandler), } + s.sessionVars = variable.NewSessionVars(s) + s.functionUsageMu.builtinFunctionUsage = make(telemetry.BuiltinFunctionsUsage) if opt != nil && opt.PreparedPlanCache != nil { s.preparedPlanCache = opt.PreparedPlanCache @@ -2936,7 +2937,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, error) { s := &session{ store: store, - sessionVars: variable.NewSessionVars(), + sessionVars: variable.NewSessionVars(nil), client: store.GetClient(), mppClient: store.GetMPPClient(), stmtStats: stmtstats.CreateStatementStats(), diff --git a/sessionctx/variable/mock_globalaccessor.go b/sessionctx/variable/mock_globalaccessor.go index 969892dc6f736..c4bb86748e9fd 100644 --- a/sessionctx/variable/mock_globalaccessor.go +++ b/sessionctx/variable/mock_globalaccessor.go @@ -43,7 +43,7 @@ func NewMockGlobalAccessor4Tests() *MockGlobalAccessor { // mock.SessionVars = vars // vars.GlobalVarsAccessor = mock - tmp.SessionVars = NewSessionVars() + tmp.SessionVars = NewSessionVars(nil) // Set all sysvars to the default value for k, sv := range GetSysVars() { diff --git a/sessionctx/variable/mock_globalaccessor_test.go b/sessionctx/variable/mock_globalaccessor_test.go index f3a2051d38802..57ed4931a342d 100644 --- a/sessionctx/variable/mock_globalaccessor_test.go +++ b/sessionctx/variable/mock_globalaccessor_test.go @@ -23,7 +23,7 @@ import ( func TestMockAPI(t *testing.T) { defer view.Stop() - vars := NewSessionVars() + vars := NewSessionVars(nil) mock := NewMockGlobalAccessor4Tests() mock.SessionVars = vars vars.GlobalVarsAccessor = mock diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index db3606baf60b3..c4dc136ea3dec 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -598,6 +598,11 @@ func (s *SessionVars) GetUserVarType(name string) (*types.FieldType, bool) { return ft, ok } +// HookContext contains the necessary variables for executing set/get hook +type HookContext interface { + GetStore() kv.Storage +} + // SessionVars is to handle user-defined or global variables in the current session. type SessionVars struct { Concurrency @@ -1288,6 +1293,8 @@ type SessionVars struct { // LastPlanReplayerToken indicates the last plan replayer token LastPlanReplayerToken string + + HookContext } // GetPreparedStmtByName returns the prepared statement specified by stmtName. @@ -1478,7 +1485,7 @@ func (connInfo *ConnectionInfo) IsSecureTransport() bool { } // NewSessionVars creates a session vars object. -func NewSessionVars() *SessionVars { +func NewSessionVars(hctx HookContext) *SessionVars { vars := &SessionVars{ userVars: struct { lock sync.RWMutex @@ -1580,6 +1587,7 @@ func NewSessionVars() *SessionVars { TiFlashFastScan: DefTiFlashFastScan, EnableTiFlashReadForWriteStmt: DefTiDBEnableTiFlashReadForWriteStmt, ForeignKeyChecks: DefTiDBForeignKeyChecks, + HookContext: hctx, } vars.KVVars = tikvstore.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 6dc6812f85a03..91f1394499c97 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -26,6 +26,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" @@ -33,7 +34,7 @@ import ( ) func TestSetSystemVariable(t *testing.T) { - v := variable.NewSessionVars() + v := variable.NewSessionVars(nil) v.GlobalVarsAccessor = variable.NewMockGlobalAccessor4Tests() v.TimeZone = time.UTC mtx := new(sync.Mutex) @@ -291,7 +292,7 @@ func TestIsolationRead(t *testing.T) { config.UpdateGlobal(func(conf *config.Config) { conf.IsolationRead.Engines = []string{"tiflash", "tidb"} }) - sessVars := variable.NewSessionVars() + sessVars := variable.NewSessionVars(nil) _, ok := sessVars.IsolationReadEngines[kv.TiDB] require.True(t, ok) _, ok = sessVars.IsolationReadEngines[kv.TiKV] @@ -393,7 +394,7 @@ func TestTransactionContextSavepoint(t *testing.T) { } func TestGeneralPlanCacheStmt(t *testing.T) { - sessVars := variable.NewSessionVars() + sessVars := variable.NewSessionVars(nil) sessVars.GeneralPlanCacheSize = 100 sql1 := "select * from t where a>?" sql2 := "select * from t where a