diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index 43f50c9422748..5ac74f9e3340b 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -69,7 +69,7 @@ type GCWorker struct { } // NewGCWorker creates a GCWorker instance. -func NewGCWorker(store tikv.Storage, pdClient pd.Client) (tikv.GCHandler, error) { +func NewGCWorker(store tikv.Storage, pdClient pd.Client) (GCHandler, error) { ver, err := store.CurrentVersion(oracle.GlobalTxnScope) if err != nil { return nil, errors.Trace(err) @@ -2183,3 +2183,16 @@ func (s *mergeLockScanner) physicalScanLocksForStore(ctx context.Context, safePo return nil } + +// GCHandler runs garbage collection job. +type GCHandler interface { + // Start starts the GCHandler. + Start() + + // Close closes the GCHandler. + Close() +} + +// NewGCHandlerFunc creates a new GCHandler. +// To enable real GC, we should assign the function to `gcworker.NewGCWorker`. +var NewGCHandlerFunc func(storage tikv.Storage, pdClient pd.Client) (GCHandler, error) diff --git a/store/gcworker/gc_worker_test.go b/store/gcworker/gc_worker_test.go index 0660aae14c8c9..6e91d2a80483a 100644 --- a/store/gcworker/gc_worker_test.go +++ b/store/gcworker/gc_worker_test.go @@ -69,8 +69,7 @@ type testGCWorkerSuite struct { var _ = SerialSuites(&testGCWorkerSuite{}) func (s *testGCWorkerSuite) SetUpTest(c *C) { - tikv.NewGCHandlerFunc = NewGCWorker - + NewGCHandlerFunc = NewGCWorker hijackClient := func(client tikv.Client) tikv.Client { s.client = &testGCWorkerClient{ Client: client, diff --git a/store/mockstore/unistore.go b/store/mockstore/unistore.go index 8a72fc5858265..6f559b16e9a0f 100644 --- a/store/mockstore/unistore.go +++ b/store/mockstore/unistore.go @@ -52,3 +52,7 @@ func (s *mockStorage) EtcdAddrs() ([]string, error) { func (s *mockStorage) TLSConfig() *tls.Config { return nil } + +func (s *mockStorage) StartGCWorker() error { + return nil +} diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 8bfb02471863e..d2a68259ecdc2 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -61,7 +61,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) { client := mocktikv.NewRPCClient(cluster, mvccStore) pdCli := &CodecPDClient{mocktikv.NewPDClient(cluster)} spkv := NewMockSafePointKV() - store, err := NewKVStore("mocktikv-store", pdCli, spkv, client, false, nil) + store, err := NewKVStore("mocktikv-store", pdCli, spkv, client, nil) store.EnableTxnLocalLatches(1024000) c.Assert(err, IsNil) diff --git a/store/tikv/interface.go b/store/tikv/interface.go index 426a73a235981..0cff79c13b800 100644 --- a/store/tikv/interface.go +++ b/store/tikv/interface.go @@ -19,7 +19,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/tikvrpc" - "github.com/tikv/pd/client" ) // Storage represent the kv.Storage runs on TiKV. @@ -41,9 +40,6 @@ type Storage interface { // UpdateSPCache updates the cache of safe point. UpdateSPCache(cachedSP uint64, cachedTime time.Time) - // GetGCHandler gets the GCHandler. - GetGCHandler() GCHandler - // SetOracle sets the Oracle. SetOracle(oracle oracle.Oracle) @@ -56,16 +52,3 @@ type Storage interface { // Closed returns the closed channel. Closed() <-chan struct{} } - -// GCHandler runs garbage collection job. -type GCHandler interface { - // Start starts the GCHandler. - Start() - - // Close closes the GCHandler. - Close() -} - -// NewGCHandlerFunc creates a new GCHandler. -// To enable real GC, we should assign the function to `gcworker.NewGCWorker`. -var NewGCHandlerFunc func(storage Storage, pdClient pd.Client) (GCHandler, error) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 691f6ccc214bf..1461cf792bb81 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -67,10 +67,8 @@ type KVStore struct { coprCache *coprCache lockResolver *LockResolver txnLatches *latch.LatchesScheduler - gcWorker GCHandler - mock bool - enableGC bool + mock bool kv SafePointKV safePoint uint64 @@ -112,7 +110,7 @@ func (s *KVStore) CheckVisibility(startTime uint64) error { } // NewKVStore creates a new TiKV store instance. -func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC bool, coprCacheConfig *config.CoprocessorCache) (*KVStore, error) { +func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, coprCacheConfig *config.CoprocessorCache) (*KVStore, error) { o, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond) if err != nil { return nil, errors.Trace(err) @@ -133,7 +131,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client memCache: kv.NewCacheDB(), } store.lockResolver = newLockResolver(store) - store.enableGC = enableGC coprCache, err := newCoprCache(coprCacheConfig) if err != nil { @@ -157,21 +154,6 @@ func (s *KVStore) IsLatchEnabled() bool { return s.txnLatches != nil } -// StartGCWorker starts GC worker, it's called in BootstrapSession, don't call this function more than once. -func (s *KVStore) StartGCWorker() error { - if !s.enableGC || NewGCHandlerFunc == nil { - return nil - } - - gcWorker, err := NewGCHandlerFunc(s, s.pdClient) - if err != nil { - return errors.Trace(err) - } - gcWorker.Start() - s.gcWorker = gcWorker - return nil -} - func (s *KVStore) runSafePointChecker() { d := gcSafePointUpdateInterval for { @@ -237,9 +219,6 @@ func (s *KVStore) GetSnapshot(ver kv.Version) kv.Snapshot { func (s *KVStore) Close() error { s.oracle.Close() s.pdClient.Close() - if s.gcWorker != nil { - s.gcWorker.Close() - } close(s.closed) if err := s.client.Close(); err != nil { @@ -382,11 +361,6 @@ func (s *KVStore) GetLockResolver() *LockResolver { return s.lockResolver } -// GetGCHandler returns the GC worker instance. -func (s *KVStore) GetGCHandler() GCHandler { - return s.gcWorker -} - // Closed returns a channel that indicates if the store is closed. func (s *KVStore) Closed() <-chan struct{} { return s.closed diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 97c114b95fb91..6cd2f596df2a4 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -93,7 +93,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl return nil, errors.Trace(err) } - s, err := NewKVStore(uuid, &CodecPDClient{pdCli}, spkv, NewRPCClient(security), false, nil) + s, err := NewKVStore(uuid, &CodecPDClient{pdCli}, spkv, NewRPCClient(security), nil) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/test_util.go b/store/tikv/test_util.go index 00223a9148211..e8bd0e76a3e4b 100644 --- a/store/tikv/test_util.go +++ b/store/tikv/test_util.go @@ -34,7 +34,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien // Make sure the uuid is unique. uid := uuid.New().String() spkv := NewMockSafePointKV() - tikvStore, err := NewKVStore(uid, pdCli, spkv, client, false, &config.GetGlobalConfig().TiKVClient.CoprCache) + tikvStore, err := NewKVStore(uid, pdCli, spkv, client, &config.GetGlobalConfig().TiKVClient.CoprCache) if txnLocalLatches > 0 { tikvStore.EnableTxnLocalLatches(txnLocalLatches) diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index f07454526ecc4..55993f87f8600 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -51,7 +51,7 @@ func NewTestStore(c *C) *KVStore { c.Assert(err, IsNil) spKV, err := NewEtcdSafePointKV(addrs, tlsConfig) c.Assert(err, IsNil) - store, err := NewKVStore("test-store", &CodecPDClient{Client: pdClient}, spKV, NewRPCClient(securityConfig), false, nil) + store, err := NewKVStore("test-store", &CodecPDClient{Client: pdClient}, spKV, NewRPCClient(securityConfig), nil) c.Assert(err, IsNil) err = clearStorage(store) c.Assert(err, IsNil) diff --git a/store/tikv_driver.go b/store/tikv_driver.go index d74c410265cbb..e149b9cf119a3 100644 --- a/store/tikv_driver.go +++ b/store/tikv_driver.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/gcworker" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" "github.com/pingcap/tidb/util/execdetails" @@ -147,7 +148,8 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...DriverOption) (kv.St } coprCacheConfig := &config.GetGlobalConfig().TiKVClient.CoprCache - s, err := tikv.NewKVStore(uuid, &tikv.CodecPDClient{Client: pdCli}, spkv, tikv.NewRPCClient(d.security), !disableGC, coprCacheConfig) + pdClient := tikv.CodecPDClient{Client: pdCli} + s, err := tikv.NewKVStore(uuid, &pdClient, spkv, tikv.NewRPCClient(d.security), coprCacheConfig) if err != nil { return nil, errors.Trace(err) } @@ -159,6 +161,8 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...DriverOption) (kv.St KVStore: s, etcdAddrs: etcdAddrs, tlsConfig: tlsConfig, + pdClient: &pdClient, + enableGC: !disableGC, } mc.cache[uuid] = store @@ -169,6 +173,9 @@ type tikvStore struct { *tikv.KVStore etcdAddrs []string tlsConfig *tls.Config + pdClient pd.Client + enableGC bool + gcWorker gcworker.GCHandler } var ( @@ -223,10 +230,28 @@ func (s *tikvStore) TLSConfig() *tls.Config { return s.tlsConfig } +// StartGCWorker starts GC worker, it's called in BootstrapSession, don't call this function more than once. +func (s *tikvStore) StartGCWorker() error { + if !s.enableGC || gcworker.NewGCHandlerFunc == nil { + return nil + } + + gcWorker, err := gcworker.NewGCHandlerFunc(s, s.pdClient) + if err != nil { + return errors.Trace(err) + } + gcWorker.Start() + s.gcWorker = gcWorker + return nil +} + // Close and unregister the store. func (s *tikvStore) Close() error { mc.Lock() defer mc.Unlock() delete(mc.cache, s.UUID()) + if s.gcWorker != nil { + s.gcWorker.Close() + } return s.KVStore.Close() } diff --git a/tidb-server/main.go b/tidb-server/main.go index df9834a9b5777..e30b36f02ae14 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -249,7 +249,7 @@ func setHeapProfileTracker() { func registerStores() { err := kvstore.Register("tikv", kvstore.TiKVDriver{}) terror.MustNil(err) - tikv.NewGCHandlerFunc = gcworker.NewGCWorker + gcworker.NewGCHandlerFunc = gcworker.NewGCWorker err = kvstore.Register("mocktikv", mockstore.MockTiKVDriver{}) terror.MustNil(err) err = kvstore.Register("unistore", mockstore.EmbedUnistoreDriver{})