Skip to content

Commit

Permalink
*: adjust delta schema count and add metrics (pingcap#11625) (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Oct 10, 2019
1 parent 42087af commit 7c56347
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 7 deletions.
69 changes: 62 additions & 7 deletions domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -73,7 +75,7 @@ func NewSchemaValidator(lease time.Duration) SchemaValidator {
return &schemaValidator{
isStarted: true,
lease: lease,
deltaSchemaInfos: make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad),
deltaSchemaInfos: make([]deltaSchemaInfo, 0, variable.DefTiDBMaxDeltaSchemaCount),
}
}

Expand All @@ -86,26 +88,29 @@ func (s *schemaValidator) IsStarted() bool {

func (s *schemaValidator) Stop() {
logutil.Logger(context.Background()).Info("the schema validator stops")
metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorStop).Inc()
s.mux.Lock()
defer s.mux.Unlock()
s.isStarted = false
s.latestSchemaVer = 0
s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad)
s.deltaSchemaInfos = s.deltaSchemaInfos[:0]
}

func (s *schemaValidator) Restart() {
logutil.Logger(context.Background()).Info("the schema validator restarts")
metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorRestart).Inc()
s.mux.Lock()
defer s.mux.Unlock()
s.isStarted = true
}

func (s *schemaValidator) Reset() {
metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorReset).Inc()
s.mux.Lock()
defer s.mux.Unlock()
s.isStarted = true
s.latestSchemaVer = 0
s.deltaSchemaInfos = make([]deltaSchemaInfo, 0, maxNumberOfDiffsToLoad)
s.deltaSchemaInfos = s.deltaSchemaInfos[:0]
}

func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, changedTableIDs []int64) {
Expand Down Expand Up @@ -147,13 +152,17 @@ func hasRelatedTableID(relatedTableIDs, updateTableIDs []int64) bool {
// NOTE, this function should be called under lock!
func (s *schemaValidator) isRelatedTablesChanged(currVer int64, tableIDs []int64) bool {
if len(s.deltaSchemaInfos) == 0 {
metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheEmpty).Inc()
logutil.Logger(context.Background()).Info("schema change history is empty", zap.Int64("currVer", currVer))
return true
}
newerDeltas := s.findNewerDeltas(currVer)
if len(newerDeltas) == len(s.deltaSchemaInfos) {
logutil.Logger(context.Background()).Info("the schema version is much older than the latest version", zap.Int64("currVer", currVer),
zap.Int64("latestSchemaVer", s.latestSchemaVer))
metrics.LoadSchemaCounter.WithLabelValues(metrics.SchemaValidatorCacheMiss).Inc()
logutil.Logger(context.Background()).Info("the schema version is much older than the latest version",
zap.Int64("currVer", currVer),
zap.Int64("latestSchemaVer", s.latestSchemaVer),
zap.Reflect("deltas", newerDeltas))
return true
}
for _, item := range newerDeltas {
Expand Down Expand Up @@ -209,8 +218,54 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [
}

func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) {
s.deltaSchemaInfos = append(s.deltaSchemaInfos, deltaSchemaInfo{schemaVersion, relatedTableIDs})
if len(s.deltaSchemaInfos) > maxNumberOfDiffsToLoad {
maxCnt := int(variable.GetMaxDeltaSchemaCount())
if maxCnt <= 0 {
logutil.Logger(context.Background()).Info("the schema validator enqueue", zap.Int("delta max count", maxCnt))
return
}

delta := deltaSchemaInfo{schemaVersion, relatedTableIDs}
if len(s.deltaSchemaInfos) == 0 {
s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta)
return
}

lastOffset := len(s.deltaSchemaInfos) - 1
// The first item we needn't to merge, because we hope to cover more versions.
if lastOffset != 0 && ids(s.deltaSchemaInfos[lastOffset].relatedTableIDs).containIn(delta.relatedTableIDs) {
s.deltaSchemaInfos[lastOffset] = delta
} else {
s.deltaSchemaInfos = append(s.deltaSchemaInfos, delta)
}

if len(s.deltaSchemaInfos) > maxCnt {
logutil.Logger(context.Background()).Info("the schema validator enqueue, queue is too long",
zap.Int("delta max count", maxCnt), zap.Int64("remove schema version", s.deltaSchemaInfos[0].schemaVersion))
s.deltaSchemaInfos = s.deltaSchemaInfos[1:]
}
}

type ids []int64

// containIn is checks if a is included in b.
func (a ids) containIn(b []int64) bool {
if len(a) > len(b) {
return false
}

var isEqual bool
for _, i := range a {
isEqual = false
for _, j := range b {
if i == j {
isEqual = true
break
}
}
if !isEqual {
return false
}
}

return true
}
59 changes: 59 additions & 0 deletions domain/schema_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testleak"
)
Expand Down Expand Up @@ -141,3 +142,61 @@ func serverFunc(lease time.Duration, requireLease chan leaseGrantItem, oracleCh
}
}
}

