From 24bbbdc7a715f39828ce38e7f1958ce9579d144d Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 27 Apr 2021 08:12:30 -0600 Subject: [PATCH 01/14] WIP --- session/session.go | 40 +++++++++++----- session/sysvar_cache.go | 84 +++++++++++++++++++++++++++++++++ sessionctx/variable/session.go | 6 +++ sessionctx/variable/varsutil.go | 8 +--- tools/check/go.mod | 2 +- 5 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 session/sysvar_cache.go diff --git a/session/session.go b/session/session.go index a59182a46b44e..5e577c9dc2506 100644 --- a/session/session.go +++ b/session/session.go @@ -1400,10 +1400,6 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex } s.PrepareTxnCtx(ctx) - err := s.loadCommonGlobalVariablesIfNeeded() - if err != nil { - return nil, err - } s.sessionVars.StartTime = time.Now() @@ -1611,10 +1607,6 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields // We don't need to create a transaction for prepare statement, just get information schema will do. s.sessionVars.TxnCtx.InfoSchema = domain.GetDomain(s).InfoSchema() } - err = s.loadCommonGlobalVariablesIfNeeded() - if err != nil { - return - } ctx := context.Background() inTxn := s.GetSessionVars().InTxn() @@ -2360,6 +2352,9 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s + // copy the session var cache + s.initSessionVarsFromCache() + s.sessionVars.BinlogClient = binloginfo.GetPumpsClient() s.txn.init() @@ -2568,6 +2563,31 @@ var builtinGlobalVariable = []string{ variable.CTEMaxRecursionDepth, } +// This should be renamed, because it should be copySessionVarsFromCache +func (s *session) initSessionVarsFromCache() error { + + // 1. Call RebuildSessionVarsCache() (eventually don't call it every time) + s.RebuildSessionVarsCache() + + // 2. Deep copy sessionVarCache and set it to s.SessionVars.systems + + for varName, varVal := range sessionVarCache { + if err := s.sessionVars.InitSessionVar(varName, varVal); err != nil { + return err + } + } + + // when client set Capability Flags CLIENT_INTERACTIVE, init wait_timeout with interactive_timeout + if s.sessionVars.ClientCapability&mysql.ClientInteractive > 0 { + if varVal, ok := s.sessionVars.GetSystemVar(variable.InteractiveTimeout); ok { + if err := s.sessionVars.SetSystemVar(variable.WaitTimeout, varVal); err != nil { + return err + } + } + } + return nil +} + // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { vars := s.sessionVars @@ -2698,10 +2718,6 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { } txn.SetVars(s.sessionVars.KVVars) s.txn.changeInvalidToValid(txn) - err = s.loadCommonGlobalVariablesIfNeeded() - if err != nil { - return err - } return nil } diff --git a/session/sysvar_cache.go b/session/sysvar_cache.go new file mode 100644 index 0000000000000..28736fdab6582 --- /dev/null +++ b/session/sysvar_cache.go @@ -0,0 +1,84 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package session + +import ( + "context" + + "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/chunk" +) + +// When a new session is created, SessionVars.systems needs to be populated +// with a cache of common system variables. Previously this populated the +// "common sysvars" only, with uncommon ones being populated on-demand. +// +// This is both *not* MySQL compatible (it should not pick up global scoped changes after the session starts), +// and it has a cache miss-path which can be quite slow, such as when running +// SHOW VARIABLES for the first time in a session (which will hit all session vars). + +var sessionVarCache map[string]string + +func (s *session) fetchTableValues() (tableContents map[string]string) { + + tableContents = make(map[string]string) + + // Copy all variables from the table to tableContents + rs, err := s.ExecuteInternal(context.Background(), "SELECT variable_name, variable_value FROM mysql.global_variables") + if err != nil { + return + } + defer terror.Call(rs.Close) + req := rs.NewChunk() + for { + err = rs.Next(context.TODO(), req) + if err != nil { + return + } + if req.NumRows() == 0 { + return + } + it := chunk.NewIterator4Chunk(req) + for row := it.Begin(); row != it.End(); row = it.Next() { + name := row.GetString(0) + val := row.GetString(1) + tableContents[name] = val + } + } +} + +func (s *session) RebuildSessionVarsCache() error { + + // Create a new map to hold the new cache, + // and a cache if what's available in the mysql_global_variables table + // (includes global-only vars that need to be ignored..) + newCache := make(map[string]string) + tableContents := s.fetchTableValues() + + for _, sv := range variable.GetSysVars() { + if sv.Scope&variable.ScopeSession != 0 { + if _, ok := tableContents[sv.Name]; ok { + newCache[sv.Name] = tableContents[sv.Name] + } else { + // fmt.Printf("%%%% could not find k: %s in cache!\n", sv.Name) + newCache[sv.Name] = sv.Value // use default + } + } + } + + // Set the sessionVarCache to be the new cache. + sessionVarCache = newCache + return nil +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 6e75bd7ca010c..b468ad09d1147 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1382,6 +1382,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { return nil } +func (s *SessionVars) InitSessionVar(name string, val string) error { + // fmt.Printf("$$$$ Setting initial sessionvar value for k: %s\n", name) + s.systems[name] = val + return nil +} + // GetReadableTxnMode returns the session variable TxnMode but rewrites it to "OPTIMISTIC" when it's empty. func (s *SessionVars) GetReadableTxnMode() string { txnMode := s.TxnMode diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 0204b58a3e13b..7f91c5696b12d 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -158,12 +158,8 @@ func GetSessionSystemVar(s *SessionVars, key string) (string, error) { if err != nil || ok { return gVal, err } - gVal, err = s.GlobalVarsAccessor.GetGlobalSysVar(key) - if err != nil { - return "", err - } - s.systems[key] = gVal - return gVal, nil + // If it's not found here, don't fail back to globals. It means it's not a sessionvar. + return "", err } // GetSessionOnlySysVars get the default value defined in code for session only variable. diff --git a/tools/check/go.mod b/tools/check/go.mod index e12940619afc8..171d04af71830 100644 --- a/tools/check/go.mod +++ b/tools/check/go.mod @@ -20,4 +20,4 @@ require ( honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3 ) -go 1.13 \ No newline at end of file +go 1.13 From 53cb128c1123871f72961d5be7f0337587bf891b Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 27 Apr 2021 10:00:13 -0600 Subject: [PATCH 02/14] WIP --- session/session.go | 243 ++++------------------------- session/sysvar_cache.go | 63 ++++++-- sessionctx/variable/session.go | 11 +- sessionctx/variable/sysvar.go | 15 ++ sessionctx/variable/sysvar_test.go | 14 ++ 5 files changed, 119 insertions(+), 227 deletions(-) diff --git a/session/session.go b/session/session.go index 5e577c9dc2506..16d17c7cb0a14 100644 --- a/session/session.go +++ b/session/session.go @@ -950,7 +950,8 @@ func (s *session) varFromTiDBTable(name string) bool { } // GetGlobalSysVar implements GlobalVarAccessor.GetGlobalSysVar interface. -func (s *session) GetGlobalSysVar(name string) (string, error) { +func (s *session) GetGlobalSysVar(name string) (value string, err error) { + name = strings.ToLower(name) if name == variable.TiDBSlowLogMasking { name = variable.TiDBRedactLog } @@ -958,26 +959,23 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { // When running bootstrap or upgrade, we should not access global storage. return "", nil } - sysVar, err := s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) - if err != nil { - if errResultIsEmpty.Equal(err) { - sv := variable.GetSysVar(name) - if sv != nil { - return sv.Value, nil - } - return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) - } - return "", err + + // Fetch from the svcache, it should be there + var ok bool + if value, ok = svcache.global[name]; !ok { + return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) } + // Fetch mysql.tidb values if required if s.varFromTiDBTable(name) { - return s.getTiDBTableValue(name, sysVar) + return s.getTiDBTableValue(name, value) } - return sysVar, nil + return value, nil } // SetGlobalSysVar implements GlobalVarAccessor.SetGlobalSysVar interface. func (s *session) SetGlobalSysVar(name, value string) error { + name = strings.ToLower(name) if name == variable.TiDBSlowLogMasking { name = variable.TiDBRedactLog } @@ -991,7 +989,6 @@ func (s *session) SetGlobalSysVar(name, value string) error { if err != nil { return err } - name = strings.ToLower(name) // update mysql.tidb if required. if s.varFromTiDBTable(name) { if err = s.setTiDBTableValue(name, sVal); err != nil { @@ -1003,8 +1000,13 @@ func (s *session) SetGlobalSysVar(name, value string) error { if err != nil { return err } - _, _, err = s.ExecRestrictedStmt(context.TODO(), stmt) - return err + if _, _, err = s.ExecRestrictedStmt(context.TODO(), stmt); err != nil { + return err + } + + // Update the svcache + // TODO: tell etcd peers that the svcache is stale! + return s.UpdateSysVarCacheForKey(name, sVal) } // setTiDBTableValue handles tikv_* sysvars which need to update mysql.tidb @@ -2352,8 +2354,14 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s - // copy the session var cache - s.initSessionVarsFromCache() + + // eventually we shouldn't need this + s.buildSysVarCacheIfNeeded() + + // Deep copy sessionvar cache + svcache.RLock() + s.sessionVars.InitSessionVarsFromCache(svcache.session) + svcache.RUnlock() s.sessionVars.BinlogClient = binloginfo.GetPumpsClient() s.txn.init() @@ -2439,143 +2447,16 @@ func finishBootstrap(store kv.Storage) { const quoteCommaQuote = "', '" -var builtinGlobalVariable = []string{ - variable.AutoCommit, - variable.SQLModeVar, - variable.MaxAllowedPacket, - variable.TimeZone, - variable.BlockEncryptionMode, - variable.WaitTimeout, - variable.InteractiveTimeout, - variable.MaxPreparedStmtCount, - variable.InitConnect, - variable.TxnIsolation, - variable.TxReadOnly, - variable.TransactionIsolation, - variable.TransactionReadOnly, - variable.NetBufferLength, - variable.QueryCacheType, - variable.QueryCacheSize, - variable.CharacterSetServer, - variable.AutoIncrementIncrement, - variable.AutoIncrementOffset, - variable.CollationServer, - variable.NetWriteTimeout, - variable.MaxExecutionTime, - variable.InnodbLockWaitTimeout, - variable.WindowingUseHighPrecision, - variable.SQLSelectLimit, - variable.DefaultWeekFormat, - - /* TiDB specific global variables: */ - variable.TiDBSkipASCIICheck, - variable.TiDBSkipUTF8Check, - variable.TiDBIndexJoinBatchSize, - variable.TiDBIndexLookupSize, - variable.TiDBIndexLookupConcurrency, - variable.TiDBIndexLookupJoinConcurrency, - variable.TiDBIndexSerialScanConcurrency, - variable.TiDBHashJoinConcurrency, - variable.TiDBProjectionConcurrency, - variable.TiDBHashAggPartialConcurrency, - variable.TiDBHashAggFinalConcurrency, - variable.TiDBWindowConcurrency, - variable.TiDBMergeJoinConcurrency, - variable.TiDBStreamAggConcurrency, - variable.TiDBExecutorConcurrency, - variable.TiDBBackoffLockFast, - variable.TiDBBackOffWeight, - variable.TiDBConstraintCheckInPlace, - variable.TiDBDDLReorgWorkerCount, - variable.TiDBDDLReorgBatchSize, - variable.TiDBDDLErrorCountLimit, - variable.TiDBOptInSubqToJoinAndAgg, - variable.TiDBOptPreferRangeScan, - variable.TiDBOptCorrelationThreshold, - variable.TiDBOptCorrelationExpFactor, - variable.TiDBOptCPUFactor, - variable.TiDBOptCopCPUFactor, - variable.TiDBOptNetworkFactor, - variable.TiDBOptScanFactor, - variable.TiDBOptDescScanFactor, - variable.TiDBOptMemoryFactor, - variable.TiDBOptDiskFactor, - variable.TiDBOptConcurrencyFactor, - variable.TiDBDistSQLScanConcurrency, - variable.TiDBInitChunkSize, - variable.TiDBMaxChunkSize, - variable.TiDBEnableCascadesPlanner, - variable.TiDBRetryLimit, - variable.TiDBDisableTxnAutoRetry, - variable.TiDBEnableWindowFunction, - variable.TiDBEnableStrictDoubleTypeCheck, - variable.TiDBEnableTablePartition, - variable.TiDBEnableVectorizedExpression, - variable.TiDBEnableFastAnalyze, - variable.TiDBExpensiveQueryTimeThreshold, - variable.TiDBEnableNoopFuncs, - variable.TiDBEnableIndexMerge, - variable.TiDBTxnMode, - variable.TiDBAllowBatchCop, - variable.TiDBAllowMPPExecution, - variable.TiDBOptBCJ, - variable.TiDBBCJThresholdSize, - variable.TiDBBCJThresholdCount, - variable.TiDBRowFormatVersion, - variable.TiDBEnableStmtSummary, - variable.TiDBStmtSummaryInternalQuery, - variable.TiDBStmtSummaryRefreshInterval, - variable.TiDBStmtSummaryHistorySize, - variable.TiDBStmtSummaryMaxStmtCount, - variable.TiDBStmtSummaryMaxSQLLength, - variable.TiDBMaxDeltaSchemaCount, - variable.TiDBCapturePlanBaseline, - variable.TiDBUsePlanBaselines, - variable.TiDBEvolvePlanBaselines, - variable.TiDBEnableExtendedStats, - variable.TiDBIsolationReadEngines, - variable.TiDBStoreLimit, - variable.TiDBAllowAutoRandExplicitInsert, - variable.TiDBEnableClusteredIndex, - variable.TiDBPartitionPruneMode, - variable.TiDBRedactLog, - variable.TiDBEnableTelemetry, - variable.TiDBShardAllocateStep, - variable.TiDBEnableChangeColumnType, - variable.TiDBEnableChangeMultiSchema, - variable.TiDBEnablePointGetCache, - variable.TiDBEnableAlterPlacement, - variable.TiDBEnableAmendPessimisticTxn, - variable.TiDBMemQuotaApplyCache, - variable.TiDBEnableParallelApply, - variable.TiDBMemoryUsageAlarmRatio, - variable.TiDBEnableRateLimitAction, - variable.TiDBEnableAsyncCommit, - variable.TiDBEnable1PC, - variable.TiDBGuaranteeLinearizability, - variable.TiDBAnalyzeVersion, - variable.TiDBEnableIndexMergeJoin, - variable.TiDBTrackAggregateMemoryUsage, - variable.TiDBMultiStatementMode, - variable.TiDBEnableExchangePartition, - variable.TiDBAllowFallbackToTiKV, - variable.TiDBEnableDynamicPrivileges, - variable.CTEMaxRecursionDepth, -} - // This should be renamed, because it should be copySessionVarsFromCache func (s *session) initSessionVarsFromCache() error { - // 1. Call RebuildSessionVarsCache() (eventually don't call it every time) - s.RebuildSessionVarsCache() + // 1. Call RebuildSysVarCache() (eventually don't call it every time) + s.RebuildSysVarCache() // 2. Deep copy sessionVarCache and set it to s.SessionVars.systems - - for varName, varVal := range sessionVarCache { - if err := s.sessionVars.InitSessionVar(varName, varVal); err != nil { - return err - } - } + svcache.RLock() + s.sessionVars.InitSessionVarsFromCache(svcache.session) + svcache.RUnlock() // when client set Capability Flags CLIENT_INTERACTIVE, init wait_timeout with interactive_timeout if s.sessionVars.ClientCapability&mysql.ClientInteractive > 0 { @@ -2588,68 +2469,6 @@ func (s *session) initSessionVarsFromCache() error { return nil } -// loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. -func (s *session) loadCommonGlobalVariablesIfNeeded() error { - vars := s.sessionVars - if vars.CommonGlobalLoaded { - return nil - } - if s.Value(sessionctx.Initing) != nil { - // When running bootstrap or upgrade, we should not access global storage. - return nil - } - - var err error - // Use GlobalVariableCache if TiDB just loaded global variables within 2 second ago. - // When a lot of connections connect to TiDB simultaneously, it can protect TiKV meta region from overload. - gvc := domain.GetDomain(s).GetGlobalVarsCache() - loadFunc := func() ([]chunk.Row, []*ast.ResultField, error) { - vars := append(make([]string, 0, len(builtinGlobalVariable)+len(variable.PluginVarNames)), builtinGlobalVariable...) - if len(variable.PluginVarNames) > 0 { - vars = append(vars, variable.PluginVarNames...) - } - - stmt, err := s.ParseWithParams(context.TODO(), "select HIGH_PRIORITY * from mysql.global_variables where variable_name in (%?) order by VARIABLE_NAME", vars) - if err != nil { - return nil, nil, errors.Trace(err) - } - - return s.ExecRestrictedStmt(context.TODO(), stmt) - } - rows, _, err := gvc.LoadGlobalVariables(loadFunc) - if err != nil { - logutil.BgLogger().Warn("failed to load global variables", - zap.Uint64("conn", s.sessionVars.ConnectionID), zap.Error(err)) - return err - } - vars.CommonGlobalLoaded = true - - for _, row := range rows { - varName := row.GetString(0) - varVal := row.GetString(1) - // `collation_server` is related to `character_set_server`, set `character_set_server` will also set `collation_server`. - // We have to make sure we set the `collation_server` with right value. - if _, ok := vars.GetSystemVar(varName); !ok || varName == variable.CollationServer { - err = variable.SetSessionSystemVar(s.sessionVars, varName, varVal) - if err != nil { - return err - } - } - } - - // when client set Capability Flags CLIENT_INTERACTIVE, init wait_timeout with interactive_timeout - if vars.ClientCapability&mysql.ClientInteractive > 0 { - if varVal, ok := vars.GetSystemVar(variable.InteractiveTimeout); ok { - if err := vars.SetSystemVar(variable.WaitTimeout, varVal); err != nil { - return err - } - } - } - - vars.CommonGlobalLoaded = true - return nil -} - // PrepareTxnCtx starts a goroutine to begin a transaction if needed, and creates a new transaction context. // It is called before we execute a sql query. func (s *session) PrepareTxnCtx(ctx context.Context) { diff --git a/session/sysvar_cache.go b/session/sysvar_cache.go index 28736fdab6582..ba56ea822281e 100644 --- a/session/sysvar_cache.go +++ b/session/sysvar_cache.go @@ -15,6 +15,7 @@ package session import ( "context" + "sync" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/sessionctx/variable" @@ -29,12 +30,16 @@ import ( // and it has a cache miss-path which can be quite slow, such as when running // SHOW VARIABLES for the first time in a session (which will hit all session vars). -var sessionVarCache map[string]string +type sysVarCache struct { + sync.RWMutex + global map[string]string + session map[string]string +} -func (s *session) fetchTableValues() (tableContents map[string]string) { +var svcache sysVarCache +func (s *session) fetchTableValues() (tableContents map[string]string) { tableContents = make(map[string]string) - // Copy all variables from the table to tableContents rs, err := s.ExecuteInternal(context.Background(), "SELECT variable_name, variable_value FROM mysql.global_variables") if err != nil { @@ -59,26 +64,60 @@ func (s *session) fetchTableValues() (tableContents map[string]string) { } } -func (s *session) RebuildSessionVarsCache() error { +func (s *session) buildSysVarCacheIfNeeded() { + if len(svcache.global) == 0 || len(svcache.session) == 0 { + s.RebuildSysVarCache() + } +} + +// RebuildSysVarCache rebuilds the sysvar cache both globally and for session vars. +// It needs to be called when sysvars are added or removed. For global sysvar changes +// UpdateSysVarCacheForKey can be called instead. +func (s *session) RebuildSysVarCache() error { + svcache.Lock() + defer svcache.Unlock() // Create a new map to hold the new cache, // and a cache if what's available in the mysql_global_variables table - // (includes global-only vars that need to be ignored..) - newCache := make(map[string]string) + newSessionCache := make(map[string]string) + newGlobalCache := make(map[string]string) tableContents := s.fetchTableValues() for _, sv := range variable.GetSysVars() { - if sv.Scope&variable.ScopeSession != 0 { + if sv.HasSessionScope() { if _, ok := tableContents[sv.Name]; ok { - newCache[sv.Name] = tableContents[sv.Name] + newSessionCache[sv.Name] = tableContents[sv.Name] } else { - // fmt.Printf("%%%% could not find k: %s in cache!\n", sv.Name) - newCache[sv.Name] = sv.Value // use default + newSessionCache[sv.Name] = sv.Value // use default + } + } + if sv.HasGlobalScope() { + if _, ok := tableContents[sv.Name]; ok { + newGlobalCache[sv.Name] = tableContents[sv.Name] + } else { + newGlobalCache[sv.Name] = sv.Value // use default } } } - // Set the sessionVarCache to be the new cache. - sessionVarCache = newCache + // Update the cache + svcache.session = newSessionCache + svcache.global = newGlobalCache + return nil +} + +// UpdateSysVarCacheForKey is an optimization where we patch the contents of the +// global and session cache rather than run RebuildSysVarCache() +func (s *session) UpdateSysVarCacheForKey(nameInLower string, value string) error { + svcache.Lock() + defer svcache.Unlock() + + sv := variable.GetSysVar(nameInLower) + if sv.HasSessionScope() { + svcache.session[nameInLower] = value + } + if sv.HasGlobalScope() { + svcache.global[nameInLower] = value + } return nil } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index b468ad09d1147..dbb6901e9e457 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1382,9 +1382,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { return nil } -func (s *SessionVars) InitSessionVar(name string, val string) error { - // fmt.Printf("$$$$ Setting initial sessionvar value for k: %s\n", name) - s.systems[name] = val +func (s *SessionVars) InitSessionVarsFromCache(cache map[string]string) error { + for varName, varVal := range cache { + s.systems[varName] = varVal + } + // when client set Capability Flags CLIENT_INTERACTIVE, init wait_timeout with interactive_timeout + if s.ClientCapability&mysql.ClientInteractive > 0 { + s.systems[WaitTimeout] = s.systems[InteractiveTimeout] + } return nil } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 5a7ca2902c63a..87081d4562997 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -379,6 +379,21 @@ func (sv *SysVar) GetNativeValType(val string) (types.Datum, byte, uint) { return types.NewStringDatum(val), mysql.TypeVarString, 0 } +// HasSessionScope returns true if the scope for the sysVar includes session. +func (sv *SysVar) HasSessionScope() bool { + return sv.Scope&ScopeSession != 0 +} + +// HasGlobalScope returns true if the scope for the sysVar includes global. +func (sv *SysVar) HasGlobalScope() bool { + return sv.Scope&ScopeGlobal != 0 +} + +// HasNoneScope returns true if the scope for the sysVar is explicitly none +func (sv *SysVar) HasNoneScope() bool { + return sv.Scope == ScopeNone +} + var sysVars map[string]*SysVar var sysVarsLock sync.RWMutex diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index d954c753e0088..e1cf53ff7adef 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -168,3 +168,17 @@ func (*testSysVarSuite) TestEnumValidation(c *C) { c.Assert(err, IsNil) c.Assert(val, Equals, "AUTO") } + +func (*testSysVarSuite) TestScope(c *C) { + sv := SysVar{Scope: ScopeGlobal | ScopeSession, Name: "mynewsysvar", Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "AUTO"}} + c.Assert(sv.HasSessionScope(), IsTrue) + c.Assert(sv.HasGlobalScope(), IsTrue) + + sv = SysVar{Scope: ScopeGlobal, Name: "mynewsysvar", Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "AUTO"}} + c.Assert(sv.HasSessionScope(), IsFalse) + c.Assert(sv.HasGlobalScope(), IsTrue) + + sv = SysVar{Scope: ScopeNone, Name: "mynewsysvar", Value: On, Type: TypeEnum, PossibleValues: []string{"OFF", "ON", "AUTO"}} + c.Assert(sv.HasSessionScope(), IsFalse) + c.Assert(sv.HasGlobalScope(), IsFalse) +} From b18efb6c1c9d23bea1e0ec21eca641bd5c7f1da1 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 27 Apr 2021 13:09:39 -0600 Subject: [PATCH 03/14] WIP --- domain/domain.go | 72 +++++++++- domain/global_vars_cache.go | 135 ------------------- domain/global_vars_cache_test.go | 221 ------------------------------- domain/sysvar_cache.go | 129 ++++++++++++++++++ metrics/domain.go | 9 ++ metrics/metrics.go | 1 + session/session.go | 52 +++----- session/session_test.go | 6 - session/sysvar_cache.go | 132 ------------------ sessionctx/variable/session.go | 5 +- 10 files changed, 227 insertions(+), 535 deletions(-) delete mode 100644 domain/global_vars_cache.go delete mode 100644 domain/global_vars_cache_test.go create mode 100644 domain/sysvar_cache.go delete mode 100644 session/sysvar_cache.go diff --git a/domain/domain.go b/domain/domain.go index f4b0ac8900f24..19612518a66ad 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -79,7 +79,7 @@ type Domain struct { sysSessionPool *sessionPool exit chan struct{} etcdClient *clientv3.Client - gvc GlobalVariableCache + sysVarCache SysVarCache // replaces gcv slowQuery *topNSlowQueries expensiveQueryHandle *expensivequery.Handle wg sync.WaitGroup @@ -922,6 +922,59 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error { return nil } +// LoadSysVarCacheLoop create a goroutine loads sysvar cache in a loop, it +// should be called only once in BootstrapSession. +func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { + + err := do.sysVarCache.RebuildSysVarCache(ctx) + if err != nil { + return err + } + + var watchCh clientv3.WatchChan + duration := 5 * time.Minute + if do.etcdClient != nil { + watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey) + duration = 10 * time.Minute + } + + do.wg.Add(1) + go func() { + defer func() { + do.wg.Done() + logutil.BgLogger().Info("LoadSysVarCacheLoop exited.") + util.Recover(metrics.LabelDomain, "LoadSysVarCacheLoop", nil, false) + }() + var count int + for { + ok := true + select { + case <-do.exit: + return + case _, ok = <-watchCh: + case <-time.After(duration): + } + if !ok { + logutil.BgLogger().Error("LoadSysVarCacheLoop loop watch channel closed") + watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey) + count++ + if count > 10 { + time.Sleep(time.Duration(count) * time.Second) + } + continue + } + + count = 0 + err := do.sysVarCache.RebuildSysVarCache(ctx) + metrics.LoadSysVarCacheCounter.WithLabelValues(metrics.RetLabel(err)).Inc() + if err != nil { + logutil.BgLogger().Error("LoadSysVarCacheLoop failed", zap.Error(err)) + } + } + }() + return nil +} + // PrivilegeHandle returns the MySQLPrivilege. func (do *Domain) PrivilegeHandle() *privileges.Handle { return do.privHandle @@ -1300,7 +1353,10 @@ func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle { return do.expensiveQueryHandle } -const privilegeKey = "/tidb/privilege" +const ( + privilegeKey = "/tidb/privilege" + sysVarCacheKey = "/tidb/sysvars" +) // NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches // the key will get notification. @@ -1322,6 +1378,18 @@ func (do *Domain) NotifyUpdatePrivilege(ctx sessionctx.Context) { } } +func (do *Domain) NotifyUpdateSysVarCache(ctx sessionctx.Context) { + if do.etcdClient != nil { + row := do.etcdClient.KV + _, err := row.Put(context.Background(), sysVarCacheKey, "") + if err != nil { + logutil.BgLogger().Warn("notify update sysvar cache failed", zap.Error(err)) + } + } + // update locally + do.sysVarCache.RebuildSysVarCache(ctx) +} + // ServerID gets serverID. func (do *Domain) ServerID() uint64 { return atomic.LoadUint64(&do.serverID) diff --git a/domain/global_vars_cache.go b/domain/global_vars_cache.go deleted file mode 100644 index 52aa12a5ac955..0000000000000 --- a/domain/global_vars_cache.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright 2018 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package domain - -import ( - "fmt" - "sync" - "time" - - "github.com/pingcap/parser/ast" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/logutil" - "github.com/pingcap/tidb/util/stmtsummary" - "go.uber.org/zap" - "golang.org/x/sync/singleflight" -) - -// GlobalVariableCache caches global variables. -type GlobalVariableCache struct { - sync.RWMutex - lastModify time.Time - rows []chunk.Row - fields []*ast.ResultField - - // Unit test may like to disable it. - disable bool - SingleFight singleflight.Group -} - -// GlobalVariableCacheExpiry is the global variable cache TTL. -const GlobalVariableCacheExpiry = 2 * time.Second - -// Update updates the global variable cache. -func (gvc *GlobalVariableCache) Update(rows []chunk.Row, fields []*ast.ResultField) { - gvc.Lock() - gvc.lastModify = time.Now() - gvc.rows = rows - gvc.fields = fields - gvc.Unlock() - - checkEnableServerGlobalVar(rows) -} - -// Get gets the global variables from cache. -func (gvc *GlobalVariableCache) Get() (succ bool, rows []chunk.Row, fields []*ast.ResultField) { - gvc.RLock() - defer gvc.RUnlock() - if time.Since(gvc.lastModify) < GlobalVariableCacheExpiry { - succ, rows, fields = !gvc.disable, gvc.rows, gvc.fields - return - } - succ = false - return -} - -type loadResult struct { - rows []chunk.Row - fields []*ast.ResultField -} - -// LoadGlobalVariables will load from global cache first, loadFn will be executed if cache is not valid -func (gvc *GlobalVariableCache) LoadGlobalVariables(loadFn func() ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) { - succ, rows, fields := gvc.Get() - if succ { - return rows, fields, nil - } - fn := func() (interface{}, error) { - resRows, resFields, loadErr := loadFn() - if loadErr != nil { - return nil, loadErr - } - gvc.Update(resRows, resFields) - return &loadResult{resRows, resFields}, nil - } - res, err, _ := gvc.SingleFight.Do("loadGlobalVariable", fn) - if err != nil { - return nil, nil, err - } - loadRes := res.(*loadResult) - return loadRes.rows, loadRes.fields, nil -} - -// Disable disables the global variable cache, used in test only. -func (gvc *GlobalVariableCache) Disable() { - gvc.Lock() - defer gvc.Unlock() - gvc.disable = true -} - -// checkEnableServerGlobalVar processes variables that acts in server and global level. -func checkEnableServerGlobalVar(rows []chunk.Row) { - for _, row := range rows { - sVal := "" - if !row.IsNull(1) { - sVal = row.GetString(1) - } - var err error - switch row.GetString(0) { - case variable.TiDBEnableStmtSummary: - err = stmtsummary.StmtSummaryByDigestMap.SetEnabled(sVal, false) - case variable.TiDBStmtSummaryInternalQuery: - err = stmtsummary.StmtSummaryByDigestMap.SetEnabledInternalQuery(sVal, false) - case variable.TiDBStmtSummaryRefreshInterval: - err = stmtsummary.StmtSummaryByDigestMap.SetRefreshInterval(sVal, false) - case variable.TiDBStmtSummaryHistorySize: - err = stmtsummary.StmtSummaryByDigestMap.SetHistorySize(sVal, false) - case variable.TiDBStmtSummaryMaxStmtCount: - err = stmtsummary.StmtSummaryByDigestMap.SetMaxStmtCount(sVal, false) - case variable.TiDBStmtSummaryMaxSQLLength: - err = stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(sVal, false) - case variable.TiDBCapturePlanBaseline: - variable.CapturePlanBaseline.Set(sVal, false) - } - if err != nil { - logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", row.GetString(0)), zap.Error(err)) - } - } -} - -// GetGlobalVarsCache gets the global variable cache. -func (do *Domain) GetGlobalVarsCache() *GlobalVariableCache { - return &do.gvc -} diff --git a/domain/global_vars_cache_test.go b/domain/global_vars_cache_test.go deleted file mode 100644 index 7358d709986af..0000000000000 --- a/domain/global_vars_cache_test.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2019 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package domain - -import ( - "sync" - "sync/atomic" - "time" - - . "github.com/pingcap/check" - "github.com/pingcap/parser/ast" - "github.com/pingcap/parser/charset" - "github.com/pingcap/parser/model" - "github.com/pingcap/parser/mysql" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/store/mockstore" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/stmtsummary" - "github.com/pingcap/tidb/util/testleak" -) - -var _ = SerialSuites(&testGVCSuite{}) - -type testGVCSuite struct{} - -func (gvcSuite *testGVCSuite) TestSimple(c *C) { - defer testleak.AfterTest(c)() - testleak.BeforeTest() - - store, err := mockstore.NewMockStore() - c.Assert(err, IsNil) - defer func() { - err := store.Close() - c.Assert(err, IsNil) - }() - ddlLease := 50 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, mockFactory) - err = dom.Init(ddlLease, sysMockFactory) - c.Assert(err, IsNil) - defer dom.Close() - - // Get empty global vars cache. - gvc := dom.GetGlobalVarsCache() - succ, rows, fields := gvc.Get() - c.Assert(succ, IsFalse) - c.Assert(rows, IsNil) - c.Assert(fields, IsNil) - // Get a variable from global vars cache. - rf := getResultField("c", 1, 0) - rf1 := getResultField("c1", 2, 1) - ft := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ft1 := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ck := chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024) - ck.AppendString(0, "variable1") - ck.AppendString(1, "value1") - row := ck.GetRow(0) - gvc.Update([]chunk.Row{row}, []*ast.ResultField{rf, rf1}) - succ, rows, fields = gvc.Get() - c.Assert(succ, IsTrue) - c.Assert(rows[0], Equals, row) - c.Assert(fields, DeepEquals, []*ast.ResultField{rf, rf1}) - // Disable the cache. - gvc.Disable() - succ, rows, fields = gvc.Get() - c.Assert(succ, IsFalse) - c.Assert(rows[0], Equals, row) - c.Assert(fields, DeepEquals, []*ast.ResultField{rf, rf1}) -} - -func getResultField(colName string, id, offset int) *ast.ResultField { - return &ast.ResultField{ - Column: &model.ColumnInfo{ - Name: model.NewCIStr(colName), - ID: int64(id), - Offset: offset, - FieldType: types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetUTF8, - Collate: charset.CollationUTF8, - }, - }, - TableAsName: model.NewCIStr("tbl"), - DBName: model.NewCIStr("test"), - } -} - -func (gvcSuite *testGVCSuite) TestConcurrentOneFlight(c *C) { - defer testleak.AfterTest(c)() - testleak.BeforeTest() - gvc := &GlobalVariableCache{} - succ, rows, fields := gvc.Get() - c.Assert(succ, IsFalse) - c.Assert(rows, IsNil) - c.Assert(fields, IsNil) - - // Get a variable from global vars cache. - rf := getResultField("c", 1, 0) - rf1 := getResultField("c1", 2, 1) - ft := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ft1 := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ckLow := chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024) - val := "fromStorage" - val1 := "fromStorage1" - ckLow.AppendString(0, val) - ckLow.AppendString(1, val1) - - // Let cache become invalid, and try concurrent load - counter := int32(0) - waitToStart := new(sync.WaitGroup) - waitToStart.Add(1) - gvc.lastModify = time.Now().Add(time.Duration(-10) * time.Second) - loadFunc := func() ([]chunk.Row, []*ast.ResultField, error) { - time.Sleep(100 * time.Millisecond) - atomic.AddInt32(&counter, 1) - return []chunk.Row{ckLow.GetRow(0)}, []*ast.ResultField{rf, rf1}, nil - } - wg := new(sync.WaitGroup) - worker := 100 - resArray := make([]loadResult, worker) - for i := 0; i < worker; i++ { - wg.Add(1) - go func(idx int) { - defer wg.Done() - waitToStart.Wait() - resRow, resField, _ := gvc.LoadGlobalVariables(loadFunc) - resArray[idx].rows = resRow - resArray[idx].fields = resField - }(i) - } - waitToStart.Done() - wg.Wait() - succ, rows, fields = gvc.Get() - c.Assert(counter, Equals, int32(1)) - c.Assert(resArray[0].rows[0].GetString(0), Equals, val) - c.Assert(resArray[0].rows[0].GetString(1), Equals, val1) - for i := 0; i < worker; i++ { - c.Assert(resArray[0].rows[0], Equals, resArray[i].rows[0]) - c.Assert(resArray[i].rows[0].GetString(0), Equals, val) - c.Assert(resArray[i].rows[0].GetString(1), Equals, val1) - } - // Validate cache - c.Assert(succ, IsTrue) - c.Assert(rows[0], Equals, resArray[0].rows[0]) - c.Assert(fields, DeepEquals, []*ast.ResultField{rf, rf1}) -} - -func (gvcSuite *testGVCSuite) TestCheckEnableStmtSummary(c *C) { - defer testleak.AfterTest(c)() - testleak.BeforeTest() - - store, err := mockstore.NewMockStore() - c.Assert(err, IsNil) - defer func() { - err := store.Close() - c.Assert(err, IsNil) - }() - ddlLease := 50 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, mockFactory) - err = dom.Init(ddlLease, sysMockFactory) - c.Assert(err, IsNil) - defer dom.Close() - - gvc := dom.GetGlobalVarsCache() - - rf := getResultField("c", 1, 0) - rf1 := getResultField("c1", 2, 1) - ft := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - ft1 := &types.FieldType{ - Tp: mysql.TypeString, - Charset: charset.CharsetBin, - Collate: charset.CollationBin, - } - - err = stmtsummary.StmtSummaryByDigestMap.SetEnabled("0", false) - c.Assert(err, IsNil) - ck := chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024) - ck.AppendString(0, variable.TiDBEnableStmtSummary) - ck.AppendString(1, "1") - row := ck.GetRow(0) - gvc.Update([]chunk.Row{row}, []*ast.ResultField{rf, rf1}) - c.Assert(stmtsummary.StmtSummaryByDigestMap.Enabled(), Equals, true) - - ck = chunk.NewChunkWithCapacity([]*types.FieldType{ft, ft1}, 1024) - ck.AppendString(0, variable.TiDBEnableStmtSummary) - ck.AppendString(1, "0") - row = ck.GetRow(0) - gvc.Update([]chunk.Row{row}, []*ast.ResultField{rf, rf1}) - c.Assert(stmtsummary.StmtSummaryByDigestMap.Enabled(), Equals, false) -} diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go new file mode 100644 index 0000000000000..7841e0aaa68b6 --- /dev/null +++ b/domain/sysvar_cache.go @@ -0,0 +1,129 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/sqlexec" +) + +// The sysvar cache replaces the "global_vars_cache". It is an improvement because it operates similar to privilege cache, +// Where it caches for 5 minutes instead of 2 seconds, plus it listens on etcd for updates from other servers. + +// When a new session is created, SessionVars.systems needs to be populated +// with a cache of common system variables. Previously this populated the +// "common sysvars" only, with uncommon ones being populated on-demand. +// +// This was both *not* MySQL compatible (it should not pick up global scoped changes after the session starts), +// and it has a cache miss-path which can be quite slow, such as when running +// SHOW VARIABLES for the first time in a session (which will hit all session vars). + +type SysVarCache struct { + sync.RWMutex + isHealthy bool + lastModify time.Time + global map[string]string + session map[string]string +} + +// GetGlobalVarsCache gets the global variable cache. +func (do *Domain) GetSysVarCache() *SysVarCache { + return &do.sysVarCache +} + +func (svc *SysVarCache) GetSessionCache() map[string]string { + svc.RLock() + defer svc.RUnlock() + + // Perform a deep copy since this will be assigned directly to the session + newMap := make(map[string]string, len(svc.session)) + for k, v := range svc.session { + newMap[k] = v + } + return newMap +} + +func (svc *SysVarCache) GetGlobalVar(name string) (string, error) { + svc.RLock() + defer svc.RUnlock() + if val, ok := svc.global[name]; ok { + return val, nil + } + return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) +} + +func (svc *SysVarCache) fetchTableValues(ctx sessionctx.Context) (map[string]string, error) { + tableContents := make(map[string]string) + // Copy all variables from the table to tableContents + exec := ctx.(sqlexec.RestrictedSQLExecutor) + stmt, err := exec.ParseWithParams(context.Background(), `SELECT variable_name, variable_value FROM mysql.global_variables`) + if err != nil { + return tableContents, err + } + rows, _, err := exec.ExecRestrictedStmt(context.TODO(), stmt) + if err != nil { + return nil, err + } + for _, row := range rows { + name := row.GetString(0) + val := row.GetString(1) + tableContents[name] = val + } + return tableContents, nil +} + +// RebuildSysVarCache rebuilds the sysvar cache both globally and for session vars. +// It needs to be called when sysvars are added or removed. +func (svc *SysVarCache) RebuildSysVarCache(ctx sessionctx.Context) error { + svc.Lock() + defer svc.Unlock() + + logutil.BgLogger().Info("rebuilding sysvar cache") + + // Create a new map to hold the new cache, + // and a cache if what's available in the mysql_global_variables table + newSessionCache := make(map[string]string) + newGlobalCache := make(map[string]string) + tableContents, err := svc.fetchTableValues(ctx) + if err != nil { + return err + } + + for _, sv := range variable.GetSysVars() { + if sv.HasSessionScope() { + if _, ok := tableContents[sv.Name]; ok { + newSessionCache[sv.Name] = tableContents[sv.Name] + } else { + newSessionCache[sv.Name] = sv.Value // use default + } + } + if sv.HasGlobalScope() { + if _, ok := tableContents[sv.Name]; ok { + newGlobalCache[sv.Name] = tableContents[sv.Name] + } else { + newGlobalCache[sv.Name] = sv.Value // use default + } + } + } + + svc.session = newSessionCache + svc.global = newGlobalCache + return nil +} diff --git a/metrics/domain.go b/metrics/domain.go index dd3912555d59c..5397290f33993 100644 --- a/metrics/domain.go +++ b/metrics/domain.go @@ -47,6 +47,15 @@ var ( Help: "Counter of load privilege", }, []string{LblType}) + // LoadSysVarCacheCounter records the counter of loading sysvars + LoadSysVarCacheCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "domain", + Name: "load_sysvarcache_total", + Help: "Counter of load sysvar cache", + }, []string{LblType}) + SchemaValidatorStop = "stop" SchemaValidatorRestart = "restart" SchemaValidatorReset = "reset" diff --git a/metrics/metrics.go b/metrics/metrics.go index ff2ac3b1aa08d..9ac9c22d7ab52 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -98,6 +98,7 @@ func RegisterMetrics() { prometheus.MustRegister(JobsGauge) prometheus.MustRegister(KeepAliveCounter) prometheus.MustRegister(LoadPrivilegeCounter) + prometheus.MustRegister(LoadSysVarCacheCounter) prometheus.MustRegister(LoadSchemaCounter) prometheus.MustRegister(LoadSchemaDuration) prometheus.MustRegister(MetaHistogram) diff --git a/session/session.go b/session/session.go index b4361c395e839..47c1e094b2c05 100644 --- a/session/session.go +++ b/session/session.go @@ -950,7 +950,7 @@ func (s *session) varFromTiDBTable(name string) bool { } // GetGlobalSysVar implements GlobalVarAccessor.GetGlobalSysVar interface. -func (s *session) GetGlobalSysVar(name string) (value string, err error) { +func (s *session) GetGlobalSysVar(name string) (string, error) { name = strings.ToLower(name) if name == variable.TiDBSlowLogMasking { name = variable.TiDBRedactLog @@ -959,33 +959,15 @@ func (s *session) GetGlobalSysVar(name string) (value string, err error) { // When running bootstrap or upgrade, we should not access global storage. return "", nil } - - if svcache.isHealthy { - // Use the new sysvar cache - var ok bool - if value, ok = svcache.global[name]; !ok { - return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) - } - } else { - // Fallback to the table method. - value, err = s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) - if err != nil { - if errResultIsEmpty.Equal(err) { - sv := variable.GetSysVar(name) - if sv != nil { - return sv.Value, nil - } - return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) - } - return "", err - } + sysVar, err := domain.GetDomain(s).GetSysVarCache().GetGlobalVar(name) + if err != nil { + return "", err } - // Fetch mysql.tidb values if required if s.varFromTiDBTable(name) { - return s.getTiDBTableValue(name, value) + return s.getTiDBTableValue(name, sysVar) } - return value, nil + return sysVar, nil } // SetGlobalSysVar implements GlobalVarAccessor.SetGlobalSysVar interface. @@ -1020,8 +1002,8 @@ func (s *session) SetGlobalSysVar(name, value string) error { } // Update the svcache - // TODO: tell etcd peers that the svcache is stale! - return s.UpdateSysVarCacheForKey(name, sVal) + domain.GetDomain(s).NotifyUpdateSysVarCache(s) + return nil } // setTiDBTableValue handles tikv_* sysvars which need to update mysql.tidb @@ -2217,8 +2199,6 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return nil, err } - se.RebuildSysVarCache() - // get system tz from mysql.tidb tz, err := se.getTableValue(context.TODO(), mysql.TiDBTable, "system_tz") if err != nil { @@ -2281,13 +2261,18 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { } } + // Rebuild sysvar cache in a loop + err = dom.LoadSysVarCacheLoop(se) + if err != nil { + return nil, err + } + if len(cfg.Plugin.Load) > 0 { err := plugin.Init(context.Background(), plugin.Config{EtcdClient: dom.GetEtcdClient()}) if err != nil { return nil, err } } - se4, err := createSession(store) if err != nil { return nil, err @@ -2379,9 +2364,6 @@ func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { domain.BindDomain(s, dom) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s - - s.loadCommonGlobalVariablesIfNeeded() - s.sessionVars.BinlogClient = binloginfo.GetPumpsClient() s.txn.init() @@ -2462,7 +2444,6 @@ func finishBootstrap(store kv.Storage) { logutil.BgLogger().Fatal("finish bootstrap failed", zap.Error(err)) } - } const quoteCommaQuote = "', '" @@ -2478,9 +2459,8 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { } // Deep copy sessionvar cache - svcache.RLock() - s.sessionVars.InitSessionVarsFromCache(svcache.session) - svcache.RUnlock() + sessionCache := domain.GetDomain(s).GetSysVarCache().GetSessionCache() + s.sessionVars.InitSessionVarsFromCache(sessionCache) s.sessionVars.CommonGlobalLoaded = true return nil } diff --git a/session/session_test.go b/session/session_test.go index 9ce875fa07868..f13044ef1f14f 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -212,7 +212,6 @@ func (s *testSessionSuiteBase) SetUpSuite(c *C) { var err error s.dom, err = session.BootstrapSession(s.store) c.Assert(err, IsNil) - s.dom.GetGlobalVarsCache().Disable() } func (s *testSessionSuiteBase) TearDownSuite(c *C) { @@ -638,7 +637,6 @@ func (s *testSessionSuite) TestGlobalVarAccessor(c *C) { c.Assert(v, Equals, varValue2) // For issue 10955, make sure the new session load `max_execution_time` into sessionVars. - s.dom.GetGlobalVarsCache().Disable() tk1.MustExec("set @@global.max_execution_time = 100") tk2 := testkit.NewTestKitWithInit(c, s.store) c.Assert(tk2.Se.GetSessionVars().MaxExecutionTime, Equals, uint64(100)) @@ -2574,8 +2572,6 @@ func (s *testSessionSuite) TestSetGlobalTZ(c *C) { tk.MustQuery("show variables like 'time_zone'").Check(testkit.Rows("time_zone +08:00")) - // Disable global variable cache, so load global session variable take effect immediate. - s.dom.GetGlobalVarsCache().Disable() tk1 := testkit.NewTestKitWithInit(c, s.store) tk1.MustQuery("show variables like 'time_zone'").Check(testkit.Rows("time_zone +00:00")) } @@ -2717,8 +2713,6 @@ func (s *testSessionSuite3) TestEnablePartition(c *C) { tk.MustExec("set tidb_enable_list_partition=on") tk.MustQuery("show variables like 'tidb_enable_list_partition'").Check(testkit.Rows("tidb_enable_list_partition ON")) - // Disable global variable cache, so load global session variable take effect immediate. - s.dom.GetGlobalVarsCache().Disable() tk1 := testkit.NewTestKitWithInit(c, s.store) tk1.MustQuery("show variables like 'tidb_enable_table_partition'").Check(testkit.Rows("tidb_enable_table_partition ON")) } diff --git a/session/sysvar_cache.go b/session/sysvar_cache.go deleted file mode 100644 index e513ddde621a4..0000000000000 --- a/session/sysvar_cache.go +++ /dev/null @@ -1,132 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package session - -import ( - "context" - "sync" - - "github.com/pingcap/parser/terror" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/logutil" -) - -// When a new session is created, SessionVars.systems needs to be populated -// with a cache of common system variables. Previously this populated the -// "common sysvars" only, with uncommon ones being populated on-demand. -// -// This is both *not* MySQL compatible (it should not pick up global scoped changes after the session starts), -// and it has a cache miss-path which can be quite slow, such as when running -// SHOW VARIABLES for the first time in a session (which will hit all session vars). - -type sysVarCache struct { - sync.RWMutex - isHealthy bool - global map[string]string - session map[string]string -} - -var svcache sysVarCache - -func (s *session) fetchTableValues() (tableContents map[string]string) { - tableContents = make(map[string]string) - // Copy all variables from the table to tableContents - rs, err := s.ExecuteInternal(context.Background(), "SELECT variable_name, variable_value FROM mysql.global_variables") - if err != nil { - return - } - defer terror.Call(rs.Close) - req := rs.NewChunk() - for { - err = rs.Next(context.TODO(), req) - if err != nil { - return - } - if req.NumRows() == 0 { - return - } - it := chunk.NewIterator4Chunk(req) - for row := it.Begin(); row != it.End(); row = it.Next() { - name := row.GetString(0) - val := row.GetString(1) - tableContents[name] = val - } - } -} - -func (s *session) buildSysVarCacheIfNeeded() { - if len(svcache.global) == 0 || len(svcache.session) == 0 { - s.RebuildSysVarCache() - } -} - -// RebuildSysVarCache rebuilds the sysvar cache both globally and for session vars. -// It needs to be called when sysvars are added or removed. For global sysvar changes -// UpdateSysVarCacheForKey can be called instead. -func (s *session) RebuildSysVarCache() error { - svcache.Lock() - defer svcache.Unlock() - - logutil.BgLogger().Info("rebuilding sysvar cache") - - // Create a new map to hold the new cache, - // and a cache if what's available in the mysql_global_variables table - newSessionCache := make(map[string]string) - newGlobalCache := make(map[string]string) - tableContents := s.fetchTableValues() - - for _, sv := range variable.GetSysVars() { - if sv.HasSessionScope() { - if _, ok := tableContents[sv.Name]; ok { - newSessionCache[sv.Name] = tableContents[sv.Name] - } else { - newSessionCache[sv.Name] = sv.Value // use default - } - } - if sv.HasGlobalScope() { - if _, ok := tableContents[sv.Name]; ok { - newGlobalCache[sv.Name] = tableContents[sv.Name] - } else { - newGlobalCache[sv.Name] = sv.Value // use default - } - } - } - - // Update the cache - svcache.session = newSessionCache - svcache.global = newGlobalCache - svcache.isHealthy = true - return nil -} - -// UpdateSysVarCacheForKey is an optimization where we patch the contents of the -// global and session cache rather than run RebuildSysVarCache() -func (s *session) UpdateSysVarCacheForKey(nameInLower string, value string) error { - if !svcache.isHealthy { - return s.RebuildSysVarCache() - } - - svcache.Lock() - defer svcache.Unlock() - - sv := variable.GetSysVar(nameInLower) - if sv.HasSessionScope() { - svcache.session[nameInLower] = value - } - if sv.HasGlobalScope() { - svcache.global[nameInLower] = value - } - return nil -} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index dbb6901e9e457..6bb2dadb20a91 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1383,9 +1383,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { } func (s *SessionVars) InitSessionVarsFromCache(cache map[string]string) error { - for varName, varVal := range cache { - s.systems[varName] = varVal - } + s.systems = cache + // when client set Capability Flags CLIENT_INTERACTIVE, init wait_timeout with interactive_timeout if s.ClientCapability&mysql.ClientInteractive > 0 { s.systems[WaitTimeout] = s.systems[InteractiveTimeout] From 6b0d8c17f3d866670a62edf36d2deaf66047b98c Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 27 Apr 2021 14:10:39 -0600 Subject: [PATCH 04/14] WIP --- cmd/explaintest/main.go | 3 - ddl/db_change_test.go | 1 - domain/domain.go | 2 +- domain/sysvar_cache.go | 33 +++-- executor/ddl_test.go | 2 + executor/executor_test.go | 3 +- executor/seqtest/seq_executor_test.go | 2 - infoschema/tables_test.go | 6 +- metrics/metrics.go | 2 +- planner/core/prepare_test.go | 3 - session/bootstrap.go | 1 + session/session.go | 185 +++++++++++++++++++++++++- session/session_test.go | 1 + sessionctx/variable/session.go | 9 +- 14 files changed, 208 insertions(+), 45 deletions(-) diff --git a/cmd/explaintest/main.go b/cmd/explaintest/main.go index fa5265f7af871..a85c8ce82dd3c 100644 --- a/cmd/explaintest/main.go +++ b/cmd/explaintest/main.go @@ -30,7 +30,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/parser/ast" - "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util/logutil" @@ -663,8 +662,6 @@ func main() { log.Fatal(fmt.Sprintf("%s failed", sql), zap.Error(err)) } } - // Wait global variables to reload. - time.Sleep(domain.GlobalVariableCacheExpiry) if _, err = mdb.Exec("set sql_mode='STRICT_TRANS_TABLES'"); err != nil { log.Fatal("set sql_mode='STRICT_TRANS_TABLES' failed", zap.Error(err)) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index de9d8eb504866..bbd7295af10e1 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1063,7 +1063,6 @@ func (s *testStateChangeSuite) TestParallelAlterModifyColumnAndAddPK(c *C) { _, err = s.se.Execute(context.Background(), "set global tidb_enable_change_column_type = 0") c.Assert(err, IsNil) }() - domain.GetDomain(s.se).GetGlobalVarsCache().Disable() sql1 := "ALTER TABLE t ADD PRIMARY KEY (b) NONCLUSTERED;" sql2 := "ALTER TABLE t MODIFY COLUMN b tinyint;" diff --git a/domain/domain.go b/domain/domain.go index 19612518a66ad..60405675a095d 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -79,7 +79,7 @@ type Domain struct { sysSessionPool *sessionPool exit chan struct{} etcdClient *clientv3.Client - sysVarCache SysVarCache // replaces gcv + sysVarCache SysVarCache // replaces GlobalVariableCache slowQuery *topNSlowQueries expensiveQueryHandle *expensivequery.Handle wg sync.WaitGroup diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go index 7841e0aaa68b6..52b75bb4de522 100644 --- a/domain/sysvar_cache.go +++ b/domain/sysvar_cache.go @@ -24,16 +24,10 @@ import ( "github.com/pingcap/tidb/util/sqlexec" ) -// The sysvar cache replaces the "global_vars_cache". It is an improvement because it operates similar to privilege cache, -// Where it caches for 5 minutes instead of 2 seconds, plus it listens on etcd for updates from other servers. - -// When a new session is created, SessionVars.systems needs to be populated -// with a cache of common system variables. Previously this populated the -// "common sysvars" only, with uncommon ones being populated on-demand. -// -// This was both *not* MySQL compatible (it should not pick up global scoped changes after the session starts), -// and it has a cache miss-path which can be quite slow, such as when running -// SHOW VARIABLES for the first time in a session (which will hit all session vars). +// The sysvar cache replaces the GlobalVariableCache. +// It is an improvement because it operates similar to privilege cache, +// where it caches for 5 minutes instead of 2 seconds, plus it listens on etcd +// for updates from other servers. type SysVarCache struct { sync.RWMutex @@ -48,10 +42,13 @@ func (do *Domain) GetSysVarCache() *SysVarCache { return &do.sysVarCache } -func (svc *SysVarCache) GetSessionCache() map[string]string { +func (svc *SysVarCache) GetSessionCache(ctx sessionctx.Context) map[string]string { + if len(svc.session) == 0 { + svc.RebuildSysVarCache(ctx) + } + svc.RLock() defer svc.RUnlock() - // Perform a deep copy since this will be assigned directly to the session newMap := make(map[string]string, len(svc.session)) for k, v := range svc.session { @@ -60,7 +57,11 @@ func (svc *SysVarCache) GetSessionCache() map[string]string { return newMap } -func (svc *SysVarCache) GetGlobalVar(name string) (string, error) { +func (svc *SysVarCache) GetGlobalVar(ctx sessionctx.Context, name string) (string, error) { + if len(svc.global) == 0 { + svc.RebuildSysVarCache(ctx) + } + svc.RLock() defer svc.RUnlock() if val, ok := svc.global[name]; ok { @@ -95,10 +96,6 @@ func (svc *SysVarCache) RebuildSysVarCache(ctx sessionctx.Context) error { svc.Lock() defer svc.Unlock() - logutil.BgLogger().Info("rebuilding sysvar cache") - - // Create a new map to hold the new cache, - // and a cache if what's available in the mysql_global_variables table newSessionCache := make(map[string]string) newGlobalCache := make(map[string]string) tableContents, err := svc.fetchTableValues(ctx) @@ -123,6 +120,8 @@ func (svc *SysVarCache) RebuildSysVarCache(ctx sessionctx.Context) error { } } + logutil.BgLogger().Info("rebuilding sysvar cache") + svc.session = newSessionCache svc.global = newGlobalCache return nil diff --git a/executor/ddl_test.go b/executor/ddl_test.go index c55908066de62..f625c1fda2ac2 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -484,6 +484,8 @@ func (s *testSuite6) TestAlterTableAddColumn(c *C) { tk.MustExec("drop sequence alter_seq") } +// TODO: Requires multi schema change. I need to figure out why it is turned off. + func (s *testSuite6) TestAlterTableAddColumns(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/executor_test.go b/executor/executor_test.go index aaebfd0ada142..46f9130c43a9f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2258,8 +2258,6 @@ func (s *testSuiteP2) TestSQLMode(c *C) { tk.MustExec("set sql_mode = 'STRICT_TRANS_TABLES'") tk.MustExec("set @@global.sql_mode = ''") - // Disable global variable cache, so load global session variable take effect immediate. - s.domain.GetGlobalVarsCache().Disable() tk2 := testkit.NewTestKit(c, s.store) tk2.MustExec("use test") tk2.MustExec("drop table if exists t2") @@ -7965,6 +7963,7 @@ func (s *testSerialSuite) TestTxnWriteThroughputSLI(c *C) { writeSLI := tk.Se.GetTxnWriteThroughputSLI() c.Assert(writeSLI.IsInvalid(), Equals, false) c.Assert(writeSLI.IsSmallTxn(), Equals, true) + // This is currently returning 46 and not 58 for the writesize. why?? c.Assert(tk.Se.GetTxnWriteThroughputSLI().String(), Equals, "invalid: false, affectRow: 2, writeSize: 58, readKeys: 0, writeKeys: 2, writeTime: 1s") tk.Se.GetTxnWriteThroughputSLI().Reset() diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 7b38a6f6b673e..f5df7c01eb509 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1473,8 +1473,6 @@ func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(variable.DefTiDBMaxDeltaSchemaCount)) - gvc := domain.GetDomain(tk.Se).GetGlobalVarsCache() - gvc.Disable() tk.MustExec("set @@global.tidb_max_delta_schema_count= -1") tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '-1'")) diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index f30f25ba6abfa..fd378a2a6354b 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -959,7 +959,7 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) { tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - s.dom.GetGlobalVarsCache().Disable() + // s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) @@ -1203,7 +1203,7 @@ func (s *testClusterTableSuite) TestStmtSummaryHistoryTable(c *C) { tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - s.dom.GetGlobalVarsCache().Disable() + // s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) @@ -1260,7 +1260,7 @@ func (s *testTableSuite) TestStmtSummaryInternalQuery(c *C) { tk.MustExec("set global tidb_enable_stmt_summary = 1") tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - s.dom.GetGlobalVarsCache().Disable() + // s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) diff --git a/metrics/metrics.go b/metrics/metrics.go index 9ac9c22d7ab52..6156c772a6972 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -98,7 +98,6 @@ func RegisterMetrics() { prometheus.MustRegister(JobsGauge) prometheus.MustRegister(KeepAliveCounter) prometheus.MustRegister(LoadPrivilegeCounter) - prometheus.MustRegister(LoadSysVarCacheCounter) prometheus.MustRegister(LoadSchemaCounter) prometheus.MustRegister(LoadSchemaDuration) prometheus.MustRegister(MetaHistogram) @@ -151,6 +150,7 @@ func RegisterMetrics() { prometheus.MustRegister(TiFlashQueryTotalCounter) prometheus.MustRegister(SmallTxnWriteDuration) prometheus.MustRegister(TxnWriteThroughput) + prometheus.MustRegister(LoadSysVarCacheCounter) tikvmetrics.InitMetrics(TiDB, TiKVClient) tikvmetrics.RegisterMetrics() diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index 686d67b00a9e0..afd5321ca009c 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -275,9 +275,6 @@ func (s *testPrepareSerialSuite) TestPrepareOverMaxPreparedStmtCount(c *C) { tk.MustExec("set @@global.max_prepared_stmt_count = 2") tk.MustQuery("select @@global.max_prepared_stmt_count").Check(testkit.Rows("2")) - // Disable global variable cache, so load global session variable take effect immediate. - dom.GetGlobalVarsCache().Disable() - // test close session to give up all prepared stmt tk.MustExec(`prepare stmt2 from "select 1"`) prePrepared = readGaugeInt(metrics.PreparedStmtGauge) diff --git a/session/bootstrap.go b/session/bootstrap.go index 34d6748ae38c1..a58754228cd66 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -363,6 +363,7 @@ func bootstrap(s Session) { doDMLWorks(s) logutil.BgLogger().Info("bootstrap successful", zap.Duration("take time", time.Since(startTime))) + dom.NotifyUpdateSysVarCache(s) return } time.Sleep(200 * time.Millisecond) diff --git a/session/session.go b/session/session.go index 47c1e094b2c05..cdaa7a320c8ee 100644 --- a/session/session.go +++ b/session/session.go @@ -959,9 +959,24 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { // When running bootstrap or upgrade, we should not access global storage. return "", nil } - sysVar, err := domain.GetDomain(s).GetSysVarCache().GetGlobalVar(name) + + sv := variable.GetSysVar(name) + if sv == nil { + // It might be a recently unregistered sysvar. We should return unknown + // since GetSysVar is the canonical version, but we can update the cache + // so the next request doesn't attempt to load this. + domain.GetDomain(s).NotifyUpdateSysVarCache(s) + return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) + } + + sysVar, err := domain.GetDomain(s).GetSysVarCache().GetGlobalVar(s, name) if err != nil { - return "", err + // The sysvar exists, but there is no cache entry yet. + // This might be because the sysvar was only recently registered. + // In which case it is safe to return the default, but we can also + // update the cache for the future. + domain.GetDomain(s).NotifyUpdateSysVarCache(s) + return sv.Value, nil } // Fetch mysql.tidb values if required if s.varFromTiDBTable(name) { @@ -2448,8 +2463,133 @@ func finishBootstrap(store kv.Storage) { const quoteCommaQuote = "', '" +var builtinGlobalVariable = []string{ + variable.AutoCommit, + variable.SQLModeVar, + variable.MaxAllowedPacket, + variable.TimeZone, + variable.BlockEncryptionMode, + variable.WaitTimeout, + variable.InteractiveTimeout, + variable.MaxPreparedStmtCount, + variable.InitConnect, + variable.TxnIsolation, + variable.TxReadOnly, + variable.TransactionIsolation, + variable.TransactionReadOnly, + variable.NetBufferLength, + variable.QueryCacheType, + variable.QueryCacheSize, + variable.CharacterSetServer, + variable.AutoIncrementIncrement, + variable.AutoIncrementOffset, + variable.CollationServer, + variable.NetWriteTimeout, + variable.MaxExecutionTime, + variable.InnodbLockWaitTimeout, + variable.WindowingUseHighPrecision, + variable.SQLSelectLimit, + variable.DefaultWeekFormat, + + /* TiDB specific global variables: */ + variable.TiDBSkipASCIICheck, + variable.TiDBSkipUTF8Check, + variable.TiDBIndexJoinBatchSize, + variable.TiDBIndexLookupSize, + variable.TiDBIndexLookupConcurrency, + variable.TiDBIndexLookupJoinConcurrency, + variable.TiDBIndexSerialScanConcurrency, + variable.TiDBHashJoinConcurrency, + variable.TiDBProjectionConcurrency, + variable.TiDBHashAggPartialConcurrency, + variable.TiDBHashAggFinalConcurrency, + variable.TiDBWindowConcurrency, + variable.TiDBMergeJoinConcurrency, + variable.TiDBStreamAggConcurrency, + variable.TiDBExecutorConcurrency, + variable.TiDBBackoffLockFast, + variable.TiDBBackOffWeight, + variable.TiDBConstraintCheckInPlace, + variable.TiDBDDLReorgWorkerCount, + variable.TiDBDDLReorgBatchSize, + variable.TiDBDDLErrorCountLimit, + variable.TiDBOptInSubqToJoinAndAgg, + variable.TiDBOptPreferRangeScan, + variable.TiDBOptCorrelationThreshold, + variable.TiDBOptCorrelationExpFactor, + variable.TiDBOptCPUFactor, + variable.TiDBOptCopCPUFactor, + variable.TiDBOptNetworkFactor, + variable.TiDBOptScanFactor, + variable.TiDBOptDescScanFactor, + variable.TiDBOptMemoryFactor, + variable.TiDBOptDiskFactor, + variable.TiDBOptConcurrencyFactor, + variable.TiDBDistSQLScanConcurrency, + variable.TiDBInitChunkSize, + variable.TiDBMaxChunkSize, + variable.TiDBEnableCascadesPlanner, + variable.TiDBRetryLimit, + variable.TiDBDisableTxnAutoRetry, + variable.TiDBEnableWindowFunction, + variable.TiDBEnableStrictDoubleTypeCheck, + variable.TiDBEnableTablePartition, + variable.TiDBEnableVectorizedExpression, + variable.TiDBEnableFastAnalyze, + variable.TiDBExpensiveQueryTimeThreshold, + variable.TiDBEnableNoopFuncs, + variable.TiDBEnableIndexMerge, + variable.TiDBTxnMode, + variable.TiDBAllowBatchCop, + variable.TiDBAllowMPPExecution, + variable.TiDBOptBCJ, + variable.TiDBBCJThresholdSize, + variable.TiDBBCJThresholdCount, + variable.TiDBRowFormatVersion, + variable.TiDBEnableStmtSummary, + variable.TiDBStmtSummaryInternalQuery, + variable.TiDBStmtSummaryRefreshInterval, + variable.TiDBStmtSummaryHistorySize, + variable.TiDBStmtSummaryMaxStmtCount, + variable.TiDBStmtSummaryMaxSQLLength, + variable.TiDBMaxDeltaSchemaCount, + variable.TiDBCapturePlanBaseline, + variable.TiDBUsePlanBaselines, + variable.TiDBEvolvePlanBaselines, + variable.TiDBEnableExtendedStats, + variable.TiDBIsolationReadEngines, + variable.TiDBStoreLimit, + variable.TiDBAllowAutoRandExplicitInsert, + variable.TiDBEnableClusteredIndex, + variable.TiDBPartitionPruneMode, + variable.TiDBRedactLog, + variable.TiDBEnableTelemetry, + variable.TiDBShardAllocateStep, + variable.TiDBEnableChangeColumnType, + variable.TiDBEnableChangeMultiSchema, + variable.TiDBEnablePointGetCache, + variable.TiDBEnableAlterPlacement, + variable.TiDBEnableAmendPessimisticTxn, + variable.TiDBMemQuotaApplyCache, + variable.TiDBEnableParallelApply, + variable.TiDBMemoryUsageAlarmRatio, + variable.TiDBEnableRateLimitAction, + variable.TiDBEnableAsyncCommit, + variable.TiDBEnable1PC, + variable.TiDBGuaranteeLinearizability, + variable.TiDBAnalyzeVersion, + variable.TiDBEnableIndexMergeJoin, + variable.TiDBTrackAggregateMemoryUsage, + variable.TiDBMultiStatementMode, + variable.TiDBEnableExchangePartition, + variable.TiDBAllowFallbackToTiKV, + variable.TiDBEnableDynamicPrivileges, + variable.CTEMaxRecursionDepth, +} + // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { + vars := s.sessionVars if s.sessionVars.CommonGlobalLoaded { return nil } @@ -2458,10 +2598,45 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { return nil } + var err error + + vars.CommonGlobalLoaded = true + // Deep copy sessionvar cache - sessionCache := domain.GetDomain(s).GetSysVarCache().GetSessionCache() - s.sessionVars.InitSessionVarsFromCache(sessionCache) - s.sessionVars.CommonGlobalLoaded = true + // Eventually this whole map will be applied to systems[], which is a MySQL behavior. + sessionCache := domain.GetDomain(s).GetSysVarCache().GetSessionCache(s) + for _, varName := range builtinGlobalVariable { + // The item should be in the sessionCache, but due to a strange current behavior there are some Global-only + // vars that are in builtinGlobalVariable. For compatibility we need to fall back to the Global cache on these items. + // TODO: don't load these globals into the session! + var varVal string + var ok bool + if varVal, ok = sessionCache[varName]; !ok { + varVal, err = domain.GetDomain(s).GetSysVarCache().GetGlobalVar(s, varName) + if err != nil { + continue + } + } + // `collation_server` is related to `character_set_server`, set `character_set_server` will also set `collation_server`. + // We have to make sure we set the `collation_server` with right value. + if _, ok := vars.GetSystemVar(varName); !ok || varName == variable.CollationServer { + err = variable.SetSessionSystemVar(s.sessionVars, varName, varVal) + if err != nil { + continue + } + } + } + + // when client set Capability Flags CLIENT_INTERACTIVE, init wait_timeout with interactive_timeout + if vars.ClientCapability&mysql.ClientInteractive > 0 { + if varVal, ok := vars.GetSystemVar(variable.InteractiveTimeout); ok { + if err := vars.SetSystemVar(variable.WaitTimeout, varVal); err != nil { + return err + } + } + } + + vars.CommonGlobalLoaded = true return nil } diff --git a/session/session_test.go b/session/session_test.go index f13044ef1f14f..9cc4f036b3916 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -4102,6 +4102,7 @@ func (s *testSessionSerialSuite) TestRemovedSysVars(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) variable.RegisterSysVar(&variable.SysVar{Scope: variable.ScopeGlobal | variable.ScopeSession, Name: "bogus_var", Value: "acdc"}) + result := tk.MustQuery("SHOW GLOBAL VARIABLES LIKE 'bogus_var'") result.Check(testkit.Rows("bogus_var acdc")) result = tk.MustQuery("SELECT @@GLOBAL.bogus_var") diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 6bb2dadb20a91..782747b3b32e8 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1382,13 +1382,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { return nil } -func (s *SessionVars) InitSessionVarsFromCache(cache map[string]string) error { - s.systems = cache - - // when client set Capability Flags CLIENT_INTERACTIVE, init wait_timeout with interactive_timeout - if s.ClientCapability&mysql.ClientInteractive > 0 { - s.systems[WaitTimeout] = s.systems[InteractiveTimeout] - } +func (s *SessionVars) InitSessionVarFromCache(name, val string) error { + s.systems[name] = val return nil } From 305de84959ac416a6a6ec75186170d9fcd479d79 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 28 Apr 2021 09:57:11 -0600 Subject: [PATCH 05/14] WIP --- domain/domain.go | 8 +++- domain/sysvar_cache.go | 86 ++++++++++++++++++++++++---------- executor/ddl_test.go | 2 - executor/executor_test.go | 1 - infoschema/tables_test.go | 6 --- session/bootstrap.go | 1 - session/session.go | 11 +++-- sessionctx/variable/session.go | 5 -- 8 files changed, 74 insertions(+), 46 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 60405675a095d..0971a35d93ef6 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -965,6 +965,7 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { } count = 0 + logutil.BgLogger().Info("Rebuilding sysvar cache from etcd watch event.") err := do.sysVarCache.RebuildSysVarCache(ctx) metrics.LoadSysVarCacheCounter.WithLabelValues(metrics.RetLabel(err)).Inc() if err != nil { @@ -1378,6 +1379,9 @@ func (do *Domain) NotifyUpdatePrivilege(ctx sessionctx.Context) { } } +// NotifyUpdateSysVarCache updates the sysvar cache key in etcd, which other TiDB +// clients are subscribed to for updates. For the caller, the cache is also built +// synchronously so that the effect is immediate. func (do *Domain) NotifyUpdateSysVarCache(ctx sessionctx.Context) { if do.etcdClient != nil { row := do.etcdClient.KV @@ -1387,7 +1391,9 @@ func (do *Domain) NotifyUpdateSysVarCache(ctx sessionctx.Context) { } } // update locally - do.sysVarCache.RebuildSysVarCache(ctx) + if err := do.sysVarCache.RebuildSysVarCache(ctx); err != nil { + logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err)) + } } // ServerID gets serverID. diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go index 52b75bb4de522..c1f77df462705 100644 --- a/domain/sysvar_cache.go +++ b/domain/sysvar_cache.go @@ -15,13 +15,15 @@ package domain import ( "context" + "fmt" "sync" - "time" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/stmtsummary" + "go.uber.org/zap" ) // The sysvar cache replaces the GlobalVariableCache. @@ -29,24 +31,35 @@ import ( // where it caches for 5 minutes instead of 2 seconds, plus it listens on etcd // for updates from other servers. +// SysVarCache represents the cache of system variables broken up into session and global scope. type SysVarCache struct { sync.RWMutex - isHealthy bool - lastModify time.Time - global map[string]string - session map[string]string + global map[string]string + session map[string]string } -// GetGlobalVarsCache gets the global variable cache. +// GetSysVarCache gets the global variable cache. func (do *Domain) GetSysVarCache() *SysVarCache { return &do.sysVarCache } -func (svc *SysVarCache) GetSessionCache(ctx sessionctx.Context) map[string]string { - if len(svc.session) == 0 { - svc.RebuildSysVarCache(ctx) +func (svc *SysVarCache) rebuildCacheIfNeeded(ctx sessionctx.Context) { + svc.RLock() + cacheNeedsRebuild := len(svc.session) == 0 || len(svc.global) == 0 + svc.RUnlock() + if cacheNeedsRebuild { + logutil.BgLogger().Warn("sysvar cache is empty, triggering rebuild") + if err := svc.RebuildSysVarCache(ctx); err != nil { + logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err)) + } } +} +// GetSessionCache gets a copy of the session sysvar cache. +// The intention is to copy it directly to the systems[] map +// on creating a new session. +func (svc *SysVarCache) GetSessionCache(ctx sessionctx.Context) map[string]string { + svc.rebuildCacheIfNeeded(ctx) svc.RLock() defer svc.RUnlock() // Perform a deep copy since this will be assigned directly to the session @@ -57,16 +70,16 @@ func (svc *SysVarCache) GetSessionCache(ctx sessionctx.Context) map[string]strin return newMap } +// GetGlobalVar gets an individual global var from the sysvar cache. func (svc *SysVarCache) GetGlobalVar(ctx sessionctx.Context, name string) (string, error) { - if len(svc.global) == 0 { - svc.RebuildSysVarCache(ctx) - } - + svc.rebuildCacheIfNeeded(ctx) svc.RLock() defer svc.RUnlock() + if val, ok := svc.global[name]; ok { return val, nil } + logutil.BgLogger().Warn("could not find key in global cache", zap.String("name", name)) return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) } @@ -93,9 +106,6 @@ func (svc *SysVarCache) fetchTableValues(ctx sessionctx.Context) (map[string]str // RebuildSysVarCache rebuilds the sysvar cache both globally and for session vars. // It needs to be called when sysvars are added or removed. func (svc *SysVarCache) RebuildSysVarCache(ctx sessionctx.Context) error { - svc.Lock() - defer svc.Unlock() - newSessionCache := make(map[string]string) newGlobalCache := make(map[string]string) tableContents, err := svc.fetchTableValues(ctx) @@ -104,25 +114,49 @@ func (svc *SysVarCache) RebuildSysVarCache(ctx sessionctx.Context) error { } for _, sv := range variable.GetSysVars() { + sVal := sv.Value + if _, ok := tableContents[sv.Name]; ok { + sVal = tableContents[sv.Name] + } if sv.HasSessionScope() { - if _, ok := tableContents[sv.Name]; ok { - newSessionCache[sv.Name] = tableContents[sv.Name] - } else { - newSessionCache[sv.Name] = sv.Value // use default - } + newSessionCache[sv.Name] = sVal } if sv.HasGlobalScope() { - if _, ok := tableContents[sv.Name]; ok { - newGlobalCache[sv.Name] = tableContents[sv.Name] - } else { - newGlobalCache[sv.Name] = sv.Value // use default - } + newGlobalCache[sv.Name] = sVal } + // Propagate any changes to the server scoped variables + checkEnableServerGlobalVar(sv.Name, sVal) } logutil.BgLogger().Info("rebuilding sysvar cache") + svc.Lock() + defer svc.Unlock() svc.session = newSessionCache svc.global = newGlobalCache return nil } + +// checkEnableServerGlobalVar processes variables that acts in server and global level. +func checkEnableServerGlobalVar(name, sVal string) { + var err error + switch name { + case variable.TiDBEnableStmtSummary: + err = stmtsummary.StmtSummaryByDigestMap.SetEnabled(sVal, false) + case variable.TiDBStmtSummaryInternalQuery: + err = stmtsummary.StmtSummaryByDigestMap.SetEnabledInternalQuery(sVal, false) + case variable.TiDBStmtSummaryRefreshInterval: + err = stmtsummary.StmtSummaryByDigestMap.SetRefreshInterval(sVal, false) + case variable.TiDBStmtSummaryHistorySize: + err = stmtsummary.StmtSummaryByDigestMap.SetHistorySize(sVal, false) + case variable.TiDBStmtSummaryMaxStmtCount: + err = stmtsummary.StmtSummaryByDigestMap.SetMaxStmtCount(sVal, false) + case variable.TiDBStmtSummaryMaxSQLLength: + err = stmtsummary.StmtSummaryByDigestMap.SetMaxSQLLength(sVal, false) + case variable.TiDBCapturePlanBaseline: + variable.CapturePlanBaseline.Set(sVal, false) + } + if err != nil { + logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", name), zap.Error(err)) + } +} diff --git a/executor/ddl_test.go b/executor/ddl_test.go index f625c1fda2ac2..c55908066de62 100644 --- a/executor/ddl_test.go +++ b/executor/ddl_test.go @@ -484,8 +484,6 @@ func (s *testSuite6) TestAlterTableAddColumn(c *C) { tk.MustExec("drop sequence alter_seq") } -// TODO: Requires multi schema change. I need to figure out why it is turned off. - func (s *testSuite6) TestAlterTableAddColumns(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/executor_test.go b/executor/executor_test.go index 260f7c9d18482..663b51c0cc7a6 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7963,7 +7963,6 @@ func (s *testSerialSuite) TestTxnWriteThroughputSLI(c *C) { writeSLI := tk.Se.GetTxnWriteThroughputSLI() c.Assert(writeSLI.IsInvalid(), Equals, false) c.Assert(writeSLI.IsSmallTxn(), Equals, true) - // This is currently returning 46 and not 58 for the writesize. why?? c.Assert(tk.Se.GetTxnWriteThroughputSLI().String(), Equals, "invalid: false, affectRow: 2, writeSize: 58, readKeys: 0, writeKeys: 2, writeTime: 1s") tk.Se.GetTxnWriteThroughputSLI().Reset() diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index fd378a2a6354b..4c98b811062ec 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -958,8 +958,6 @@ func (s *testTableSuite) TestStmtSummaryTable(c *C) { tk.MustExec("set global tidb_enable_stmt_summary = 1") tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) - // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - // s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) @@ -1202,8 +1200,6 @@ func (s *testClusterTableSuite) TestStmtSummaryHistoryTable(c *C) { tk.MustExec("set global tidb_enable_stmt_summary = 1") tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) - // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - // s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) @@ -1259,8 +1255,6 @@ func (s *testTableSuite) TestStmtSummaryInternalQuery(c *C) { tk.MustExec("create global binding for select * from t where t.a = 1 using select * from t ignore index(k) where t.a = 1") tk.MustExec("set global tidb_enable_stmt_summary = 1") tk.MustQuery("select @@global.tidb_enable_stmt_summary").Check(testkit.Rows("1")) - // Invalidate the cache manually so that tidb_enable_stmt_summary works immediately. - // s.dom.GetGlobalVarsCache().Disable() // Disable refreshing summary. tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999") tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999")) diff --git a/session/bootstrap.go b/session/bootstrap.go index a58754228cd66..34d6748ae38c1 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -363,7 +363,6 @@ func bootstrap(s Session) { doDMLWorks(s) logutil.BgLogger().Info("bootstrap successful", zap.Duration("take time", time.Since(startTime))) - dom.NotifyUpdateSysVarCache(s) return } time.Sleep(200 * time.Millisecond) diff --git a/session/session.go b/session/session.go index cdaa7a320c8ee..a20e5f0abca4b 100644 --- a/session/session.go +++ b/session/session.go @@ -965,7 +965,7 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { // It might be a recently unregistered sysvar. We should return unknown // since GetSysVar is the canonical version, but we can update the cache // so the next request doesn't attempt to load this. - domain.GetDomain(s).NotifyUpdateSysVarCache(s) + logutil.BgLogger().Info("sysvar does not exist. sysvar cache may be stale", zap.String("name", name)) return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) } @@ -975,7 +975,7 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { // This might be because the sysvar was only recently registered. // In which case it is safe to return the default, but we can also // update the cache for the future. - domain.GetDomain(s).NotifyUpdateSysVarCache(s) + logutil.BgLogger().Info("sysvar not in cache yet. sysvar cache may be stale", zap.String("name", name)) return sv.Value, nil } // Fetch mysql.tidb values if required @@ -2590,7 +2590,7 @@ var builtinGlobalVariable = []string{ // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { vars := s.sessionVars - if s.sessionVars.CommonGlobalLoaded { + if vars.CommonGlobalLoaded { return nil } if s.Value(sessionctx.Initing) != nil { @@ -2614,13 +2614,16 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { if varVal, ok = sessionCache[varName]; !ok { varVal, err = domain.GetDomain(s).GetSysVarCache().GetGlobalVar(s, varName) if err != nil { - continue + continue // skip variables that are not loaded. } } // `collation_server` is related to `character_set_server`, set `character_set_server` will also set `collation_server`. // We have to make sure we set the `collation_server` with right value. if _, ok := vars.GetSystemVar(varName); !ok || varName == variable.CollationServer { err = variable.SetSessionSystemVar(s.sessionVars, varName, varVal) + // If there an error it probably means that the sysvar has failed validation. + // The var might have been set by an earlier TiDB server, and fails on a new TiDB server + // which is stricter. if err != nil { continue } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 782747b3b32e8..6e75bd7ca010c 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1382,11 +1382,6 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { return nil } -func (s *SessionVars) InitSessionVarFromCache(name, val string) error { - s.systems[name] = val - return nil -} - // GetReadableTxnMode returns the session variable TxnMode but rewrites it to "OPTIMISTIC" when it's empty. func (s *SessionVars) GetReadableTxnMode() string { txnMode := s.TxnMode From f586e94f1e17c760aa0bcc8a0042d9e220e12b3c Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 28 Apr 2021 13:57:51 -0600 Subject: [PATCH 06/14] Add safer fallback for cache miss --- session/session.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/session/session.go b/session/session.go index a20e5f0abca4b..305fa54792e6a 100644 --- a/session/session.go +++ b/session/session.go @@ -976,7 +976,10 @@ func (s *session) GetGlobalSysVar(name string) (string, error) { // In which case it is safe to return the default, but we can also // update the cache for the future. logutil.BgLogger().Info("sysvar not in cache yet. sysvar cache may be stale", zap.String("name", name)) - return sv.Value, nil + sysVar, err = s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) + if err != nil { + return sv.Value, nil + } } // Fetch mysql.tidb values if required if s.varFromTiDBTable(name) { From 3be42568ebeb33bd0c993ba486ad0a4fa1642545 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Thu, 29 Apr 2021 10:02:24 -0600 Subject: [PATCH 07/14] Address PR feedback --- domain/sysvar_cache.go | 17 +++++++++++------ session/session.go | 5 ++++- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go index c1f77df462705..cf4db9034575e 100644 --- a/domain/sysvar_cache.go +++ b/domain/sysvar_cache.go @@ -43,23 +43,26 @@ func (do *Domain) GetSysVarCache() *SysVarCache { return &do.sysVarCache } -func (svc *SysVarCache) rebuildCacheIfNeeded(ctx sessionctx.Context) { +func (svc *SysVarCache) rebuildCacheIfNeeded(ctx sessionctx.Context) (err error) { svc.RLock() cacheNeedsRebuild := len(svc.session) == 0 || len(svc.global) == 0 svc.RUnlock() if cacheNeedsRebuild { logutil.BgLogger().Warn("sysvar cache is empty, triggering rebuild") - if err := svc.RebuildSysVarCache(ctx); err != nil { + if err = svc.RebuildSysVarCache(ctx); err != nil { logutil.BgLogger().Error("rebuilding sysvar cache failed", zap.Error(err)) } } + return err } // GetSessionCache gets a copy of the session sysvar cache. // The intention is to copy it directly to the systems[] map // on creating a new session. -func (svc *SysVarCache) GetSessionCache(ctx sessionctx.Context) map[string]string { - svc.rebuildCacheIfNeeded(ctx) +func (svc *SysVarCache) GetSessionCache(ctx sessionctx.Context) (map[string]string, error) { + if err := svc.rebuildCacheIfNeeded(ctx); err != nil { + return nil, err + } svc.RLock() defer svc.RUnlock() // Perform a deep copy since this will be assigned directly to the session @@ -67,12 +70,14 @@ func (svc *SysVarCache) GetSessionCache(ctx sessionctx.Context) map[string]strin for k, v := range svc.session { newMap[k] = v } - return newMap + return newMap, nil } // GetGlobalVar gets an individual global var from the sysvar cache. func (svc *SysVarCache) GetGlobalVar(ctx sessionctx.Context, name string) (string, error) { - svc.rebuildCacheIfNeeded(ctx) + if err := svc.rebuildCacheIfNeeded(ctx); err != nil { + return "", err + } svc.RLock() defer svc.RUnlock() diff --git a/session/session.go b/session/session.go index 305fa54792e6a..e5a64e4269c4d 100644 --- a/session/session.go +++ b/session/session.go @@ -2607,7 +2607,10 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { // Deep copy sessionvar cache // Eventually this whole map will be applied to systems[], which is a MySQL behavior. - sessionCache := domain.GetDomain(s).GetSysVarCache().GetSessionCache(s) + sessionCache, err := domain.GetDomain(s).GetSysVarCache().GetSessionCache(s) + if err != nil { + return err + } for _, varName := range builtinGlobalVariable { // The item should be in the sessionCache, but due to a strange current behavior there are some Global-only // vars that are in builtinGlobalVariable. For compatibility we need to fall back to the Global cache on these items. From 5c9f3e049bc44c901095985f00fade3d8697378d Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 5 May 2021 21:06:29 -0600 Subject: [PATCH 08/14] Update Get to use function with fallback --- go.sum | 1 + session/session.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/go.sum b/go.sum index a4450303c0621..e9023baa2feb9 100644 --- a/go.sum +++ b/go.sum @@ -500,6 +500,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A= diff --git a/session/session.go b/session/session.go index e5a64e4269c4d..a6d0bf4252a18 100644 --- a/session/session.go +++ b/session/session.go @@ -2618,7 +2618,7 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { var varVal string var ok bool if varVal, ok = sessionCache[varName]; !ok { - varVal, err = domain.GetDomain(s).GetSysVarCache().GetGlobalVar(s, varName) + varVal, err = s.GetGlobalSysVar(varName) if err != nil { continue // skip variables that are not loaded. } From d88b9696cb8df7bbf17a4b99c52e0782722a14d4 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 7 May 2021 13:12:59 -0600 Subject: [PATCH 09/14] Fix bad merge --- session/session.go | 2 +- sessionctx/variable/session.go | 8 +++++++ sessionctx/variable/sysvar.go | 43 +++++++++++++++++++++++++--------- 3 files changed, 41 insertions(+), 12 deletions(-) diff --git a/session/session.go b/session/session.go index 51e3c532e6279..c29b642fceb58 100644 --- a/session/session.go +++ b/session/session.go @@ -2654,7 +2654,7 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { // `collation_server` is related to `character_set_server`, set `character_set_server` will also set `collation_server`. // We have to make sure we set the `collation_server` with right value. if _, ok := vars.GetSystemVar(varName); !ok || varName == variable.CollationServer { - err = vars.SetSystemVar(varName, varVal) + err = vars.SetSystemVarWithLooseValidation(varName, varVal) if err != nil { return err } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1554f3c429d65..f195838dd3082 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1380,6 +1380,14 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { return sv.SetSessionFromHook(s, val) } +// SetSystemVarWithLooseValidation sets the value of a system variable for session scope. +// Validation functions are called, but scope validation is skipped. +func (s *SessionVars) SetSystemVarWithLooseValidation(name string, val string) error { + sv := GetSysVar(name) + val = sv.ValidateLoose(s, val, ScopeSession) + return sv.SetSessionFromHook(s, val) +} + // GetReadableTxnMode returns the session variable TxnMode but rewrites it to "OPTIMISTIC" when it's empty. func (s *SessionVars) GetReadableTxnMode() string { txnMode := s.TxnMode diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index c17238c9ae9c5..95562214ff5c8 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -185,7 +185,7 @@ func (sv *SysVar) HasGlobalScope() bool { func (sv *SysVar) Validate(vars *SessionVars, value string, scope ScopeFlag) (string, error) { // Normalize the value and apply validation based on type. // i.e. TypeBool converts 1/on/ON to ON. - normalizedValue, err := sv.validateFromType(vars, value, scope) + normalizedValue, err := sv.validateFromType(vars, value, scope, false) if err != nil { return normalizedValue, err } @@ -197,18 +197,19 @@ func (sv *SysVar) Validate(vars *SessionVars, value string, scope ScopeFlag) (st } // validateFromType provides automatic validation based on the SysVar's type -func (sv *SysVar) validateFromType(vars *SessionVars, value string, scope ScopeFlag) (string, error) { +func (sv *SysVar) validateFromType(vars *SessionVars, value string, scope ScopeFlag, skipScopeValidation bool) (string, error) { // Check that the scope is correct and return the appropriate error message. - if sv.ReadOnly || sv.Scope == ScopeNone { - return value, ErrIncorrectScope.FastGenByArgs(sv.Name, "read only") - } - if scope == ScopeGlobal && !sv.HasGlobalScope() { - return value, errLocalVariable.FastGenByArgs(sv.Name) - } - if scope == ScopeSession && !sv.HasSessionScope() { - return value, errGlobalVariable.FastGenByArgs(sv.Name) + if !skipScopeValidation { + if sv.ReadOnly || sv.Scope == ScopeNone { + return value, ErrIncorrectScope.FastGenByArgs(sv.Name, "read only") + } + if scope == ScopeGlobal && !sv.HasGlobalScope() { + return value, errLocalVariable.FastGenByArgs(sv.Name) + } + if scope == ScopeSession && !sv.HasSessionScope() { + return value, errGlobalVariable.FastGenByArgs(sv.Name) + } } - // The string "DEFAULT" is a special keyword in MySQL, which restores // the compiled sysvar value. In which case we can skip further validation. if strings.EqualFold(value, "DEFAULT") { @@ -240,6 +241,26 @@ func (sv *SysVar) validateFromType(vars *SessionVars, value string, scope ScopeF return value, nil // typeString } +// ValidateLoose normalizes values but can not return errors. +// It is used when reading values applied from other servers which are assumed to be safe. +// If there is an error it would cause an upgrade problem +func (sv *SysVar) ValidateLoose(vars *SessionVars, value string, scope ScopeFlag) string { + // Normalize the value and apply validation based on type. + // i.e. TypeBool converts 1/on/ON to ON. + normalizedValue, err := sv.validateFromType(vars, value, scope, true) + if err != nil { + return normalizedValue + } + // If type validation was successful, call the (optional) validation function + if sv.Validation != nil { + normalizedValue, err = sv.Validation(vars, normalizedValue, value, scope) + if err != nil { + return normalizedValue + } + } + return normalizedValue +} + const ( localDayTimeFormat = "15:04" // FullDayTimeFormat is the full format of analyze start time and end time. From 3d803f084ca8cfea68a86528249021a617175101 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Fri, 7 May 2021 14:48:18 -0600 Subject: [PATCH 10/14] Fix bad merge --- session/session.go | 2 +- sessionctx/variable/session.go | 7 ++--- sessionctx/variable/sysvar.go | 47 ++++++++++++++++++---------------- 3 files changed, 30 insertions(+), 26 deletions(-) diff --git a/session/session.go b/session/session.go index c29b642fceb58..a4a38a565d4e2 100644 --- a/session/session.go +++ b/session/session.go @@ -2654,7 +2654,7 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { // `collation_server` is related to `character_set_server`, set `character_set_server` will also set `collation_server`. // We have to make sure we set the `collation_server` with right value. if _, ok := vars.GetSystemVar(varName); !ok || varName == variable.CollationServer { - err = vars.SetSystemVarWithLooseValidation(varName, varVal) + err = vars.SetSystemVarWithRelaxedValidation(varName, varVal) if err != nil { return err } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index f195838dd3082..5b8f30a03d2ca 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1380,11 +1380,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { return sv.SetSessionFromHook(s, val) } -// SetSystemVarWithLooseValidation sets the value of a system variable for session scope. +// SetSystemVarWithRelaxedValidation sets the value of a system variable for session scope. // Validation functions are called, but scope validation is skipped. -func (s *SessionVars) SetSystemVarWithLooseValidation(name string, val string) error { +// Errors are not expected to be returned because this could cause upgrade issues. +func (s *SessionVars) SetSystemVarWithRelaxedValidation(name string, val string) error { sv := GetSysVar(name) - val = sv.ValidateLoose(s, val, ScopeSession) + val = sv.ValidateWithRelaxedValidation(s, val, ScopeSession) return sv.SetSessionFromHook(s, val) } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 95562214ff5c8..a47a185e7f602 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -183,9 +183,13 @@ func (sv *SysVar) HasGlobalScope() bool { // Validate checks if system variable satisfies specific restriction. func (sv *SysVar) Validate(vars *SessionVars, value string, scope ScopeFlag) (string, error) { + // Check that the scope is correct first. + if err := sv.validateScope(scope); err != nil { + return value, err + } // Normalize the value and apply validation based on type. // i.e. TypeBool converts 1/on/ON to ON. - normalizedValue, err := sv.validateFromType(vars, value, scope, false) + normalizedValue, err := sv.validateFromType(vars, value, scope) if err != nil { return normalizedValue, err } @@ -197,19 +201,7 @@ func (sv *SysVar) Validate(vars *SessionVars, value string, scope ScopeFlag) (st } // validateFromType provides automatic validation based on the SysVar's type -func (sv *SysVar) validateFromType(vars *SessionVars, value string, scope ScopeFlag, skipScopeValidation bool) (string, error) { - // Check that the scope is correct and return the appropriate error message. - if !skipScopeValidation { - if sv.ReadOnly || sv.Scope == ScopeNone { - return value, ErrIncorrectScope.FastGenByArgs(sv.Name, "read only") - } - if scope == ScopeGlobal && !sv.HasGlobalScope() { - return value, errLocalVariable.FastGenByArgs(sv.Name) - } - if scope == ScopeSession && !sv.HasSessionScope() { - return value, errGlobalVariable.FastGenByArgs(sv.Name) - } - } +func (sv *SysVar) validateFromType(vars *SessionVars, value string, scope ScopeFlag) (string, error) { // The string "DEFAULT" is a special keyword in MySQL, which restores // the compiled sysvar value. In which case we can skip further validation. if strings.EqualFold(value, "DEFAULT") { @@ -241,17 +233,28 @@ func (sv *SysVar) validateFromType(vars *SessionVars, value string, scope ScopeF return value, nil // typeString } -// ValidateLoose normalizes values but can not return errors. -// It is used when reading values applied from other servers which are assumed to be safe. -// If there is an error it would cause an upgrade problem -func (sv *SysVar) ValidateLoose(vars *SessionVars, value string, scope ScopeFlag) string { - // Normalize the value and apply validation based on type. - // i.e. TypeBool converts 1/on/ON to ON. - normalizedValue, err := sv.validateFromType(vars, value, scope, true) +func (sv *SysVar) validateScope(scope ScopeFlag) error { + if sv.ReadOnly || sv.Scope == ScopeNone { + return ErrIncorrectScope.FastGenByArgs(sv.Name, "read only") + } + if scope == ScopeGlobal && !sv.HasGlobalScope() { + return errLocalVariable.FastGenByArgs(sv.Name) + } + if scope == ScopeSession && !sv.HasSessionScope() { + return errGlobalVariable.FastGenByArgs(sv.Name) + } + return nil +} + +// ValidateWithRelaxedValidation normalizes values but can not return errors. +// Normalization+validation needs to be applied when reading values because older versions of TiDB +// may be less sophisticated in normalizing values. But errors should be caught and handled, +// because otherwise there will be upgrade issues. +func (sv *SysVar) ValidateWithRelaxedValidation(vars *SessionVars, value string, scope ScopeFlag) string { + normalizedValue, err := sv.validateFromType(vars, value, scope) if err != nil { return normalizedValue } - // If type validation was successful, call the (optional) validation function if sv.Validation != nil { normalizedValue, err = sv.Validation(vars, normalizedValue, value, scope) if err != nil { From 144d11fb1c81eebde04a9af043e710712d9615d8 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Sun, 9 May 2021 21:41:06 -0600 Subject: [PATCH 11/14] Remove space --- domain/domain.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index 0971a35d93ef6..c0a1aa8086b88 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -925,19 +925,16 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error { // LoadSysVarCacheLoop create a goroutine loads sysvar cache in a loop, it // should be called only once in BootstrapSession. func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { - err := do.sysVarCache.RebuildSysVarCache(ctx) if err != nil { return err } - var watchCh clientv3.WatchChan duration := 5 * time.Minute if do.etcdClient != nil { watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey) duration = 10 * time.Minute } - do.wg.Add(1) go func() { defer func() { From 32290d9cbf4ab0835a5f01c149b40dd597fdd689 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Tue, 11 May 2021 09:21:50 -0600 Subject: [PATCH 12/14] Address PR feedback --- go.mod | 2 +- go.sum | 5 +++-- session/session.go | 4 ---- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index bf927f9cc55ce..cd8becf5a757c 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 - github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde + github.com/pingcap/parser v0.0.0-20210508071014-cd9cd78e230c github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible github.com/pingcap/tipb v0.0.0-20210422074242-57dd881b81b1 diff --git a/go.sum b/go.sum index 74b4f623789b8..4e1030039a04f 100644 --- a/go.sum +++ b/go.sum @@ -443,8 +443,8 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde h1:CcGOCE3kr8aYBy6rRcWWldidL1X5smQxV79nlnzOk+o= -github.com/pingcap/parser v0.0.0-20210427084954-8e8ed7927bde/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= +github.com/pingcap/parser v0.0.0-20210508071014-cd9cd78e230c h1:GLFd+wBN7EsV6ad/tVGFCD37taOyzIMVs3SdiWZF18I= +github.com/pingcap/parser v0.0.0-20210508071014-cd9cd78e230c/go.mod h1:xZC8I7bug4GJ5KtHhgAikjTfU4kBv1Sbo3Pf1MZ6lVw= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3 h1:A9KL9R+lWSVPH8IqUuH1QSTRJ5FGoY1bT2IcfPKsWD8= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= @@ -500,6 +500,7 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A= diff --git a/session/session.go b/session/session.go index a4a38a565d4e2..083f619cd5591 100644 --- a/session/session.go +++ b/session/session.go @@ -2629,8 +2629,6 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { return nil } - var err error - vars.CommonGlobalLoaded = true // Deep copy sessionvar cache @@ -2669,8 +2667,6 @@ func (s *session) loadCommonGlobalVariablesIfNeeded() error { } } } - - vars.CommonGlobalLoaded = true return nil } From cddb747fcfa4fc7748d4f1fb5b31fb3b4e122e13 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Wed, 12 May 2021 21:34:19 -0600 Subject: [PATCH 13/14] Change automatic refresh interval to 30 seconds --- domain/domain.go | 10 ++++------ domain/sysvar_cache.go | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/domain/domain.go b/domain/domain.go index c0a1aa8086b88..fa6c9bfb30061 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -922,18 +922,17 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error { return nil } -// LoadSysVarCacheLoop create a goroutine loads sysvar cache in a loop, it -// should be called only once in BootstrapSession. +// LoadSysVarCacheLoop create a goroutine loads sysvar cache in a loop, +// it should be called only once in BootstrapSession. func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { err := do.sysVarCache.RebuildSysVarCache(ctx) if err != nil { return err } var watchCh clientv3.WatchChan - duration := 5 * time.Minute + duration := 30 * time.Second if do.etcdClient != nil { watchCh = do.etcdClient.Watch(context.Background(), sysVarCacheKey) - duration = 10 * time.Minute } do.wg.Add(1) go func() { @@ -960,9 +959,8 @@ func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error { } continue } - count = 0 - logutil.BgLogger().Info("Rebuilding sysvar cache from etcd watch event.") + logutil.BgLogger().Debug("Rebuilding sysvar cache from etcd watch event.") err := do.sysVarCache.RebuildSysVarCache(ctx) metrics.LoadSysVarCacheCounter.WithLabelValues(metrics.RetLabel(err)).Inc() if err != nil { diff --git a/domain/sysvar_cache.go b/domain/sysvar_cache.go index cf4db9034575e..23c9688ea2f81 100644 --- a/domain/sysvar_cache.go +++ b/domain/sysvar_cache.go @@ -133,7 +133,7 @@ func (svc *SysVarCache) RebuildSysVarCache(ctx sessionctx.Context) error { checkEnableServerGlobalVar(sv.Name, sVal) } - logutil.BgLogger().Info("rebuilding sysvar cache") + logutil.BgLogger().Debug("rebuilding sysvar cache") svc.Lock() defer svc.Unlock() From 46d5b31fc906f2ee863cda172e783e49cd46fc99 Mon Sep 17 00:00:00 2001 From: Morgan Tocker Date: Thu, 13 May 2021 09:05:10 -0600 Subject: [PATCH 14/14] Make reading getsysvar list concurrency safe --- sessionctx/variable/sysvar.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index bcd5e3b719485..6121ad61552ac 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -509,11 +509,15 @@ func SetSysVar(name string, value string) { sysVars[name].Value = value } -// GetSysVars returns the sysVars list under a RWLock +// GetSysVars deep copies the sysVars list under a RWLock func GetSysVars() map[string]*SysVar { sysVarsLock.RLock() defer sysVarsLock.RUnlock() - return sysVars + copy := make(map[string]*SysVar, len(sysVars)) + for name, sv := range sysVars { + copy[name] = sv + } + return copy } // PluginVarNames is global plugin var names set.