diff --git a/Makefile b/Makefile index afa834416263..3c72acb53068 100644 --- a/Makefile +++ b/Makefile @@ -117,6 +117,10 @@ simulator: export GO111MODULE=on simulator: CGO_ENABLED=0 go build -o bin/pd-simulator tools/pd-simulator/main.go +regions-dump: export GO111MODULE=on +regions-dump: + CGO_ENABLED=0 go build -o bin/regions-dump tools/regions-dump/main.go + clean-test: rm -rf /tmp/test_pd* rm -rf /tmp/pd-tests* diff --git a/server/cache/cache.go b/pkg/cache/cache.go similarity index 100% rename from server/cache/cache.go rename to pkg/cache/cache.go diff --git a/server/cache/cache_test.go b/pkg/cache/cache_test.go similarity index 100% rename from server/cache/cache_test.go rename to pkg/cache/cache_test.go diff --git a/server/cache/fifo.go b/pkg/cache/fifo.go similarity index 100% rename from server/cache/fifo.go rename to pkg/cache/fifo.go diff --git a/server/cache/lru.go b/pkg/cache/lru.go similarity index 100% rename from server/cache/lru.go rename to pkg/cache/lru.go diff --git a/server/cache/ttl.go b/pkg/cache/ttl.go similarity index 100% rename from server/cache/ttl.go rename to pkg/cache/ttl.go diff --git a/server/cache/two_queue.go b/pkg/cache/two_queue.go similarity index 100% rename from server/cache/two_queue.go rename to pkg/cache/two_queue.go diff --git a/server/checker/merge_checker.go b/server/checker/merge_checker.go index b018294ef4d7..d91800fe796c 100644 --- a/server/checker/merge_checker.go +++ b/server/checker/merge_checker.go @@ -17,7 +17,7 @@ import ( "time" log "github.com/pingcap/log" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" diff --git a/server/cluster.go b/server/cluster.go index 96887ab00dc9..3f6ab533b968 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -109,7 +109,7 @@ func (c *RaftCluster) isInitialized() bool { // yet. func (c *RaftCluster) loadBootstrapTime() (time.Time, error) { var t time.Time - data, err := c.s.kv.Load(c.s.kv.ClusterStatePath("raft_bootstrap_time")) + data, err := c.s.storage.Load(c.s.storage.ClusterStatePath("raft_bootstrap_time")) if err != nil { return t, err } @@ -128,7 +128,7 @@ func (c *RaftCluster) start() error { return nil } - cluster, err := loadClusterInfo(c.s.idAlloc, c.s.kv, c.s.scheduleOpt) + cluster, err := loadClusterInfo(c.s.idAlloc, c.s.storage, c.s.scheduleOpt) if err != nil { return err } @@ -555,7 +555,7 @@ func (c *RaftCluster) SetStoreWeight(storeID uint64, leaderWeight, regionWeight return core.NewStoreNotFoundErr(storeID) } - if err := c.s.kv.SaveStoreWeight(storeID, leaderWeight, regionWeight); err != nil { + if err := c.s.storage.SaveStoreWeight(storeID, leaderWeight, regionWeight); err != nil { return err } diff --git a/server/cluster_info.go b/server/cluster_info.go index c40028b8d04b..b0edebd91487 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -32,7 +32,7 @@ type clusterInfo struct { sync.RWMutex core *core.BasicCluster id core.IDAllocator - kv *core.KV + storage *core.Storage meta *metapb.Cluster opt *scheduleOption regionStats *statistics.RegionStatistics @@ -45,12 +45,12 @@ type clusterInfo struct { var defaultChangedRegionsLimit = 10000 -func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo { +func newClusterInfo(id core.IDAllocator, opt *scheduleOption, storage *core.Storage) *clusterInfo { return &clusterInfo{ core: core.NewBasicCluster(), id: id, opt: opt, - kv: kv, + storage: storage, labelLevelStats: statistics.NewLabelLevelStatistics(), storesStats: statistics.NewStoresStats(), prepareChecker: newPrepareChecker(), @@ -60,11 +60,11 @@ func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clus } // Return nil if cluster is not bootstrapped. -func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*clusterInfo, error) { - c := newClusterInfo(id, opt, kv) +func loadClusterInfo(id core.IDAllocator, storage *core.Storage, opt *scheduleOption) (*clusterInfo, error) { + c := newClusterInfo(id, opt, storage) c.meta = &metapb.Cluster{} - ok, err := kv.LoadMeta(c.meta) + ok, err := storage.LoadMeta(c.meta) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl } start := time.Now() - if err := kv.LoadStores(c.core.Stores); err != nil { + if err := storage.LoadStores(c.core.Stores); err != nil { return nil, err } log.Info("load stores", @@ -82,7 +82,7 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl ) start = time.Now() - if err := kv.LoadRegions(c.core.Regions); err != nil { + if err := storage.LoadRegions(c.core.Regions); err != nil { return nil, err } log.Info("load regions", @@ -117,7 +117,7 @@ func (c *clusterInfo) OnStoreVersionChange() { // it will update the cluster version. if clusterVersion.LessThan(*minVersion) { c.opt.SetClusterVersion(*minVersion) - err := c.opt.persist(c.kv) + err := c.opt.persist(c.storage) if err != nil { log.Error("persist cluster version meet error", zap.Error(err)) } @@ -176,8 +176,8 @@ func (c *clusterInfo) putMeta(meta *metapb.Cluster) error { } func (c *clusterInfo) putMetaLocked(meta *metapb.Cluster) error { - if c.kv != nil { - if err := c.kv.SaveMeta(meta); err != nil { + if c.storage != nil { + if err := c.storage.SaveMeta(meta); err != nil { return err } } @@ -199,8 +199,8 @@ func (c *clusterInfo) putStore(store *core.StoreInfo) error { } func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error { - if c.kv != nil { - if err := c.kv.SaveStore(store.GetMeta()); err != nil { + if c.storage != nil { + if err := c.storage.SaveStore(store.GetMeta()); err != nil { return err } } @@ -216,8 +216,8 @@ func (c *clusterInfo) deleteStore(store *core.StoreInfo) error { } func (c *clusterInfo) deleteStoreLocked(store *core.StoreInfo) error { - if c.kv != nil { - if err := c.kv.DeleteStore(store.GetMeta()); err != nil { + if c.storage != nil { + if err := c.storage.DeleteStore(store.GetMeta()); err != nil { return err } } @@ -348,8 +348,8 @@ func (c *clusterInfo) putRegion(region *core.RegionInfo) error { } func (c *clusterInfo) putRegionLocked(region *core.RegionInfo) error { - if c.kv != nil { - if err := c.kv.SaveRegion(region.GetMeta()); err != nil { + if c.storage != nil { + if err := c.storage.SaveRegion(region.GetMeta()); err != nil { return err } } @@ -525,7 +525,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { isReadUpdate, readItem := c.CheckReadStatus(region) c.RUnlock() - // Save to KV if meta is updated. + // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. var saveKV, saveCache, isNew bool @@ -589,11 +589,11 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { } } - if saveKV && c.kv != nil { - if err := c.kv.SaveRegion(region.GetMeta()); err != nil { - // Not successfully saved to kv is not fatal, it only leads to longer warm-up + if saveKV && c.storage != nil { + if err := c.storage.SaveRegion(region.GetMeta()); err != nil { + // Not successfully saved to storage is not fatal, it only leads to longer warm-up // after restart. Here we only log the error then go on updating cache. - log.Error("fail to save region to kv", + log.Error("fail to save region to storage", zap.Uint64("region-id", region.GetID()), zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta())), zap.Error(err)) @@ -615,10 +615,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { if saveCache { overlaps := c.core.Regions.SetRegion(region) - if c.kv != nil { + if c.storage != nil { for _, item := range overlaps { - if err := c.kv.DeleteRegion(item); err != nil { - log.Error("fail to delete region from kv", + if err := c.storage.DeleteRegion(item); err != nil { + log.Error("fail to delete region from storage", zap.Uint64("region-id", item.GetId()), zap.Reflect("region-meta", core.HexRegionMeta(item)), zap.Error(err)) diff --git a/server/cluster_info_test.go b/server/cluster_info_test.go index 1849213b7382..c85bd54ed5ed 100644 --- a/server/cluster_info_test.go +++ b/server/cluster_info_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" ) var _ = Suite(&testStoresInfoSuite{}) @@ -197,11 +198,11 @@ func checkRegion(c *C, a *core.RegionInfo, b *core.RegionInfo) { } } -func checkRegionsKV(c *C, kv *core.KV, regions []*core.RegionInfo) { - if kv != nil { +func checkRegionsKV(c *C, s *core.Storage, regions []*core.RegionInfo) { + if s != nil { for _, region := range regions { var meta metapb.Region - ok, err := kv.LoadRegion(region.GetID(), &meta) + ok, err := s.LoadRegion(region.GetID(), &meta) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(&meta, DeepEquals, region.GetMeta()) @@ -253,23 +254,23 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { server, cleanup := mustRunTestServer(c) defer cleanup() - kv := server.kv + storage := server.storage _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) // Cluster is not bootstrapped. - cluster, err := loadClusterInfo(server.idAlloc, kv, opt) + cluster, err := loadClusterInfo(server.idAlloc, storage, opt) c.Assert(err, IsNil) c.Assert(cluster, IsNil) // Save meta, stores and regions. n := 10 meta := &metapb.Cluster{Id: 123} - c.Assert(kv.SaveMeta(meta), IsNil) - stores := mustSaveStores(c, kv, n) - regions := mustSaveRegions(c, kv, n) + c.Assert(storage.SaveMeta(meta), IsNil) + stores := mustSaveStores(c, storage, n) + regions := mustSaveRegions(c, storage, n) - cluster, err = loadClusterInfo(server.idAlloc, kv, opt) + cluster, err = loadClusterInfo(server.idAlloc, storage, opt) c.Assert(err, IsNil) c.Assert(cluster, NotNil) @@ -288,7 +289,7 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV())) n, np := uint64(3), uint64(3) stores := newTestStores(n) @@ -324,7 +325,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { for _, store := range stores { tmp := &metapb.Store{} - ok, err := cluster.kv.LoadStore(store.GetID(), tmp) + ok, err := cluster.storage.LoadStore(store.GetID(), tmp) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(tmp, DeepEquals, store.GetMeta()) @@ -334,7 +335,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV())) n, np := uint64(3), uint64(3) @@ -349,25 +350,25 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { // region does not exist. c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is the same, not updated. c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) c.Assert(cluster.handleRegionHeartbeat(stale), NotNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is updated. region = origin.Clone( @@ -377,13 +378,13 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { regions[i] = region c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) c.Assert(cluster.handleRegionHeartbeat(stale), NotNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // Add a down peer. region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ @@ -420,13 +421,13 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { regions[i] = region c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) } regionCounts := make(map[uint64]int) @@ -463,11 +464,11 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { c.Assert(store.GetRegionSize(), Equals, cluster.core.Regions.GetStoreRegionSize(store.GetID())) } - // Test with kv. - if kv := cluster.kv; kv != nil { + // Test with storage. + if storage := cluster.storage; storage != nil { for _, region := range regions { tmp := &metapb.Region{} - ok, err := kv.LoadRegion(region.GetID(), tmp) + ok, err := storage.LoadRegion(region.GetID(), tmp) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(tmp, DeepEquals, region.GetMeta()) @@ -482,15 +483,15 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { ) c.Assert(cluster.handleRegionHeartbeat(overlapRegion), NotNil) region := &metapb.Region{} - ok, err := kv.LoadRegion(regions[n-1].GetID(), region) + ok, err := storage.LoadRegion(regions[n-1].GetID(), region) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(region, DeepEquals, regions[n-1].GetMeta()) - ok, err = kv.LoadRegion(regions[n-2].GetID(), region) + ok, err = storage.LoadRegion(regions[n-2].GetID(), region) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(region, DeepEquals, regions[n-2].GetMeta()) - ok, err = kv.LoadRegion(overlapRegion.GetID(), region) + ok, err = storage.LoadRegion(overlapRegion.GetID(), region) c.Assert(ok, IsFalse) c.Assert(err, IsNil) @@ -501,13 +502,13 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { ) c.Assert(cluster.handleRegionHeartbeat(overlapRegion), IsNil) region = &metapb.Region{} - ok, err = kv.LoadRegion(regions[n-1].GetID(), region) + ok, err = storage.LoadRegion(regions[n-1].GetID(), region) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - ok, err = kv.LoadRegion(regions[n-2].GetID(), region) + ok, err = storage.LoadRegion(regions[n-2].GetID(), region) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - ok, err = kv.LoadRegion(overlapRegion.GetID(), region) + ok, err = storage.LoadRegion(overlapRegion.GetID(), region) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(region, DeepEquals, overlapRegion.GetMeta()) @@ -693,7 +694,7 @@ func (s *testClusterUtilSuite) TestCheckStaleRegion(c *C) { c.Assert(checkStaleRegion(region, origin), NotNil) } -func mustSaveStores(c *C, kv *core.KV, n int) []*metapb.Store { +func mustSaveStores(c *C, s *core.Storage, n int) []*metapb.Store { stores := make([]*metapb.Store, 0, n) for i := 0; i < n; i++ { store := &metapb.Store{Id: uint64(i)} @@ -701,13 +702,13 @@ func mustSaveStores(c *C, kv *core.KV, n int) []*metapb.Store { } for _, store := range stores { - c.Assert(kv.SaveStore(store), IsNil) + c.Assert(s.SaveStore(store), IsNil) } return stores } -func mustSaveRegions(c *C, kv *core.KV, n int) []*metapb.Region { +func mustSaveRegions(c *C, s *core.Storage, n int) []*metapb.Region { regions := make([]*metapb.Region, 0, n) for i := 0; i < n; i++ { region := newTestRegionMeta(uint64(i)) @@ -715,9 +716,9 @@ func mustSaveRegions(c *C, kv *core.KV, n int) []*metapb.Region { } for _, region := range regions { - c.Assert(kv.SaveRegion(region), IsNil) + c.Assert(s.SaveRegion(region), IsNil) } - c.Assert(kv.Flush(), IsNil) + c.Assert(s.Flush(), IsNil) return regions } diff --git a/server/cluster_test.go b/server/cluster_test.go index b70322b29381..90f9bfd71ec1 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -47,7 +48,7 @@ type testClusterSuite struct { } type testErrorKV struct { - core.KVBase + kv.Base } func (kv *testErrorKV) Save(key, value string) error { @@ -502,7 +503,7 @@ func (s *testClusterSuite) TestConcurrentHandleRegion(c *C) { c.Assert(err, IsNil) s.svr.cluster.RLock() s.svr.cluster.cachedCluster.Lock() - s.svr.cluster.cachedCluster.kv = core.NewKV(core.NewMemoryKV()) + s.svr.cluster.cachedCluster.storage = core.NewStorage(kv.NewMemoryKV()) s.svr.cluster.cachedCluster.Unlock() s.svr.cluster.RUnlock() var stores []*metapb.Store @@ -592,7 +593,7 @@ type testGetStoresSuite struct { func (s *testGetStoresSuite) SetUpSuite(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - s.cluster = newClusterInfo(mockid.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + s.cluster = newClusterInfo(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV())) stores := newTestStores(200) @@ -656,8 +657,8 @@ func (s *testClusterSuite) TestSetScheduleOpt(c *C) { c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0) //PUT GET failed - oldKV := s.svr.kv - s.svr.kv = core.NewKV(&testErrorKV{}) + oldStorage := s.svr.storage + s.svr.storage = core.NewStorage(&testErrorKV{}) replicateCfg.MaxReplicas = 7 scheduleCfg.MaxSnapshotCount = 20 pdServerCfg.UseRegionStorage = false @@ -675,11 +676,11 @@ func (s *testClusterSuite) TestSetScheduleOpt(c *C) { c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0) //DELETE failed - s.svr.kv = oldKV + s.svr.storage = oldStorage c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil) c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil) - s.svr.kv = core.NewKV(&testErrorKV{}) + s.svr.storage = core.NewStorage(&testErrorKV{}) c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil) c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil) diff --git a/server/config_test.go b/server/config_test.go index ea09c4c4ea41..e2f21f084320 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -21,9 +21,9 @@ import ( "time" "github.com/BurntSushi/toml" - . "github.com/pingcap/check" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" ) var _ = Suite(&testConfigSuite{}) @@ -46,19 +46,19 @@ func (s *testConfigSuite) TestBadFormatJoinAddr(c *C) { func (s *testConfigSuite) TestReloadConfig(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - kv := core.NewKV(core.NewMemoryKV()) + storage := core.NewStorage(kv.NewMemoryKV()) scheduleCfg := opt.load() scheduleCfg.MaxSnapshotCount = 10 opt.SetMaxReplicas(5) opt.loadPDServerConfig().UseRegionStorage = true - c.Assert(opt.persist(kv), IsNil) + c.Assert(opt.persist(storage), IsNil) // suppose we add a new default enable scheduler "adjacent-region" defaultSchedulers := []string{"balance-region", "balance-leader", "hot-region", "label", "adjacent-region"} _, newOpt, err := newTestScheduleConfig() c.Assert(err, IsNil) newOpt.AddSchedulerCfg("adjacent-region", []string{}) - c.Assert(newOpt.reload(kv), IsNil) + c.Assert(newOpt.reload(storage), IsNil) schedulers := newOpt.GetSchedulers() c.Assert(schedulers, HasLen, 5) c.Assert(newOpt.loadPDServerConfig().UseRegionStorage, IsTrue) diff --git a/server/coordinator.go b/server/coordinator.go index e1159d75d739..ea26049b941d 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -240,7 +240,7 @@ func (c *coordinator) run() { // Removes the invalid scheduler config and persist. scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k] c.cluster.opt.store(scheduleCfg) - if err := c.cluster.opt.persist(c.cluster.kv); err != nil { + if err := c.cluster.opt.persist(c.cluster.storage); err != nil { log.Error("cannot persist schedule config", zap.Error(err)) } diff --git a/server/coordinator_test.go b/server/coordinator_test.go index b5a0517999ae..aa21d5c5028f 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" "github.com/pingcap/pd/server/schedulers" @@ -56,7 +57,7 @@ func newTestClusterInfo(opt *scheduleOption) *testClusterInfo { return &testClusterInfo{clusterInfo: newClusterInfo( mockid.NewIDAllocator(), opt, - core.NewKV(core.NewMemoryKV()), + core.NewStorage(kv.NewMemoryKV()), )} } @@ -622,7 +623,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { c.Assert(co.removeScheduler("balance-hot-region-scheduler"), IsNil) c.Assert(co.removeScheduler("label-scheduler"), IsNil) c.Assert(co.schedulers, HasLen, 2) - c.Assert(co.cluster.opt.persist(co.cluster.kv), IsNil) + c.Assert(co.cluster.opt.persist(co.cluster.storage), IsNil) co.stop() co.wg.Wait() // make a new coordinator for testing @@ -634,7 +635,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // suppose we add a new default enable scheduler newOpt.AddSchedulerCfg("adjacent-region", []string{}) c.Assert(newOpt.GetSchedulers(), HasLen, 5) - c.Assert(newOpt.reload(co.cluster.kv), IsNil) + c.Assert(newOpt.reload(co.cluster.storage), IsNil) c.Assert(newOpt.GetSchedulers(), HasLen, 7) tc.clusterInfo.opt = newOpt @@ -646,7 +647,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // suppose restart PD again _, newOpt, err = newTestScheduleConfig() c.Assert(err, IsNil) - c.Assert(newOpt.reload(tc.kv), IsNil) + c.Assert(newOpt.reload(tc.storage), IsNil) tc.clusterInfo.opt = newOpt co = newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() @@ -665,13 +666,13 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // the scheduler that is not enable by default will be completely deleted c.Assert(co.cluster.opt.GetSchedulers(), HasLen, 6) c.Assert(co.schedulers, HasLen, 4) - c.Assert(co.cluster.opt.persist(co.cluster.kv), IsNil) + c.Assert(co.cluster.opt.persist(co.cluster.storage), IsNil) co.stop() co.wg.Wait() _, newOpt, err = newTestScheduleConfig() c.Assert(err, IsNil) - c.Assert(newOpt.reload(co.cluster.kv), IsNil) + c.Assert(newOpt.reload(co.cluster.storage), IsNil) tc.clusterInfo.opt = newOpt co = newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -1012,11 +1013,11 @@ func getHeartBeatStreams(c *C, tc *testClusterInfo) *heartbeatStreams { config := NewTestSingleConfig(c) svr, err := CreateServer(config, nil) c.Assert(err, IsNil) - kvBase := newEtcdKVBase(svr) + kvBase := kv.NewEtcdKVBase(svr.client, svr.rootPath) path := filepath.Join(svr.cfg.DataDir, "region-meta") - regionKV, err := core.NewRegionKV(path) + regionStorage, err := core.NewRegionStorage(path) c.Assert(err, IsNil) - svr.kv = core.NewKV(kvBase).SetRegionKV(regionKV) + svr.storage = core.NewStorage(kvBase).SetRegionStorage(regionStorage) cluster := newRaftCluster(svr, tc.getClusterID()) cluster.cachedCluster = tc.clusterInfo hbStreams := newHeartbeatStreams(tc.getClusterID(), cluster) diff --git a/server/core/kv.go b/server/core/kv.go deleted file mode 100644 index fb4071972188..000000000000 --- a/server/core/kv.go +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "encoding/json" - "fmt" - "math" - "path" - "strconv" - "sync/atomic" - - "github.com/gogo/protobuf/proto" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pkg/errors" -) - -const ( - clusterPath = "raft" - configPath = "config" - schedulePath = "schedule" - gcPath = "gc" -) - -const ( - maxKVRangeLimit = 10000 - minKVRangeLimit = 100 -) - -// KV wraps all kv operations, keep it stateless. -type KV struct { - KVBase - regionKV *RegionKV - useRegionKV int32 -} - -// NewKV creates KV instance with KVBase. -func NewKV(base KVBase) *KV { - return &KV{ - KVBase: base, - } -} - -// SetRegionKV sets the region storage. -func (kv *KV) SetRegionKV(regionKV *RegionKV) *KV { - kv.regionKV = regionKV - return kv -} - -// GetRegionKV gets the region storage. -func (kv *KV) GetRegionKV() *RegionKV { - return kv.regionKV -} - -// SwitchToRegionStorage switches to the region storage. -func (kv *KV) SwitchToRegionStorage() { - atomic.StoreInt32(&kv.useRegionKV, 1) -} - -// SwitchToDefaultStorage switches to the to default storage. -func (kv *KV) SwitchToDefaultStorage() { - atomic.StoreInt32(&kv.useRegionKV, 0) -} - -func (kv *KV) storePath(storeID uint64) string { - return path.Join(clusterPath, "s", fmt.Sprintf("%020d", storeID)) -} - -func regionPath(regionID uint64) string { - return path.Join(clusterPath, "r", fmt.Sprintf("%020d", regionID)) -} - -// ClusterStatePath returns the path to save an option. -func (kv *KV) ClusterStatePath(option string) string { - return path.Join(clusterPath, "status", option) -} - -func (kv *KV) storeLeaderWeightPath(storeID uint64) string { - return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader") -} - -func (kv *KV) storeRegionWeightPath(storeID uint64) string { - return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "region") -} - -// LoadMeta loads cluster meta from KV store. -func (kv *KV) LoadMeta(meta *metapb.Cluster) (bool, error) { - return loadProto(kv.KVBase, clusterPath, meta) -} - -// SaveMeta save cluster meta to KV store. -func (kv *KV) SaveMeta(meta *metapb.Cluster) error { - return saveProto(kv.KVBase, clusterPath, meta) -} - -// LoadStore loads one store from KV. -func (kv *KV) LoadStore(storeID uint64, store *metapb.Store) (bool, error) { - return loadProto(kv.KVBase, kv.storePath(storeID), store) -} - -// SaveStore saves one store to KV. -func (kv *KV) SaveStore(store *metapb.Store) error { - return saveProto(kv.KVBase, kv.storePath(store.GetId()), store) -} - -// DeleteStore deletes one store from KV. -func (kv *KV) DeleteStore(store *metapb.Store) error { - return kv.Delete(kv.storePath(store.GetId())) -} - -// LoadRegion loads one regoin from KV. -func (kv *KV) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) { - if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return loadProto(kv.regionKV, regionPath(regionID), region) - } - return loadProto(kv.KVBase, regionPath(regionID), region) -} - -// LoadRegions loads all regions from KV to RegionsInfo. -func (kv *KV) LoadRegions(regions *RegionsInfo) error { - if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return loadRegions(kv.regionKV, regions) - } - return loadRegions(kv.KVBase, regions) -} - -// SaveRegion saves one region to KV. -func (kv *KV) SaveRegion(region *metapb.Region) error { - if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return kv.regionKV.SaveRegion(region) - } - return saveProto(kv.KVBase, regionPath(region.GetId()), region) -} - -// DeleteRegion deletes one region from KV. -func (kv *KV) DeleteRegion(region *metapb.Region) error { - if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return deleteRegion(kv.regionKV, region) - } - return deleteRegion(kv.KVBase, region) -} - -// SaveConfig stores marshalable cfg to the configPath. -func (kv *KV) SaveConfig(cfg interface{}) error { - value, err := json.Marshal(cfg) - if err != nil { - return errors.WithStack(err) - } - return kv.Save(configPath, string(value)) -} - -// LoadConfig loads config from configPath then unmarshal it to cfg. -func (kv *KV) LoadConfig(cfg interface{}) (bool, error) { - value, err := kv.Load(configPath) - if err != nil { - return false, err - } - if value == "" { - return false, nil - } - err = json.Unmarshal([]byte(value), cfg) - if err != nil { - return false, errors.WithStack(err) - } - return true, nil -} - -// LoadStores loads all stores from KV to StoresInfo. -func (kv *KV) LoadStores(stores *StoresInfo) error { - nextID := uint64(0) - endKey := kv.storePath(math.MaxUint64) - for { - key := kv.storePath(nextID) - _, res, err := kv.LoadRange(key, endKey, minKVRangeLimit) - if err != nil { - return err - } - for _, s := range res { - store := &metapb.Store{} - if err := store.Unmarshal([]byte(s)); err != nil { - return errors.WithStack(err) - } - leaderWeight, err := kv.loadFloatWithDefaultValue(kv.storeLeaderWeightPath(store.GetId()), 1.0) - if err != nil { - return err - } - regionWeight, err := kv.loadFloatWithDefaultValue(kv.storeRegionWeightPath(store.GetId()), 1.0) - if err != nil { - return err - } - newStoreInfo := NewStoreInfo(store, SetLeaderWeight(leaderWeight), SetRegionWeight(regionWeight)) - - nextID = store.GetId() + 1 - stores.SetStore(newStoreInfo) - } - if len(res) < minKVRangeLimit { - return nil - } - } -} - -// SaveStoreWeight saves a store's leader and region weight to KV. -func (kv *KV) SaveStoreWeight(storeID uint64, leader, region float64) error { - leaderValue := strconv.FormatFloat(leader, 'f', -1, 64) - if err := kv.Save(kv.storeLeaderWeightPath(storeID), leaderValue); err != nil { - return err - } - regionValue := strconv.FormatFloat(region, 'f', -1, 64) - return kv.Save(kv.storeRegionWeightPath(storeID), regionValue) -} - -func (kv *KV) loadFloatWithDefaultValue(path string, def float64) (float64, error) { - res, err := kv.Load(path) - if err != nil { - return 0, err - } - if res == "" { - return def, nil - } - val, err := strconv.ParseFloat(res, 64) - if err != nil { - return 0, errors.WithStack(err) - } - return val, nil -} - -// Flush flushes the dirty region to storage. -func (kv *KV) Flush() error { - if kv.regionKV != nil { - return kv.regionKV.FlushRegion() - } - return nil -} - -// Close closes the kv. -func (kv *KV) Close() error { - if kv.regionKV != nil { - return kv.regionKV.Close() - } - return nil -} - -// SaveGCSafePoint saves new GC safe point to KV. -func (kv *KV) SaveGCSafePoint(safePoint uint64) error { - key := path.Join(gcPath, "safe_point") - value := strconv.FormatUint(safePoint, 16) - return kv.Save(key, value) -} - -// LoadGCSafePoint loads current GC safe point from KV. -func (kv *KV) LoadGCSafePoint() (uint64, error) { - key := path.Join(gcPath, "safe_point") - value, err := kv.Load(key) - if err != nil { - return 0, err - } - if value == "" { - return 0, nil - } - safePoint, err := strconv.ParseUint(value, 16, 64) - if err != nil { - return 0, err - } - return safePoint, nil -} - -func loadProto(kv KVBase, key string, msg proto.Message) (bool, error) { - value, err := kv.Load(key) - if err != nil { - return false, err - } - if value == "" { - return false, nil - } - err = proto.Unmarshal([]byte(value), msg) - return true, errors.WithStack(err) -} - -func saveProto(kv KVBase, key string, msg proto.Message) error { - value, err := proto.Marshal(msg) - if err != nil { - return errors.WithStack(err) - } - return kv.Save(key, string(value)) -} diff --git a/server/core/region_kv.go b/server/core/region_storage.go similarity index 62% rename from server/core/region_kv.go rename to server/core/region_storage.go index 06306edabffc..45dfe7c96673 100644 --- a/server/core/region_kv.go +++ b/server/core/region_storage.go @@ -21,15 +21,16 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" log "github.com/pingcap/log" + "github.com/pingcap/pd/server/kv" "github.com/pkg/errors" "go.uber.org/zap" ) var dirtyFlushTick = time.Second -// RegionKV is used to save regions. -type RegionKV struct { - *leveldbKV +// RegionStorage is used to save regions. +type RegionStorage struct { + *kv.LeveldbKV mu sync.RWMutex batchRegions map[string]*metapb.Region batchSize int @@ -41,21 +42,21 @@ type RegionKV struct { } const ( - //DefaultFlushRegionRate is the ttl to sync the regions to kv storage. + //DefaultFlushRegionRate is the ttl to sync the regions to region storage. defaultFlushRegionRate = 3 * time.Second - //DefaultBatchSize is the batch size to save the regions to kv storage. + //DefaultBatchSize is the batch size to save the regions to region storage. defaultBatchSize = 100 ) -// NewRegionKV returns a kv storage that is used to save regions. -func NewRegionKV(path string) (*RegionKV, error) { - levelDB, err := newLeveldbKV(path) +// NewRegionStorage returns a region storage that is used to save regions. +func NewRegionStorage(path string) (*RegionStorage, error) { + levelDB, err := kv.NewLeveldbKV(path) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) - kv := &RegionKV{ - leveldbKV: levelDB, + s := &RegionStorage{ + LeveldbKV: levelDB, batchSize: defaultBatchSize, flushRate: defaultFlushRegionRate, batchRegions: make(map[string]*metapb.Region, defaultBatchSize), @@ -63,11 +64,11 @@ func NewRegionKV(path string) (*RegionKV, error) { ctx: ctx, cancel: cancel, } - kv.backgroundFlush() - return kv, nil + s.backgroundFlush() + return s, nil } -func (kv *RegionKV) backgroundFlush() { +func (s *RegionStorage) backgroundFlush() { ticker := time.NewTicker(dirtyFlushTick) var ( isFlush bool @@ -78,35 +79,35 @@ func (kv *RegionKV) backgroundFlush() { for { select { case <-ticker.C: - kv.mu.RLock() - isFlush = kv.flushTime.Before(time.Now()) - kv.mu.RUnlock() + s.mu.RLock() + isFlush = s.flushTime.Before(time.Now()) + s.mu.RUnlock() if !isFlush { continue } - if err = kv.FlushRegion(); err != nil { + if err = s.FlushRegion(); err != nil { log.Error("flush regions meet error", zap.Error(err)) } - case <-kv.ctx.Done(): + case <-s.ctx.Done(): return } } }() } -// SaveRegion saves one region to KV. -func (kv *RegionKV) SaveRegion(region *metapb.Region) error { - kv.mu.Lock() - defer kv.mu.Unlock() - if kv.cacheSize < kv.batchSize-1 { - kv.batchRegions[regionPath(region.GetId())] = region - kv.cacheSize++ +// SaveRegion saves one region to storage. +func (s *RegionStorage) SaveRegion(region *metapb.Region) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.cacheSize < s.batchSize-1 { + s.batchRegions[regionPath(region.GetId())] = region + s.cacheSize++ - kv.flushTime = time.Now().Add(kv.flushRate) + s.flushTime = time.Now().Add(s.flushRate) return nil } - kv.batchRegions[regionPath(region.GetId())] = region - err := kv.flush() + s.batchRegions[regionPath(region.GetId())] = region + err := s.flush() if err != nil { return err @@ -114,11 +115,11 @@ func (kv *RegionKV) SaveRegion(region *metapb.Region) error { return nil } -func deleteRegion(kv KVBase, region *metapb.Region) error { - return kv.Delete(regionPath(region.GetId())) +func deleteRegion(kv kv.Base, region *metapb.Region) error { + return kv.Remove(regionPath(region.GetId())) } -func loadRegions(kv KVBase, regions *RegionsInfo) error { +func loadRegions(kv kv.Base, regions *RegionsInfo) error { nextID := uint64(0) endKey := regionPath(math.MaxUint64) @@ -157,28 +158,28 @@ func loadRegions(kv KVBase, regions *RegionsInfo) error { } } -// FlushRegion saves the cache region to region kv storage. -func (kv *RegionKV) FlushRegion() error { - kv.mu.Lock() - defer kv.mu.Unlock() - return kv.flush() +// FlushRegion saves the cache region to region storage. +func (s *RegionStorage) FlushRegion() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.flush() } -func (kv *RegionKV) flush() error { - if err := kv.SaveRegions(kv.batchRegions); err != nil { +func (s *RegionStorage) flush() error { + if err := s.SaveRegions(s.batchRegions); err != nil { return err } - kv.cacheSize = 0 - kv.batchRegions = make(map[string]*metapb.Region, kv.batchSize) + s.cacheSize = 0 + s.batchRegions = make(map[string]*metapb.Region, s.batchSize) return nil } // Close closes the kv. -func (kv *RegionKV) Close() error { - err := kv.FlushRegion() +func (s *RegionStorage) Close() error { + err := s.FlushRegion() if err != nil { log.Error("meet error before close the region storage", zap.Error(err)) } - kv.cancel() - return kv.db.Close() + s.cancel() + return errors.WithStack(s.LeveldbKV.Close()) } diff --git a/server/core/storage.go b/server/core/storage.go new file mode 100644 index 000000000000..3b266ecc8618 --- /dev/null +++ b/server/core/storage.go @@ -0,0 +1,297 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "encoding/json" + "fmt" + "math" + "path" + "strconv" + "sync/atomic" + + "github.com/gogo/protobuf/proto" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/kv" + "github.com/pkg/errors" +) + +const ( + clusterPath = "raft" + configPath = "config" + schedulePath = "schedule" + gcPath = "gc" +) + +const ( + maxKVRangeLimit = 10000 + minKVRangeLimit = 100 +) + +// Storage wraps all kv operations, keep it stateless. +type Storage struct { + kv.Base + regionStorage *RegionStorage + useRegionStorage int32 +} + +// NewStorage creates Storage instance with Base. +func NewStorage(base kv.Base) *Storage { + return &Storage{ + Base: base, + } +} + +// SetRegionStorage sets the region storage. +func (s *Storage) SetRegionStorage(regionStorage *RegionStorage) *Storage { + s.regionStorage = regionStorage + return s +} + +// GetRegionStorage gets the region storage. +func (s *Storage) GetRegionStorage() *RegionStorage { + return s.regionStorage +} + +// SwitchToRegionStorage switches to the region storage. +func (s *Storage) SwitchToRegionStorage() { + atomic.StoreInt32(&s.useRegionStorage, 1) +} + +// SwitchToDefaultStorage switches to the to default storage. +func (s *Storage) SwitchToDefaultStorage() { + atomic.StoreInt32(&s.useRegionStorage, 0) +} + +func (s *Storage) storePath(storeID uint64) string { + return path.Join(clusterPath, "s", fmt.Sprintf("%020d", storeID)) +} + +func regionPath(regionID uint64) string { + return path.Join(clusterPath, "r", fmt.Sprintf("%020d", regionID)) +} + +// ClusterStatePath returns the path to save an option. +func (s *Storage) ClusterStatePath(option string) string { + return path.Join(clusterPath, "status", option) +} + +func (s *Storage) storeLeaderWeightPath(storeID uint64) string { + return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader") +} + +func (s *Storage) storeRegionWeightPath(storeID uint64) string { + return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "region") +} + +// LoadMeta loads cluster meta from storage. +func (s *Storage) LoadMeta(meta *metapb.Cluster) (bool, error) { + return loadProto(s.Base, clusterPath, meta) +} + +// SaveMeta save cluster meta to storage. +func (s *Storage) SaveMeta(meta *metapb.Cluster) error { + return saveProto(s.Base, clusterPath, meta) +} + +// LoadStore loads one store from storage. +func (s *Storage) LoadStore(storeID uint64, store *metapb.Store) (bool, error) { + return loadProto(s.Base, s.storePath(storeID), store) +} + +// SaveStore saves one store to storage. +func (s *Storage) SaveStore(store *metapb.Store) error { + return saveProto(s.Base, s.storePath(store.GetId()), store) +} + +// DeleteStore deletes one store from storage. +func (s *Storage) DeleteStore(store *metapb.Store) error { + return s.Remove(s.storePath(store.GetId())) +} + +// LoadRegion loads one regoin from storage. +func (s *Storage) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) { + if atomic.LoadInt32(&s.useRegionStorage) > 0 { + return loadProto(s.regionStorage, regionPath(regionID), region) + } + return loadProto(s.Base, regionPath(regionID), region) +} + +// LoadRegions loads all regions from storage to RegionsInfo. +func (s *Storage) LoadRegions(regions *RegionsInfo) error { + if atomic.LoadInt32(&s.useRegionStorage) > 0 { + return loadRegions(s.regionStorage, regions) + } + return loadRegions(s.Base, regions) +} + +// SaveRegion saves one region to storage. +func (s *Storage) SaveRegion(region *metapb.Region) error { + if atomic.LoadInt32(&s.useRegionStorage) > 0 { + return s.regionStorage.SaveRegion(region) + } + return saveProto(s.Base, regionPath(region.GetId()), region) +} + +// DeleteRegion deletes one region from storage. +func (s *Storage) DeleteRegion(region *metapb.Region) error { + if atomic.LoadInt32(&s.useRegionStorage) > 0 { + return deleteRegion(s.regionStorage, region) + } + return deleteRegion(s.Base, region) +} + +// SaveConfig stores marshalable cfg to the configPath. +func (s *Storage) SaveConfig(cfg interface{}) error { + value, err := json.Marshal(cfg) + if err != nil { + return errors.WithStack(err) + } + return s.Save(configPath, string(value)) +} + +// LoadConfig loads config from configPath then unmarshal it to cfg. +func (s *Storage) LoadConfig(cfg interface{}) (bool, error) { + value, err := s.Load(configPath) + if err != nil { + return false, err + } + if value == "" { + return false, nil + } + err = json.Unmarshal([]byte(value), cfg) + if err != nil { + return false, errors.WithStack(err) + } + return true, nil +} + +// LoadStores loads all stores from storage to StoresInfo. +func (s *Storage) LoadStores(stores *StoresInfo) error { + nextID := uint64(0) + endKey := s.storePath(math.MaxUint64) + for { + key := s.storePath(nextID) + _, res, err := s.LoadRange(key, endKey, minKVRangeLimit) + if err != nil { + return err + } + for _, str := range res { + store := &metapb.Store{} + if err := store.Unmarshal([]byte(str)); err != nil { + return errors.WithStack(err) + } + leaderWeight, err := s.loadFloatWithDefaultValue(s.storeLeaderWeightPath(store.GetId()), 1.0) + if err != nil { + return err + } + regionWeight, err := s.loadFloatWithDefaultValue(s.storeRegionWeightPath(store.GetId()), 1.0) + if err != nil { + return err + } + newStoreInfo := NewStoreInfo(store, SetLeaderWeight(leaderWeight), SetRegionWeight(regionWeight)) + + nextID = store.GetId() + 1 + stores.SetStore(newStoreInfo) + } + if len(res) < minKVRangeLimit { + return nil + } + } +} + +// SaveStoreWeight saves a store's leader and region weight to storage. +func (s *Storage) SaveStoreWeight(storeID uint64, leader, region float64) error { + leaderValue := strconv.FormatFloat(leader, 'f', -1, 64) + if err := s.Save(s.storeLeaderWeightPath(storeID), leaderValue); err != nil { + return err + } + regionValue := strconv.FormatFloat(region, 'f', -1, 64) + return s.Save(s.storeRegionWeightPath(storeID), regionValue) +} + +func (s *Storage) loadFloatWithDefaultValue(path string, def float64) (float64, error) { + res, err := s.Load(path) + if err != nil { + return 0, err + } + if res == "" { + return def, nil + } + val, err := strconv.ParseFloat(res, 64) + if err != nil { + return 0, errors.WithStack(err) + } + return val, nil +} + +// Flush flushes the dirty region to storage. +func (s *Storage) Flush() error { + if s.regionStorage != nil { + return s.regionStorage.FlushRegion() + } + return nil +} + +// Close closes the s. +func (s *Storage) Close() error { + if s.regionStorage != nil { + return s.regionStorage.Close() + } + return nil +} + +// SaveGCSafePoint saves new GC safe point to storage. +func (s *Storage) SaveGCSafePoint(safePoint uint64) error { + key := path.Join(gcPath, "safe_point") + value := strconv.FormatUint(safePoint, 16) + return s.Save(key, value) +} + +// LoadGCSafePoint loads current GC safe point from storage. +func (s *Storage) LoadGCSafePoint() (uint64, error) { + key := path.Join(gcPath, "safe_point") + value, err := s.Load(key) + if err != nil { + return 0, err + } + if value == "" { + return 0, nil + } + safePoint, err := strconv.ParseUint(value, 16, 64) + if err != nil { + return 0, err + } + return safePoint, nil +} + +func loadProto(s kv.Base, key string, msg proto.Message) (bool, error) { + value, err := s.Load(key) + if err != nil { + return false, err + } + if value == "" { + return false, nil + } + err = proto.Unmarshal([]byte(value), msg) + return true, errors.WithStack(err) +} + +func saveProto(s kv.Base, key string, msg proto.Message) error { + value, err := proto.Marshal(msg) + if err != nil { + return errors.WithStack(err) + } + return s.Save(key, string(value)) +} diff --git a/server/core/kv_test.go b/server/core/storage_test.go similarity index 69% rename from server/core/kv_test.go rename to server/core/storage_test.go index f671a3b0b1d6..b46a4146a812 100644 --- a/server/core/kv_test.go +++ b/server/core/storage_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/kv" "github.com/pkg/errors" ) @@ -28,51 +29,51 @@ type testKVSuite struct { } func (s *testKVSuite) TestBasic(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) - c.Assert(kv.storePath(123), Equals, "raft/s/00000000000000000123") + c.Assert(storage.storePath(123), Equals, "raft/s/00000000000000000123") c.Assert(regionPath(123), Equals, "raft/r/00000000000000000123") meta := &metapb.Cluster{Id: 123} - ok, err := kv.LoadMeta(meta) + ok, err := storage.LoadMeta(meta) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - c.Assert(kv.SaveMeta(meta), IsNil) + c.Assert(storage.SaveMeta(meta), IsNil) newMeta := &metapb.Cluster{} - ok, err = kv.LoadMeta(newMeta) + ok, err = storage.LoadMeta(newMeta) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(newMeta, DeepEquals, meta) store := &metapb.Store{Id: 123} - ok, err = kv.LoadStore(123, store) + ok, err = storage.LoadStore(123, store) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - c.Assert(kv.SaveStore(store), IsNil) + c.Assert(storage.SaveStore(store), IsNil) newStore := &metapb.Store{} - ok, err = kv.LoadStore(123, newStore) + ok, err = storage.LoadStore(123, newStore) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(newStore, DeepEquals, store) region := &metapb.Region{Id: 123} - ok, err = kv.LoadRegion(123, region) + ok, err = storage.LoadRegion(123, region) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - c.Assert(kv.SaveRegion(region), IsNil) + c.Assert(storage.SaveRegion(region), IsNil) newRegion := &metapb.Region{} - ok, err = kv.LoadRegion(123, newRegion) + ok, err = storage.LoadRegion(123, newRegion) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(newRegion, DeepEquals, region) - err = kv.DeleteRegion(region) + err = storage.DeleteRegion(region) c.Assert(err, IsNil) - ok, err = kv.LoadRegion(123, newRegion) + ok, err = storage.LoadRegion(123, newRegion) c.Assert(ok, IsFalse) c.Assert(err, IsNil) } -func mustSaveStores(c *C, kv *KV, n int) []*metapb.Store { +func mustSaveStores(c *C, s *Storage, n int) []*metapb.Store { stores := make([]*metapb.Store, 0, n) for i := 0; i < n; i++ { store := &metapb.Store{Id: uint64(i)} @@ -80,19 +81,19 @@ func mustSaveStores(c *C, kv *KV, n int) []*metapb.Store { } for _, store := range stores { - c.Assert(kv.SaveStore(store), IsNil) + c.Assert(s.SaveStore(store), IsNil) } return stores } func (s *testKVSuite) TestLoadStores(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) cache := NewStoresInfo() n := 10 - stores := mustSaveStores(c, kv, n) - c.Assert(kv.LoadStores(cache), IsNil) + stores := mustSaveStores(c, storage, n) + c.Assert(storage.LoadStores(cache), IsNil) c.Assert(cache.GetStoreCount(), Equals, n) for _, store := range cache.GetMetaStores() { @@ -101,14 +102,14 @@ func (s *testKVSuite) TestLoadStores(c *C) { } func (s *testKVSuite) TestStoreWeight(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) cache := NewStoresInfo() const n = 3 - mustSaveStores(c, kv, n) - c.Assert(kv.SaveStoreWeight(1, 2.0, 3.0), IsNil) - c.Assert(kv.SaveStoreWeight(2, 0.2, 0.3), IsNil) - c.Assert(kv.LoadStores(cache), IsNil) + mustSaveStores(c, storage, n) + c.Assert(storage.SaveStoreWeight(1, 2.0, 3.0), IsNil) + c.Assert(storage.SaveStoreWeight(2, 0.2, 0.3), IsNil) + c.Assert(storage.LoadStores(cache), IsNil) leaderWeights := []float64{1.0, 2.0, 0.2} regionWeights := []float64{1.0, 3.0, 0.3} for i := 0; i < n; i++ { @@ -117,7 +118,7 @@ func (s *testKVSuite) TestStoreWeight(c *C) { } } -func mustSaveRegions(c *C, kv *KV, n int) []*metapb.Region { +func mustSaveRegions(c *C, s *Storage, n int) []*metapb.Region { regions := make([]*metapb.Region, 0, n) for i := 0; i < n; i++ { region := newTestRegionMeta(uint64(i)) @@ -125,19 +126,19 @@ func mustSaveRegions(c *C, kv *KV, n int) []*metapb.Region { } for _, region := range regions { - c.Assert(kv.SaveRegion(region), IsNil) + c.Assert(s.SaveRegion(region), IsNil) } return regions } func (s *testKVSuite) TestLoadRegions(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) cache := NewRegionsInfo() n := 10 - regions := mustSaveRegions(c, kv, n) - c.Assert(kv.LoadRegions(cache), IsNil) + regions := mustSaveRegions(c, storage, n) + c.Assert(storage.LoadRegions(cache), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { @@ -146,12 +147,12 @@ func (s *testKVSuite) TestLoadRegions(c *C) { } func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) { - kv := NewKV(&KVWithMaxRangeLimit{KVBase: NewMemoryKV(), rangeLimit: 500}) + storage := NewStorage(&KVWithMaxRangeLimit{Base: kv.NewMemoryKV(), rangeLimit: 500}) cache := NewRegionsInfo() n := 1000 - regions := mustSaveRegions(c, kv, n) - c.Assert(kv.LoadRegions(cache), IsNil) + regions := mustSaveRegions(c, storage, n) + c.Assert(storage.LoadRegions(cache), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { c.Assert(region, DeepEquals, regions[region.GetId()]) @@ -159,23 +160,23 @@ func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) { } func (s *testKVSuite) TestLoadGCSafePoint(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) testData := []uint64{0, 1, 2, 233, 2333, 23333333333, math.MaxUint64} - r, e := kv.LoadGCSafePoint() + r, e := storage.LoadGCSafePoint() c.Assert(r, Equals, uint64(0)) c.Assert(e, IsNil) for _, safePoint := range testData { - err := kv.SaveGCSafePoint(safePoint) + err := storage.SaveGCSafePoint(safePoint) c.Assert(err, IsNil) - safePoint1, err := kv.LoadGCSafePoint() + safePoint1, err := storage.LoadGCSafePoint() c.Assert(err, IsNil) c.Assert(safePoint, Equals, safePoint1) } } type KVWithMaxRangeLimit struct { - KVBase + kv.Base rangeLimit int } @@ -183,7 +184,7 @@ func (kv *KVWithMaxRangeLimit) LoadRange(key, endKey string, limit int) ([]strin if limit > kv.rangeLimit { return nil, nil, errors.Errorf("limit %v exceed max rangeLimit %v", limit, kv.rangeLimit) } - return kv.KVBase.LoadRange(key, endKey, limit) + return kv.Base.LoadRange(key, endKey, limit) } func newTestRegionMeta(regionID uint64) *metapb.Region { diff --git a/server/grpc_service.go b/server/grpc_service.go index a5b249d4ef90..d03bb0bf676d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -657,7 +657,7 @@ func (s *Server) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafePoin return &pdpb.GetGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } - safePoint, err := s.kv.LoadGCSafePoint() + safePoint, err := s.storage.LoadGCSafePoint() if err != nil { return nil, err } @@ -688,7 +688,7 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa return &pdpb.UpdateGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } - oldSafePoint, err := s.kv.LoadGCSafePoint() + oldSafePoint, err := s.storage.LoadGCSafePoint() if err != nil { return nil, err } @@ -697,7 +697,7 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa // Only save the safe point if it's greater than the previous one if newSafePoint > oldSafePoint { - if err := s.kv.SaveGCSafePoint(newSafePoint); err != nil { + if err := s.storage.SaveGCSafePoint(newSafePoint); err != nil { return nil, err } log.Info("updated gc safe point", diff --git a/server/handler.go b/server/handler.go index 5c064f12a703..38f44f6c1116 100644 --- a/server/handler.go +++ b/server/handler.go @@ -183,7 +183,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error { log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) if err = c.addScheduler(s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err)) - } else if err = h.opt.persist(c.cluster.kv); err != nil { + } else if err = h.opt.persist(c.cluster.storage); err != nil { log.Error("can not persist scheduler config", zap.Error(err)) } return err @@ -197,7 +197,7 @@ func (h *Handler) RemoveScheduler(name string) error { } if err = c.removeScheduler(name); err != nil { log.Error("can not remove scheduler", zap.String("scheduler-name", name), zap.Error(err)) - } else if err = h.opt.persist(c.cluster.kv); err != nil { + } else if err = h.opt.persist(c.cluster.storage); err != nil { log.Error("can not persist scheduler config", zap.Error(err)) } return err diff --git a/server/id.go b/server/id.go index 047d96a637b8..b4bf76b183fb 100644 --- a/server/id.go +++ b/server/id.go @@ -80,7 +80,7 @@ func (alloc *idAllocator) generate() (uint64, error) { end += allocStep value = uint64ToBytes(end) - resp, err := alloc.s.leaderTxn(cmp).Then(clientv3.OpPut(key, string(value))).Commit() + resp, err := alloc.s.LeaderTxn(cmp).Then(clientv3.OpPut(key, string(value))).Commit() if err != nil { return 0, err } diff --git a/server/etcd_kv.go b/server/kv/etcd_kv.go similarity index 51% rename from server/etcd_kv.go rename to server/kv/etcd_kv.go index 05bad301cbd2..1dee5bf2c43c 100644 --- a/server/etcd_kv.go +++ b/server/kv/etcd_kv.go @@ -11,9 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package server +package kv import ( + "context" "path" "strings" "time" @@ -28,6 +29,8 @@ import ( const ( kvRequestTimeout = time.Second * 10 kvSlowRequestTime = time.Second * 1 + requestTimeout = 10 * time.Second + slowRequestTime = 1 * time.Second ) var ( @@ -35,23 +38,22 @@ var ( ) type etcdKVBase struct { - server *Server client *clientv3.Client rootPath string } -func newEtcdKVBase(s *Server) *etcdKVBase { +// NewEtcdKVBase creates a new etcd kv. +func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase { return &etcdKVBase{ - server: s, - client: s.client, - rootPath: s.rootPath, + client: client, + rootPath: rootPath, } } func (kv *etcdKVBase) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) - resp, err := etcdutil.EtcdKVGet(kv.server.client, key) + resp, err := etcdutil.EtcdKVGet(kv.client, key) if err != nil { return "", err } @@ -69,7 +71,7 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri withRange := clientv3.WithRange(endKey) withLimit := clientv3.WithLimit(int64(limit)) - resp, err := etcdutil.EtcdKVGet(kv.server.client, key, withRange, withLimit) + resp, err := etcdutil.EtcdKVGet(kv.client, key, withRange, withLimit) if err != nil { return nil, nil, err } @@ -85,7 +87,8 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri func (kv *etcdKVBase) Save(key, value string) error { key = path.Join(kv.rootPath, key) - resp, err := kv.server.leaderTxn().Then(clientv3.OpPut(key, value)).Commit() + txn := NewSlowLogTxn(kv.client) + resp, err := txn.Then(clientv3.OpPut(key, value)).Commit() if err != nil { log.Error("save to etcd meet error", zap.Error(err)) return errors.WithStack(err) @@ -96,12 +99,13 @@ func (kv *etcdKVBase) Save(key, value string) error { return nil } -func (kv *etcdKVBase) Delete(key string) error { +func (kv *etcdKVBase) Remove(key string) error { key = path.Join(kv.rootPath, key) - resp, err := kv.server.leaderTxn().Then(clientv3.OpDelete(key)).Commit() + txn := NewSlowLogTxn(kv.client) + resp, err := txn.Then(clientv3.OpDelete(key)).Commit() if err != nil { - log.Error("delete from etcd meet error", zap.Error(err)) + log.Error("remove from etcd meet error", zap.Error(err)) return errors.WithStack(err) } if !resp.Succeeded { @@ -109,3 +113,60 @@ func (kv *etcdKVBase) Delete(key string) error { } return nil } + +// SlowLogTxn wraps etcd transaction and log slow one. +type SlowLogTxn struct { + clientv3.Txn + cancel context.CancelFunc +} + +// NewSlowLogTxn create a SlowLogTxn. +func NewSlowLogTxn(client *clientv3.Client) clientv3.Txn { + ctx, cancel := context.WithTimeout(client.Ctx(), requestTimeout) + return &SlowLogTxn{ + Txn: client.Txn(ctx), + cancel: cancel, + } +} + +// If takes a list of comparison. If all comparisons passed in succeed, +// the operations passed into Then() will be executed. Or the operations +// passed into Else() will be executed. +func (t *SlowLogTxn) If(cs ...clientv3.Cmp) clientv3.Txn { + return &SlowLogTxn{ + Txn: t.Txn.If(cs...), + cancel: t.cancel, + } +} + +// Then takes a list of operations. The Ops list will be executed, if the +// comparisons passed in If() succeed. +func (t *SlowLogTxn) Then(ops ...clientv3.Op) clientv3.Txn { + return &SlowLogTxn{ + Txn: t.Txn.Then(ops...), + cancel: t.cancel, + } +} + +// Commit implements Txn Commit interface. +func (t *SlowLogTxn) Commit() (*clientv3.TxnResponse, error) { + start := time.Now() + resp, err := t.Txn.Commit() + t.cancel() + + cost := time.Since(start) + if cost > slowRequestTime { + log.Warn("txn runs too slow", + zap.Error(err), + zap.Reflect("response", resp), + zap.Duration("cost", cost)) + } + label := "success" + if err != nil { + label = "failed" + } + txnCounter.WithLabelValues(label).Inc() + txnDuration.WithLabelValues(label).Observe(cost.Seconds()) + + return resp, errors.WithStack(err) +} diff --git a/server/etcd_kv_test.go b/server/kv/etcd_kv_test.go similarity index 55% rename from server/etcd_kv_test.go rename to server/kv/etcd_kv_test.go index f588b030368f..ad47c3412c23 100644 --- a/server/etcd_kv_test.go +++ b/server/kv/etcd_kv_test.go @@ -11,18 +11,44 @@ // See the License for the specific language governing permissions and // limitations under the License. -package server +package kv -import . "github.com/pingcap/check" +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "path" + "strconv" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/pd/pkg/tempurl" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/embed" +) + +func TestKV(t *testing.T) { + TestingT(t) +} type testEtcdKVSuite struct{} var _ = Suite(&testEtcdKVSuite{}) func (s *testEtcdKVSuite) TestEtcdKV(c *C) { - server, cleanup := mustRunTestServer(c) - defer cleanup() - kv := server.kv.KVBase + cfg := newTestSingleConfig() + etcd, err := embed.StartEtcd(cfg) + c.Assert(err, IsNil) + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + c.Assert(err, IsNil) + rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) + + kv := NewEtcdKVBase(client, rootPath) keys := []string{"test/key1", "test/key2", "test/key3", "test/key4", "test/key5"} vals := []string{"val1", "val2", "val3", "val4", "val5"} @@ -56,8 +82,37 @@ func (s *testEtcdKVSuite) TestEtcdKV(c *C) { v, err = kv.Load(keys[1]) c.Assert(err, IsNil) c.Assert(v, Equals, "val2") - c.Assert(kv.Delete(keys[1]), IsNil) + c.Assert(kv.Remove(keys[1]), IsNil) v, err = kv.Load(keys[1]) c.Assert(err, IsNil) c.Assert(v, Equals, "") + + etcd.Close() + cleanConfig(cfg) +} + +func newTestSingleConfig() *embed.Config { + cfg := embed.NewConfig() + cfg.Name = "test_etcd" + cfg.Dir, _ = ioutil.TempDir("/tmp", "test_etcd") + cfg.WalDir = "" + cfg.Logger = "zap" + cfg.LogOutputs = []string{"stdout"} + + pu, _ := url.Parse(tempurl.Alloc()) + cfg.LPUrls = []url.URL{*pu} + cfg.APUrls = cfg.LPUrls + cu, _ := url.Parse(tempurl.Alloc()) + cfg.LCUrls = []url.URL{*cu} + cfg.ACUrls = cfg.LCUrls + + cfg.StrictReconfigCheck = false + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0]) + cfg.ClusterState = embed.ClusterStateFlagNew + return cfg +} + +func cleanConfig(cfg *embed.Config) { + // Clean data directory + os.RemoveAll(cfg.Dir) } diff --git a/server/kv/kv.go b/server/kv/kv.go new file mode 100644 index 000000000000..768eb7036f19 --- /dev/null +++ b/server/kv/kv.go @@ -0,0 +1,22 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +// Base is an abstract interface for load/save pd cluster data. +type Base interface { + Load(key string) (string, error) + LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) + Save(key, value string) error + Remove(key string) error +} diff --git a/server/core/levedb_kv.go b/server/kv/levedb_kv.go similarity index 58% rename from server/core/levedb_kv.go rename to server/kv/levedb_kv.go index b9123ee1ecf5..7b5d72ecc39c 100644 --- a/server/core/levedb_kv.go +++ b/server/kv/levedb_kv.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +package kv import ( "github.com/gogo/protobuf/proto" @@ -21,29 +21,32 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -type leveldbKV struct { - db *leveldb.DB +// LeveldbKV is a kv store using leveldb. +type LeveldbKV struct { + *leveldb.DB } -// newLeveldbKV is used to store regions information. -func newLeveldbKV(path string) (*leveldbKV, error) { +// NewLeveldbKV is used to store regions information. +func NewLeveldbKV(path string) (*LeveldbKV, error) { db, err := leveldb.OpenFile(path, nil) if err != nil { return nil, errors.WithStack(err) } - return &leveldbKV{db: db}, nil + return &LeveldbKV{db}, nil } -func (kv *leveldbKV) Load(key string) (string, error) { - v, err := kv.db.Get([]byte(key), nil) +// Load gets a value for a given key. +func (kv *LeveldbKV) Load(key string) (string, error) { + v, err := kv.Get([]byte(key), nil) if err != nil { return "", errors.WithStack(err) } return string(v), err } -func (kv *leveldbKV) LoadRange(startKey, endKey string, limit int) ([]string, []string, error) { - iter := kv.db.NewIterator(&util.Range{Start: []byte(startKey), Limit: []byte(endKey)}, nil) +// LoadRange gets a range of value for a given key range. +func (kv *LeveldbKV) LoadRange(startKey, endKey string, limit int) ([]string, []string, error) { + iter := kv.NewIterator(&util.Range{Start: []byte(startKey), Limit: []byte(endKey)}, nil) keys := make([]string, 0, limit) values := make([]string, 0, limit) count := 0 @@ -59,15 +62,18 @@ func (kv *leveldbKV) LoadRange(startKey, endKey string, limit int) ([]string, [] return keys, values, nil } -func (kv *leveldbKV) Save(key, value string) error { - return errors.WithStack(kv.db.Put([]byte(key), []byte(value), nil)) +// Save stores a key-value pair. +func (kv *LeveldbKV) Save(key, value string) error { + return errors.WithStack(kv.Put([]byte(key), []byte(value), nil)) } -func (kv *leveldbKV) Delete(key string) error { - return errors.WithStack(kv.db.Delete([]byte(key), nil)) +// Remove deletes a key-value pair for a given key. +func (kv *LeveldbKV) Remove(key string) error { + return errors.WithStack(kv.Delete([]byte(key), nil)) } -func (kv *leveldbKV) SaveRegions(regions map[string]*metapb.Region) error { +// SaveRegions stores some regions. +func (kv *LeveldbKV) SaveRegions(regions map[string]*metapb.Region) error { batch := new(leveldb.Batch) for key, r := range regions { @@ -77,9 +83,5 @@ func (kv *leveldbKV) SaveRegions(regions map[string]*metapb.Region) error { } batch.Put([]byte(key), value) } - return errors.WithStack(kv.db.Write(batch, nil)) -} - -func (kv *leveldbKV) Close() error { - return errors.WithStack(kv.db.Close()) + return errors.WithStack(kv.Write(batch, nil)) } diff --git a/server/core/kv_base.go b/server/kv/mem_kv.go similarity index 83% rename from server/core/kv_base.go rename to server/kv/mem_kv.go index a6a637ba2288..a32d3bc5b845 100644 --- a/server/core/kv_base.go +++ b/server/kv/mem_kv.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +package kv import ( "sync" @@ -19,21 +19,13 @@ import ( "github.com/google/btree" ) -// KVBase is an abstract interface for load/save pd cluster data. -type KVBase interface { - Load(key string) (string, error) - LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) - Save(key, value string) error - Delete(key string) error -} - type memoryKV struct { sync.RWMutex tree *btree.BTree } // NewMemoryKV returns an in-memory kvBase for testing. -func NewMemoryKV() KVBase { +func NewMemoryKV() Base { return &memoryKV{ tree: btree.New(2), } @@ -77,7 +69,7 @@ func (kv *memoryKV) Save(key, value string) error { return nil } -func (kv *memoryKV) Delete(key string) error { +func (kv *memoryKV) Remove(key string) error { kv.Lock() defer kv.Unlock() diff --git a/server/kv/metrics.go b/server/kv/metrics.go new file mode 100644 index 000000000000..3ce2313f1c50 --- /dev/null +++ b/server/kv/metrics.go @@ -0,0 +1,40 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import "github.com/prometheus/client_golang/prometheus" + +var ( + txnCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "txn", + Name: "txns_count", + Help: "Counter of txns.", + }, []string{"result"}) + + txnDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "txn", + Name: "handle_txns_duration_seconds", + Help: "Bucketed histogram of processing time (s) of handled txns.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{"result"}) +) + +func init() { + prometheus.MustRegister(txnCounter) + prometheus.MustRegister(txnDuration) +} diff --git a/server/leader.go b/server/leader.go index e5e39d365541..3747df01b391 100644 --- a/server/leader.go +++ b/server/leader.go @@ -25,6 +25,7 @@ import ( log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/etcdutil" "github.com/pingcap/pd/pkg/logutil" + "github.com/pingcap/pd/server/kv" "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" @@ -235,7 +236,7 @@ func (s *Server) campaignLeader() error { leaderKey := s.getLeaderPath() // The leader key must not exist, so the CreateRevision is 0. - resp, err := s.txn(). + resp, err := kv.NewSlowLogTxn(s.client). If(clientv3.Compare(clientv3.CreateRevision(leaderKey), "=", 0)). Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(leaseResp.ID))). Commit() @@ -390,7 +391,7 @@ func (s *Server) ResignLeader(nextLeader string) error { func (s *Server) deleteLeaderKey() error { // delete leader itself and let others start a new election again. leaderKey := s.getLeaderPath() - resp, err := s.leaderTxn().Then(clientv3.OpDelete(leaderKey)).Commit() + resp, err := s.LeaderTxn().Then(clientv3.OpDelete(leaderKey)).Commit() if err != nil { return errors.WithStack(err) } @@ -406,15 +407,15 @@ func (s *Server) leaderCmp() clientv3.Cmp { } func (s *Server) reloadConfigFromKV() error { - err := s.scheduleOpt.reload(s.kv) + err := s.scheduleOpt.reload(s.storage) if err != nil { return err } if s.scheduleOpt.loadPDServerConfig().UseRegionStorage { - s.kv.SwitchToRegionStorage() + s.storage.SwitchToRegionStorage() log.Info("server enable region storage") } else { - s.kv.SwitchToDefaultStorage() + s.storage.SwitchToDefaultStorage() log.Info("server disable region storage") } return nil diff --git a/server/metrics.go b/server/metrics.go index 44cf929b5da0..bac0a882bbb2 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -126,8 +126,6 @@ var ( ) func init() { - prometheus.MustRegister(txnCounter) - prometheus.MustRegister(txnDuration) prometheus.MustRegister(timeJumpBackCounter) prometheus.MustRegister(schedulerStatusGauge) prometheus.MustRegister(regionHeartbeatCounter) diff --git a/server/namespace/namespace.go b/server/namespace/namespace.go index 07f654e6a1e5..a719df9469d4 100644 --- a/server/namespace/namespace.go +++ b/server/namespace/namespace.go @@ -92,7 +92,7 @@ func (c defaultClassifier) IsStoreIDExist(storeID uint64) bool { } // CreateClassifierFunc is for creating namespace classifier. -type CreateClassifierFunc func(*core.KV, core.IDAllocator) (Classifier, error) +type CreateClassifierFunc func(*core.Storage, core.IDAllocator) (Classifier, error) var classifierMap = make(map[string]CreateClassifierFunc) @@ -106,16 +106,16 @@ func RegisterClassifier(name string, createFn CreateClassifierFunc) { } // CreateClassifier creates a namespace classifier with registered creator func. -func CreateClassifier(name string, kv *core.KV, idAlloc core.IDAllocator) (Classifier, error) { +func CreateClassifier(name string, storage *core.Storage, idAlloc core.IDAllocator) (Classifier, error) { fn, ok := classifierMap[name] if !ok { return nil, errors.Errorf("create func of %v is not registered", name) } - return fn(kv, idAlloc) + return fn(storage, idAlloc) } func init() { - RegisterClassifier("default", func(*core.KV, core.IDAllocator) (Classifier, error) { + RegisterClassifier("default", func(*core.Storage, core.IDAllocator) (Classifier, error) { return DefaultClassifier, nil }) } diff --git a/server/option.go b/server/option.go index 301e64cd3da4..d519328dfa4e 100644 --- a/server/option.go +++ b/server/option.go @@ -312,7 +312,7 @@ func (o *scheduleOption) loadPDServerConfig() *PDServerConfig { return o.pdServerConfig.Load().(*PDServerConfig) } -func (o *scheduleOption) persist(kv *core.KV) error { +func (o *scheduleOption) persist(storage *core.Storage) error { namespaces := o.loadNSConfig() cfg := &Config{ @@ -323,11 +323,11 @@ func (o *scheduleOption) persist(kv *core.KV) error { ClusterVersion: o.loadClusterVersion(), PDServerCfg: *o.loadPDServerConfig(), } - err := kv.SaveConfig(cfg) + err := storage.SaveConfig(cfg) return err } -func (o *scheduleOption) reload(kv *core.KV) error { +func (o *scheduleOption) reload(storage *core.Storage) error { namespaces := o.loadNSConfig() cfg := &Config{ @@ -338,7 +338,7 @@ func (o *scheduleOption) reload(kv *core.KV) error { ClusterVersion: o.loadClusterVersion(), PDServerCfg: *o.loadPDServerConfig(), } - isExist, err := kv.LoadConfig(cfg) + isExist, err := storage.LoadConfig(cfg) if err != nil { return err } diff --git a/server/region_syncer/history_buffer.go b/server/region_syncer/history_buffer.go index f5fa3a594fee..bd36f9e7720f 100644 --- a/server/region_syncer/history_buffer.go +++ b/server/region_syncer/history_buffer.go @@ -19,6 +19,7 @@ import ( log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" "go.uber.org/zap" ) @@ -34,11 +35,11 @@ type historyBuffer struct { head int tail int size int - kv core.KVBase + kv kv.Base flushCount int } -func newHistoryBuffer(size int, kv core.KVBase) *historyBuffer { +func newHistoryBuffer(size int, kv kv.Base) *historyBuffer { // use an empty space to simplify operation size++ if size < 2 { diff --git a/server/region_syncer/history_buffer_test.go b/server/region_syncer/history_buffer_test.go index c365997af101..b08332f11638 100644 --- a/server/region_syncer/history_buffer_test.go +++ b/server/region_syncer/history_buffer_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" ) var _ = Suite(&testHistoryBuffer{}) @@ -36,7 +37,7 @@ func (t *testHistoryBuffer) TestBufferSize(c *C) { } // size equals 1 - h := newHistoryBuffer(1, core.NewMemoryKV()) + h := newHistoryBuffer(1, kv.NewMemoryKV()) c.Assert(h.len(), Equals, 0) for _, r := range regions { h.Record(r) @@ -46,7 +47,7 @@ func (t *testHistoryBuffer) TestBufferSize(c *C) { c.Assert(h.get(99), IsNil) // size equals 2 - h = newHistoryBuffer(2, core.NewMemoryKV()) + h = newHistoryBuffer(2, kv.NewMemoryKV()) for _, r := range regions { h.Record(r) } @@ -55,9 +56,9 @@ func (t *testHistoryBuffer) TestBufferSize(c *C) { c.Assert(h.get(99), Equals, regions[h.nextIndex()-2]) c.Assert(h.get(98), IsNil) - // size eqauls 100 - kv := core.NewMemoryKV() - h1 := newHistoryBuffer(100, kv) + // size equals 100 + kvMem := kv.NewMemoryKV() + h1 := newHistoryBuffer(100, kvMem) for i := 0; i < 6; i++ { h1.Record(regions[i]) } @@ -66,7 +67,7 @@ func (t *testHistoryBuffer) TestBufferSize(c *C) { h1.persist() // restart the buffer - h2 := newHistoryBuffer(100, kv) + h2 := newHistoryBuffer(100, kvMem) c.Assert(h2.nextIndex(), Equals, uint64(6)) c.Assert(h2.firstIndex(), Equals, uint64(6)) c.Assert(h2.get(h.nextIndex()-1), IsNil) diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 377c53fa7a16..caba7c77a5c3 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -56,7 +56,7 @@ type Server interface { ClusterID() uint64 GetMemberInfo() *pdpb.Member GetLeader() *pdpb.Member - GetStorage() *core.KV + GetStorage() *core.Storage Name() string GetMetaRegions() []*metapb.Region } @@ -84,7 +84,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { streams: make(map[string]ServerStream), server: s, closed: make(chan struct{}), - history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionKV()), + history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionStorage()), limit: ratelimit.NewBucketWithRate(defaultBucketRate, defaultBucketCapacity), } } diff --git a/server/schedule/filters.go b/server/schedule/filters.go index 928110eccf8b..502ac81e7158 100644 --- a/server/schedule/filters.go +++ b/server/schedule/filters.go @@ -16,7 +16,7 @@ package schedule import ( "fmt" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" ) @@ -444,3 +444,59 @@ func (f StoreStateFilter) filterMoveRegion(opt Options, store *core.StoreInfo) b } return false } + +// BlacklistType the type of BlackListStore Filter. +type BlacklistType int + +// some flags about blacklist type. +const ( + // blacklist associated with the source. + BlacklistSource BlacklistType = 1 << iota + // blacklist associated with the target. + BlacklistTarget +) + +// BlacklistStoreFilter filters the store according to the blacklist. +type BlacklistStoreFilter struct { + blacklist map[uint64]struct{} + flag BlacklistType +} + +// NewBlacklistStoreFilter creates a blacklist filter. +func NewBlacklistStoreFilter(typ BlacklistType) *BlacklistStoreFilter { + return &BlacklistStoreFilter{ + blacklist: make(map[uint64]struct{}), + flag: typ, + } +} + +// Type implements the Filter. +func (f *BlacklistStoreFilter) Type() string { + return "blacklist-store-filter" +} + +// FilterSource implements the Filter. +func (f *BlacklistStoreFilter) FilterSource(opt Options, store *core.StoreInfo) bool { + if f.flag&BlacklistSource != BlacklistSource { + return false + } + return f.filter(store) +} + +// Add adds the store to the blacklist. +func (f *BlacklistStoreFilter) Add(storeID uint64) { + f.blacklist[storeID] = struct{}{} +} + +// FilterTarget implements the Filter. +func (f *BlacklistStoreFilter) FilterTarget(opt Options, store *core.StoreInfo) bool { + if f.flag&BlacklistTarget != BlacklistTarget { + return false + } + return f.filter(store) +} + +func (f *BlacklistStoreFilter) filter(store *core.StoreInfo) bool { + _, ok := f.blacklist[store.GetID()] + return ok +} diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 2e3259cca484..9ada499c8d71 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" log "github.com/pingcap/log" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "go.uber.org/zap" ) diff --git a/server/schedule/selector.go b/server/schedule/selector.go index 0530f106c13a..315c0519cdea 100644 --- a/server/schedule/selector.go +++ b/server/schedule/selector.go @@ -36,10 +36,11 @@ func NewBalanceSelector(kind core.ResourceKind, filters []Filter) *BalanceSelect // SelectSource selects the store that can pass all filters and has the maximal // resource score. -func (s *BalanceSelector) SelectSource(opt Options, stores []*core.StoreInfo) *core.StoreInfo { +func (s *BalanceSelector) SelectSource(opt Options, stores []*core.StoreInfo, filters ...Filter) *core.StoreInfo { + filters = append(filters, s.filters...) var result *core.StoreInfo for _, store := range stores { - if FilterSource(opt, store, s.filters) { + if FilterSource(opt, store, filters) { continue } if result == nil || diff --git a/server/schedule/waiting_operator.go b/server/schedule/waiting_operator.go index 3b03db6b145f..6f5d7efd8e03 100644 --- a/server/schedule/waiting_operator.go +++ b/server/schedule/waiting_operator.go @@ -90,7 +90,7 @@ func (b *RandBuckets) GetOperator() []*Operator { var res []*Operator res = append(res, bucket.ops[0]) // Merge operation has two operators, and thus it should be handled specifically. - if bucket.ops[0].Desc() == "merge-region" { + if bucket.ops[0].Kind()&OpMerge != 0 { res = append(res, bucket.ops[1]) bucket.ops = bucket.ops[2:] } else { diff --git a/server/schedule/waiting_operator_test.go b/server/schedule/waiting_operator_test.go index 2a542a57a1db..2cdac8db7c6e 100644 --- a/server/schedule/waiting_operator_test.go +++ b/server/schedule/waiting_operator_test.go @@ -58,9 +58,11 @@ func (s *testWaitingOperatorSuite) TestListOperator(c *C) { func (s *testWaitingOperatorSuite) TestRandomBucketsWithMergeRegion(c *C) { rb := NewRandBuckets() + descs := []string{"merge-region", "admin-merge-region", "random-merge"} for j := 0; j < 100; j++ { // adds operators - op := NewOperator("merge-region", uint64(1), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OperatorStep{ + desc := descs[j%3] + op := NewOperator(desc, uint64(1), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OperatorStep{ MergeRegion{ FromRegion: &metapb.Region{ Id: 1, @@ -75,7 +77,7 @@ func (s *testWaitingOperatorSuite) TestRandomBucketsWithMergeRegion(c *C) { }, }...) rb.PutOperator(op) - op = NewOperator("merge-region", uint64(2), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OperatorStep{ + op = NewOperator(desc, uint64(2), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OperatorStep{ MergeRegion{ FromRegion: &metapb.Region{ Id: 1, diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 62698dc3a695..8ecfbb2e5a88 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -17,7 +17,7 @@ import ( "strconv" log "github.com/pingcap/log" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" "go.uber.org/zap" diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index ecae36ce3fc8..1c521fb0d4c6 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -14,11 +14,12 @@ package schedulers import ( + "fmt" "strconv" + "time" "github.com/pingcap/kvproto/pkg/metapb" log "github.com/pingcap/log" - "github.com/pingcap/pd/server/cache" "github.com/pingcap/pd/server/checker" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" @@ -31,36 +32,45 @@ func init() { }) } -// balanceRegionRetryLimit is the limit to retry schedule for selected store. -const balanceRegionRetryLimit = 10 +const ( + // balanceRegionRetryLimit is the limit to retry schedule for selected store. + balanceRegionRetryLimit = 10 + hitsStoreTTL = 5 * time.Minute + // The scheduler selects the same source or source-target for a long time + // and do not create an operator will trigger the hit filter. the + // calculation of this time is as follows: + // ScheduleIntervalFactor default is 1.3 , and MinScheduleInterval is 10ms, + // the total time spend t = a1 * (1-pow(q,n)) / (1 - q), where a1 = 10, + // q = 1.3, and n = 30, so t = 87299ms ≈ 87s. + hitsStoreCountThreshold = 30 * balanceRegionRetryLimit + balanceRegionName = "balance-region-scheduler" +) type balanceRegionScheduler struct { *baseScheduler selector *schedule.BalanceSelector - taintStores *cache.TTLUint64 opController *schedule.OperatorController + hitsCounter *hitsStoreBuilder } // newBalanceRegionScheduler creates a scheduler that tends to keep regions on // each store balanced. func newBalanceRegionScheduler(opController *schedule.OperatorController) schedule.Scheduler { - taintStores := newTaintCache() filters := []schedule.Filter{ schedule.StoreStateFilter{MoveRegion: true}, - schedule.NewCacheFilter(taintStores), } base := newBaseScheduler(opController) s := &balanceRegionScheduler{ baseScheduler: base, selector: schedule.NewBalanceSelector(core.RegionKind, filters), - taintStores: taintStores, opController: opController, + hitsCounter: newHitsStoreBuilder(hitsStoreTTL, hitsStoreCountThreshold), } return s } func (s *balanceRegionScheduler) GetName() string { - return "balance-region-scheduler" + return balanceRegionName } func (s *balanceRegionScheduler) GetType() string { @@ -73,11 +83,11 @@ func (s *balanceRegionScheduler) IsScheduleAllowed(cluster schedule.Cluster) boo func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule.Operator { schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() - stores := cluster.GetStores() // source is the store with highest region score in the list that can be selected as balance source. - source := s.selector.SelectSource(cluster, stores) + filter := s.hitsCounter.buildSourceFilter(cluster) + source := s.selector.SelectSource(cluster, stores, filter) if source == nil { schedulerCounter.WithLabelValues(s.GetName(), "no_store").Inc() // Unlike the balanceLeaderScheduler, we don't need to clear the taintCache @@ -93,7 +103,6 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. balanceRegionCounter.WithLabelValues("source_store", sourceAddress, sourceLabel).Inc() opInfluence := s.opController.GetOpInfluence(cluster) - var hasPotentialTarget bool for i := 0; i < balanceRegionRetryLimit; i++ { // Priority picks the region that has a pending peer. // Pending region may means the disk is overload, remove the pending region firstly. @@ -108,6 +117,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. } if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no_region").Inc() + s.hitsCounter.put(source, nil) continue } log.Debug("select region", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID())) @@ -116,6 +126,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. if len(region.GetPeers()) != cluster.GetMaxReplicas() { log.Debug("region has abnormal replica count", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID())) schedulerCounter.WithLabelValues(s.GetName(), "abnormal_replica").Inc() + s.hitsCounter.put(source, nil) continue } @@ -123,28 +134,16 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. if cluster.IsRegionHot(region.GetID()) { log.Debug("region is hot", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID())) schedulerCounter.WithLabelValues(s.GetName(), "region_hot").Inc() + s.hitsCounter.put(source, nil) continue } - if !s.hasPotentialTarget(cluster, region, source, opInfluence) { - continue - } - hasPotentialTarget = true - oldPeer := region.GetStorePeer(sourceID) if op := s.transferPeer(cluster, region, oldPeer, opInfluence); op != nil { schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc() return []*schedule.Operator{op} } } - - if !hasPotentialTarget { - // If no potential target store can be found for the selected store, ignore it for a while. - log.Debug("no operator created for selected store", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", sourceID)) - balanceRegionCounter.WithLabelValues("add_taint", sourceAddress, sourceLabel).Inc() - s.taintStores.Put(sourceID) - } - return nil } @@ -154,11 +153,12 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region * stores := cluster.GetRegionStores(region) source := cluster.GetStore(oldPeer.GetStoreId()) scoreGuard := schedule.NewDistinctScoreFilter(cluster.GetLocationLabels(), stores, source) - + hitsFilter := s.hitsCounter.buildTargetFilter(cluster, source) checker := checker.NewReplicaChecker(cluster, nil) - storeID, _ := checker.SelectBestReplacementStore(region, oldPeer, scoreGuard) + storeID, _ := checker.SelectBestReplacementStore(region, oldPeer, scoreGuard, hitsFilter) if storeID == 0 { schedulerCounter.WithLabelValues(s.GetName(), "no_replacement").Inc() + s.hitsCounter.put(source, nil) return nil } @@ -177,6 +177,7 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region * zap.Int64("target-influence", opInfluence.GetStoreInfluence(targetID).ResourceSize(core.RegionKind)), zap.Int64("average-region-size", cluster.GetAverageRegionSize())) schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc() + s.hitsCounter.put(source, target) return nil } @@ -190,54 +191,103 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region * schedulerCounter.WithLabelValues(s.GetName(), "create_operator_fail").Inc() return nil } + s.hitsCounter.remove(source, target) + s.hitsCounter.remove(source, nil) sourceLabel := strconv.FormatUint(sourceID, 10) targetLabel := strconv.FormatUint(targetID, 10) balanceRegionCounter.WithLabelValues("move_peer", source.GetAddress()+"-out", sourceLabel).Inc() balanceRegionCounter.WithLabelValues("move_peer", target.GetAddress()+"-in", targetLabel).Inc() - // only for testing balanceRegionCounter.WithLabelValues("direction", "from_to", sourceLabel+"-"+targetLabel).Inc() return op } -// hasPotentialTarget is used to determine whether the specified sourceStore -// cannot find a matching targetStore in the long term. -// The main factor for judgment includes StoreState, DistinctScore, and -// ResourceScore, while excludes factors such as ServerBusy, too many snapshot, -// which may recover soon. -func (s *balanceRegionScheduler) hasPotentialTarget(cluster schedule.Cluster, region *core.RegionInfo, source *core.StoreInfo, opInfluence schedule.OpInfluence) bool { - filters := []schedule.Filter{ - schedule.NewExcludedFilter(nil, region.GetStoreIds()), - schedule.NewDistinctScoreFilter(cluster.GetLocationLabels(), cluster.GetRegionStores(region), source), +type record struct { + lastTime time.Time + count int +} +type hitsStoreBuilder struct { + hits map[string]*record + ttl time.Duration + threshold int +} + +func newHitsStoreBuilder(ttl time.Duration, threshold int) *hitsStoreBuilder { + return &hitsStoreBuilder{ + hits: make(map[string]*record), + ttl: ttl, + threshold: threshold, } +} - for _, store := range cluster.GetStores() { - if schedule.FilterTarget(cluster, store, filters) { - log.Debug("skip target store by filters", - zap.String("scheduler", s.GetName()), - zap.Uint64("region", region.GetID()), - zap.Uint64("source", source.GetID()), - zap.Uint64("target", store.GetID())) - continue - } - if !store.IsUp() || store.DownTime() > cluster.GetMaxStoreDownTime() { - log.Debug("skip target store by status", - zap.String("scheduler", s.GetName()), - zap.Uint64("region", region.GetID()), - zap.Uint64("source", source.GetID()), - zap.Uint64("target", store.GetID()), - zap.Bool("isup", store.IsUp()), - zap.Duration("downtime", store.DownTime())) - continue +func (h *hitsStoreBuilder) getKey(source, target *core.StoreInfo) string { + if source == nil { + return "" + } + key := fmt.Sprintf("s%d", source.GetID()) + if target != nil { + key = fmt.Sprintf("%s->t%d", key, target.GetID()) + } + return key +} + +func (h *hitsStoreBuilder) filter(source, target *core.StoreInfo) bool { + key := h.getKey(source, target) + if key == "" { + return false + } + if item, ok := h.hits[key]; ok { + if time.Since(item.lastTime) > h.ttl { + delete(h.hits, key) } - if !shouldBalance(cluster, source, store, region, core.RegionKind, opInfluence) { - log.Debug("skip target store for it should not balance", - zap.String("scheduler", s.GetName()), - zap.Uint64("region", region.GetID()), - zap.Uint64("source", source.GetID()), - zap.Uint64("target", store.GetID())) - continue + if time.Since(item.lastTime) <= h.ttl && item.count >= h.threshold { + log.Debug("skip the the store", zap.String("scheduler", balanceRegionName), zap.String("filter-key", key)) + return true } - return true } return false } + +func (h *hitsStoreBuilder) remove(source, target *core.StoreInfo) { + key := h.getKey(source, target) + if _, ok := h.hits[key]; ok && key != "" { + delete(h.hits, key) + } +} + +func (h *hitsStoreBuilder) put(source, target *core.StoreInfo) { + key := h.getKey(source, target) + if key == "" { + return + } + if item, ok := h.hits[key]; ok { + if time.Since(item.lastTime) >= h.ttl { + item.count = 0 + } else { + item.count++ + } + item.lastTime = time.Now() + } else { + item := &record{lastTime: time.Now()} + h.hits[key] = item + } +} + +func (h *hitsStoreBuilder) buildSourceFilter(cluster schedule.Cluster) schedule.Filter { + filter := schedule.NewBlacklistStoreFilter(schedule.BlacklistSource) + for _, source := range cluster.GetStores() { + if h.filter(source, nil) { + filter.Add(source.GetID()) + } + } + return filter +} + +func (h *hitsStoreBuilder) buildTargetFilter(cluster schedule.Cluster, source *core.StoreInfo) schedule.Filter { + filter := schedule.NewBlacklistStoreFilter(schedule.BlacklistTarget) + for _, target := range cluster.GetStores() { + if h.filter(source, target) { + filter.Add(target.GetID()) + } + } + return filter +} diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 82ed7d0db4e1..b199d49f74f6 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -330,7 +330,6 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { sb, err := schedule.CreateScheduler("balance-region", oc) c.Assert(err, IsNil) - cache := sb.(*balanceRegionScheduler).taintStores opt.SetMaxReplicas(1) // Add stores 1,2,3,4. @@ -345,14 +344,13 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { // Test stateFilter. tc.SetStoreOffline(1) tc.UpdateRegionCount(2, 6) - cache.Remove(4) + // When store 1 is offline, it will be filtered, // store 2 becomes the store with least regions. testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 2) opt.SetMaxReplicas(3) c.Assert(sb.Schedule(tc), IsNil) - cache.Clear() opt.SetMaxReplicas(1) c.Assert(sb.Schedule(tc), NotNil) } @@ -367,8 +365,6 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { sb, err := schedule.CreateScheduler("balance-region", oc) c.Assert(err, IsNil) - cache := sb.(*balanceRegionScheduler).taintStores - // Store 1 has the largest region score, so the balancer try to replace peer in store 1. tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(2, 15, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) @@ -378,22 +374,21 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { // This schedule try to replace peer in store 1, but we have no other stores, // so store 1 will be set in the cache and skipped next schedule. c.Assert(sb.Schedule(tc), IsNil) - c.Assert(cache.Exists(1), IsTrue) + for i := 0; i <= hitsStoreCountThreshold/balanceRegionRetryLimit; i++ { + sb.Schedule(tc) + } + hit := sb.(*balanceRegionScheduler).hitsCounter + c.Assert(hit.buildSourceFilter(tc).FilterSource(tc, tc.GetStore(1)), IsTrue) + c.Assert(hit.buildSourceFilter(tc).FilterSource(tc, tc.GetStore(2)), IsFalse) + c.Assert(hit.buildSourceFilter(tc).FilterSource(tc, tc.GetStore(3)), IsFalse) // Store 4 has smaller region score than store 2. tc.AddLabelsStore(4, 2, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 2, 4) - // If store 4 is busy, no operator will be created. - tc.SetStoreBusy(4, true) - c.Assert(sb.Schedule(tc), IsNil) - // Since busy is a short term state, store2 will not be added to taint cache. - c.Assert(cache.Exists(2), IsFalse) - tc.SetStoreBusy(4, false) - // Store 5 has smaller region score than store 1. tc.AddLabelsStore(5, 2, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) - cache.Remove(1) // Delete store 1 from cache, or it will be skipped. + hit.remove(tc.GetStore(1), nil) testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 5) // Store 6 has smaller region score than store 5. @@ -417,9 +412,12 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { tc.SetStoreDown(5) tc.SetStoreDown(6) tc.SetStoreDown(7) - c.Assert(sb.Schedule(tc), IsNil) - c.Assert(cache.Exists(1), IsTrue) - cache.Remove(1) + tc.SetStoreDown(8) + for i := 0; i <= hitsStoreCountThreshold/balanceRegionRetryLimit; i++ { + c.Assert(sb.Schedule(tc), IsNil) + } + c.Assert(hit.buildSourceFilter(tc).FilterSource(tc, tc.GetStore(1)), IsTrue) + hit.remove(tc.GetStore(1), nil) // Store 9 has different zone with other stores but larger region score than store 1. tc.AddLabelsStore(9, 20, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) @@ -464,6 +462,65 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) { testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 11, 6) } +// TestBalance2 for cornor case 1: +// 11 regions distributed across 5 stores. +//| region_id | leader_store | follower_store | follower_store | +//|-----------|--------------|----------------|----------------| +//| 1 | 1 | 2 | 3 | +//| 2 | 1 | 2 | 3 | +//| 3 | 1 | 2 | 3 | +//| 4 | 1 | 2 | 3 | +//| 5 | 1 | 2 | 3 | +//| 6 | 1 | 2 | 3 | +//| 7 | 1 | 2 | 4 | +//| 8 | 1 | 2 | 4 | +//| 9 | 1 | 2 | 4 | +//| 10 | 1 | 4 | 5 | +//| 11 | 1 | 4 | 5 | +// and the space of last store 5 if very small, about 5 * regionsize +// the source region is more likely distributed in store[1, 2, 3]. +func (s *testBalanceRegionSchedulerSuite) TestBalance1(c *C) { + opt := mockoption.NewScheduleOptions() + tc := mockcluster.NewCluster(opt) + oc := schedule.NewOperatorController(nil, nil) + + opt.TolerantSizeRatio = 1 + + sb, err := schedule.CreateScheduler("balance-region", oc) + c.Assert(err, IsNil) + + tc.AddRegionStore(1, 11) + tc.AddRegionStore(2, 9) + tc.AddRegionStore(3, 6) + tc.AddRegionStore(4, 5) + tc.AddRegionStore(5, 2) + tc.AddLeaderRegion(1, 1, 2, 3) + tc.AddLeaderRegion(2, 1, 2, 3) + + c.Assert(sb.Schedule(tc)[0], NotNil) + // if the space of store 5 is normal, we can balance region to store 5 + testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 5) + + // the used size of store 5 reach (highSpace, lowSpace) + origin := tc.GetStore(5) + stats := origin.GetStoreStats() + stats.Capacity = 50 + stats.Available = 28 + stats.UsedSize = 20 + store5 := origin.Clone(core.SetStoreStats(stats)) + tc.PutStore(store5) + + // the scheduler always pick store 1 as source store, + // and store 5 as target store, but cannot pass `shouldBalance`. + c.Assert(sb.Schedule(tc), IsNil) + // hits the store many times + for i := 0; i < 1000; i++ { + sb.Schedule(tc) + } + // now filter the store 5, and can transfer store 1 to store 4 + testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 4) +} + func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { opt := mockoption.NewScheduleOptions() tc := mockcluster.NewCluster(opt) diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 37ae6ce7c6e1..a21ac5fed5d7 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -17,7 +17,7 @@ import ( "time" "github.com/montanaflynn/stats" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" ) diff --git a/server/server.go b/server/server.go index 0462214f5207..646eadac22ad 100644 --- a/server/server.go +++ b/server/server.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/pd/pkg/etcdutil" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" "github.com/pingcap/pd/server/namespace" "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" @@ -89,8 +90,8 @@ type Server struct { // store, region and peer, because we just need // a unique ID. idAlloc *idAllocator - // for kv operation. - kv *core.KV + // for storage operation. + storage *core.Storage // for namespace. classifier namespace.Classifier // for raft cluster @@ -220,16 +221,16 @@ func (s *Server) startServer() error { s.member, s.memberValue = s.memberInfo() s.idAlloc = &idAllocator{s: s} - kvBase := newEtcdKVBase(s) + kvBase := kv.NewEtcdKVBase(s.client, s.rootPath) path := filepath.Join(s.cfg.DataDir, "region-meta") - regionKV, err := core.NewRegionKV(path) + regionStorage, err := core.NewRegionStorage(path) if err != nil { return err } - s.kv = core.NewKV(kvBase).SetRegionKV(regionKV) + s.storage = core.NewStorage(kvBase).SetRegionStorage(regionStorage) s.cluster = newRaftCluster(s, s.clusterID) s.hbStreams = newHeartbeatStreams(s.clusterID, s.cluster) - if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.kv, s.idAlloc); err != nil { + if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.storage, s.idAlloc); err != nil { return err } @@ -276,8 +277,8 @@ func (s *Server) Close() { if s.hbStreams != nil { s.hbStreams.Close() } - if err := s.kv.Close(); err != nil { - log.Error("close kv meet error", zap.Error(err)) + if err := s.storage.Close(); err != nil { + log.Error("close storage meet error", zap.Error(err)) } log.Info("close server") @@ -406,7 +407,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe // TODO: we must figure out a better way to handle bootstrap failed, maybe intervene manually. bootstrapCmp := clientv3.Compare(clientv3.CreateRevision(clusterRootPath), "=", 0) - resp, err := s.txn().If(bootstrapCmp).Then(ops...).Commit() + resp, err := kv.NewSlowLogTxn(s.client).If(bootstrapCmp).Then(ops...).Commit() if err != nil { return nil, errors.WithStack(err) } @@ -416,11 +417,11 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe } log.Info("bootstrap cluster ok", zap.Uint64("cluster-id", clusterID)) - err = s.kv.SaveRegion(req.GetRegion()) + err = s.storage.SaveRegion(req.GetRegion()) if err != nil { log.Warn("save the bootstrap region failed", zap.Error(err)) } - err = s.kv.Flush() + err = s.storage.Flush() if err != nil { log.Warn("flush the bootstrap region failed", zap.Error(err)) } @@ -469,8 +470,8 @@ func (s *Server) GetClient() *clientv3.Client { } // GetStorage returns the backend storage of server. -func (s *Server) GetStorage() *core.KV { - return s.kv +func (s *Server) GetStorage() *core.Storage { + return s.storage } // ID returns the unique etcd ID for this server in etcd cluster. @@ -493,16 +494,11 @@ func (s *Server) GetClassifier() namespace.Classifier { return s.classifier } -// txn returns an etcd client transaction wrapper. -// The wrapper will set a request timeout to the context and log slow transactions. -func (s *Server) txn() clientv3.Txn { - return newSlowLogTxn(s.client) -} - -// leaderTxn returns txn() with a leader comparison to guarantee that +// LeaderTxn returns txn() with a leader comparison to guarantee that // the transaction can be executed only if the server is leader. -func (s *Server) leaderTxn(cs ...clientv3.Cmp) clientv3.Txn { - return s.txn().If(append(cs, s.leaderCmp())...) +func (s *Server) LeaderTxn(cs ...clientv3.Cmp) clientv3.Txn { + txn := kv.NewSlowLogTxn(s.client) + return txn.If(append(cs, s.leaderCmp())...) } // GetConfig gets the config information. @@ -533,7 +529,7 @@ func (s *Server) SetScheduleConfig(cfg ScheduleConfig) error { } old := s.scheduleOpt.load() s.scheduleOpt.store(&cfg) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.store(old) log.Error("failed to update schedule config", zap.Reflect("new", cfg), @@ -559,7 +555,7 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error { } old := s.scheduleOpt.rep.load() s.scheduleOpt.rep.store(&cfg) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.rep.store(old) log.Error("failed to update replication config", zap.Reflect("new", cfg), @@ -575,7 +571,7 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error { func (s *Server) SetPDServerConfig(cfg PDServerConfig) error { old := s.scheduleOpt.loadPDServerConfig() s.scheduleOpt.pdServerConfig.Store(&cfg) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.pdServerConfig.Store(old) log.Error("failed to update PDServer config", zap.Reflect("new", cfg), @@ -617,7 +613,7 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { if n, ok := s.scheduleOpt.getNS(name); ok { old := n.load() n.store(&cfg) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.ns.Store(name, newNamespaceOption(old)) log.Error("failed to update namespace config", zap.String("name", name), @@ -629,7 +625,7 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { log.Info("namespace config is updated", zap.String("name", name), zap.Reflect("new", cfg), zap.Reflect("old", old)) } else { s.scheduleOpt.ns.Store(name, newNamespaceOption(&cfg)) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.ns.Delete(name) log.Error("failed to add namespace config", zap.String("name", name), @@ -647,7 +643,7 @@ func (s *Server) DeleteNamespaceConfig(name string) error { if n, ok := s.scheduleOpt.getNS(name); ok { cfg := n.load() s.scheduleOpt.ns.Delete(name) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.ns.Store(name, newNamespaceOption(cfg)) log.Error("failed to delete namespace config", zap.String("name", name), @@ -662,7 +658,7 @@ func (s *Server) DeleteNamespaceConfig(name string) error { // SetLabelProperty inserts a label property config. func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) - err := s.scheduleOpt.persist(s.kv) + err := s.scheduleOpt.persist(s.storage) if err != nil { s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) log.Error("failed to update label property config", @@ -680,7 +676,7 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { // DeleteLabelProperty deletes a label property config. func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) - err := s.scheduleOpt.persist(s.kv) + err := s.scheduleOpt.persist(s.storage) if err != nil { s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) log.Error("failed to delete label property config", @@ -708,7 +704,7 @@ func (s *Server) SetClusterVersion(v string) error { } old := s.scheduleOpt.loadClusterVersion() s.scheduleOpt.SetClusterVersion(*version) - err = s.scheduleOpt.persist(s.kv) + err = s.scheduleOpt.persist(s.storage) if err != nil { s.scheduleOpt.SetClusterVersion(old) log.Error("failed to update cluster version", @@ -784,7 +780,7 @@ func (s *Server) getMemberLeaderPriorityPath(id uint64) string { // SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader. func (s *Server) SetMemberLeaderPriority(id uint64, priority int) error { key := s.getMemberLeaderPriorityPath(id) - res, err := s.leaderTxn().Then(clientv3.OpPut(key, strconv.Itoa(priority))).Commit() + res, err := s.LeaderTxn().Then(clientv3.OpPut(key, strconv.Itoa(priority))).Commit() if err != nil { return errors.WithStack(err) } @@ -797,7 +793,7 @@ func (s *Server) SetMemberLeaderPriority(id uint64, priority int) error { // DeleteMemberLeaderPriority removes a member's priority config. func (s *Server) DeleteMemberLeaderPriority(id uint64) error { key := s.getMemberLeaderPriorityPath(id) - res, err := s.leaderTxn().Then(clientv3.OpDelete(key)).Commit() + res, err := s.LeaderTxn().Then(clientv3.OpDelete(key)).Commit() if err != nil { return errors.WithStack(err) } diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index d851cb1b1450..fb3075edd826 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -17,7 +17,7 @@ import ( "math/rand" "time" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" ) diff --git a/server/tso.go b/server/tso.go index 17d2ebaad185..d81d2390a098 100644 --- a/server/tso.go +++ b/server/tso.go @@ -63,7 +63,7 @@ func (s *Server) saveTimestamp(ts time.Time) error { data := uint64ToBytes(uint64(ts.UnixNano())) key := s.getTimestampPath() - resp, err := s.leaderTxn().Then(clientv3.OpPut(key, string(data))).Commit() + resp, err := s.LeaderTxn().Then(clientv3.OpPut(key, string(data))).Commit() if err != nil { return errors.WithStack(err) } diff --git a/server/util.go b/server/util.go index 24347b5d5e40..88ceb64b399b 100644 --- a/server/util.go +++ b/server/util.go @@ -182,57 +182,6 @@ func uint64ToBytes(v uint64) []byte { return b } -// slowLogTxn wraps etcd transaction and log slow one. -type slowLogTxn struct { - clientv3.Txn - cancel context.CancelFunc -} - -func newSlowLogTxn(client *clientv3.Client) clientv3.Txn { - ctx, cancel := context.WithTimeout(client.Ctx(), requestTimeout) - return &slowLogTxn{ - Txn: client.Txn(ctx), - cancel: cancel, - } -} - -func (t *slowLogTxn) If(cs ...clientv3.Cmp) clientv3.Txn { - return &slowLogTxn{ - Txn: t.Txn.If(cs...), - cancel: t.cancel, - } -} - -func (t *slowLogTxn) Then(ops ...clientv3.Op) clientv3.Txn { - return &slowLogTxn{ - Txn: t.Txn.Then(ops...), - cancel: t.cancel, - } -} - -// Commit implements Txn Commit interface. -func (t *slowLogTxn) Commit() (*clientv3.TxnResponse, error) { - start := time.Now() - resp, err := t.Txn.Commit() - t.cancel() - - cost := time.Since(start) - if cost > slowRequestTime { - log.Warn("txn runs too slow", - zap.Error(err), - zap.Reflect("response", resp), - zap.Duration("cost", cost)) - } - label := "success" - if err != nil { - label = "failed" - } - txnCounter.WithLabelValues(label).Inc() - txnDuration.WithLabelValues(label).Observe(cost.Seconds()) - - return resp, errors.WithStack(err) -} - // GetMembers return a slice of Members. func GetMembers(etcdClient *clientv3.Client) ([]*pdpb.Member, error) { listResp, err := etcdutil.ListEtcdMembers(etcdClient) diff --git a/table/namespace_classifier.go b/table/namespace_classifier.go index c6f6c3f32d45..7b44501a6514 100644 --- a/table/namespace_classifier.go +++ b/table/namespace_classifier.go @@ -92,7 +92,7 @@ func (ns *Namespace) AddStoreID(storeID uint64) { type tableNamespaceClassifier struct { sync.RWMutex nsInfo *namespacesInfo - kv *core.KV + storage *core.Storage idAlloc core.IDAllocator http.Handler } @@ -101,14 +101,14 @@ const kvRangeLimit = 1000 // NewTableNamespaceClassifier creates a new namespace classifier that // classifies stores and regions by table range. -func NewTableNamespaceClassifier(kv *core.KV, idAlloc core.IDAllocator) (namespace.Classifier, error) { +func NewTableNamespaceClassifier(storage *core.Storage, idAlloc core.IDAllocator) (namespace.Classifier, error) { nsInfo := newNamespacesInfo() - if err := nsInfo.loadNamespaces(kv, kvRangeLimit); err != nil { + if err := nsInfo.loadNamespaces(storage, kvRangeLimit); err != nil { return nil, err } c := &tableNamespaceClassifier{ nsInfo: nsInfo, - kv: kv, + storage: storage, idAlloc: idAlloc, } c.Handler = newTableClassifierHandler(c) @@ -331,13 +331,13 @@ func (c *tableNamespaceClassifier) RemoveNamespaceStoreID(name string, storeID u return c.putNamespaceLocked(n) } -// ReloadNamespaces reloads ns info from kv storage +// ReloadNamespaces reloads ns info from storage. func (c *tableNamespaceClassifier) ReloadNamespaces() error { nsInfo := newNamespacesInfo() c.Lock() defer c.Unlock() - if err := nsInfo.loadNamespaces(c.kv, kvRangeLimit); err != nil { + if err := nsInfo.loadNamespaces(c.storage, kvRangeLimit); err != nil { return err } @@ -346,8 +346,8 @@ func (c *tableNamespaceClassifier) ReloadNamespaces() error { } func (c *tableNamespaceClassifier) putNamespaceLocked(ns *Namespace) error { - if c.kv != nil { - if err := c.nsInfo.saveNamespace(c.kv, ns); err != nil { + if c.storage != nil { + if err := c.nsInfo.saveNamespace(c.storage, ns); err != nil { return err } } @@ -425,16 +425,16 @@ func (namespaceInfo *namespacesInfo) namespacePath(nsID uint64) string { return path.Join("namespace", fmt.Sprintf("%20d", nsID)) } -func (namespaceInfo *namespacesInfo) saveNamespace(kv *core.KV, ns *Namespace) error { +func (namespaceInfo *namespacesInfo) saveNamespace(storage *core.Storage, ns *Namespace) error { value, err := json.Marshal(ns) if err != nil { return errors.WithStack(err) } - err = kv.Save(namespaceInfo.namespacePath(ns.GetID()), string(value)) + err = storage.Save(namespaceInfo.namespacePath(ns.GetID()), string(value)) return err } -func (namespaceInfo *namespacesInfo) loadNamespaces(kv *core.KV, rangeLimit int) error { +func (namespaceInfo *namespacesInfo) loadNamespaces(storage *core.Storage, rangeLimit int) error { start := time.Now() nextID := uint64(0) @@ -442,7 +442,7 @@ func (namespaceInfo *namespacesInfo) loadNamespaces(kv *core.KV, rangeLimit int) for { key := namespaceInfo.namespacePath(nextID) - _, res, err := kv.LoadRange(key, endKey, rangeLimit) + _, res, err := storage.LoadRange(key, endKey, rangeLimit) if err != nil { return err } diff --git a/table/namespace_classifier_test.go b/table/namespace_classifier_test.go index fac70219f275..3de72aa86c70 100644 --- a/table/namespace_classifier_test.go +++ b/table/namespace_classifier_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" ) var _ = Suite(&testTableNamespaceSuite{}) @@ -40,8 +41,8 @@ type testTableNamespaceSuite struct { } func (s *testTableNamespaceSuite) newClassifier(c *C) *tableNamespaceClassifier { - kv := core.NewKV(core.NewMemoryKV()) - classifier, err := NewTableNamespaceClassifier(kv, mockid.NewIDAllocator()) + memStorage := core.NewStorage(kv.NewMemoryKV()) + classifier, err := NewTableNamespaceClassifier(memStorage, mockid.NewIDAllocator()) c.Assert(err, IsNil) tableClassifier := classifier.(*tableNamespaceClassifier) testNamespace1 := Namespace{ @@ -135,8 +136,8 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) { } func (s *testTableNamespaceSuite) TestNamespaceOperation(c *C) { - kv := core.NewKV(core.NewMemoryKV()) - classifier, err := NewTableNamespaceClassifier(kv, mockid.NewIDAllocator()) + memStorage := core.NewStorage(kv.NewMemoryKV()) + classifier, err := NewTableNamespaceClassifier(memStorage, mockid.NewIDAllocator()) c.Assert(err, IsNil) tableClassifier := classifier.(*tableNamespaceClassifier) nsInfo := tableClassifier.nsInfo diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index b72c147e6ff4..ab6af0b5bcba 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -80,7 +80,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } - // ensure flush to region kv + // ensure flush to region storage time.Sleep(3 * time.Second) err = leaderServer.Stop() c.Assert(err, IsNil) @@ -125,7 +125,7 @@ func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } - // ensure flush to region kv + // ensure flush to region storage time.Sleep(3 * time.Second) // restart pd1 err = leaderServer.Stop() diff --git a/tools/pd-ctl/pdctl/command/config_command.go b/tools/pd-ctl/pdctl/command/config_command.go index d7905f5d7308..6cf2505a5758 100644 --- a/tools/pd-ctl/pdctl/command/config_command.go +++ b/tools/pd-ctl/pdctl/command/config_command.go @@ -270,11 +270,8 @@ func postConfigDataWithPath(cmd *cobra.Command, key, value, path string) error { if err != nil { return err } - req, err := getRequest(cmd, path, http.MethodPost, "application/json", bytes.NewBuffer(reqData)) - if err != nil { - return err - } - _, err = dail(req) + _, err = doRequest(cmd, path, http.MethodPost, + WithBody("application/json", bytes.NewBuffer(reqData))) if err != nil { return err } diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index 2082748fb9e8..733455258fe6 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -55,69 +55,110 @@ func InitHTTPSClient(CAPath, CertPath, KeyPath string) error { return nil } -func getRequest(cmd *cobra.Command, prefix string, method string, bodyType string, body io.Reader) (*http.Request, error) { - if method == "" { - method = http.MethodGet +type bodyOption struct { + contentType string + body io.Reader +} + +// BodyOption sets the type and content of the body +type BodyOption func(*bodyOption) + +// WithBody returns a BodyOption +func WithBody(contentType string, body io.Reader) BodyOption { + return func(bo *bodyOption) { + bo.contentType = contentType + bo.body = body } - url := getAddressFromCmd(cmd, prefix) - req, err := http.NewRequest(method, url, body) - if err != nil { - return nil, err +} +func doRequest(cmd *cobra.Command, prefix string, method string, + opts ...BodyOption) (string, error) { + b := &bodyOption{} + for _, o := range opts { + o(b) } - req.Header.Set("Content-Type", bodyType) - return req, err + var resp string + var err error + tryURLs(cmd, func(endpoint string) error { + url := endpoint + "/" + prefix + if method == "" { + method = http.MethodGet + } + var req *http.Request + req, err = http.NewRequest(method, url, b.body) + if err != nil { + return err + } + if b.contentType != "" { + req.Header.Set("Content-Type", b.contentType) + } + // the resp would be returned by the outer function + resp, err = dail(req) + if err != nil { + return err + } + return nil + }) + return resp, err } func dail(req *http.Request) (string, error) { - var res string - reps, err := dialClient.Do(req) + resp, err := dialClient.Do(req) if err != nil { - return res, err - } - defer reps.Body.Close() - if reps.StatusCode != http.StatusOK { - return res, genResponseError(reps) + return "", err } - - r, err := ioutil.ReadAll(reps.Body) - if err != nil { - return res, err + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + var msg []byte + msg, err = ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + return fmt.Sprintf("[%d] %s", resp.StatusCode, msg), nil } - res = string(r) - return res, nil -} -func doRequest(cmd *cobra.Command, prefix string, method string) (string, error) { - req, err := getRequest(cmd, prefix, method, "", nil) + content, err := ioutil.ReadAll(resp.Body) if err != nil { return "", err } - return dail(req) + return string(content), nil } -func genResponseError(r *http.Response) error { - res, _ := ioutil.ReadAll(r.Body) - return errors.Errorf("[%d] %s", r.StatusCode, res) -} +// DoFunc receives an endpoint which you can issue request to +type DoFunc func(endpoint string) error -func getAddressFromCmd(cmd *cobra.Command, prefix string) string { - p, err := cmd.Flags().GetString("pd") +// tryURLs issues requests to each URL and tries next one if there +// is an error +func tryURLs(cmd *cobra.Command, f DoFunc) { + addrs, err := cmd.Flags().GetString("pd") if err != nil { - cmd.Println("get pd address error, should set flag with '-u'") + cmd.Println("get pd address failed, should set flag with '-u'") os.Exit(1) } + endpoints := strings.Split(addrs, ",") + for _, endpoint := range endpoints { + var u *url.URL + u, err = url.Parse(endpoint) + if err != nil { + cmd.Println("address format is wrong, should like 'http://127.0.0.1:2379' or '127.0.0.1:2379'") + os.Exit(1) + } + // tolerate some schemes that will be used by users, the TiKV SDK + // use 'tikv' as the scheme, it is really confused if we do not + // support it by pdctl + if u.Scheme == "" || u.Scheme == "pd" || u.Scheme == "tikv" { + u.Scheme = "http" + } - if p != "" && !strings.HasPrefix(p, "http") { - p = "http://" + p + endpoint = u.String() + err = f(endpoint) + if err != nil { + continue + } + break } - - u, err := url.Parse(p) if err != nil { - cmd.Println("address format is wrong, should like 'http://127.0.0.1:2379' or '127.0.0.1:2379'") + cmd.Println("after trying all endpoints, no endpoint is available, the last error we met:", err) } - - s := fmt.Sprintf("%s/%s", u, prefix) - return s } func printResponseError(r *http.Response) error { @@ -133,21 +174,23 @@ func postJSON(cmd *cobra.Command, prefix string, input map[string]interface{}) { return } - url := getAddressFromCmd(cmd, prefix) - r, err := dialClient.Post(url, "application/json", bytes.NewBuffer(data)) - if err != nil { - cmd.Println(err) - return - } - defer r.Body.Close() - - if r.StatusCode != http.StatusOK { - if err := printResponseError(r); err != nil { - cmd.Println(err) + tryURLs(cmd, func(endpoint string) error { + url := endpoint + "/" + prefix + r, err := dialClient.Post(url, "application/json", bytes.NewBuffer(data)) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + if err := printResponseError(r); err != nil { + cmd.Println(err) + } + return nil } - } else { cmd.Println("success!") - } + return nil + }) + } // UsageTemplate will used to generate a help information diff --git a/tools/pd-ctl/pdctl/command/log_command.go b/tools/pd-ctl/pdctl/command/log_command.go index c1e070e8f1e6..86e10b44cce0 100644 --- a/tools/pd-ctl/pdctl/command/log_command.go +++ b/tools/pd-ctl/pdctl/command/log_command.go @@ -47,12 +47,8 @@ func logCommandFunc(cmd *cobra.Command, args []string) { cmd.Printf("Failed to set log level: %s\n", err) return } - req, err := getRequest(cmd, logPrefix, http.MethodPost, "application/json", bytes.NewBuffer(data)) - if err != nil { - cmd.Printf("Failed to set log level: %s\n", err) - return - } - _, err = dail(req) + _, err = doRequest(cmd, logPrefix, http.MethodPost, + WithBody("application/json", bytes.NewBuffer(data))) if err != nil { cmd.Printf("Failed to set log level: %s\n", err) return diff --git a/tools/pd-ctl/pdctl/command/member_command.go b/tools/pd-ctl/pdctl/command/member_command.go index 891e72a012f5..df0d2eb29deb 100644 --- a/tools/pd-ctl/pdctl/command/member_command.go +++ b/tools/pd-ctl/pdctl/command/member_command.go @@ -171,12 +171,7 @@ func setLeaderPriorityFunc(cmd *cobra.Command, args []string) { } data := map[string]interface{}{"leader-priority": priority} reqData, _ := json.Marshal(data) - req, err := getRequest(cmd, prefix, http.MethodPost, "application/json", bytes.NewBuffer(reqData)) - if err != nil { - cmd.Printf("failed to set leader priority: %v\n", err) - return - } - _, err = dail(req) + _, err = doRequest(cmd, prefix, http.MethodPost, WithBody("application/json", bytes.NewBuffer(reqData))) if err != nil { cmd.Printf("failed to set leader priority: %v\n", err) return diff --git a/tools/regions-dump/main.go b/tools/regions-dump/main.go new file mode 100644 index 000000000000..47aa3136181a --- /dev/null +++ b/tools/regions-dump/main.go @@ -0,0 +1,144 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "math" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/pkg/etcdutil" + "github.com/pingcap/pd/server/core" + "github.com/pkg/errors" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/transport" +) + +var ( + clusterID = flag.Uint64("cluster-id", 0, "please make cluster ID match with tikv") + endpoints = flag.String("endpoints", "http://127.0.0.1:2379", "endpoints urls") + startID = flag.Uint64("start-id", 0, "the id of the start region") + endID = flag.Uint64("end-id", 0, "the id of the last region") + filePath = flag.String("file", "regions.dump", "the dump file path and name") + caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs.") + certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format..") + keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format.") +) + +const ( + requestTimeout = 10 * time.Second + etcdTimeout = 1200 * time.Second + + pdRootPath = "/pd" + pdClusterIDPath = "/pd/cluster_id" + maxKVRangeLimit = 10000 + minKVRangeLimit = 100 +) + +var ( + rootPath = "" +) + +func checkErr(err error) { + if err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } +} + +func main() { + flag.Parse() + if *endID != 0 && *endID < *startID { + checkErr(errors.New("The end id should great or equal than start id")) + } + rootPath = path.Join(pdRootPath, strconv.FormatUint(*clusterID, 10)) + f, err := os.Create(*filePath) + checkErr(err) + defer f.Close() + + urls := strings.Split(*endpoints, ",") + + tlsInfo := transport.TLSInfo{ + CertFile: *certPath, + KeyFile: *keyPath, + TrustedCAFile: *caPath, + } + tlsConfig, err := tlsInfo.ClientConfig() + checkErr(err) + + client, err := clientv3.New(clientv3.Config{ + Endpoints: urls, + DialTimeout: etcdTimeout, + TLS: tlsConfig, + }) + checkErr(err) + + err = loadRegions(client, f) + checkErr(err) + fmt.Println("successful!") +} + +func regionPath(regionID uint64) string { + return path.Join("raft", "r", fmt.Sprintf("%020d", regionID)) +} + +func loadRegions(client *clientv3.Client, f *os.File) error { + nextID := uint64(*startID) + endKey := regionPath(math.MaxUint64) + if *endID != 0 { + endKey = regionPath(*endID) + } + w := bufio.NewWriter(f) + defer w.Flush() + // Since the region key may be very long, using a larger rangeLimit will cause + // the message packet to exceed the grpc message size limit (4MB). Here we use + // a variable rangeLimit to work around. + rangeLimit := 10000 + for { + startKey := regionPath(nextID) + _, res, err := loadRange(client, startKey, endKey, rangeLimit) + if err != nil { + if rangeLimit /= 2; rangeLimit >= minKVRangeLimit { + continue + } + return err + } + + for _, s := range res { + region := &metapb.Region{} + if err := region.Unmarshal([]byte(s)); err != nil { + return errors.WithStack(err) + } + nextID = region.GetId() + 1 + fmt.Fprintln(w, core.HexRegionMeta(region)) + } + + if len(res) < rangeLimit { + return nil + } + } +} + +func loadRange(client *clientv3.Client, key, endKey string, limit int) ([]string, []string, error) { + key = path.Join(rootPath, key) + endKey = path.Join(rootPath, endKey) + + withRange := clientv3.WithRange(endKey) + withLimit := clientv3.WithLimit(int64(limit)) + resp, err := etcdutil.EtcdKVGet(client, key, withRange, withLimit) + if err != nil { + return nil, nil, err + } + keys := make([]string, 0, len(resp.Kvs)) + values := make([]string, 0, len(resp.Kvs)) + for _, item := range resp.Kvs { + keys = append(keys, strings.TrimPrefix(strings.TrimPrefix(string(item.Key), rootPath), "/")) + values = append(values, string(item.Value)) + } + return keys, values, nil +}