Skip to content

Commit

Permalink
variables: enable variable hook to access storage (pingcap#38227)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored Sep 30, 2022
1 parent a6a97d4 commit 5c4b328
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 83 deletions.
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,11 @@ func NewSession(options *SessionOptions, logger log.Logger) sessionctx.Context {
}

func newSession(options *SessionOptions, logger log.Logger) *session {
s := &session{
values: make(map[fmt.Stringer]interface{}, 1),
}
sqlMode := options.SQLMode
vars := variable.NewSessionVars()
vars := variable.NewSessionVars(s)
vars.SkipUTF8Check = true
vars.StmtCtx.InInsertStmt = true
vars.StmtCtx.BatchCheck = true
Expand Down Expand Up @@ -289,10 +292,7 @@ func newSession(options *SessionOptions, logger log.Logger) *session {
log.ShortError(err))
}
vars.TxnCtx = nil
s := &session{
vars: vars,
values: make(map[fmt.Stringer]interface{}, 1),
}
s.vars = vars
s.txn.kvPairs = &KvPairs{}

return s
Expand Down
4 changes: 2 additions & 2 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func createSelectNormalByBenchmarkTest(batch, totalRows int, ctx sessionctx.Cont
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetFromSessionVars(variable.NewSessionVars(nil)).
SetMemTracker(memory.NewTracker(-1, -1)).
Build()

Expand Down Expand Up @@ -390,7 +390,7 @@ func createSelectNormal(t *testing.T, batch, totalRows int, planIDs []int, sctx
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetFromSessionVars(variable.NewSessionVars(nil)).
SetMemTracker(memory.NewTracker(-1, -1)).
Build()
require.NoError(t, err)
Expand Down
14 changes: 7 additions & 7 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestRequestBuilder1(t *testing.T) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetFromSessionVars(variable.NewSessionVars(nil)).
Build()
require.NoError(t, err)
expect := &kv.Request{
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestRequestBuilder2(t *testing.T) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetFromSessionVars(variable.NewSessionVars(nil)).
Build()
require.NoError(t, err)
expect := &kv.Request{
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestRequestBuilder3(t *testing.T) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetFromSessionVars(variable.NewSessionVars(nil)).
Build()
require.NoError(t, err)
expect := &kv.Request{
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestRequestBuilder4(t *testing.T) {
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetFromSessionVars(variable.NewSessionVars(nil)).
Build()
require.NoError(t, err)
expect := &kv.Request{
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestRequestBuilder7(t *testing.T) {
// copy iterator variable into a new variable, see issue #27779
replicaRead := replicaRead
t.Run(replicaRead.src, func(t *testing.T) {
vars := variable.NewSessionVars()
vars := variable.NewSessionVars(nil)
vars.SetReplicaRead(replicaRead.replicaReadType)

concurrency := 10
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestRequestBuilder7(t *testing.T) {
}

func TestRequestBuilder8(t *testing.T) {
sv := variable.NewSessionVars()
sv := variable.NewSessionVars(nil)
actual, err := (&RequestBuilder{}).
SetFromSessionVars(sv).
Build()
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestIndexRangesToKVRangesWithFbs(t *testing.T) {
}

func TestScanLimitConcurrency(t *testing.T) {
vars := variable.NewSessionVars()
vars := variable.NewSessionVars(nil)
for _, tt := range []struct {
tp tipb.ExecType
limit uint64
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func getGrowing(m aggPartialResultMapper) bool {
}

func TestFilterTemporaryTableKeys(t *testing.T) {
vars := variable.NewSessionVars()
vars := variable.NewSessionVars(nil)
const tableID int64 = 3
vars.TxnCtx = &variable.TransactionContext{
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
Expand Down
2 changes: 1 addition & 1 deletion session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestBootstrapWithError(t *testing.T) {
{
se := &session{
store: store,
sessionVars: variable.NewSessionVars(),
sessionVars: variable.NewSessionVars(nil),
}
se.functionUsageMu.builtinFunctionUsage = make(telemetry.BuiltinFunctionsUsage)
se.txn.init()
Expand Down
2 changes: 1 addition & 1 deletion session/schema_amender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func TestAmendCollectAndGenMutations(t *testing.T) {
defer func() { require.NoError(t, store.Close()) }()
se := &session{
store: store,
sessionVars: variable.NewSessionVars(),
sessionVars: variable.NewSessionVars(nil),
}
se.mu.values = make(map[fmt.Stringer]interface{})
domain.BindDomain(se, domain.NewMockDomain())
Expand Down
5 changes: 3 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2902,13 +2902,14 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
}
s := &session{
store: store,
sessionVars: variable.NewSessionVars(),
ddlOwnerManager: dom.DDL().OwnerManager(),
client: store.GetClient(),
mppClient: store.GetMPPClient(),
stmtStats: stmtstats.CreateStatementStats(),
sessionStatesHandlers: make(map[sessionstates.SessionStateType]sessionctx.SessionStatesHandler),
}
s.sessionVars = variable.NewSessionVars(s)

s.functionUsageMu.builtinFunctionUsage = make(telemetry.BuiltinFunctionsUsage)
if opt != nil && opt.PreparedPlanCache != nil {
s.preparedPlanCache = opt.PreparedPlanCache
Expand Down Expand Up @@ -2936,7 +2937,7 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, error) {
s := &session{
store: store,
sessionVars: variable.NewSessionVars(),
sessionVars: variable.NewSessionVars(nil),
client: store.GetClient(),
mppClient: store.GetMPPClient(),
stmtStats: stmtstats.CreateStatementStats(),
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/mock_globalaccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewMockGlobalAccessor4Tests() *MockGlobalAccessor {
// mock.SessionVars = vars
// vars.GlobalVarsAccessor = mock

tmp.SessionVars = NewSessionVars()
tmp.SessionVars = NewSessionVars(nil)

// Set all sysvars to the default value
for k, sv := range GetSysVars() {
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/mock_globalaccessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

func TestMockAPI(t *testing.T) {
defer view.Stop()
vars := NewSessionVars()
vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
Expand Down
10 changes: 9 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ func (s *SessionVars) GetUserVarType(name string) (*types.FieldType, bool) {
return ft, ok
}

// HookContext contains the necessary variables for executing set/get hook
type HookContext interface {
GetStore() kv.Storage
}

// SessionVars is to handle user-defined or global variables in the current session.
type SessionVars struct {
Concurrency
Expand Down Expand Up @@ -1288,6 +1293,8 @@ type SessionVars struct {

// LastPlanReplayerToken indicates the last plan replayer token
LastPlanReplayerToken string

HookContext
}

// GetPreparedStmtByName returns the prepared statement specified by stmtName.
Expand Down Expand Up @@ -1478,7 +1485,7 @@ func (connInfo *ConnectionInfo) IsSecureTransport() bool {
}

// NewSessionVars creates a session vars object.
func NewSessionVars() *SessionVars {
func NewSessionVars(hctx HookContext) *SessionVars {
vars := &SessionVars{
userVars: struct {
lock sync.RWMutex
Expand Down Expand Up @@ -1580,6 +1587,7 @@ func NewSessionVars() *SessionVars {
TiFlashFastScan: DefTiFlashFastScan,
EnableTiFlashReadForWriteStmt: DefTiDBEnableTiFlashReadForWriteStmt,
ForeignKeyChecks: DefTiDBForeignKeyChecks,
HookContext: hctx,
}
vars.KVVars = tikvstore.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down
20 changes: 17 additions & 3 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
)

func TestSetSystemVariable(t *testing.T) {
v := variable.NewSessionVars()
v := variable.NewSessionVars(nil)
v.GlobalVarsAccessor = variable.NewMockGlobalAccessor4Tests()
v.TimeZone = time.UTC
mtx := new(sync.Mutex)
Expand Down Expand Up @@ -291,7 +292,7 @@ func TestIsolationRead(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.IsolationRead.Engines = []string{"tiflash", "tidb"}
})
sessVars := variable.NewSessionVars()
sessVars := variable.NewSessionVars(nil)
_, ok := sessVars.IsolationReadEngines[kv.TiDB]
require.True(t, ok)
_, ok = sessVars.IsolationReadEngines[kv.TiKV]
Expand Down Expand Up @@ -393,7 +394,7 @@ func TestTransactionContextSavepoint(t *testing.T) {
}

func TestGeneralPlanCacheStmt(t *testing.T) {
sessVars := variable.NewSessionVars()
sessVars := variable.NewSessionVars(nil)
sessVars.GeneralPlanCacheSize = 100
sql1 := "select * from t where a>?"
sql2 := "select * from t where a<?"
Expand All @@ -408,3 +409,16 @@ func TestGeneralPlanCacheStmt(t *testing.T) {
require.NotNil(t, sessVars.GetGeneralPlanCacheStmt(sql1))
require.NotNil(t, sessVars.GetGeneralPlanCacheStmt(sql2))
}

func TestHookContext(t *testing.T) {
store := testkit.CreateMockStore(t)
ctx := mock.NewContext()
ctx.Store = store
sv := variable.SysVar{Scope: variable.ScopeGlobal | variable.ScopeSession, Name: "testhooksysvar", Value: variable.On, Type: variable.TypeBool, SetSession: func(s *variable.SessionVars, val string) error {
require.Equal(t, s.GetStore(), store)
return nil
}}
variable.RegisterSysVar(&sv)

ctx.GetSessionVars().SetSystemVar("testhooksysvar", "test")
}
Loading

0 comments on commit 5c4b328

Please sign in to comment.