func (*testSuite) TestEnqueue(c *C) {
lease := 10 * time.Millisecond
originalCnt := variable.GetMaxDeltaSchemaCount()
defer variable.SetMaxDeltaSchemaCount(originalCnt)

validator := NewSchemaValidator(lease).(*schemaValidator)
c.Assert(validator.IsStarted(), IsTrue)
// maxCnt is 0.
variable.SetMaxDeltaSchemaCount(0)
validator.enqueue(1, []int64{11})
c.Assert(validator.deltaSchemaInfos, HasLen, 0)

// maxCnt is 10.
variable.SetMaxDeltaSchemaCount(10)
ds := []deltaSchemaInfo{
{0, []int64{1}},
{1, []int64{1}},
{2, []int64{1}},
{3, []int64{2, 2}},
{4, []int64{2}},
{5, []int64{1, 4}},
{6, []int64{1, 4}},
{7, []int64{3, 1, 3}},
{8, []int64{1, 2, 3}},
{9, []int64{1, 2, 3}},
}
for _, d := range ds {
validator.enqueue(d.schemaVersion, d.relatedTableIDs)
}
validator.enqueue(10, []int64{1})
ret := []deltaSchemaInfo{
{0, []int64{1}},
{2, []int64{1}},
{3, []int64{2, 2}},
{4, []int64{2}},
{6, []int64{1, 4}},
{9, []int64{1, 2, 3}},
{10, []int64{1}},
}
c.Assert(validator.deltaSchemaInfos, DeepEquals, ret)
// The Items' relatedTableIDs have different order.
validator.enqueue(11, []int64{1, 2, 3, 4})
validator.enqueue(12, []int64{4, 1, 2, 3, 1})
validator.enqueue(13, []int64{4, 1, 3, 2, 5})
ret[len(ret)-1] = deltaSchemaInfo{13, []int64{4, 1, 3, 2, 5}}
c.Assert(validator.deltaSchemaInfos, DeepEquals, ret)
// The length of deltaSchemaInfos is greater then maxCnt.
validator.enqueue(14, []int64{1})
validator.enqueue(15, []int64{2})
validator.enqueue(16, []int64{3})
validator.enqueue(17, []int64{4})
ret = append(ret, deltaSchemaInfo{14, []int64{1}})
ret = append(ret, deltaSchemaInfo{15, []int64{2}})
ret = append(ret, deltaSchemaInfo{16, []int64{3}})
ret = append(ret, deltaSchemaInfo{17, []int64{4}})
c.Assert(validator.deltaSchemaInfos, DeepEquals, ret[1:])
}
29 changes: 29 additions & 0 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor"
Expand Down Expand Up @@ -1050,3 +1051,31 @@ func (s *seqTestSuite) TestAutoIDInRetry(c *C) {
tk.MustExec("insert into t values ()")
tk.MustQuery(`select * from t`).Check(testkit.Rows("1", "2", "3", "4", "5"))
}

func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(variable.DefTiDBMaxDeltaSchemaCount))
gvc := domain.GetDomain(tk.Se).GetGlobalVarsCache()
gvc.Disable()

tk.MustExec("set @@global.tidb_max_delta_schema_count= -1")
tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '-1'"))
// Make sure a new session will load global variables.
tk.Se = nil
tk.MustExec("use test")
c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(100))
tk.MustExec(fmt.Sprintf("set @@global.tidb_max_delta_schema_count= %v", uint64(math.MaxInt64)))
tk.MustQuery("show warnings;").Check(testkit.Rows(fmt.Sprintf("Warning 1292 Truncated incorrect tidb_max_delta_schema_count value: '%d'", uint64(math.MaxInt64))))
tk.Se = nil
tk.MustExec("use test")
c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(16384))
_, err := tk.Exec("set @@global.tidb_max_delta_schema_count= invalid_val")
c.Assert(terror.ErrorEqual(err, variable.ErrWrongTypeForVar), IsTrue, Commentf("err %v", err))

