Skip to content

Commit

Permalink
store/tikv/gc_worker: Add config to specify gc concurrency manually (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored and jackysp committed Jun 12, 2019
1 parent dabeb0a commit c5c2737
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 43 deletions.
108 changes: 74 additions & 34 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func (w *GCWorker) Close() {
}

const (
booleanTrue = "true"
booleanFalse = "false"

gcWorkerTickInterval = time.Minute
gcJobLogTickInterval = time.Minute * 10
gcWorkerLease = time.Minute * 2
Expand All @@ -120,29 +123,31 @@ const (
gcScanLockLimit = tikv.ResolvedCacheSize / 2

gcEnableKey = "tikv_gc_enable"
gcEnableValue = "true"
gcDisableValue = "false"
gcDefaultEnableValue = true

gcModeKey = "tikv_gc_mode"
gcModeCentral = "central"
gcModeDistributed = "distributed"
gcModeDefault = gcModeDistributed

gcAutoConcurrencyKey = "tikv_gc_auto_concurrency"
gcDefaultAutoConcurrency = true
)

var gcSafePointCacheInterval = tikv.GcSafePointCacheInterval

var gcVariableComments = map[string]string{
gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)",
gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)",
gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)",
gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)",
gcRunIntervalKey: "GC run interval, at least 10m, in Go format.",
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.",
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
gcConcurrencyKey: "[DEPRECATED] How many goroutines used to do GC parallel, [1, 128], default 2",
gcEnableKey: "Current GC enable status",
gcModeKey: "Mode of GC, \"central\" or \"distributed\"",
gcLeaderUUIDKey: "Current GC worker leader UUID. (DO NOT EDIT)",
gcLeaderDescKey: "Host name and pid of current GC leader. (DO NOT EDIT)",
gcLeaderLeaseKey: "Current GC worker leader lease. (DO NOT EDIT)",
gcLastRunTimeKey: "The time when last GC starts. (DO NOT EDIT)",
gcRunIntervalKey: "GC run interval, at least 10m, in Go format.",
gcLifeTimeKey: "All versions within life time will not be collected by GC, at least 10m, in Go format.",
gcSafePointKey: "All versions after safe point can be accessed. (DO NOT EDIT)",
gcConcurrencyKey: "How many goroutines used to do GC parallel, [1, 128], default 2",
gcEnableKey: "Current GC enable status",
gcModeKey: "Mode of GC, \"central\" or \"distributed\"",
gcAutoConcurrencyKey: "Let TiDB pick the concurrency automatically. If set false, tikv_gc_concurrency will be used",
}

func (w *GCWorker) start(ctx context.Context, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -258,26 +263,12 @@ func (w *GCWorker) leaderTick(ctx context.Context) error {
return nil
}

stores, err := w.getUpStores(ctx)
concurrency := len(stores)
concurrency, err := w.getGCConcurrency(ctx)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency.",
logutil.Logger(ctx).Info("[gc worker] failed to get gc concurrency.",
zap.String("uuid", w.uuid),
zap.Error(err))

concurrency, err = w.loadGCConcurrencyWithDefault()
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to load gc concurrency. use default value.",
zap.String("uuid", w.uuid),
zap.Error(err))
concurrency = gcDefaultConcurrency
}
}

if concurrency == 0 {
logutil.Logger(ctx).Error("[gc worker] no store is up",
zap.String("uuid", w.uuid))
return errors.New("[gc worker] no store is up")
return errors.Trace(err)
}

w.gcIsRunning = true
Expand Down Expand Up @@ -359,19 +350,68 @@ func (w *GCWorker) getOracleTime() (time.Time, error) {
}

func (w *GCWorker) checkGCEnable() (bool, error) {
str, err := w.loadValueFromSysTable(gcEnableKey)
return w.loadBooleanWithDefault(gcEnableKey, gcDefaultEnableValue)
}

func (w *GCWorker) checkUseAutoConcurrency() (bool, error) {
return w.loadBooleanWithDefault(gcAutoConcurrencyKey, gcDefaultAutoConcurrency)
}

func (w *GCWorker) loadBooleanWithDefault(key string, defaultValue bool) (bool, error) {
str, err := w.loadValueFromSysTable(key)
if err != nil {
return false, errors.Trace(err)
}
if str == "" {
// Save default value for gc enable key. The default value is always true.
err = w.saveValueToSysTable(gcEnableKey, gcEnableValue)
defaultValueStr := booleanFalse
if defaultValue {
defaultValueStr = booleanTrue
}
err = w.saveValueToSysTable(key, defaultValueStr)
if err != nil {
return defaultValue, errors.Trace(err)
}
return defaultValue, nil
}
return strings.EqualFold(str, booleanTrue), nil
}

func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) {
useAutoConcurrency, err := w.checkUseAutoConcurrency()
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to load config gc_auto_concurrency. use default value.",
zap.String("uuid", w.uuid),
zap.Error(err))
useAutoConcurrency = gcDefaultAutoConcurrency
}
if !useAutoConcurrency {
return w.loadGCConcurrencyWithDefault()
}

stores, err := w.getUpStores(ctx)
concurrency := len(stores)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency. use config.",
zap.String("uuid", w.uuid),
zap.Error(err))

concurrency, err = w.loadGCConcurrencyWithDefault()
if err != nil {
return gcDefaultEnableValue, errors.Trace(err)
logutil.Logger(ctx).Error("[gc worker] failed to load gc concurrency from config. use default value.",
zap.String("uuid", w.uuid),
zap.Error(err))
concurrency = gcDefaultConcurrency
}
return gcDefaultEnableValue, nil
}
return strings.EqualFold(str, gcEnableValue), nil

