diff --git a/config/config.go b/config/config.go index c4408bc4a2e09..d52ad3731e31b 100644 --- a/config/config.go +++ b/config/config.go @@ -404,6 +404,7 @@ type Performance struct { MaxMemory uint64 `toml:"max-memory" json:"max-memory"` ServerMemoryQuota uint64 `toml:"server-memory-quota" json:"server-memory-quota"` MemoryUsageAlarmRatio float64 `toml:"memory-usage-alarm-ratio" json:"memory-usage-alarm-ratio"` +<<<<<<< HEAD StatsLease string `toml:"stats-lease" json:"stats-lease"` StmtCountLimit uint `toml:"stmt-count-limit" json:"stmt-count-limit"` FeedbackProbability float64 `toml:"feedback-probability" json:"feedback-probability"` @@ -423,6 +424,17 @@ type Performance struct { IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` GOGC int `toml:"gogc" json:"gogc"` EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` +======= + + EnableLoadFMSketch bool `toml:"enable-load-fmsketch" json:"enable-load-fmsketch"` + + LiteInitStats bool `toml:"lite-init-stats" json:"lite-init-stats"` + + // If ForceInitStats is true, when tidb starts up, it doesn't provide service until init stats is finished. + // If ForceInitStats is false, tidb can provide service before init stats is finished. Note that during the period + // of init stats the optimizer may make bad decisions due to pseudo stats. + ForceInitStats bool `toml:"force-init-stats" json:"force-init-stats"` +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) } // PlanCache is the PlanCache section of the config. @@ -617,9 +629,25 @@ var defaultConf = Config{ MaxTxnTTL: defTiKVCfg.MaxTxnTTL, // 1hour MemProfileInterval: "1m", // TODO: set indexUsageSyncLease to 60s. +<<<<<<< HEAD IndexUsageSyncLease: "0s", GOGC: 100, EnforceMPP: false, +======= + IndexUsageSyncLease: "0s", + GOGC: 100, + EnforceMPP: false, + PlanReplayerGCLease: "10m", + StatsLoadConcurrency: 5, + StatsLoadQueueSize: 1000, + AnalyzePartitionConcurrencyQuota: 16, + PlanReplayerDumpWorkerConcurrency: 1, + EnableStatsCacheMemQuota: false, + RunAutoAnalyze: true, + EnableLoadFMSketch: false, + LiteInitStats: false, + ForceInitStats: false, +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/config/config.toml.example b/config/config.toml.example index 6ac986bb95807..8e4e846a79074 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -299,6 +299,15 @@ mem-profile-interval = "1m" # If you find the CPU used by GC is too high or GC is too frequent and impact your business you can increase this value. gogc = 100 +<<<<<<< HEAD +======= +# Whether to use the lite mode of init stats. +lite-init-stats = false + +# Whether to wait for init stats to finish before providing service during startup +force-init-stats = false + +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) [proxy-protocol] # PROXY protocol acceptable client networks. # Empty string means disable PROXY protocol, * means all networks. diff --git a/config/config_test.go b/config/config_test.go index 7201f5fb42f1e..a0124449c76d6 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -203,6 +203,14 @@ stores-refresh-interval = 30 enable-forwarding = true [performance] txn-total-size-limit=2000 +<<<<<<< HEAD +======= +tcp-no-delay = false +enable-load-fmsketch = true +plan-replayer-dump-worker-concurrency = 1 +lite-init-stats = true +force-init-stats = true +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) [tikv-client] commit-timeout="41s" max-batch-size=128 @@ -246,6 +254,7 @@ spilled-file-encryption-method = "plaintext" c.Assert(conf.Performance.TxnTotalSizeLimit, Equals, uint64(2000)) c.Assert(conf.AlterPrimaryKey, Equals, true) +<<<<<<< HEAD c.Assert(conf.TiKVClient.CommitTimeout, Equals, "41s") c.Assert(conf.TiKVClient.AsyncCommit.KeysLimit, Equals, uint(123)) c.Assert(conf.TiKVClient.AsyncCommit.TotalKeySizeLimit, Equals, uint64(1024)) @@ -282,6 +291,51 @@ spilled-file-encryption-method = "plaintext" c.Assert(conf.EnableEnumLengthLimit, Equals, false) c.Assert(conf.EnableForwarding, Equals, true) c.Assert(conf.StoresRefreshInterval, Equals, uint64(30)) +======= + require.Equal(t, "41s", conf.TiKVClient.CommitTimeout) + require.Equal(t, uint(123), conf.TiKVClient.AsyncCommit.KeysLimit) + require.Equal(t, uint64(1024), conf.TiKVClient.AsyncCommit.TotalKeySizeLimit) + require.Equal(t, uint(128), conf.TiKVClient.MaxBatchSize) + require.Equal(t, uint(6000), conf.TiKVClient.RegionCacheTTL) + require.Equal(t, int64(0), conf.TiKVClient.StoreLimit) + require.Equal(t, int64(8192), conf.TiKVClient.TTLRefreshedTxnSize) + require.Equal(t, uint(1000), conf.TokenLimit) + require.True(t, conf.EnableTableLock) + require.Equal(t, uint64(5), conf.DelayCleanTableLock) + require.Equal(t, uint64(10000), conf.SplitRegionMaxNum) + require.True(t, conf.RepairMode) + require.Equal(t, uint64(16), conf.TiKVClient.ResolveLockLiteThreshold) + require.Equal(t, uint32(200), conf.Instance.MaxConnections) + require.Equal(t, uint32(10), conf.TiDBMaxReuseChunk) + require.Equal(t, uint32(20), conf.TiDBMaxReuseColumn) + require.Equal(t, []string{"tiflash"}, conf.IsolationRead.Engines) + require.Equal(t, 3080, conf.MaxIndexLength) + require.Equal(t, 70, conf.IndexLimit) + require.Equal(t, uint32(4000), conf.TableColumnCountLimit) + require.True(t, conf.SkipRegisterToDashboard) + require.Equal(t, 3, len(conf.Labels)) + require.Equal(t, "bar", conf.Labels["foo"]) + require.Equal(t, "abc", conf.Labels["group"]) + require.Equal(t, "dc-1", conf.Labels["zone"]) + require.Equal(t, SpilledFileEncryptionMethodPlaintext, conf.Security.SpilledFileEncryptionMethod) + require.True(t, conf.DeprecateIntegerDisplayWidth) + require.False(t, conf.EnableEnumLengthLimit) + require.True(t, conf.EnableForwarding) + require.Equal(t, uint64(30), conf.StoresRefreshInterval) + require.Equal(t, uint(123), conf.PessimisticTxn.DeadlockHistoryCapacity) + require.True(t, conf.PessimisticTxn.DeadlockHistoryCollectRetryable) + require.True(t, conf.PessimisticTxn.PessimisticAutoCommit.Load()) + require.Equal(t, "127.0.0.1:10100", conf.TopSQL.ReceiverAddress) + require.True(t, conf.Experimental.AllowsExpressionIndex) + require.Equal(t, uint(20), conf.Status.GRPCKeepAliveTime) + require.Equal(t, uint(10), conf.Status.GRPCKeepAliveTimeout) + require.Equal(t, uint(2048), conf.Status.GRPCConcurrentStreams) + require.Equal(t, 10240, conf.Status.GRPCInitialWindowSize) + require.Equal(t, 40960, conf.Status.GRPCMaxSendMsgSize) + require.True(t, conf.Performance.EnableLoadFMSketch) + require.True(t, conf.Performance.LiteInitStats) + require.True(t, conf.Performance.ForceInitStats) +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) _, err = f.WriteString(` [log.file] diff --git a/domain/domain.go b/domain/domain.go index 793c78497fc05..ae363cd0d3eec 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1160,6 +1160,7 @@ func (do *Domain) newOwnerManager(prompt, ownerKey string) owner.Manager { return statsOwner } +<<<<<<< HEAD func (do *Domain) loadStatsWorker() { defer util.Recover(metrics.LabelDomain, "loadStatsWorker", nil, false) lease := do.statsLease @@ -1172,7 +1173,13 @@ func (do *Domain) loadStatsWorker() { do.wg.Done() logutil.BgLogger().Info("loadStatsWorker exited.") }() +======= +func (do *Domain) initStats() { +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) statsHandle := do.StatsHandle() + defer func() { + close(statsHandle.InitStatsDone) + }() t := time.Now() err := statsHandle.InitStats(do.InfoSchema()) if err != nil { @@ -1180,6 +1187,24 @@ func (do *Domain) loadStatsWorker() { } else { logutil.BgLogger().Info("init stats info time", zap.Duration("take time", time.Since(t))) } +} + +func (do *Domain) loadStatsWorker() { + defer util.Recover(metrics.LabelDomain, "loadStatsWorker", nil, false) + lease := do.statsLease + if lease == 0 { + lease = 3 * time.Second + } + loadTicker := time.NewTicker(lease) + updStatsHealthyTicker := time.NewTicker(20 * lease) + defer func() { + loadTicker.Stop() + updStatsHealthyTicker.Stop() + logutil.BgLogger().Info("loadStatsWorker exited.") + }() + do.initStats() + statsHandle := do.StatsHandle() + var err error for { select { case <-loadTicker.C: diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 7e8c23716bd8c..6b959709e1259 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -106,6 +106,287 @@ type Handle struct { // idxUsageListHead contains all the index usage collectors required by session. idxUsageListHead *SessionIndexUsageCollector +<<<<<<< HEAD +======= + + // StatsLoad is used to load stats concurrently + StatsLoad StatsLoad + + // sysProcTracker is used to track sys process like analyze + sysProcTracker sessionctx.SysProcTracker + // serverIDGetter is used to get server ID for generating auto analyze ID. + serverIDGetter func() uint64 + // tableLocked used to store locked tables + tableLocked []int64 + + InitStatsDone chan struct{} +} + +// GetTableLockedAndClearForTest for unit test only +func (h *Handle) GetTableLockedAndClearForTest() []int64 { + tableLocked := h.tableLocked + h.tableLocked = make([]int64, 0) + return tableLocked +} + +// LoadLockedTables load locked tables from store +func (h *Handle) LoadLockedTables() error { + h.mu.Lock() + defer h.mu.Unlock() + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") + if err != nil { + return errors.Trace(err) + } + + h.tableLocked = make([]int64, len(rows)) + for i, row := range rows { + h.tableLocked[i] = row.GetInt64(0) + } + + return nil +} + +// AddLockedTables add locked tables id to store +func (h *Handle) AddLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { + h.mu.Lock() + defer h.mu.Unlock() + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + + exec := h.mu.ctx.(sqlexec.SQLExecutor) + + _, err := exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return "", err + } + + //load tables to check duplicate when insert + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") + if err != nil { + return "", err + } + + dupTables := make([]string, 0) + tableLocked := make([]int64, 0) + for _, row := range rows { + tableLocked = append(tableLocked, row.GetInt64(0)) + } + + strTids := fmt.Sprintf("%v", tids) + logutil.BgLogger().Info("[stats] lock table ", zap.String("tableIDs", strTids)) + for i, tid := range tids { + _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid) + if err != nil { + logutil.BgLogger().Error("[stats] error occurred when insert mysql.stats_table_locked ", zap.Error(err)) + return "", err + } + // update handle + if !isTableLocked(tableLocked, tid) { + tableLocked = append(tableLocked, tid) + } else { + dupTables = append(dupTables, tables[i].Schema.L+"."+tables[i].Name.L) + } + } + + //insert related partitions while don't warning duplicate partitions + for _, tid := range pids { + _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_table_locked(table_id) select %? from dual where not exists(select table_id from mysql.stats_table_locked where table_id = %?)", tid, tid) + if err != nil { + logutil.BgLogger().Error("[stats] error occurred when insert mysql.stats_table_locked ", zap.Error(err)) + return "", err + } + if !isTableLocked(tableLocked, tid) { + tableLocked = append(tableLocked, tid) + } + } + + err = finishTransaction(ctx, exec, err) + if err != nil { + return "", err + } + // update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated + h.tableLocked = tableLocked + + if len(dupTables) > 0 { + tables := dupTables[0] + for i, table := range dupTables { + if i == 0 { + continue + } + tables += ", " + table + } + var msg string + if len(tids) > 1 { + if len(tids) > len(dupTables) { + msg = "skip locking locked tables: " + tables + ", other tables locked successfully" + } else { + msg = "skip locking locked tables: " + tables + } + } else { + msg = "skip locking locked table: " + tables + } + return msg, err + } + return "", err +} + +// getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked. +func (h *Handle) getStatsDeltaFromTableLocked(ctx context.Context, tableID int64) (int64, int64, uint64, error) { + rows, _, err := h.execRestrictedSQL(ctx, "select count, modify_count, version from mysql.stats_table_locked where table_id = %?", tableID) + if err != nil { + return 0, 0, 0, err + } + + if len(rows) == 0 { + return 0, 0, 0, nil + } + count := rows[0].GetInt64(0) + modifyCount := rows[0].GetInt64(1) + version := rows[0].GetUint64(2) + return count, modifyCount, version, nil +} + +// RemoveLockedTables remove tables from table locked array +func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.TableName) (string, error) { + h.mu.Lock() + defer h.mu.Unlock() + + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) + + exec := h.mu.ctx.(sqlexec.SQLExecutor) + _, err := exec.ExecuteInternal(ctx, "begin pessimistic") + if err != nil { + return "", err + } + + //load tables to check unlock the unlock table + rows, _, err := h.execRestrictedSQL(ctx, "select table_id from mysql.stats_table_locked") + if err != nil { + return "", err + } + + nonlockedTables := make([]string, 0) + tableLocked := make([]int64, 0) + for _, row := range rows { + tableLocked = append(tableLocked, row.GetInt64(0)) + } + + strTids := fmt.Sprintf("%v", tids) + logutil.BgLogger().Info("[stats] unlock table ", zap.String("tableIDs", strTids)) + for i, tid := range tids { + // get stats delta during table locked + count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) + if err != nil { + logutil.BgLogger().Error("[stats] error occurred when getStatsDeltaFromTableLocked", zap.Error(err)) + return "", err + } + // update stats_meta with stats delta + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) + if err != nil { + logutil.BgLogger().Error("[stats] error occurred when update mysql.stats_meta", zap.Error(err)) + return "", err + } + + _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) + if err != nil { + logutil.BgLogger().Error("[stats] error occurred when delete from mysql.stats_table_locked ", zap.Error(err)) + return "", err + } + var exist bool + exist, tableLocked = removeIfTableLocked(tableLocked, tid) + if !exist { + nonlockedTables = append(nonlockedTables, tables[i].Schema.L+"."+tables[i].Name.L) + } + } + //delete related partitions while don't warning delete empty partitions + for _, tid := range pids { + // get stats delta during table locked + count, modifyCount, version, err := h.getStatsDeltaFromTableLocked(ctx, tid) + if err != nil { + logutil.BgLogger().Error("[stats] error occurred when getStatsDeltaFromTableLocked", zap.Error(err)) + return "", err + } + // update stats_meta with stats delta + _, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %?, count = count + %?, modify_count = modify_count + %? where table_id = %?", version, count, modifyCount, tid) + if err != nil { + logutil.BgLogger().Error("[stats] error occurred when update mysql.stats_meta", zap.Error(err)) + return "", err + } + + _, err = exec.ExecuteInternal(ctx, "delete from mysql.stats_table_locked where table_id = %?", tid) + if err != nil { + logutil.BgLogger().Error("[stats] error occurred when delete from mysql.stats_table_locked ", zap.Error(err)) + return "", err + } + _, tableLocked = removeIfTableLocked(tableLocked, tid) + } + + err = finishTransaction(ctx, exec, err) + if err != nil { + return "", err + } + // update handle.tableLocked after transaction success, if txn failed, tableLocked won't be updated + h.tableLocked = tableLocked + + if len(nonlockedTables) > 0 { + tables := nonlockedTables[0] + for i, table := range nonlockedTables { + if i == 0 { + continue + } + tables += ", " + table + } + var msg string + if len(tids) > 1 { + if len(tids) > len(nonlockedTables) { + msg = "skip unlocking non-locked tables: " + tables + ", other tables unlocked successfully" + } else { + msg = "skip unlocking non-locked tables: " + tables + } + } else { + msg = "skip unlocking non-locked table: " + tables + } + return msg, err + } + return "", err +} + +// IsTableLocked check whether table is locked in handle with Handle.Mutex +func (h *Handle) IsTableLocked(tableID int64) bool { + h.mu.RLock() + defer h.mu.RUnlock() + return h.isTableLocked(tableID) +} + +// IsTableLocked check whether table is locked in handle without Handle.Mutex +func (h *Handle) isTableLocked(tableID int64) bool { + return isTableLocked(h.tableLocked, tableID) +} + +// isTableLocked check whether table is locked +func isTableLocked(tableLocked []int64, tableID int64) bool { + return lockTableIndexOf(tableLocked, tableID) > -1 +} + +// lockTableIndexOf get the locked table's index in the array +func lockTableIndexOf(tableLocked []int64, tableID int64) int { + for idx, id := range tableLocked { + if id == tableID { + return idx + } + } + return -1 +} + +// removeIfTableLocked try to remove the table from table locked array +func removeIfTableLocked(tableLocked []int64, tableID int64) (bool, []int64) { + idx := lockTableIndexOf(tableLocked, tableID) + if idx > -1 { + tableLocked = append(tableLocked[:idx], tableLocked[idx+1:]...) + } + return idx > -1, tableLocked +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) } func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) { @@ -191,6 +472,12 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (* feedback: statistics.NewQueryFeedbackMap(), idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)}, pool: pool, +<<<<<<< HEAD +======= + sysProcTracker: tracker, + serverIDGetter: serverIDGetter, + InitStatsDone: make(chan struct{}), +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) } handle.lease.Store(lease) handle.pool = pool diff --git a/tidb-server/main.go b/tidb-server/main.go index 9c8a30da3258d..0c1bc7e85617e 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -185,11 +185,44 @@ func main() { printInfo() setupBinlogClient() setupMetrics() +<<<<<<< HEAD createStoreAndDomain() createServer() signal.SetupSignalHandler(serverShutdown) runServer() cleanup() +======= + + keyspaceName := keyspace.GetKeyspaceNameBySettings() + + resourcemanager.InstanceResourceManager.Start() + storage, dom := createStoreAndDomain(keyspaceName) + svr := createServer(storage, dom) + err = driver.TrySetupGlobalResourceController(context.Background(), dom.ServerID(), storage) + if err != nil { + logutil.BgLogger().Warn("failed to setup global resource controller", zap.Error(err)) + } + + // Register error API is not thread-safe, the caller MUST NOT register errors after initialization. + // To prevent misuse, set a flag to indicate that register new error will panic immediately. + // For regression of issue like https://github.com/pingcap/tidb/issues/28190 + terror.RegisterFinish() + + exited := make(chan struct{}) + signal.SetupSignalHandler(func(graceful bool) { + svr.Close() + cleanup(svr, storage, dom, graceful) + cpuprofile.StopCPUProfiler() + resourcemanager.InstanceResourceManager.Stop() + close(exited) + }) + topsql.SetupTopSQL() + if config.GetGlobalConfig().Performance.ForceInitStats { + <-dom.StatsHandle().InitStatsDone + } + terror.MustNil(svr.Run()) + <-exited +>>>>>>> 50dd8b40f1c (*: provide a option to wait for init stats to finish before providing service during startup (#43381)) syncLog() }