tk.MustExec("set @@global.tidb_max_delta_schema_count= 2048")
tk.Se = nil
tk.MustExec("use test")
c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(2048))
tk.MustQuery("select @@global.tidb_max_delta_schema_count").Check(testkit.Rows("2048"))
}
15 changes: 15 additions & 0 deletions metrics/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Metrics for the domain package.
var (
// LoadSchemaCounter records the counter of load schema.
LoadSchemaCounter = prometheus.NewCounterVec(
Expand Down Expand Up @@ -45,4 +46,18 @@ var (
Name: "load_privilege_total",
Help: "Counter of load privilege",
}, []string{LblType})

SchemaValidatorStop = "stop"
SchemaValidatorRestart = "restart"
SchemaValidatorReset = "reset"
SchemaValidatorCacheEmpty = "cache_empty"
SchemaValidatorCacheMiss = "cache_miss"
// HandleSchemaValidate records the counter of handling schema validate.
HandleSchemaValidate = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tidb",
Subsystem: "domain",
Name: "handle_schema_validate",
Help: "Counter of handle schema validate",
}, []string{LblType})
)
1 change: 1 addition & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,5 @@ func RegisterMetrics() {
prometheus.MustRegister(TiKVBatchClientUnavailable)
prometheus.MustRegister(TiKVRangeTaskStats)
prometheus.MustRegister(TiKVRangeTaskPushDuration)
prometheus.MustRegister(HandleSchemaValidate)
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,7 @@ var builtinGlobalVariable = []string{
variable.TiDBEnableWindowFunction,
variable.TiDBEnableFastAnalyze,
variable.TiDBExpensiveQueryTimeThreshold,
variable.TiDBMaxDeltaSchemaCount,
}

var (
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
}
case TiDBLowResolutionTSO:
s.LowResolutionTSO = TiDBOptOn(val)
// It's a global variable, but it also wants to be cached in server.
case TiDBMaxDeltaSchemaCount:
SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount))
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal, TiDBDDLReorgBatchSize, strconv.Itoa(DefTiDBDDLReorgBatchSize)},
{ScopeGlobal, TiDBDDLErrorCountLimit, strconv.Itoa(DefTiDBDDLErrorCountLimit)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeGlobal, TiDBMaxDeltaSchemaCount, strconv.Itoa(DefTiDBMaxDeltaSchemaCount)},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBEnableRadixJoin, BoolToIntStr(DefTiDBUseRadixJoin)},
{ScopeGlobal | ScopeSession, TiDBOptJoinReorderThreshold, strconv.Itoa(DefTiDBOptJoinReorderThreshold)},
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ const (
// It can be: PRIORITY_LOW, PRIORITY_NORMAL, PRIORITY_HIGH
TiDBDDLReorgPriority = "tidb_ddl_reorg_priority"

// tidb_max_delta_schema_count defines the max length of deltaSchemaInfos.
// deltaSchemaInfos is a queue that maintains the history of schema changes.
TiDBMaxDeltaSchemaCount = "tidb_max_delta_schema_count"

// tidb_scatter_region will scatter the regions for DDLs when it is ON.
TiDBScatterRegion = "tidb_scatter_region"

Expand Down Expand Up @@ -330,6 +334,7 @@ const (
DefTiDBDDLReorgWorkerCount = 4
DefTiDBDDLReorgBatchSize = 256
DefTiDBDDLErrorCountLimit = 512
DefTiDBMaxDeltaSchemaCount = 1024
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
Expand All @@ -352,6 +357,7 @@ var (
maxDDLReorgWorkerCount int32 = 128
ddlReorgBatchSize int32 = DefTiDBDDLReorgBatchSize
ddlErrorCountlimit int64 = DefTiDBDDLErrorCountLimit
maxDeltaSchemaCount int64 = DefTiDBMaxDeltaSchemaCount
// Export for testing.
MaxDDLReorgBatchSize int32 = 10240
MinDDLReorgBatchSize int32 = 32
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ func GetDDLErrorCountLimit() int64 {
return atomic.LoadInt64(&ddlErrorCountlimit)
}

// SetMaxDeltaSchemaCount sets maxDeltaSchemaCount size.
func SetMaxDeltaSchemaCount(cnt int64) {
atomic.StoreInt64(&maxDeltaSchemaCount, cnt)
}

// GetMaxDeltaSchemaCount gets maxDeltaSchemaCount size.
func GetMaxDeltaSchemaCount() int64 {
return atomic.LoadInt64(&maxDeltaSchemaCount)
}

// GetSessionSystemVar gets a system variable.
// If it is a session only variable, use the default value defined in code.
// Returns error if there is no such variable.
Expand Down Expand Up @@ -320,6 +330,8 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
return checkUInt64SystemVar(name, value, 0, 4294967295, vars)
case OldPasswords:
return checkUInt64SystemVar(name, value, 0, 2, vars)
case TiDBMaxDeltaSchemaCount:
return checkInt64SystemVar(name, value, 100, 16384, vars)
case SessionTrackGtids:
if strings.EqualFold(value, "OFF") || value == "0" {
return "OFF", nil
Expand Down

0 comments on commit 7c56347

Please sign in to comment.