if concurrency == 0 {
logutil.Logger(ctx).Error("[gc worker] no store is up",
zap.String("uuid", w.uuid))
return 0, errors.New("[gc worker] no store is up")
}

return concurrency, nil
}

func (w *GCWorker) checkGCInterval(now time.Time) (bool, error) {
Expand Down
56 changes: 47 additions & 9 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestT(t *testing.T) {

type testGCWorkerSuite struct {
store tikv.Storage
cluster *mocktikv.Cluster
oracle *mockoracle.MockOracle
gcWorker *GCWorker
dom *domain.Domain
Expand All @@ -47,9 +48,9 @@ var _ = Suite(&testGCWorkerSuite{})
func (s *testGCWorkerSuite) SetUpTest(c *C) {
tikv.NewGCHandlerFunc = NewGCWorker

cluster := mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(cluster)
store, err := mockstore.NewMockTikvStore(mockstore.WithCluster(cluster))
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
store, err := mockstore.NewMockTikvStore(mockstore.WithCluster(s.cluster))

s.store = store.(tikv.Storage)
c.Assert(err, IsNil)
Expand All @@ -58,7 +59,7 @@ func (s *testGCWorkerSuite) SetUpTest(c *C) {
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)

gcWorker, err := NewGCWorker(s.store, mocktikv.NewPDClient(cluster))
gcWorker, err := NewGCWorker(s.store, mocktikv.NewPDClient(s.cluster))
c.Assert(err, IsNil)
gcWorker.Start()
gcWorker.Close()
Expand Down Expand Up @@ -154,16 +155,28 @@ func (s *testGCWorkerSuite) TestPrepareGC(c *C) {

// Change GC enable status.
s.oracle.AddOffset(time.Minute * 40)
err = s.gcWorker.saveValueToSysTable(gcEnableKey, gcDisableValue)
err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanFalse)
c.Assert(err, IsNil)
ok, _, err = s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsFalse)
err = s.gcWorker.saveValueToSysTable(gcEnableKey, gcEnableValue)
err = s.gcWorker.saveValueToSysTable(gcEnableKey, booleanTrue)
c.Assert(err, IsNil)
ok, _, err = s.gcWorker.prepare()
c.Assert(err, IsNil)
c.Assert(ok, IsTrue)

// Change auto concurrency
err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse)
c.Assert(err, IsNil)
useAutoConcurrency, err := s.gcWorker.checkUseAutoConcurrency()
c.Assert(err, IsNil)
c.Assert(useAutoConcurrency, IsFalse)
err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue)
c.Assert(err, IsNil)
useAutoConcurrency, err = s.gcWorker.checkUseAutoConcurrency()
c.Assert(err, IsNil)
c.Assert(useAutoConcurrency, IsTrue)
}

func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {
Expand Down Expand Up @@ -199,6 +212,28 @@ func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult"), IsNil)
}

func (s *testGCWorkerSuite) TestGetGCConcurrency(c *C) {
// Pick a concurrency that doesn't equal to the number of stores.
concurrencyConfig := 25
c.Assert(concurrencyConfig, Not(Equals), len(s.cluster.GetAllStores()))
err := s.gcWorker.saveValueToSysTable(gcConcurrencyKey, strconv.Itoa(concurrencyConfig))
c.Assert(err, IsNil)

ctx := context.Background()

err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanFalse)
c.Assert(err, IsNil)
concurrency, err := s.gcWorker.getGCConcurrency(ctx)
c.Assert(err, IsNil)
c.Assert(concurrency, Equals, concurrencyConfig)

err = s.gcWorker.saveValueToSysTable(gcAutoConcurrencyKey, booleanTrue)
c.Assert(err, IsNil)
concurrency, err = s.gcWorker.getGCConcurrency(ctx)
c.Assert(err, IsNil)
c.Assert(concurrency, Equals, len(s.cluster.GetAllStores()))
}

func (s *testGCWorkerSuite) TestDoGC(c *C) {
var err error
ctx := context.Background()
Expand Down Expand Up @@ -236,17 +271,20 @@ func (s *testGCWorkerSuite) TestCheckGCMode(c *C) {
c.Assert(err, IsNil)
c.Assert(str, Equals, gcModeDistributed)

s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral)
err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeCentral)
c.Assert(err, IsNil)
useDistributedGC, err = s.gcWorker.checkUseDistributedGC()
c.Assert(err, IsNil)
c.Assert(useDistributedGC, Equals, false)

s.gcWorker.saveValueToSysTable(gcModeKey, gcModeDistributed)
err = s.gcWorker.saveValueToSysTable(gcModeKey, gcModeDistributed)
c.Assert(err, IsNil)
useDistributedGC, err = s.gcWorker.checkUseDistributedGC()
c.Assert(err, IsNil)
c.Assert(useDistributedGC, Equals, true)

s.gcWorker.saveValueToSysTable(gcModeKey, "invalid_mode")
err = s.gcWorker.saveValueToSysTable(gcModeKey, "invalid_mode")
c.Assert(err, IsNil)
useDistributedGC, err = s.gcWorker.checkUseDistributedGC()
c.Assert(err, IsNil)
c.Assert(useDistributedGC, Equals, true)
Expand Down

0 comments on commit c5c2737

Please sign in to comment.