Skip to content

Commit

Permalink
store/tikv_driver: move gcworker into tikv_driver (#22698)
Browse files Browse the repository at this point in the history
Co-authored-by: disksing <i@disksing.com>
Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 5, 2021
1 parent 006e2fc commit 6483b7d
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 54 deletions.
15 changes: 14 additions & 1 deletion store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type GCWorker struct {
}

// NewGCWorker creates a GCWorker instance.
func NewGCWorker(store tikv.Storage, pdClient pd.Client) (tikv.GCHandler, error) {
func NewGCWorker(store tikv.Storage, pdClient pd.Client) (GCHandler, error) {
ver, err := store.CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -2183,3 +2183,16 @@ func (s *mergeLockScanner) physicalScanLocksForStore(ctx context.Context, safePo

return nil
}

// GCHandler runs garbage collection job.
type GCHandler interface {
// Start starts the GCHandler.
Start()

// Close closes the GCHandler.
Close()
}

// NewGCHandlerFunc creates a new GCHandler.
// To enable real GC, we should assign the function to `gcworker.NewGCWorker`.
var NewGCHandlerFunc func(storage tikv.Storage, pdClient pd.Client) (GCHandler, error)
3 changes: 1 addition & 2 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ type testGCWorkerSuite struct {
var _ = SerialSuites(&testGCWorkerSuite{})

func (s *testGCWorkerSuite) SetUpTest(c *C) {
tikv.NewGCHandlerFunc = NewGCWorker

NewGCHandlerFunc = NewGCWorker
hijackClient := func(client tikv.Client) tikv.Client {
s.client = &testGCWorkerClient{
Client: client,
Expand Down
4 changes: 4 additions & 0 deletions store/mockstore/unistore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ func (s *mockStorage) EtcdAddrs() ([]string, error) {
func (s *mockStorage) TLSConfig() *tls.Config {
return nil
}

func (s *mockStorage) StartGCWorker() error {
return nil
}
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) {
client := mocktikv.NewRPCClient(cluster, mvccStore)
pdCli := &CodecPDClient{mocktikv.NewPDClient(cluster)}
spkv := NewMockSafePointKV()
store, err := NewKVStore("mocktikv-store", pdCli, spkv, client, false, nil)
store, err := NewKVStore("mocktikv-store", pdCli, spkv, client, nil)
store.EnableTxnLocalLatches(1024000)
c.Assert(err, IsNil)

Expand Down
17 changes: 0 additions & 17 deletions store/tikv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/tikv/pd/client"
)

// Storage represent the kv.Storage runs on TiKV.
Expand All @@ -41,9 +40,6 @@ type Storage interface {
// UpdateSPCache updates the cache of safe point.
UpdateSPCache(cachedSP uint64, cachedTime time.Time)

// GetGCHandler gets the GCHandler.
GetGCHandler() GCHandler

// SetOracle sets the Oracle.
SetOracle(oracle oracle.Oracle)

Expand All @@ -56,16 +52,3 @@ type Storage interface {
// Closed returns the closed channel.
Closed() <-chan struct{}
}

// GCHandler runs garbage collection job.
type GCHandler interface {
// Start starts the GCHandler.
Start()

// Close closes the GCHandler.
Close()
}

// NewGCHandlerFunc creates a new GCHandler.
// To enable real GC, we should assign the function to `gcworker.NewGCWorker`.
var NewGCHandlerFunc func(storage Storage, pdClient pd.Client) (GCHandler, error)
30 changes: 2 additions & 28 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,8 @@ type KVStore struct {
coprCache *coprCache
lockResolver *LockResolver
txnLatches *latch.LatchesScheduler
gcWorker GCHandler

mock bool
enableGC bool
mock bool

kv SafePointKV
safePoint uint64
Expand Down Expand Up @@ -112,7 +110,7 @@ func (s *KVStore) CheckVisibility(startTime uint64) error {
}

// NewKVStore creates a new TiKV store instance.
func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC bool, coprCacheConfig *config.CoprocessorCache) (*KVStore, error) {
func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, coprCacheConfig *config.CoprocessorCache) (*KVStore, error) {
o, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -133,7 +131,6 @@ func NewKVStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client
memCache: kv.NewCacheDB(),
}
store.lockResolver = newLockResolver(store)
store.enableGC = enableGC

coprCache, err := newCoprCache(coprCacheConfig)
if err != nil {
Expand All @@ -157,21 +154,6 @@ func (s *KVStore) IsLatchEnabled() bool {
return s.txnLatches != nil
}

// StartGCWorker starts GC worker, it's called in BootstrapSession, don't call this function more than once.
func (s *KVStore) StartGCWorker() error {
if !s.enableGC || NewGCHandlerFunc == nil {
return nil
}

gcWorker, err := NewGCHandlerFunc(s, s.pdClient)
if err != nil {
return errors.Trace(err)
}
gcWorker.Start()
s.gcWorker = gcWorker
return nil
}

func (s *KVStore) runSafePointChecker() {
d := gcSafePointUpdateInterval
for {
Expand Down Expand Up @@ -237,9 +219,6 @@ func (s *KVStore) GetSnapshot(ver kv.Version) kv.Snapshot {
func (s *KVStore) Close() error {
s.oracle.Close()
s.pdClient.Close()
if s.gcWorker != nil {
s.gcWorker.Close()
}

close(s.closed)
if err := s.client.Close(); err != nil {
Expand Down Expand Up @@ -382,11 +361,6 @@ func (s *KVStore) GetLockResolver() *LockResolver {
return s.lockResolver
}

// GetGCHandler returns the GC worker instance.
func (s *KVStore) GetGCHandler() GCHandler {
return s.gcWorker
}

// Closed returns a channel that indicates if the store is closed.
func (s *KVStore) Closed() <-chan struct{} {
return s.closed
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl
return nil, errors.Trace(err)
}

s, err := NewKVStore(uuid, &CodecPDClient{pdCli}, spkv, NewRPCClient(security), false, nil)
s, err := NewKVStore(uuid, &CodecPDClient{pdCli}, spkv, NewRPCClient(security), nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien
// Make sure the uuid is unique.
uid := uuid.New().String()
spkv := NewMockSafePointKV()
tikvStore, err := NewKVStore(uid, pdCli, spkv, client, false, &config.GetGlobalConfig().TiKVClient.CoprCache)
tikvStore, err := NewKVStore(uid, pdCli, spkv, client, &config.GetGlobalConfig().TiKVClient.CoprCache)

if txnLocalLatches > 0 {
tikvStore.EnableTxnLocalLatches(txnLocalLatches)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/ticlient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewTestStore(c *C) *KVStore {
c.Assert(err, IsNil)
spKV, err := NewEtcdSafePointKV(addrs, tlsConfig)
c.Assert(err, IsNil)
store, err := NewKVStore("test-store", &CodecPDClient{Client: pdClient}, spKV, NewRPCClient(securityConfig), false, nil)
store, err := NewKVStore("test-store", &CodecPDClient{Client: pdClient}, spKV, NewRPCClient(securityConfig), nil)
c.Assert(err, IsNil)
err = clearStorage(store)
c.Assert(err, IsNil)
Expand Down
27 changes: 26 additions & 1 deletion store/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -147,7 +148,8 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...DriverOption) (kv.St
}

coprCacheConfig := &config.GetGlobalConfig().TiKVClient.CoprCache
s, err := tikv.NewKVStore(uuid, &tikv.CodecPDClient{Client: pdCli}, spkv, tikv.NewRPCClient(d.security), !disableGC, coprCacheConfig)
pdClient := tikv.CodecPDClient{Client: pdCli}
s, err := tikv.NewKVStore(uuid, &pdClient, spkv, tikv.NewRPCClient(d.security), coprCacheConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -159,6 +161,8 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...DriverOption) (kv.St
KVStore: s,
etcdAddrs: etcdAddrs,
tlsConfig: tlsConfig,
pdClient: &pdClient,
enableGC: !disableGC,
}

mc.cache[uuid] = store
Expand All @@ -169,6 +173,9 @@ type tikvStore struct {
*tikv.KVStore
etcdAddrs []string
tlsConfig *tls.Config
pdClient pd.Client
enableGC bool
gcWorker gcworker.GCHandler
}

var (
Expand Down Expand Up @@ -223,10 +230,28 @@ func (s *tikvStore) TLSConfig() *tls.Config {
return s.tlsConfig
}

// StartGCWorker starts GC worker, it's called in BootstrapSession, don't call this function more than once.
func (s *tikvStore) StartGCWorker() error {
if !s.enableGC || gcworker.NewGCHandlerFunc == nil {
return nil
}

gcWorker, err := gcworker.NewGCHandlerFunc(s, s.pdClient)
if err != nil {
return errors.Trace(err)
}
gcWorker.Start()
s.gcWorker = gcWorker
return nil
}

// Close and unregister the store.
func (s *tikvStore) Close() error {
mc.Lock()
defer mc.Unlock()
delete(mc.cache, s.UUID())
if s.gcWorker != nil {
s.gcWorker.Close()
}
return s.KVStore.Close()
}
2 changes: 1 addition & 1 deletion tidb-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func setHeapProfileTracker() {
func registerStores() {
err := kvstore.Register("tikv", kvstore.TiKVDriver{})
terror.MustNil(err)
tikv.NewGCHandlerFunc = gcworker.NewGCWorker
gcworker.NewGCHandlerFunc = gcworker.NewGCWorker
err = kvstore.Register("mocktikv", mockstore.MockTiKVDriver{})
terror.MustNil(err)
err = kvstore.Register("unistore", mockstore.EmbedUnistoreDriver{})
Expand Down

0 comments on commit 6483b7d

Please sign in to comment.