diff --git a/store/tikv/gcworker/gc_worker.go b/store/tikv/gcworker/gc_worker.go index 5fa52281438be..d0f89c87c893c 100644 --- a/store/tikv/gcworker/gc_worker.go +++ b/store/tikv/gcworker/gc_worker.go @@ -96,6 +96,9 @@ func (w *GCWorker) Close() { } const ( + booleanTrue = "true" + booleanFalse = "false" + gcWorkerTickInterval = time.Minute gcJobLogTickInterval = time.Minute * 10 gcWorkerLease = time.Minute * 2 @@ -120,29 +123,31 @@ const ( gcScanLockLimit = tikv.ResolvedCacheSize / 2 gcEnableKey = "tikv_gc_enable" - gcEnableValue = "true" - gcDisableValue = "false" gcDefaultEnableValue = true gcModeKey = "tikv_gc_mode" gcModeCentral = "central" gcModeDistributed = "distributed" gcModeDefault = gcModeDistributed + + gcAutoConcurrencyKey = "tikv_gc_auto_concurrency" + gcDefaultAutoConcurrency = true ) var gcSafePointCacheInterval = tikv.GcSafePointCacheInterval var gcVariableComments = map[string]string{ - gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", - gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", - gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)", - gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)", - gcRunIntervalKey: "GC run interval, at least 10m, in Go format.", - gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", - gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", - gcConcurrencyKey: "[DEPRECATED] How many goroutines used to do GC parallel, [1, 128], default 2", - gcEnableKey: "Current GC enable status", - gcModeKey: "Mode of GC, \"central\" or \"distributed\"", + gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)", + gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)", + gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)", + gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)", + gcRunIntervalKey: "GC run interval, at least 10m, in Go format.", + gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.", + gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)", + gcConcurrencyKey: "How many goroutines used to do GC parallel, [1, 128], default 2", + gcEnableKey: "Current GC enable status", + gcModeKey: "Mode of GC, \"central\" or \"distributed\"", + gcAutoConcurrencyKey: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used", } func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) { @@ -258,26 +263,12 @@ func (w *GCWorker) leaderTick(ctx context.Context) error { return nil } - stores, err := w.getUpStores(ctx) - concurrency := len(stores) + concurrency, err := w.getGCConcurrency(ctx) if err != nil { - logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency.", + logutil.Logger(ctx).Info("[gc worker] failed to get gc concurrency.", zap.String("uuid", w.uuid), zap.Error(err)) - - concurrency, err = w.loadGCConcurrencyWithDefault() - if err != nil { - logutil.Logger(ctx).Error("[gc worker] failed to load gc concurrency. use default value.", - zap.String("uuid", w.uuid), - zap.Error(err)) - concurrency = gcDefaultConcurrency - } - } - - if concurrency == 0 { - logutil.Logger(ctx).Error("[gc worker] no store is up", - zap.String("uuid", w.uuid)) - return errors.New("[gc worker] no store is up") + return errors.Trace(err) } w.gcIsRunning = true @@ -359,19 +350,68 @@ func (w *GCWorker) getOracleTime() (time.Time, error) { } func (w *GCWorker) checkGCEnable() (bool, error) { - str, err := w.loadValueFromSysTable(gcEnableKey) + return w.loadBooleanWithDefault(gcEnableKey, gcDefaultEnableValue) +} + +func (w *GCWorker) checkUseAutoConcurrency() (bool, error) { + return w.loadBooleanWithDefault(gcAutoConcurrencyKey, gcDefaultAutoConcurrency) +} + +func (w *GCWorker) loadBooleanWithDefault(key string, defaultValue bool) (bool, error) { + str, err := w.loadValueFromSysTable(key) if err != nil { return false, errors.Trace(err) } if str == "" { // Save default value for gc enable key. The default value is always true. - err = w.saveValueToSysTable(gcEnableKey, gcEnableValue) + defaultValueStr := booleanFalse + if defaultValue { + defaultValueStr = booleanTrue + } + err = w.saveValueToSysTable(key, defaultValueStr) + if err != nil { + return defaultValue, errors.Trace(err) + } + return defaultValue, nil + } + return strings.EqualFold(str, booleanTrue), nil +} + +func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) { + useAutoConcurrency, err := w.checkUseAutoConcurrency() + if err != nil { + logutil.Logger(ctx).Error("[gc worker] failed to load config gc_auto_concurrency. use default value.", + zap.String("uuid", w.uuid), + zap.Error(err)) + useAutoConcurrency = gcDefaultAutoConcurrency + } + if !useAutoConcurrency { + return w.loadGCConcurrencyWithDefault() + } + + stores, err := w.getUpStores(ctx) + concurrency := len(stores) + if err != nil { + logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency. use config.", + zap.String("uuid", w.uuid), + zap.Error(err)) + + concurrency, err = w.loadGCConcurrencyWithDefault() if err != nil { - return gcDefaultEnableValue, errors.Trace(err) + logutil.Logger(ctx).Error("[gc worker] failed to load gc concurrency from config. use default value.", + zap.String("uuid", w.uuid), + zap.Error(err)) + concurrency = gcDefaultConcurrency } - return gcDefaultEnableValue, nil } - return strings.EqualFold(str, gcEnableValue), nil + + if concurrency == 0 { + logutil.Logger(ctx).Error("[gc worker] no store is up", + zap.String("uuid", w.uuid)) + return 0, errors.New("[gc worker] no store is up") + } + + return concurrency, nil } func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) { diff --git a/store/tikv/gcworker/gc_worker_test.go b/store/tikv/gcworker/gc_worker_test.go index 99e5de1d83d54..58b1b5cea08b3 100644 --- a/store/tikv/gcworker/gc_worker_test.go +++ b/store/tikv/gcworker/gc_worker_test.go @@ -37,6 +37,7 @@ func TestT(t *testing.T) { type testGCWorkerSuite struct { store tikv.Storage + cluster *mocktikv.Cluster oracle *mockoracle.MockOracle gcWorker *GCWorker dom *domain.Domain @@ -47,9 +48,9 @@ var _ = Suite(&testGCWorkerSuite{}) func (s *testGCWorkerSuite) SetUpTest(c *C) { tikv.NewGCHandlerFunc = NewGCWorker - cluster := mocktikv.NewCluster() - mocktikv.BootstrapWithSingleStore(cluster) - store, err := mockstore.NewMockTikvStore(mockstore.WithCluster(cluster)) + s.cluster = mocktikv.NewCluster() + mocktikv.BootstrapWithSingleStore(s.cluster) + store, err := mockstore.NewMockTikvStore(mockstore.WithCluster(s.cluster)) s.store = store.(tikv.Storage) c.Assert(err, IsNil) @@ -58,7 +59,7 @@ func (s *testGCWorkerSuite) SetUpTest(c *C) { s.dom, err = session.BootstrapSession(s.store) c.Assert(err, IsNil) - gcWorker, err := NewGCWorker(s.store, mocktikv.NewPDClient(cluster)) + gcWorker, err := NewGCWorker(s.store, mocktikv.NewPDClient(s.cluster)) c.Assert(err, IsNil) gcWorker.Start() gcWorker.Close() @@ -154,16 +155,28 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) { // Change GC enable status. s.oracle.AddOffset(time.Minute * 40) - err = s.gcWorker.saveValueToSysTable(gcEnableKey, gcDisableValue) + err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse) c.Assert(err, IsNil) ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsFalse) - err = s.gcWorker.saveValueToSysTable(gcEnableKey, gcEnableValue) + err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue) c.Assert(err, IsNil) ok, _, err = s.gcWorker.prepare() c.Assert(err, IsNil) c.Assert(ok, IsTrue) + + // Change auto concurrency + err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse) + c.Assert(err, IsNil) + useAutoConcurrency, err := s.gcWorker.checkUseAutoConcurrency() + c.Assert(err, IsNil) + c.Assert(useAutoConcurrency, IsFalse) + err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue) + c.Assert(err, IsNil) + useAutoConcurrency, err = s.gcWorker.checkUseAutoConcurrency() + c.Assert(err, IsNil) + c.Assert(useAutoConcurrency, IsTrue) } func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { @@ -199,6 +212,28 @@ func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) { c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult"), IsNil) } +func (s *testGCWorkerSuite) TestGetGCConcurrency(c *C) { + // Pick a concurrency that doesn't equal to the number of stores. + concurrencyConfig := 25 + c.Assert(concurrencyConfig, Not(Equals), len(s.cluster.GetAllStores())) + err := s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(concurrencyConfig)) + c.Assert(err, IsNil) + + ctx := context.Background() + + err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse) + c.Assert(err, IsNil) + concurrency, err := s.gcWorker.getGCConcurrency(ctx) + c.Assert(err, IsNil) + c.Assert(concurrency, Equals, concurrencyConfig) + + err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue) + c.Assert(err, IsNil) + concurrency, err = s.gcWorker.getGCConcurrency(ctx) + c.Assert(err, IsNil) + c.Assert(concurrency, Equals, len(s.cluster.GetAllStores())) +} + func (s *testGCWorkerSuite) TestDoGC(c *C) { var err error ctx := context.Background() @@ -236,17 +271,20 @@ func (s *testGCWorkerSuite) TestCheckGCMode(c *C) { c.Assert(err, IsNil) c.Assert(str, Equals, gcModeDistributed) - s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) + err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral) + c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, false) - s.gcWorker.saveValueToSysTable(gcModeKey, gcModeDistributed) + err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeDistributed) + c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, true) - s.gcWorker.saveValueToSysTable(gcModeKey, "invalid_mode") + err = s.gcWorker.saveValueToSysTable(gcModeKey, "invalid_mode") + c.Assert(err, IsNil) useDistributedGC, err = s.gcWorker.checkUseDistributedGC() c.Assert(err, IsNil) c.Assert(useDistributedGC, Equals, true)