From b6767528b88dca7f8a3b31148e891f75627a898c Mon Sep 17 00:00:00 2001 From: ShuNing Date: Thu, 11 Jul 2019 16:29:41 +0800 Subject: [PATCH 1/6] tools: add regions dumper (#1631) * tools: add regions dumper Signed-off-by: nolouch * address comments Signed-off-by: nolouch --- Makefile | 4 ++ tools/regions-dump/main.go | 144 +++++++++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+) create mode 100644 tools/regions-dump/main.go diff --git a/Makefile b/Makefile index afa834416263..3c72acb53068 100644 --- a/Makefile +++ b/Makefile @@ -117,6 +117,10 @@ simulator: export GO111MODULE=on simulator: CGO_ENABLED=0 go build -o bin/pd-simulator tools/pd-simulator/main.go +regions-dump: export GO111MODULE=on +regions-dump: + CGO_ENABLED=0 go build -o bin/regions-dump tools/regions-dump/main.go + clean-test: rm -rf /tmp/test_pd* rm -rf /tmp/pd-tests* diff --git a/tools/regions-dump/main.go b/tools/regions-dump/main.go new file mode 100644 index 000000000000..47aa3136181a --- /dev/null +++ b/tools/regions-dump/main.go @@ -0,0 +1,144 @@ +package main + +import ( + "bufio" + "flag" + "fmt" + "math" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/pkg/etcdutil" + "github.com/pingcap/pd/server/core" + "github.com/pkg/errors" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/transport" +) + +var ( + clusterID = flag.Uint64("cluster-id", 0, "please make cluster ID match with tikv") + endpoints = flag.String("endpoints", "http://127.0.0.1:2379", "endpoints urls") + startID = flag.Uint64("start-id", 0, "the id of the start region") + endID = flag.Uint64("end-id", 0, "the id of the last region") + filePath = flag.String("file", "regions.dump", "the dump file path and name") + caPath = flag.String("cacert", "", "path of file that contains list of trusted SSL CAs.") + certPath = flag.String("cert", "", "path of file that contains X509 certificate in PEM format..") + keyPath = flag.String("key", "", "path of file that contains X509 key in PEM format.") +) + +const ( + requestTimeout = 10 * time.Second + etcdTimeout = 1200 * time.Second + + pdRootPath = "/pd" + pdClusterIDPath = "/pd/cluster_id" + maxKVRangeLimit = 10000 + minKVRangeLimit = 100 +) + +var ( + rootPath = "" +) + +func checkErr(err error) { + if err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } +} + +func main() { + flag.Parse() + if *endID != 0 && *endID < *startID { + checkErr(errors.New("The end id should great or equal than start id")) + } + rootPath = path.Join(pdRootPath, strconv.FormatUint(*clusterID, 10)) + f, err := os.Create(*filePath) + checkErr(err) + defer f.Close() + + urls := strings.Split(*endpoints, ",") + + tlsInfo := transport.TLSInfo{ + CertFile: *certPath, + KeyFile: *keyPath, + TrustedCAFile: *caPath, + } + tlsConfig, err := tlsInfo.ClientConfig() + checkErr(err) + + client, err := clientv3.New(clientv3.Config{ + Endpoints: urls, + DialTimeout: etcdTimeout, + TLS: tlsConfig, + }) + checkErr(err) + + err = loadRegions(client, f) + checkErr(err) + fmt.Println("successful!") +} + +func regionPath(regionID uint64) string { + return path.Join("raft", "r", fmt.Sprintf("%020d", regionID)) +} + +func loadRegions(client *clientv3.Client, f *os.File) error { + nextID := uint64(*startID) + endKey := regionPath(math.MaxUint64) + if *endID != 0 { + endKey = regionPath(*endID) + } + w := bufio.NewWriter(f) + defer w.Flush() + // Since the region key may be very long, using a larger rangeLimit will cause + // the message packet to exceed the grpc message size limit (4MB). Here we use + // a variable rangeLimit to work around. + rangeLimit := 10000 + for { + startKey := regionPath(nextID) + _, res, err := loadRange(client, startKey, endKey, rangeLimit) + if err != nil { + if rangeLimit /= 2; rangeLimit >= minKVRangeLimit { + continue + } + return err + } + + for _, s := range res { + region := &metapb.Region{} + if err := region.Unmarshal([]byte(s)); err != nil { + return errors.WithStack(err) + } + nextID = region.GetId() + 1 + fmt.Fprintln(w, core.HexRegionMeta(region)) + } + + if len(res) < rangeLimit { + return nil + } + } +} + +func loadRange(client *clientv3.Client, key, endKey string, limit int) ([]string, []string, error) { + key = path.Join(rootPath, key) + endKey = path.Join(rootPath, endKey) + + withRange := clientv3.WithRange(endKey) + withLimit := clientv3.WithLimit(int64(limit)) + resp, err := etcdutil.EtcdKVGet(client, key, withRange, withLimit) + if err != nil { + return nil, nil, err + } + keys := make([]string, 0, len(resp.Kvs)) + values := make([]string, 0, len(resp.Kvs)) + for _, item := range resp.Kvs { + keys = append(keys, strings.TrimPrefix(strings.TrimPrefix(string(item.Key), rootPath), "/")) + values = append(values, string(item.Value)) + } + return keys, values, nil +} From 84e3ca63155e2a3bf1d820c00242b9a748b32399 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 11 Jul 2019 17:05:34 +0800 Subject: [PATCH 2/6] server: use an individual package for kv (#1622) * use an individual package for kv Signed-off-by: Ryan Leung * rename all kv related namings to storage in core package Signed-off-by: Ryan Leung --- server/cluster.go | 6 +- server/cluster_info.go | 50 +-- server/cluster_info_test.go | 69 ++-- server/cluster_test.go | 15 +- server/config_test.go | 8 +- server/coordinator.go | 2 +- server/coordinator_test.go | 19 +- server/core/kv.go | 296 ----------------- .../core/{region_kv.go => region_storage.go} | 89 +++--- server/core/storage.go | 297 ++++++++++++++++++ server/core/{kv_test.go => storage_test.go} | 75 ++--- server/grpc_service.go | 6 +- server/handler.go | 4 +- server/id.go | 2 +- server/{ => kv}/etcd_kv.go | 85 ++++- server/{ => kv}/etcd_kv_test.go | 67 +++- server/kv/kv.go | 22 ++ server/{core => kv}/levedb_kv.go | 42 +-- server/{core/kv_base.go => kv/mem_kv.go} | 14 +- server/kv/metrics.go | 40 +++ server/leader.go | 11 +- server/metrics.go | 2 - server/namespace/namespace.go | 8 +- server/option.go | 8 +- server/region_syncer/history_buffer.go | 5 +- server/region_syncer/history_buffer_test.go | 13 +- server/region_syncer/server.go | 4 +- server/server.go | 62 ++-- server/tso.go | 2 +- server/util.go | 51 --- table/namespace_classifier.go | 24 +- table/namespace_classifier_test.go | 9 +- .../region_syncer/region_syncer_test.go | 4 +- 33 files changed, 768 insertions(+), 643 deletions(-) delete mode 100644 server/core/kv.go rename server/core/{region_kv.go => region_storage.go} (62%) create mode 100644 server/core/storage.go rename server/core/{kv_test.go => storage_test.go} (69%) rename server/{ => kv}/etcd_kv.go (51%) rename server/{ => kv}/etcd_kv_test.go (55%) create mode 100644 server/kv/kv.go rename server/{core => kv}/levedb_kv.go (58%) rename server/{core/kv_base.go => kv/mem_kv.go} (83%) create mode 100644 server/kv/metrics.go diff --git a/server/cluster.go b/server/cluster.go index 96887ab00dc9..3f6ab533b968 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -109,7 +109,7 @@ func (c *RaftCluster) isInitialized() bool { // yet. func (c *RaftCluster) loadBootstrapTime() (time.Time, error) { var t time.Time - data, err := c.s.kv.Load(c.s.kv.ClusterStatePath("raft_bootstrap_time")) + data, err := c.s.storage.Load(c.s.storage.ClusterStatePath("raft_bootstrap_time")) if err != nil { return t, err } @@ -128,7 +128,7 @@ func (c *RaftCluster) start() error { return nil } - cluster, err := loadClusterInfo(c.s.idAlloc, c.s.kv, c.s.scheduleOpt) + cluster, err := loadClusterInfo(c.s.idAlloc, c.s.storage, c.s.scheduleOpt) if err != nil { return err } @@ -555,7 +555,7 @@ func (c *RaftCluster) SetStoreWeight(storeID uint64, leaderWeight, regionWeight return core.NewStoreNotFoundErr(storeID) } - if err := c.s.kv.SaveStoreWeight(storeID, leaderWeight, regionWeight); err != nil { + if err := c.s.storage.SaveStoreWeight(storeID, leaderWeight, regionWeight); err != nil { return err } diff --git a/server/cluster_info.go b/server/cluster_info.go index c40028b8d04b..b0edebd91487 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -32,7 +32,7 @@ type clusterInfo struct { sync.RWMutex core *core.BasicCluster id core.IDAllocator - kv *core.KV + storage *core.Storage meta *metapb.Cluster opt *scheduleOption regionStats *statistics.RegionStatistics @@ -45,12 +45,12 @@ type clusterInfo struct { var defaultChangedRegionsLimit = 10000 -func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo { +func newClusterInfo(id core.IDAllocator, opt *scheduleOption, storage *core.Storage) *clusterInfo { return &clusterInfo{ core: core.NewBasicCluster(), id: id, opt: opt, - kv: kv, + storage: storage, labelLevelStats: statistics.NewLabelLevelStatistics(), storesStats: statistics.NewStoresStats(), prepareChecker: newPrepareChecker(), @@ -60,11 +60,11 @@ func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clus } // Return nil if cluster is not bootstrapped. -func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*clusterInfo, error) { - c := newClusterInfo(id, opt, kv) +func loadClusterInfo(id core.IDAllocator, storage *core.Storage, opt *scheduleOption) (*clusterInfo, error) { + c := newClusterInfo(id, opt, storage) c.meta = &metapb.Cluster{} - ok, err := kv.LoadMeta(c.meta) + ok, err := storage.LoadMeta(c.meta) if err != nil { return nil, err } @@ -73,7 +73,7 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl } start := time.Now() - if err := kv.LoadStores(c.core.Stores); err != nil { + if err := storage.LoadStores(c.core.Stores); err != nil { return nil, err } log.Info("load stores", @@ -82,7 +82,7 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl ) start = time.Now() - if err := kv.LoadRegions(c.core.Regions); err != nil { + if err := storage.LoadRegions(c.core.Regions); err != nil { return nil, err } log.Info("load regions", @@ -117,7 +117,7 @@ func (c *clusterInfo) OnStoreVersionChange() { // it will update the cluster version. if clusterVersion.LessThan(*minVersion) { c.opt.SetClusterVersion(*minVersion) - err := c.opt.persist(c.kv) + err := c.opt.persist(c.storage) if err != nil { log.Error("persist cluster version meet error", zap.Error(err)) } @@ -176,8 +176,8 @@ func (c *clusterInfo) putMeta(meta *metapb.Cluster) error { } func (c *clusterInfo) putMetaLocked(meta *metapb.Cluster) error { - if c.kv != nil { - if err := c.kv.SaveMeta(meta); err != nil { + if c.storage != nil { + if err := c.storage.SaveMeta(meta); err != nil { return err } } @@ -199,8 +199,8 @@ func (c *clusterInfo) putStore(store *core.StoreInfo) error { } func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error { - if c.kv != nil { - if err := c.kv.SaveStore(store.GetMeta()); err != nil { + if c.storage != nil { + if err := c.storage.SaveStore(store.GetMeta()); err != nil { return err } } @@ -216,8 +216,8 @@ func (c *clusterInfo) deleteStore(store *core.StoreInfo) error { } func (c *clusterInfo) deleteStoreLocked(store *core.StoreInfo) error { - if c.kv != nil { - if err := c.kv.DeleteStore(store.GetMeta()); err != nil { + if c.storage != nil { + if err := c.storage.DeleteStore(store.GetMeta()); err != nil { return err } } @@ -348,8 +348,8 @@ func (c *clusterInfo) putRegion(region *core.RegionInfo) error { } func (c *clusterInfo) putRegionLocked(region *core.RegionInfo) error { - if c.kv != nil { - if err := c.kv.SaveRegion(region.GetMeta()); err != nil { + if c.storage != nil { + if err := c.storage.SaveRegion(region.GetMeta()); err != nil { return err } } @@ -525,7 +525,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { isReadUpdate, readItem := c.CheckReadStatus(region) c.RUnlock() - // Save to KV if meta is updated. + // Save to storage if meta is updated. // Save to cache if meta or leader is updated, or contains any down/pending peer. // Mark isNew if the region in cache does not have leader. var saveKV, saveCache, isNew bool @@ -589,11 +589,11 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { } } - if saveKV && c.kv != nil { - if err := c.kv.SaveRegion(region.GetMeta()); err != nil { - // Not successfully saved to kv is not fatal, it only leads to longer warm-up + if saveKV && c.storage != nil { + if err := c.storage.SaveRegion(region.GetMeta()); err != nil { + // Not successfully saved to storage is not fatal, it only leads to longer warm-up // after restart. Here we only log the error then go on updating cache. - log.Error("fail to save region to kv", + log.Error("fail to save region to storage", zap.Uint64("region-id", region.GetID()), zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta())), zap.Error(err)) @@ -615,10 +615,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { if saveCache { overlaps := c.core.Regions.SetRegion(region) - if c.kv != nil { + if c.storage != nil { for _, item := range overlaps { - if err := c.kv.DeleteRegion(item); err != nil { - log.Error("fail to delete region from kv", + if err := c.storage.DeleteRegion(item); err != nil { + log.Error("fail to delete region from storage", zap.Uint64("region-id", item.GetId()), zap.Reflect("region-meta", core.HexRegionMeta(item)), zap.Error(err)) diff --git a/server/cluster_info_test.go b/server/cluster_info_test.go index 1849213b7382..c85bd54ed5ed 100644 --- a/server/cluster_info_test.go +++ b/server/cluster_info_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" ) var _ = Suite(&testStoresInfoSuite{}) @@ -197,11 +198,11 @@ func checkRegion(c *C, a *core.RegionInfo, b *core.RegionInfo) { } } -func checkRegionsKV(c *C, kv *core.KV, regions []*core.RegionInfo) { - if kv != nil { +func checkRegionsKV(c *C, s *core.Storage, regions []*core.RegionInfo) { + if s != nil { for _, region := range regions { var meta metapb.Region - ok, err := kv.LoadRegion(region.GetID(), &meta) + ok, err := s.LoadRegion(region.GetID(), &meta) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(&meta, DeepEquals, region.GetMeta()) @@ -253,23 +254,23 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { server, cleanup := mustRunTestServer(c) defer cleanup() - kv := server.kv + storage := server.storage _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) // Cluster is not bootstrapped. - cluster, err := loadClusterInfo(server.idAlloc, kv, opt) + cluster, err := loadClusterInfo(server.idAlloc, storage, opt) c.Assert(err, IsNil) c.Assert(cluster, IsNil) // Save meta, stores and regions. n := 10 meta := &metapb.Cluster{Id: 123} - c.Assert(kv.SaveMeta(meta), IsNil) - stores := mustSaveStores(c, kv, n) - regions := mustSaveRegions(c, kv, n) + c.Assert(storage.SaveMeta(meta), IsNil) + stores := mustSaveStores(c, storage, n) + regions := mustSaveRegions(c, storage, n) - cluster, err = loadClusterInfo(server.idAlloc, kv, opt) + cluster, err = loadClusterInfo(server.idAlloc, storage, opt) c.Assert(err, IsNil) c.Assert(cluster, NotNil) @@ -288,7 +289,7 @@ func (s *testClusterInfoSuite) TestLoadClusterInfo(c *C) { func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV())) n, np := uint64(3), uint64(3) stores := newTestStores(n) @@ -324,7 +325,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { for _, store := range stores { tmp := &metapb.Store{} - ok, err := cluster.kv.LoadStore(store.GetID(), tmp) + ok, err := cluster.storage.LoadStore(store.GetID(), tmp) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(tmp, DeepEquals, store.GetMeta()) @@ -334,7 +335,7 @@ func (s *testClusterInfoSuite) TestStoreHeartbeat(c *C) { func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + cluster := newClusterInfo(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV())) n, np := uint64(3), uint64(3) @@ -349,25 +350,25 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { // region does not exist. c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is the same, not updated. c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) origin := region // region is updated. region = origin.Clone(core.WithIncVersion()) regions[i] = region c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is stale (Version). stale := origin.Clone(core.WithIncConfVer()) c.Assert(cluster.handleRegionHeartbeat(stale), NotNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is updated. region = origin.Clone( @@ -377,13 +378,13 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { regions[i] = region c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // region is stale (ConfVer). stale = origin.Clone(core.WithIncConfVer()) c.Assert(cluster.handleRegionHeartbeat(stale), NotNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // Add a down peer. region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ @@ -420,13 +421,13 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { regions[i] = region c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) // Add peers. region = origin regions[i] = region c.Assert(cluster.handleRegionHeartbeat(region), IsNil) checkRegions(c, cluster.core.Regions, regions[:i+1]) - checkRegionsKV(c, cluster.kv, regions[:i+1]) + checkRegionsKV(c, cluster.storage, regions[:i+1]) } regionCounts := make(map[uint64]int) @@ -463,11 +464,11 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { c.Assert(store.GetRegionSize(), Equals, cluster.core.Regions.GetStoreRegionSize(store.GetID())) } - // Test with kv. - if kv := cluster.kv; kv != nil { + // Test with storage. + if storage := cluster.storage; storage != nil { for _, region := range regions { tmp := &metapb.Region{} - ok, err := kv.LoadRegion(region.GetID(), tmp) + ok, err := storage.LoadRegion(region.GetID(), tmp) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(tmp, DeepEquals, region.GetMeta()) @@ -482,15 +483,15 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { ) c.Assert(cluster.handleRegionHeartbeat(overlapRegion), NotNil) region := &metapb.Region{} - ok, err := kv.LoadRegion(regions[n-1].GetID(), region) + ok, err := storage.LoadRegion(regions[n-1].GetID(), region) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(region, DeepEquals, regions[n-1].GetMeta()) - ok, err = kv.LoadRegion(regions[n-2].GetID(), region) + ok, err = storage.LoadRegion(regions[n-2].GetID(), region) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(region, DeepEquals, regions[n-2].GetMeta()) - ok, err = kv.LoadRegion(overlapRegion.GetID(), region) + ok, err = storage.LoadRegion(overlapRegion.GetID(), region) c.Assert(ok, IsFalse) c.Assert(err, IsNil) @@ -501,13 +502,13 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) { ) c.Assert(cluster.handleRegionHeartbeat(overlapRegion), IsNil) region = &metapb.Region{} - ok, err = kv.LoadRegion(regions[n-1].GetID(), region) + ok, err = storage.LoadRegion(regions[n-1].GetID(), region) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - ok, err = kv.LoadRegion(regions[n-2].GetID(), region) + ok, err = storage.LoadRegion(regions[n-2].GetID(), region) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - ok, err = kv.LoadRegion(overlapRegion.GetID(), region) + ok, err = storage.LoadRegion(overlapRegion.GetID(), region) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(region, DeepEquals, overlapRegion.GetMeta()) @@ -693,7 +694,7 @@ func (s *testClusterUtilSuite) TestCheckStaleRegion(c *C) { c.Assert(checkStaleRegion(region, origin), NotNil) } -func mustSaveStores(c *C, kv *core.KV, n int) []*metapb.Store { +func mustSaveStores(c *C, s *core.Storage, n int) []*metapb.Store { stores := make([]*metapb.Store, 0, n) for i := 0; i < n; i++ { store := &metapb.Store{Id: uint64(i)} @@ -701,13 +702,13 @@ func mustSaveStores(c *C, kv *core.KV, n int) []*metapb.Store { } for _, store := range stores { - c.Assert(kv.SaveStore(store), IsNil) + c.Assert(s.SaveStore(store), IsNil) } return stores } -func mustSaveRegions(c *C, kv *core.KV, n int) []*metapb.Region { +func mustSaveRegions(c *C, s *core.Storage, n int) []*metapb.Region { regions := make([]*metapb.Region, 0, n) for i := 0; i < n; i++ { region := newTestRegionMeta(uint64(i)) @@ -715,9 +716,9 @@ func mustSaveRegions(c *C, kv *core.KV, n int) []*metapb.Region { } for _, region := range regions { - c.Assert(kv.SaveRegion(region), IsNil) + c.Assert(s.SaveRegion(region), IsNil) } - c.Assert(kv.Flush(), IsNil) + c.Assert(s.Flush(), IsNil) return regions } diff --git a/server/cluster_test.go b/server/cluster_test.go index b70322b29381..90f9bfd71ec1 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" "github.com/pkg/errors" "google.golang.org/grpc" ) @@ -47,7 +48,7 @@ type testClusterSuite struct { } type testErrorKV struct { - core.KVBase + kv.Base } func (kv *testErrorKV) Save(key, value string) error { @@ -502,7 +503,7 @@ func (s *testClusterSuite) TestConcurrentHandleRegion(c *C) { c.Assert(err, IsNil) s.svr.cluster.RLock() s.svr.cluster.cachedCluster.Lock() - s.svr.cluster.cachedCluster.kv = core.NewKV(core.NewMemoryKV()) + s.svr.cluster.cachedCluster.storage = core.NewStorage(kv.NewMemoryKV()) s.svr.cluster.cachedCluster.Unlock() s.svr.cluster.RUnlock() var stores []*metapb.Store @@ -592,7 +593,7 @@ type testGetStoresSuite struct { func (s *testGetStoresSuite) SetUpSuite(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - s.cluster = newClusterInfo(mockid.NewIDAllocator(), opt, core.NewKV(core.NewMemoryKV())) + s.cluster = newClusterInfo(mockid.NewIDAllocator(), opt, core.NewStorage(kv.NewMemoryKV())) stores := newTestStores(200) @@ -656,8 +657,8 @@ func (s *testClusterSuite) TestSetScheduleOpt(c *C) { c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0) //PUT GET failed - oldKV := s.svr.kv - s.svr.kv = core.NewKV(&testErrorKV{}) + oldStorage := s.svr.storage + s.svr.storage = core.NewStorage(&testErrorKV{}) replicateCfg.MaxReplicas = 7 scheduleCfg.MaxSnapshotCount = 20 pdServerCfg.UseRegionStorage = false @@ -675,11 +676,11 @@ func (s *testClusterSuite) TestSetScheduleOpt(c *C) { c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0) //DELETE failed - s.svr.kv = oldKV + s.svr.storage = oldStorage c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil) c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil) - s.svr.kv = core.NewKV(&testErrorKV{}) + s.svr.storage = core.NewStorage(&testErrorKV{}) c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil) c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200)) c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil) diff --git a/server/config_test.go b/server/config_test.go index ea09c4c4ea41..e2f21f084320 100644 --- a/server/config_test.go +++ b/server/config_test.go @@ -21,9 +21,9 @@ import ( "time" "github.com/BurntSushi/toml" - . "github.com/pingcap/check" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" ) var _ = Suite(&testConfigSuite{}) @@ -46,19 +46,19 @@ func (s *testConfigSuite) TestBadFormatJoinAddr(c *C) { func (s *testConfigSuite) TestReloadConfig(c *C) { _, opt, err := newTestScheduleConfig() c.Assert(err, IsNil) - kv := core.NewKV(core.NewMemoryKV()) + storage := core.NewStorage(kv.NewMemoryKV()) scheduleCfg := opt.load() scheduleCfg.MaxSnapshotCount = 10 opt.SetMaxReplicas(5) opt.loadPDServerConfig().UseRegionStorage = true - c.Assert(opt.persist(kv), IsNil) + c.Assert(opt.persist(storage), IsNil) // suppose we add a new default enable scheduler "adjacent-region" defaultSchedulers := []string{"balance-region", "balance-leader", "hot-region", "label", "adjacent-region"} _, newOpt, err := newTestScheduleConfig() c.Assert(err, IsNil) newOpt.AddSchedulerCfg("adjacent-region", []string{}) - c.Assert(newOpt.reload(kv), IsNil) + c.Assert(newOpt.reload(storage), IsNil) schedulers := newOpt.GetSchedulers() c.Assert(schedulers, HasLen, 5) c.Assert(newOpt.loadPDServerConfig().UseRegionStorage, IsTrue) diff --git a/server/coordinator.go b/server/coordinator.go index e1159d75d739..ea26049b941d 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -240,7 +240,7 @@ func (c *coordinator) run() { // Removes the invalid scheduler config and persist. scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k] c.cluster.opt.store(scheduleCfg) - if err := c.cluster.opt.persist(c.cluster.kv); err != nil { + if err := c.cluster.opt.persist(c.cluster.storage); err != nil { log.Error("cannot persist schedule config", zap.Error(err)) } diff --git a/server/coordinator_test.go b/server/coordinator_test.go index b5a0517999ae..aa21d5c5028f 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" "github.com/pingcap/pd/server/schedulers" @@ -56,7 +57,7 @@ func newTestClusterInfo(opt *scheduleOption) *testClusterInfo { return &testClusterInfo{clusterInfo: newClusterInfo( mockid.NewIDAllocator(), opt, - core.NewKV(core.NewMemoryKV()), + core.NewStorage(kv.NewMemoryKV()), )} } @@ -622,7 +623,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { c.Assert(co.removeScheduler("balance-hot-region-scheduler"), IsNil) c.Assert(co.removeScheduler("label-scheduler"), IsNil) c.Assert(co.schedulers, HasLen, 2) - c.Assert(co.cluster.opt.persist(co.cluster.kv), IsNil) + c.Assert(co.cluster.opt.persist(co.cluster.storage), IsNil) co.stop() co.wg.Wait() // make a new coordinator for testing @@ -634,7 +635,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // suppose we add a new default enable scheduler newOpt.AddSchedulerCfg("adjacent-region", []string{}) c.Assert(newOpt.GetSchedulers(), HasLen, 5) - c.Assert(newOpt.reload(co.cluster.kv), IsNil) + c.Assert(newOpt.reload(co.cluster.storage), IsNil) c.Assert(newOpt.GetSchedulers(), HasLen, 7) tc.clusterInfo.opt = newOpt @@ -646,7 +647,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // suppose restart PD again _, newOpt, err = newTestScheduleConfig() c.Assert(err, IsNil) - c.Assert(newOpt.reload(tc.kv), IsNil) + c.Assert(newOpt.reload(tc.storage), IsNil) tc.clusterInfo.opt = newOpt co = newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) co.run() @@ -665,13 +666,13 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { // the scheduler that is not enable by default will be completely deleted c.Assert(co.cluster.opt.GetSchedulers(), HasLen, 6) c.Assert(co.schedulers, HasLen, 4) - c.Assert(co.cluster.opt.persist(co.cluster.kv), IsNil) + c.Assert(co.cluster.opt.persist(co.cluster.storage), IsNil) co.stop() co.wg.Wait() _, newOpt, err = newTestScheduleConfig() c.Assert(err, IsNil) - c.Assert(newOpt.reload(co.cluster.kv), IsNil) + c.Assert(newOpt.reload(co.cluster.storage), IsNil) tc.clusterInfo.opt = newOpt co = newCoordinator(tc.clusterInfo, hbStreams, namespace.DefaultClassifier) @@ -1012,11 +1013,11 @@ func getHeartBeatStreams(c *C, tc *testClusterInfo) *heartbeatStreams { config := NewTestSingleConfig(c) svr, err := CreateServer(config, nil) c.Assert(err, IsNil) - kvBase := newEtcdKVBase(svr) + kvBase := kv.NewEtcdKVBase(svr.client, svr.rootPath) path := filepath.Join(svr.cfg.DataDir, "region-meta") - regionKV, err := core.NewRegionKV(path) + regionStorage, err := core.NewRegionStorage(path) c.Assert(err, IsNil) - svr.kv = core.NewKV(kvBase).SetRegionKV(regionKV) + svr.storage = core.NewStorage(kvBase).SetRegionStorage(regionStorage) cluster := newRaftCluster(svr, tc.getClusterID()) cluster.cachedCluster = tc.clusterInfo hbStreams := newHeartbeatStreams(tc.getClusterID(), cluster) diff --git a/server/core/kv.go b/server/core/kv.go deleted file mode 100644 index fb4071972188..000000000000 --- a/server/core/kv.go +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package core - -import ( - "encoding/json" - "fmt" - "math" - "path" - "strconv" - "sync/atomic" - - "github.com/gogo/protobuf/proto" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pkg/errors" -) - -const ( - clusterPath = "raft" - configPath = "config" - schedulePath = "schedule" - gcPath = "gc" -) - -const ( - maxKVRangeLimit = 10000 - minKVRangeLimit = 100 -) - -// KV wraps all kv operations, keep it stateless. -type KV struct { - KVBase - regionKV *RegionKV - useRegionKV int32 -} - -// NewKV creates KV instance with KVBase. -func NewKV(base KVBase) *KV { - return &KV{ - KVBase: base, - } -} - -// SetRegionKV sets the region storage. -func (kv *KV) SetRegionKV(regionKV *RegionKV) *KV { - kv.regionKV = regionKV - return kv -} - -// GetRegionKV gets the region storage. -func (kv *KV) GetRegionKV() *RegionKV { - return kv.regionKV -} - -// SwitchToRegionStorage switches to the region storage. -func (kv *KV) SwitchToRegionStorage() { - atomic.StoreInt32(&kv.useRegionKV, 1) -} - -// SwitchToDefaultStorage switches to the to default storage. -func (kv *KV) SwitchToDefaultStorage() { - atomic.StoreInt32(&kv.useRegionKV, 0) -} - -func (kv *KV) storePath(storeID uint64) string { - return path.Join(clusterPath, "s", fmt.Sprintf("%020d", storeID)) -} - -func regionPath(regionID uint64) string { - return path.Join(clusterPath, "r", fmt.Sprintf("%020d", regionID)) -} - -// ClusterStatePath returns the path to save an option. -func (kv *KV) ClusterStatePath(option string) string { - return path.Join(clusterPath, "status", option) -} - -func (kv *KV) storeLeaderWeightPath(storeID uint64) string { - return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader") -} - -func (kv *KV) storeRegionWeightPath(storeID uint64) string { - return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "region") -} - -// LoadMeta loads cluster meta from KV store. -func (kv *KV) LoadMeta(meta *metapb.Cluster) (bool, error) { - return loadProto(kv.KVBase, clusterPath, meta) -} - -// SaveMeta save cluster meta to KV store. -func (kv *KV) SaveMeta(meta *metapb.Cluster) error { - return saveProto(kv.KVBase, clusterPath, meta) -} - -// LoadStore loads one store from KV. -func (kv *KV) LoadStore(storeID uint64, store *metapb.Store) (bool, error) { - return loadProto(kv.KVBase, kv.storePath(storeID), store) -} - -// SaveStore saves one store to KV. -func (kv *KV) SaveStore(store *metapb.Store) error { - return saveProto(kv.KVBase, kv.storePath(store.GetId()), store) -} - -// DeleteStore deletes one store from KV. -func (kv *KV) DeleteStore(store *metapb.Store) error { - return kv.Delete(kv.storePath(store.GetId())) -} - -// LoadRegion loads one regoin from KV. -func (kv *KV) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) { - if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return loadProto(kv.regionKV, regionPath(regionID), region) - } - return loadProto(kv.KVBase, regionPath(regionID), region) -} - -// LoadRegions loads all regions from KV to RegionsInfo. -func (kv *KV) LoadRegions(regions *RegionsInfo) error { - if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return loadRegions(kv.regionKV, regions) - } - return loadRegions(kv.KVBase, regions) -} - -// SaveRegion saves one region to KV. -func (kv *KV) SaveRegion(region *metapb.Region) error { - if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return kv.regionKV.SaveRegion(region) - } - return saveProto(kv.KVBase, regionPath(region.GetId()), region) -} - -// DeleteRegion deletes one region from KV. -func (kv *KV) DeleteRegion(region *metapb.Region) error { - if atomic.LoadInt32(&kv.useRegionKV) > 0 { - return deleteRegion(kv.regionKV, region) - } - return deleteRegion(kv.KVBase, region) -} - -// SaveConfig stores marshalable cfg to the configPath. -func (kv *KV) SaveConfig(cfg interface{}) error { - value, err := json.Marshal(cfg) - if err != nil { - return errors.WithStack(err) - } - return kv.Save(configPath, string(value)) -} - -// LoadConfig loads config from configPath then unmarshal it to cfg. -func (kv *KV) LoadConfig(cfg interface{}) (bool, error) { - value, err := kv.Load(configPath) - if err != nil { - return false, err - } - if value == "" { - return false, nil - } - err = json.Unmarshal([]byte(value), cfg) - if err != nil { - return false, errors.WithStack(err) - } - return true, nil -} - -// LoadStores loads all stores from KV to StoresInfo. -func (kv *KV) LoadStores(stores *StoresInfo) error { - nextID := uint64(0) - endKey := kv.storePath(math.MaxUint64) - for { - key := kv.storePath(nextID) - _, res, err := kv.LoadRange(key, endKey, minKVRangeLimit) - if err != nil { - return err - } - for _, s := range res { - store := &metapb.Store{} - if err := store.Unmarshal([]byte(s)); err != nil { - return errors.WithStack(err) - } - leaderWeight, err := kv.loadFloatWithDefaultValue(kv.storeLeaderWeightPath(store.GetId()), 1.0) - if err != nil { - return err - } - regionWeight, err := kv.loadFloatWithDefaultValue(kv.storeRegionWeightPath(store.GetId()), 1.0) - if err != nil { - return err - } - newStoreInfo := NewStoreInfo(store, SetLeaderWeight(leaderWeight), SetRegionWeight(regionWeight)) - - nextID = store.GetId() + 1 - stores.SetStore(newStoreInfo) - } - if len(res) < minKVRangeLimit { - return nil - } - } -} - -// SaveStoreWeight saves a store's leader and region weight to KV. -func (kv *KV) SaveStoreWeight(storeID uint64, leader, region float64) error { - leaderValue := strconv.FormatFloat(leader, 'f', -1, 64) - if err := kv.Save(kv.storeLeaderWeightPath(storeID), leaderValue); err != nil { - return err - } - regionValue := strconv.FormatFloat(region, 'f', -1, 64) - return kv.Save(kv.storeRegionWeightPath(storeID), regionValue) -} - -func (kv *KV) loadFloatWithDefaultValue(path string, def float64) (float64, error) { - res, err := kv.Load(path) - if err != nil { - return 0, err - } - if res == "" { - return def, nil - } - val, err := strconv.ParseFloat(res, 64) - if err != nil { - return 0, errors.WithStack(err) - } - return val, nil -} - -// Flush flushes the dirty region to storage. -func (kv *KV) Flush() error { - if kv.regionKV != nil { - return kv.regionKV.FlushRegion() - } - return nil -} - -// Close closes the kv. -func (kv *KV) Close() error { - if kv.regionKV != nil { - return kv.regionKV.Close() - } - return nil -} - -// SaveGCSafePoint saves new GC safe point to KV. -func (kv *KV) SaveGCSafePoint(safePoint uint64) error { - key := path.Join(gcPath, "safe_point") - value := strconv.FormatUint(safePoint, 16) - return kv.Save(key, value) -} - -// LoadGCSafePoint loads current GC safe point from KV. -func (kv *KV) LoadGCSafePoint() (uint64, error) { - key := path.Join(gcPath, "safe_point") - value, err := kv.Load(key) - if err != nil { - return 0, err - } - if value == "" { - return 0, nil - } - safePoint, err := strconv.ParseUint(value, 16, 64) - if err != nil { - return 0, err - } - return safePoint, nil -} - -func loadProto(kv KVBase, key string, msg proto.Message) (bool, error) { - value, err := kv.Load(key) - if err != nil { - return false, err - } - if value == "" { - return false, nil - } - err = proto.Unmarshal([]byte(value), msg) - return true, errors.WithStack(err) -} - -func saveProto(kv KVBase, key string, msg proto.Message) error { - value, err := proto.Marshal(msg) - if err != nil { - return errors.WithStack(err) - } - return kv.Save(key, string(value)) -} diff --git a/server/core/region_kv.go b/server/core/region_storage.go similarity index 62% rename from server/core/region_kv.go rename to server/core/region_storage.go index 06306edabffc..45dfe7c96673 100644 --- a/server/core/region_kv.go +++ b/server/core/region_storage.go @@ -21,15 +21,16 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" log "github.com/pingcap/log" + "github.com/pingcap/pd/server/kv" "github.com/pkg/errors" "go.uber.org/zap" ) var dirtyFlushTick = time.Second -// RegionKV is used to save regions. -type RegionKV struct { - *leveldbKV +// RegionStorage is used to save regions. +type RegionStorage struct { + *kv.LeveldbKV mu sync.RWMutex batchRegions map[string]*metapb.Region batchSize int @@ -41,21 +42,21 @@ type RegionKV struct { } const ( - //DefaultFlushRegionRate is the ttl to sync the regions to kv storage. + //DefaultFlushRegionRate is the ttl to sync the regions to region storage. defaultFlushRegionRate = 3 * time.Second - //DefaultBatchSize is the batch size to save the regions to kv storage. + //DefaultBatchSize is the batch size to save the regions to region storage. defaultBatchSize = 100 ) -// NewRegionKV returns a kv storage that is used to save regions. -func NewRegionKV(path string) (*RegionKV, error) { - levelDB, err := newLeveldbKV(path) +// NewRegionStorage returns a region storage that is used to save regions. +func NewRegionStorage(path string) (*RegionStorage, error) { + levelDB, err := kv.NewLeveldbKV(path) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) - kv := &RegionKV{ - leveldbKV: levelDB, + s := &RegionStorage{ + LeveldbKV: levelDB, batchSize: defaultBatchSize, flushRate: defaultFlushRegionRate, batchRegions: make(map[string]*metapb.Region, defaultBatchSize), @@ -63,11 +64,11 @@ func NewRegionKV(path string) (*RegionKV, error) { ctx: ctx, cancel: cancel, } - kv.backgroundFlush() - return kv, nil + s.backgroundFlush() + return s, nil } -func (kv *RegionKV) backgroundFlush() { +func (s *RegionStorage) backgroundFlush() { ticker := time.NewTicker(dirtyFlushTick) var ( isFlush bool @@ -78,35 +79,35 @@ func (kv *RegionKV) backgroundFlush() { for { select { case <-ticker.C: - kv.mu.RLock() - isFlush = kv.flushTime.Before(time.Now()) - kv.mu.RUnlock() + s.mu.RLock() + isFlush = s.flushTime.Before(time.Now()) + s.mu.RUnlock() if !isFlush { continue } - if err = kv.FlushRegion(); err != nil { + if err = s.FlushRegion(); err != nil { log.Error("flush regions meet error", zap.Error(err)) } - case <-kv.ctx.Done(): + case <-s.ctx.Done(): return } } }() } -// SaveRegion saves one region to KV. -func (kv *RegionKV) SaveRegion(region *metapb.Region) error { - kv.mu.Lock() - defer kv.mu.Unlock() - if kv.cacheSize < kv.batchSize-1 { - kv.batchRegions[regionPath(region.GetId())] = region - kv.cacheSize++ +// SaveRegion saves one region to storage. +func (s *RegionStorage) SaveRegion(region *metapb.Region) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.cacheSize < s.batchSize-1 { + s.batchRegions[regionPath(region.GetId())] = region + s.cacheSize++ - kv.flushTime = time.Now().Add(kv.flushRate) + s.flushTime = time.Now().Add(s.flushRate) return nil } - kv.batchRegions[regionPath(region.GetId())] = region - err := kv.flush() + s.batchRegions[regionPath(region.GetId())] = region + err := s.flush() if err != nil { return err @@ -114,11 +115,11 @@ func (kv *RegionKV) SaveRegion(region *metapb.Region) error { return nil } -func deleteRegion(kv KVBase, region *metapb.Region) error { - return kv.Delete(regionPath(region.GetId())) +func deleteRegion(kv kv.Base, region *metapb.Region) error { + return kv.Remove(regionPath(region.GetId())) } -func loadRegions(kv KVBase, regions *RegionsInfo) error { +func loadRegions(kv kv.Base, regions *RegionsInfo) error { nextID := uint64(0) endKey := regionPath(math.MaxUint64) @@ -157,28 +158,28 @@ func loadRegions(kv KVBase, regions *RegionsInfo) error { } } -// FlushRegion saves the cache region to region kv storage. -func (kv *RegionKV) FlushRegion() error { - kv.mu.Lock() - defer kv.mu.Unlock() - return kv.flush() +// FlushRegion saves the cache region to region storage. +func (s *RegionStorage) FlushRegion() error { + s.mu.Lock() + defer s.mu.Unlock() + return s.flush() } -func (kv *RegionKV) flush() error { - if err := kv.SaveRegions(kv.batchRegions); err != nil { +func (s *RegionStorage) flush() error { + if err := s.SaveRegions(s.batchRegions); err != nil { return err } - kv.cacheSize = 0 - kv.batchRegions = make(map[string]*metapb.Region, kv.batchSize) + s.cacheSize = 0 + s.batchRegions = make(map[string]*metapb.Region, s.batchSize) return nil } // Close closes the kv. -func (kv *RegionKV) Close() error { - err := kv.FlushRegion() +func (s *RegionStorage) Close() error { + err := s.FlushRegion() if err != nil { log.Error("meet error before close the region storage", zap.Error(err)) } - kv.cancel() - return kv.db.Close() + s.cancel() + return errors.WithStack(s.LeveldbKV.Close()) } diff --git a/server/core/storage.go b/server/core/storage.go new file mode 100644 index 000000000000..3b266ecc8618 --- /dev/null +++ b/server/core/storage.go @@ -0,0 +1,297 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "encoding/json" + "fmt" + "math" + "path" + "strconv" + "sync/atomic" + + "github.com/gogo/protobuf/proto" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/kv" + "github.com/pkg/errors" +) + +const ( + clusterPath = "raft" + configPath = "config" + schedulePath = "schedule" + gcPath = "gc" +) + +const ( + maxKVRangeLimit = 10000 + minKVRangeLimit = 100 +) + +// Storage wraps all kv operations, keep it stateless. +type Storage struct { + kv.Base + regionStorage *RegionStorage + useRegionStorage int32 +} + +// NewStorage creates Storage instance with Base. +func NewStorage(base kv.Base) *Storage { + return &Storage{ + Base: base, + } +} + +// SetRegionStorage sets the region storage. +func (s *Storage) SetRegionStorage(regionStorage *RegionStorage) *Storage { + s.regionStorage = regionStorage + return s +} + +// GetRegionStorage gets the region storage. +func (s *Storage) GetRegionStorage() *RegionStorage { + return s.regionStorage +} + +// SwitchToRegionStorage switches to the region storage. +func (s *Storage) SwitchToRegionStorage() { + atomic.StoreInt32(&s.useRegionStorage, 1) +} + +// SwitchToDefaultStorage switches to the to default storage. +func (s *Storage) SwitchToDefaultStorage() { + atomic.StoreInt32(&s.useRegionStorage, 0) +} + +func (s *Storage) storePath(storeID uint64) string { + return path.Join(clusterPath, "s", fmt.Sprintf("%020d", storeID)) +} + +func regionPath(regionID uint64) string { + return path.Join(clusterPath, "r", fmt.Sprintf("%020d", regionID)) +} + +// ClusterStatePath returns the path to save an option. +func (s *Storage) ClusterStatePath(option string) string { + return path.Join(clusterPath, "status", option) +} + +func (s *Storage) storeLeaderWeightPath(storeID uint64) string { + return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader") +} + +func (s *Storage) storeRegionWeightPath(storeID uint64) string { + return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "region") +} + +// LoadMeta loads cluster meta from storage. +func (s *Storage) LoadMeta(meta *metapb.Cluster) (bool, error) { + return loadProto(s.Base, clusterPath, meta) +} + +// SaveMeta save cluster meta to storage. +func (s *Storage) SaveMeta(meta *metapb.Cluster) error { + return saveProto(s.Base, clusterPath, meta) +} + +// LoadStore loads one store from storage. +func (s *Storage) LoadStore(storeID uint64, store *metapb.Store) (bool, error) { + return loadProto(s.Base, s.storePath(storeID), store) +} + +// SaveStore saves one store to storage. +func (s *Storage) SaveStore(store *metapb.Store) error { + return saveProto(s.Base, s.storePath(store.GetId()), store) +} + +// DeleteStore deletes one store from storage. +func (s *Storage) DeleteStore(store *metapb.Store) error { + return s.Remove(s.storePath(store.GetId())) +} + +// LoadRegion loads one regoin from storage. +func (s *Storage) LoadRegion(regionID uint64, region *metapb.Region) (bool, error) { + if atomic.LoadInt32(&s.useRegionStorage) > 0 { + return loadProto(s.regionStorage, regionPath(regionID), region) + } + return loadProto(s.Base, regionPath(regionID), region) +} + +// LoadRegions loads all regions from storage to RegionsInfo. +func (s *Storage) LoadRegions(regions *RegionsInfo) error { + if atomic.LoadInt32(&s.useRegionStorage) > 0 { + return loadRegions(s.regionStorage, regions) + } + return loadRegions(s.Base, regions) +} + +// SaveRegion saves one region to storage. +func (s *Storage) SaveRegion(region *metapb.Region) error { + if atomic.LoadInt32(&s.useRegionStorage) > 0 { + return s.regionStorage.SaveRegion(region) + } + return saveProto(s.Base, regionPath(region.GetId()), region) +} + +// DeleteRegion deletes one region from storage. +func (s *Storage) DeleteRegion(region *metapb.Region) error { + if atomic.LoadInt32(&s.useRegionStorage) > 0 { + return deleteRegion(s.regionStorage, region) + } + return deleteRegion(s.Base, region) +} + +// SaveConfig stores marshalable cfg to the configPath. +func (s *Storage) SaveConfig(cfg interface{}) error { + value, err := json.Marshal(cfg) + if err != nil { + return errors.WithStack(err) + } + return s.Save(configPath, string(value)) +} + +// LoadConfig loads config from configPath then unmarshal it to cfg. +func (s *Storage) LoadConfig(cfg interface{}) (bool, error) { + value, err := s.Load(configPath) + if err != nil { + return false, err + } + if value == "" { + return false, nil + } + err = json.Unmarshal([]byte(value), cfg) + if err != nil { + return false, errors.WithStack(err) + } + return true, nil +} + +// LoadStores loads all stores from storage to StoresInfo. +func (s *Storage) LoadStores(stores *StoresInfo) error { + nextID := uint64(0) + endKey := s.storePath(math.MaxUint64) + for { + key := s.storePath(nextID) + _, res, err := s.LoadRange(key, endKey, minKVRangeLimit) + if err != nil { + return err + } + for _, str := range res { + store := &metapb.Store{} + if err := store.Unmarshal([]byte(str)); err != nil { + return errors.WithStack(err) + } + leaderWeight, err := s.loadFloatWithDefaultValue(s.storeLeaderWeightPath(store.GetId()), 1.0) + if err != nil { + return err + } + regionWeight, err := s.loadFloatWithDefaultValue(s.storeRegionWeightPath(store.GetId()), 1.0) + if err != nil { + return err + } + newStoreInfo := NewStoreInfo(store, SetLeaderWeight(leaderWeight), SetRegionWeight(regionWeight)) + + nextID = store.GetId() + 1 + stores.SetStore(newStoreInfo) + } + if len(res) < minKVRangeLimit { + return nil + } + } +} + +// SaveStoreWeight saves a store's leader and region weight to storage. +func (s *Storage) SaveStoreWeight(storeID uint64, leader, region float64) error { + leaderValue := strconv.FormatFloat(leader, 'f', -1, 64) + if err := s.Save(s.storeLeaderWeightPath(storeID), leaderValue); err != nil { + return err + } + regionValue := strconv.FormatFloat(region, 'f', -1, 64) + return s.Save(s.storeRegionWeightPath(storeID), regionValue) +} + +func (s *Storage) loadFloatWithDefaultValue(path string, def float64) (float64, error) { + res, err := s.Load(path) + if err != nil { + return 0, err + } + if res == "" { + return def, nil + } + val, err := strconv.ParseFloat(res, 64) + if err != nil { + return 0, errors.WithStack(err) + } + return val, nil +} + +// Flush flushes the dirty region to storage. +func (s *Storage) Flush() error { + if s.regionStorage != nil { + return s.regionStorage.FlushRegion() + } + return nil +} + +// Close closes the s. +func (s *Storage) Close() error { + if s.regionStorage != nil { + return s.regionStorage.Close() + } + return nil +} + +// SaveGCSafePoint saves new GC safe point to storage. +func (s *Storage) SaveGCSafePoint(safePoint uint64) error { + key := path.Join(gcPath, "safe_point") + value := strconv.FormatUint(safePoint, 16) + return s.Save(key, value) +} + +// LoadGCSafePoint loads current GC safe point from storage. +func (s *Storage) LoadGCSafePoint() (uint64, error) { + key := path.Join(gcPath, "safe_point") + value, err := s.Load(key) + if err != nil { + return 0, err + } + if value == "" { + return 0, nil + } + safePoint, err := strconv.ParseUint(value, 16, 64) + if err != nil { + return 0, err + } + return safePoint, nil +} + +func loadProto(s kv.Base, key string, msg proto.Message) (bool, error) { + value, err := s.Load(key) + if err != nil { + return false, err + } + if value == "" { + return false, nil + } + err = proto.Unmarshal([]byte(value), msg) + return true, errors.WithStack(err) +} + +func saveProto(s kv.Base, key string, msg proto.Message) error { + value, err := proto.Marshal(msg) + if err != nil { + return errors.WithStack(err) + } + return s.Save(key, string(value)) +} diff --git a/server/core/kv_test.go b/server/core/storage_test.go similarity index 69% rename from server/core/kv_test.go rename to server/core/storage_test.go index f671a3b0b1d6..b46a4146a812 100644 --- a/server/core/kv_test.go +++ b/server/core/storage_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/kv" "github.com/pkg/errors" ) @@ -28,51 +29,51 @@ type testKVSuite struct { } func (s *testKVSuite) TestBasic(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) - c.Assert(kv.storePath(123), Equals, "raft/s/00000000000000000123") + c.Assert(storage.storePath(123), Equals, "raft/s/00000000000000000123") c.Assert(regionPath(123), Equals, "raft/r/00000000000000000123") meta := &metapb.Cluster{Id: 123} - ok, err := kv.LoadMeta(meta) + ok, err := storage.LoadMeta(meta) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - c.Assert(kv.SaveMeta(meta), IsNil) + c.Assert(storage.SaveMeta(meta), IsNil) newMeta := &metapb.Cluster{} - ok, err = kv.LoadMeta(newMeta) + ok, err = storage.LoadMeta(newMeta) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(newMeta, DeepEquals, meta) store := &metapb.Store{Id: 123} - ok, err = kv.LoadStore(123, store) + ok, err = storage.LoadStore(123, store) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - c.Assert(kv.SaveStore(store), IsNil) + c.Assert(storage.SaveStore(store), IsNil) newStore := &metapb.Store{} - ok, err = kv.LoadStore(123, newStore) + ok, err = storage.LoadStore(123, newStore) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(newStore, DeepEquals, store) region := &metapb.Region{Id: 123} - ok, err = kv.LoadRegion(123, region) + ok, err = storage.LoadRegion(123, region) c.Assert(ok, IsFalse) c.Assert(err, IsNil) - c.Assert(kv.SaveRegion(region), IsNil) + c.Assert(storage.SaveRegion(region), IsNil) newRegion := &metapb.Region{} - ok, err = kv.LoadRegion(123, newRegion) + ok, err = storage.LoadRegion(123, newRegion) c.Assert(ok, IsTrue) c.Assert(err, IsNil) c.Assert(newRegion, DeepEquals, region) - err = kv.DeleteRegion(region) + err = storage.DeleteRegion(region) c.Assert(err, IsNil) - ok, err = kv.LoadRegion(123, newRegion) + ok, err = storage.LoadRegion(123, newRegion) c.Assert(ok, IsFalse) c.Assert(err, IsNil) } -func mustSaveStores(c *C, kv *KV, n int) []*metapb.Store { +func mustSaveStores(c *C, s *Storage, n int) []*metapb.Store { stores := make([]*metapb.Store, 0, n) for i := 0; i < n; i++ { store := &metapb.Store{Id: uint64(i)} @@ -80,19 +81,19 @@ func mustSaveStores(c *C, kv *KV, n int) []*metapb.Store { } for _, store := range stores { - c.Assert(kv.SaveStore(store), IsNil) + c.Assert(s.SaveStore(store), IsNil) } return stores } func (s *testKVSuite) TestLoadStores(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) cache := NewStoresInfo() n := 10 - stores := mustSaveStores(c, kv, n) - c.Assert(kv.LoadStores(cache), IsNil) + stores := mustSaveStores(c, storage, n) + c.Assert(storage.LoadStores(cache), IsNil) c.Assert(cache.GetStoreCount(), Equals, n) for _, store := range cache.GetMetaStores() { @@ -101,14 +102,14 @@ func (s *testKVSuite) TestLoadStores(c *C) { } func (s *testKVSuite) TestStoreWeight(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) cache := NewStoresInfo() const n = 3 - mustSaveStores(c, kv, n) - c.Assert(kv.SaveStoreWeight(1, 2.0, 3.0), IsNil) - c.Assert(kv.SaveStoreWeight(2, 0.2, 0.3), IsNil) - c.Assert(kv.LoadStores(cache), IsNil) + mustSaveStores(c, storage, n) + c.Assert(storage.SaveStoreWeight(1, 2.0, 3.0), IsNil) + c.Assert(storage.SaveStoreWeight(2, 0.2, 0.3), IsNil) + c.Assert(storage.LoadStores(cache), IsNil) leaderWeights := []float64{1.0, 2.0, 0.2} regionWeights := []float64{1.0, 3.0, 0.3} for i := 0; i < n; i++ { @@ -117,7 +118,7 @@ func (s *testKVSuite) TestStoreWeight(c *C) { } } -func mustSaveRegions(c *C, kv *KV, n int) []*metapb.Region { +func mustSaveRegions(c *C, s *Storage, n int) []*metapb.Region { regions := make([]*metapb.Region, 0, n) for i := 0; i < n; i++ { region := newTestRegionMeta(uint64(i)) @@ -125,19 +126,19 @@ func mustSaveRegions(c *C, kv *KV, n int) []*metapb.Region { } for _, region := range regions { - c.Assert(kv.SaveRegion(region), IsNil) + c.Assert(s.SaveRegion(region), IsNil) } return regions } func (s *testKVSuite) TestLoadRegions(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) cache := NewRegionsInfo() n := 10 - regions := mustSaveRegions(c, kv, n) - c.Assert(kv.LoadRegions(cache), IsNil) + regions := mustSaveRegions(c, storage, n) + c.Assert(storage.LoadRegions(cache), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { @@ -146,12 +147,12 @@ func (s *testKVSuite) TestLoadRegions(c *C) { } func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) { - kv := NewKV(&KVWithMaxRangeLimit{KVBase: NewMemoryKV(), rangeLimit: 500}) + storage := NewStorage(&KVWithMaxRangeLimit{Base: kv.NewMemoryKV(), rangeLimit: 500}) cache := NewRegionsInfo() n := 1000 - regions := mustSaveRegions(c, kv, n) - c.Assert(kv.LoadRegions(cache), IsNil) + regions := mustSaveRegions(c, storage, n) + c.Assert(storage.LoadRegions(cache), IsNil) c.Assert(cache.GetRegionCount(), Equals, n) for _, region := range cache.GetMetaRegions() { c.Assert(region, DeepEquals, regions[region.GetId()]) @@ -159,23 +160,23 @@ func (s *testKVSuite) TestLoadRegionsExceedRangeLimit(c *C) { } func (s *testKVSuite) TestLoadGCSafePoint(c *C) { - kv := NewKV(NewMemoryKV()) + storage := NewStorage(kv.NewMemoryKV()) testData := []uint64{0, 1, 2, 233, 2333, 23333333333, math.MaxUint64} - r, e := kv.LoadGCSafePoint() + r, e := storage.LoadGCSafePoint() c.Assert(r, Equals, uint64(0)) c.Assert(e, IsNil) for _, safePoint := range testData { - err := kv.SaveGCSafePoint(safePoint) + err := storage.SaveGCSafePoint(safePoint) c.Assert(err, IsNil) - safePoint1, err := kv.LoadGCSafePoint() + safePoint1, err := storage.LoadGCSafePoint() c.Assert(err, IsNil) c.Assert(safePoint, Equals, safePoint1) } } type KVWithMaxRangeLimit struct { - KVBase + kv.Base rangeLimit int } @@ -183,7 +184,7 @@ func (kv *KVWithMaxRangeLimit) LoadRange(key, endKey string, limit int) ([]strin if limit > kv.rangeLimit { return nil, nil, errors.Errorf("limit %v exceed max rangeLimit %v", limit, kv.rangeLimit) } - return kv.KVBase.LoadRange(key, endKey, limit) + return kv.Base.LoadRange(key, endKey, limit) } func newTestRegionMeta(regionID uint64) *metapb.Region { diff --git a/server/grpc_service.go b/server/grpc_service.go index a5b249d4ef90..d03bb0bf676d 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -657,7 +657,7 @@ func (s *Server) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafePoin return &pdpb.GetGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } - safePoint, err := s.kv.LoadGCSafePoint() + safePoint, err := s.storage.LoadGCSafePoint() if err != nil { return nil, err } @@ -688,7 +688,7 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa return &pdpb.UpdateGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil } - oldSafePoint, err := s.kv.LoadGCSafePoint() + oldSafePoint, err := s.storage.LoadGCSafePoint() if err != nil { return nil, err } @@ -697,7 +697,7 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa // Only save the safe point if it's greater than the previous one if newSafePoint > oldSafePoint { - if err := s.kv.SaveGCSafePoint(newSafePoint); err != nil { + if err := s.storage.SaveGCSafePoint(newSafePoint); err != nil { return nil, err } log.Info("updated gc safe point", diff --git a/server/handler.go b/server/handler.go index 5c064f12a703..38f44f6c1116 100644 --- a/server/handler.go +++ b/server/handler.go @@ -183,7 +183,7 @@ func (h *Handler) AddScheduler(name string, args ...string) error { log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) if err = c.addScheduler(s, args...); err != nil { log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err)) - } else if err = h.opt.persist(c.cluster.kv); err != nil { + } else if err = h.opt.persist(c.cluster.storage); err != nil { log.Error("can not persist scheduler config", zap.Error(err)) } return err @@ -197,7 +197,7 @@ func (h *Handler) RemoveScheduler(name string) error { } if err = c.removeScheduler(name); err != nil { log.Error("can not remove scheduler", zap.String("scheduler-name", name), zap.Error(err)) - } else if err = h.opt.persist(c.cluster.kv); err != nil { + } else if err = h.opt.persist(c.cluster.storage); err != nil { log.Error("can not persist scheduler config", zap.Error(err)) } return err diff --git a/server/id.go b/server/id.go index 047d96a637b8..b4bf76b183fb 100644 --- a/server/id.go +++ b/server/id.go @@ -80,7 +80,7 @@ func (alloc *idAllocator) generate() (uint64, error) { end += allocStep value = uint64ToBytes(end) - resp, err := alloc.s.leaderTxn(cmp).Then(clientv3.OpPut(key, string(value))).Commit() + resp, err := alloc.s.LeaderTxn(cmp).Then(clientv3.OpPut(key, string(value))).Commit() if err != nil { return 0, err } diff --git a/server/etcd_kv.go b/server/kv/etcd_kv.go similarity index 51% rename from server/etcd_kv.go rename to server/kv/etcd_kv.go index 05bad301cbd2..1dee5bf2c43c 100644 --- a/server/etcd_kv.go +++ b/server/kv/etcd_kv.go @@ -11,9 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package server +package kv import ( + "context" "path" "strings" "time" @@ -28,6 +29,8 @@ import ( const ( kvRequestTimeout = time.Second * 10 kvSlowRequestTime = time.Second * 1 + requestTimeout = 10 * time.Second + slowRequestTime = 1 * time.Second ) var ( @@ -35,23 +38,22 @@ var ( ) type etcdKVBase struct { - server *Server client *clientv3.Client rootPath string } -func newEtcdKVBase(s *Server) *etcdKVBase { +// NewEtcdKVBase creates a new etcd kv. +func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase { return &etcdKVBase{ - server: s, - client: s.client, - rootPath: s.rootPath, + client: client, + rootPath: rootPath, } } func (kv *etcdKVBase) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) - resp, err := etcdutil.EtcdKVGet(kv.server.client, key) + resp, err := etcdutil.EtcdKVGet(kv.client, key) if err != nil { return "", err } @@ -69,7 +71,7 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri withRange := clientv3.WithRange(endKey) withLimit := clientv3.WithLimit(int64(limit)) - resp, err := etcdutil.EtcdKVGet(kv.server.client, key, withRange, withLimit) + resp, err := etcdutil.EtcdKVGet(kv.client, key, withRange, withLimit) if err != nil { return nil, nil, err } @@ -85,7 +87,8 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri func (kv *etcdKVBase) Save(key, value string) error { key = path.Join(kv.rootPath, key) - resp, err := kv.server.leaderTxn().Then(clientv3.OpPut(key, value)).Commit() + txn := NewSlowLogTxn(kv.client) + resp, err := txn.Then(clientv3.OpPut(key, value)).Commit() if err != nil { log.Error("save to etcd meet error", zap.Error(err)) return errors.WithStack(err) @@ -96,12 +99,13 @@ func (kv *etcdKVBase) Save(key, value string) error { return nil } -func (kv *etcdKVBase) Delete(key string) error { +func (kv *etcdKVBase) Remove(key string) error { key = path.Join(kv.rootPath, key) - resp, err := kv.server.leaderTxn().Then(clientv3.OpDelete(key)).Commit() + txn := NewSlowLogTxn(kv.client) + resp, err := txn.Then(clientv3.OpDelete(key)).Commit() if err != nil { - log.Error("delete from etcd meet error", zap.Error(err)) + log.Error("remove from etcd meet error", zap.Error(err)) return errors.WithStack(err) } if !resp.Succeeded { @@ -109,3 +113,60 @@ func (kv *etcdKVBase) Delete(key string) error { } return nil } + +// SlowLogTxn wraps etcd transaction and log slow one. +type SlowLogTxn struct { + clientv3.Txn + cancel context.CancelFunc +} + +// NewSlowLogTxn create a SlowLogTxn. +func NewSlowLogTxn(client *clientv3.Client) clientv3.Txn { + ctx, cancel := context.WithTimeout(client.Ctx(), requestTimeout) + return &SlowLogTxn{ + Txn: client.Txn(ctx), + cancel: cancel, + } +} + +// If takes a list of comparison. If all comparisons passed in succeed, +// the operations passed into Then() will be executed. Or the operations +// passed into Else() will be executed. +func (t *SlowLogTxn) If(cs ...clientv3.Cmp) clientv3.Txn { + return &SlowLogTxn{ + Txn: t.Txn.If(cs...), + cancel: t.cancel, + } +} + +// Then takes a list of operations. The Ops list will be executed, if the +// comparisons passed in If() succeed. +func (t *SlowLogTxn) Then(ops ...clientv3.Op) clientv3.Txn { + return &SlowLogTxn{ + Txn: t.Txn.Then(ops...), + cancel: t.cancel, + } +} + +// Commit implements Txn Commit interface. +func (t *SlowLogTxn) Commit() (*clientv3.TxnResponse, error) { + start := time.Now() + resp, err := t.Txn.Commit() + t.cancel() + + cost := time.Since(start) + if cost > slowRequestTime { + log.Warn("txn runs too slow", + zap.Error(err), + zap.Reflect("response", resp), + zap.Duration("cost", cost)) + } + label := "success" + if err != nil { + label = "failed" + } + txnCounter.WithLabelValues(label).Inc() + txnDuration.WithLabelValues(label).Observe(cost.Seconds()) + + return resp, errors.WithStack(err) +} diff --git a/server/etcd_kv_test.go b/server/kv/etcd_kv_test.go similarity index 55% rename from server/etcd_kv_test.go rename to server/kv/etcd_kv_test.go index f588b030368f..ad47c3412c23 100644 --- a/server/etcd_kv_test.go +++ b/server/kv/etcd_kv_test.go @@ -11,18 +11,44 @@ // See the License for the specific language governing permissions and // limitations under the License. -package server +package kv -import . "github.com/pingcap/check" +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "path" + "strconv" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/pd/pkg/tempurl" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/embed" +) + +func TestKV(t *testing.T) { + TestingT(t) +} type testEtcdKVSuite struct{} var _ = Suite(&testEtcdKVSuite{}) func (s *testEtcdKVSuite) TestEtcdKV(c *C) { - server, cleanup := mustRunTestServer(c) - defer cleanup() - kv := server.kv.KVBase + cfg := newTestSingleConfig() + etcd, err := embed.StartEtcd(cfg) + c.Assert(err, IsNil) + + ep := cfg.LCUrls[0].String() + client, err := clientv3.New(clientv3.Config{ + Endpoints: []string{ep}, + }) + c.Assert(err, IsNil) + rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) + + kv := NewEtcdKVBase(client, rootPath) keys := []string{"test/key1", "test/key2", "test/key3", "test/key4", "test/key5"} vals := []string{"val1", "val2", "val3", "val4", "val5"} @@ -56,8 +82,37 @@ func (s *testEtcdKVSuite) TestEtcdKV(c *C) { v, err = kv.Load(keys[1]) c.Assert(err, IsNil) c.Assert(v, Equals, "val2") - c.Assert(kv.Delete(keys[1]), IsNil) + c.Assert(kv.Remove(keys[1]), IsNil) v, err = kv.Load(keys[1]) c.Assert(err, IsNil) c.Assert(v, Equals, "") + + etcd.Close() + cleanConfig(cfg) +} + +func newTestSingleConfig() *embed.Config { + cfg := embed.NewConfig() + cfg.Name = "test_etcd" + cfg.Dir, _ = ioutil.TempDir("/tmp", "test_etcd") + cfg.WalDir = "" + cfg.Logger = "zap" + cfg.LogOutputs = []string{"stdout"} + + pu, _ := url.Parse(tempurl.Alloc()) + cfg.LPUrls = []url.URL{*pu} + cfg.APUrls = cfg.LPUrls + cu, _ := url.Parse(tempurl.Alloc()) + cfg.LCUrls = []url.URL{*cu} + cfg.ACUrls = cfg.LCUrls + + cfg.StrictReconfigCheck = false + cfg.InitialCluster = fmt.Sprintf("%s=%s", cfg.Name, &cfg.LPUrls[0]) + cfg.ClusterState = embed.ClusterStateFlagNew + return cfg +} + +func cleanConfig(cfg *embed.Config) { + // Clean data directory + os.RemoveAll(cfg.Dir) } diff --git a/server/kv/kv.go b/server/kv/kv.go new file mode 100644 index 000000000000..768eb7036f19 --- /dev/null +++ b/server/kv/kv.go @@ -0,0 +1,22 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +// Base is an abstract interface for load/save pd cluster data. +type Base interface { + Load(key string) (string, error) + LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) + Save(key, value string) error + Remove(key string) error +} diff --git a/server/core/levedb_kv.go b/server/kv/levedb_kv.go similarity index 58% rename from server/core/levedb_kv.go rename to server/kv/levedb_kv.go index b9123ee1ecf5..7b5d72ecc39c 100644 --- a/server/core/levedb_kv.go +++ b/server/kv/levedb_kv.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +package kv import ( "github.com/gogo/protobuf/proto" @@ -21,29 +21,32 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -type leveldbKV struct { - db *leveldb.DB +// LeveldbKV is a kv store using leveldb. +type LeveldbKV struct { + *leveldb.DB } -// newLeveldbKV is used to store regions information. -func newLeveldbKV(path string) (*leveldbKV, error) { +// NewLeveldbKV is used to store regions information. +func NewLeveldbKV(path string) (*LeveldbKV, error) { db, err := leveldb.OpenFile(path, nil) if err != nil { return nil, errors.WithStack(err) } - return &leveldbKV{db: db}, nil + return &LeveldbKV{db}, nil } -func (kv *leveldbKV) Load(key string) (string, error) { - v, err := kv.db.Get([]byte(key), nil) +// Load gets a value for a given key. +func (kv *LeveldbKV) Load(key string) (string, error) { + v, err := kv.Get([]byte(key), nil) if err != nil { return "", errors.WithStack(err) } return string(v), err } -func (kv *leveldbKV) LoadRange(startKey, endKey string, limit int) ([]string, []string, error) { - iter := kv.db.NewIterator(&util.Range{Start: []byte(startKey), Limit: []byte(endKey)}, nil) +// LoadRange gets a range of value for a given key range. +func (kv *LeveldbKV) LoadRange(startKey, endKey string, limit int) ([]string, []string, error) { + iter := kv.NewIterator(&util.Range{Start: []byte(startKey), Limit: []byte(endKey)}, nil) keys := make([]string, 0, limit) values := make([]string, 0, limit) count := 0 @@ -59,15 +62,18 @@ func (kv *leveldbKV) LoadRange(startKey, endKey string, limit int) ([]string, [] return keys, values, nil } -func (kv *leveldbKV) Save(key, value string) error { - return errors.WithStack(kv.db.Put([]byte(key), []byte(value), nil)) +// Save stores a key-value pair. +func (kv *LeveldbKV) Save(key, value string) error { + return errors.WithStack(kv.Put([]byte(key), []byte(value), nil)) } -func (kv *leveldbKV) Delete(key string) error { - return errors.WithStack(kv.db.Delete([]byte(key), nil)) +// Remove deletes a key-value pair for a given key. +func (kv *LeveldbKV) Remove(key string) error { + return errors.WithStack(kv.Delete([]byte(key), nil)) } -func (kv *leveldbKV) SaveRegions(regions map[string]*metapb.Region) error { +// SaveRegions stores some regions. +func (kv *LeveldbKV) SaveRegions(regions map[string]*metapb.Region) error { batch := new(leveldb.Batch) for key, r := range regions { @@ -77,9 +83,5 @@ func (kv *leveldbKV) SaveRegions(regions map[string]*metapb.Region) error { } batch.Put([]byte(key), value) } - return errors.WithStack(kv.db.Write(batch, nil)) -} - -func (kv *leveldbKV) Close() error { - return errors.WithStack(kv.db.Close()) + return errors.WithStack(kv.Write(batch, nil)) } diff --git a/server/core/kv_base.go b/server/kv/mem_kv.go similarity index 83% rename from server/core/kv_base.go rename to server/kv/mem_kv.go index a6a637ba2288..a32d3bc5b845 100644 --- a/server/core/kv_base.go +++ b/server/kv/mem_kv.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package core +package kv import ( "sync" @@ -19,21 +19,13 @@ import ( "github.com/google/btree" ) -// KVBase is an abstract interface for load/save pd cluster data. -type KVBase interface { - Load(key string) (string, error) - LoadRange(key, endKey string, limit int) (keys []string, values []string, err error) - Save(key, value string) error - Delete(key string) error -} - type memoryKV struct { sync.RWMutex tree *btree.BTree } // NewMemoryKV returns an in-memory kvBase for testing. -func NewMemoryKV() KVBase { +func NewMemoryKV() Base { return &memoryKV{ tree: btree.New(2), } @@ -77,7 +69,7 @@ func (kv *memoryKV) Save(key, value string) error { return nil } -func (kv *memoryKV) Delete(key string) error { +func (kv *memoryKV) Remove(key string) error { kv.Lock() defer kv.Unlock() diff --git a/server/kv/metrics.go b/server/kv/metrics.go new file mode 100644 index 000000000000..3ce2313f1c50 --- /dev/null +++ b/server/kv/metrics.go @@ -0,0 +1,40 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import "github.com/prometheus/client_golang/prometheus" + +var ( + txnCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "txn", + Name: "txns_count", + Help: "Counter of txns.", + }, []string{"result"}) + + txnDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "pd", + Subsystem: "txn", + Name: "handle_txns_duration_seconds", + Help: "Bucketed histogram of processing time (s) of handled txns.", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{"result"}) +) + +func init() { + prometheus.MustRegister(txnCounter) + prometheus.MustRegister(txnDuration) +} diff --git a/server/leader.go b/server/leader.go index e5e39d365541..3747df01b391 100644 --- a/server/leader.go +++ b/server/leader.go @@ -25,6 +25,7 @@ import ( log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/etcdutil" "github.com/pingcap/pd/pkg/logutil" + "github.com/pingcap/pd/server/kv" "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" @@ -235,7 +236,7 @@ func (s *Server) campaignLeader() error { leaderKey := s.getLeaderPath() // The leader key must not exist, so the CreateRevision is 0. - resp, err := s.txn(). + resp, err := kv.NewSlowLogTxn(s.client). If(clientv3.Compare(clientv3.CreateRevision(leaderKey), "=", 0)). Then(clientv3.OpPut(leaderKey, s.memberValue, clientv3.WithLease(leaseResp.ID))). Commit() @@ -390,7 +391,7 @@ func (s *Server) ResignLeader(nextLeader string) error { func (s *Server) deleteLeaderKey() error { // delete leader itself and let others start a new election again. leaderKey := s.getLeaderPath() - resp, err := s.leaderTxn().Then(clientv3.OpDelete(leaderKey)).Commit() + resp, err := s.LeaderTxn().Then(clientv3.OpDelete(leaderKey)).Commit() if err != nil { return errors.WithStack(err) } @@ -406,15 +407,15 @@ func (s *Server) leaderCmp() clientv3.Cmp { } func (s *Server) reloadConfigFromKV() error { - err := s.scheduleOpt.reload(s.kv) + err := s.scheduleOpt.reload(s.storage) if err != nil { return err } if s.scheduleOpt.loadPDServerConfig().UseRegionStorage { - s.kv.SwitchToRegionStorage() + s.storage.SwitchToRegionStorage() log.Info("server enable region storage") } else { - s.kv.SwitchToDefaultStorage() + s.storage.SwitchToDefaultStorage() log.Info("server disable region storage") } return nil diff --git a/server/metrics.go b/server/metrics.go index 44cf929b5da0..bac0a882bbb2 100644 --- a/server/metrics.go +++ b/server/metrics.go @@ -126,8 +126,6 @@ var ( ) func init() { - prometheus.MustRegister(txnCounter) - prometheus.MustRegister(txnDuration) prometheus.MustRegister(timeJumpBackCounter) prometheus.MustRegister(schedulerStatusGauge) prometheus.MustRegister(regionHeartbeatCounter) diff --git a/server/namespace/namespace.go b/server/namespace/namespace.go index 07f654e6a1e5..a719df9469d4 100644 --- a/server/namespace/namespace.go +++ b/server/namespace/namespace.go @@ -92,7 +92,7 @@ func (c defaultClassifier) IsStoreIDExist(storeID uint64) bool { } // CreateClassifierFunc is for creating namespace classifier. -type CreateClassifierFunc func(*core.KV, core.IDAllocator) (Classifier, error) +type CreateClassifierFunc func(*core.Storage, core.IDAllocator) (Classifier, error) var classifierMap = make(map[string]CreateClassifierFunc) @@ -106,16 +106,16 @@ func RegisterClassifier(name string, createFn CreateClassifierFunc) { } // CreateClassifier creates a namespace classifier with registered creator func. -func CreateClassifier(name string, kv *core.KV, idAlloc core.IDAllocator) (Classifier, error) { +func CreateClassifier(name string, storage *core.Storage, idAlloc core.IDAllocator) (Classifier, error) { fn, ok := classifierMap[name] if !ok { return nil, errors.Errorf("create func of %v is not registered", name) } - return fn(kv, idAlloc) + return fn(storage, idAlloc) } func init() { - RegisterClassifier("default", func(*core.KV, core.IDAllocator) (Classifier, error) { + RegisterClassifier("default", func(*core.Storage, core.IDAllocator) (Classifier, error) { return DefaultClassifier, nil }) } diff --git a/server/option.go b/server/option.go index 301e64cd3da4..d519328dfa4e 100644 --- a/server/option.go +++ b/server/option.go @@ -312,7 +312,7 @@ func (o *scheduleOption) loadPDServerConfig() *PDServerConfig { return o.pdServerConfig.Load().(*PDServerConfig) } -func (o *scheduleOption) persist(kv *core.KV) error { +func (o *scheduleOption) persist(storage *core.Storage) error { namespaces := o.loadNSConfig() cfg := &Config{ @@ -323,11 +323,11 @@ func (o *scheduleOption) persist(kv *core.KV) error { ClusterVersion: o.loadClusterVersion(), PDServerCfg: *o.loadPDServerConfig(), } - err := kv.SaveConfig(cfg) + err := storage.SaveConfig(cfg) return err } -func (o *scheduleOption) reload(kv *core.KV) error { +func (o *scheduleOption) reload(storage *core.Storage) error { namespaces := o.loadNSConfig() cfg := &Config{ @@ -338,7 +338,7 @@ func (o *scheduleOption) reload(kv *core.KV) error { ClusterVersion: o.loadClusterVersion(), PDServerCfg: *o.loadPDServerConfig(), } - isExist, err := kv.LoadConfig(cfg) + isExist, err := storage.LoadConfig(cfg) if err != nil { return err } diff --git a/server/region_syncer/history_buffer.go b/server/region_syncer/history_buffer.go index f5fa3a594fee..bd36f9e7720f 100644 --- a/server/region_syncer/history_buffer.go +++ b/server/region_syncer/history_buffer.go @@ -19,6 +19,7 @@ import ( log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" "go.uber.org/zap" ) @@ -34,11 +35,11 @@ type historyBuffer struct { head int tail int size int - kv core.KVBase + kv kv.Base flushCount int } -func newHistoryBuffer(size int, kv core.KVBase) *historyBuffer { +func newHistoryBuffer(size int, kv kv.Base) *historyBuffer { // use an empty space to simplify operation size++ if size < 2 { diff --git a/server/region_syncer/history_buffer_test.go b/server/region_syncer/history_buffer_test.go index c365997af101..b08332f11638 100644 --- a/server/region_syncer/history_buffer_test.go +++ b/server/region_syncer/history_buffer_test.go @@ -19,6 +19,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" ) var _ = Suite(&testHistoryBuffer{}) @@ -36,7 +37,7 @@ func (t *testHistoryBuffer) TestBufferSize(c *C) { } // size equals 1 - h := newHistoryBuffer(1, core.NewMemoryKV()) + h := newHistoryBuffer(1, kv.NewMemoryKV()) c.Assert(h.len(), Equals, 0) for _, r := range regions { h.Record(r) @@ -46,7 +47,7 @@ func (t *testHistoryBuffer) TestBufferSize(c *C) { c.Assert(h.get(99), IsNil) // size equals 2 - h = newHistoryBuffer(2, core.NewMemoryKV()) + h = newHistoryBuffer(2, kv.NewMemoryKV()) for _, r := range regions { h.Record(r) } @@ -55,9 +56,9 @@ func (t *testHistoryBuffer) TestBufferSize(c *C) { c.Assert(h.get(99), Equals, regions[h.nextIndex()-2]) c.Assert(h.get(98), IsNil) - // size eqauls 100 - kv := core.NewMemoryKV() - h1 := newHistoryBuffer(100, kv) + // size equals 100 + kvMem := kv.NewMemoryKV() + h1 := newHistoryBuffer(100, kvMem) for i := 0; i < 6; i++ { h1.Record(regions[i]) } @@ -66,7 +67,7 @@ func (t *testHistoryBuffer) TestBufferSize(c *C) { h1.persist() // restart the buffer - h2 := newHistoryBuffer(100, kv) + h2 := newHistoryBuffer(100, kvMem) c.Assert(h2.nextIndex(), Equals, uint64(6)) c.Assert(h2.firstIndex(), Equals, uint64(6)) c.Assert(h2.get(h.nextIndex()-1), IsNil) diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 377c53fa7a16..caba7c77a5c3 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -56,7 +56,7 @@ type Server interface { ClusterID() uint64 GetMemberInfo() *pdpb.Member GetLeader() *pdpb.Member - GetStorage() *core.KV + GetStorage() *core.Storage Name() string GetMetaRegions() []*metapb.Region } @@ -84,7 +84,7 @@ func NewRegionSyncer(s Server) *RegionSyncer { streams: make(map[string]ServerStream), server: s, closed: make(chan struct{}), - history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionKV()), + history: newHistoryBuffer(defaultHistoryBufferSize, s.GetStorage().GetRegionStorage()), limit: ratelimit.NewBucketWithRate(defaultBucketRate, defaultBucketCapacity), } } diff --git a/server/server.go b/server/server.go index 0462214f5207..646eadac22ad 100644 --- a/server/server.go +++ b/server/server.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/pd/pkg/etcdutil" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" "github.com/pingcap/pd/server/namespace" "github.com/pkg/errors" "go.etcd.io/etcd/clientv3" @@ -89,8 +90,8 @@ type Server struct { // store, region and peer, because we just need // a unique ID. idAlloc *idAllocator - // for kv operation. - kv *core.KV + // for storage operation. + storage *core.Storage // for namespace. classifier namespace.Classifier // for raft cluster @@ -220,16 +221,16 @@ func (s *Server) startServer() error { s.member, s.memberValue = s.memberInfo() s.idAlloc = &idAllocator{s: s} - kvBase := newEtcdKVBase(s) + kvBase := kv.NewEtcdKVBase(s.client, s.rootPath) path := filepath.Join(s.cfg.DataDir, "region-meta") - regionKV, err := core.NewRegionKV(path) + regionStorage, err := core.NewRegionStorage(path) if err != nil { return err } - s.kv = core.NewKV(kvBase).SetRegionKV(regionKV) + s.storage = core.NewStorage(kvBase).SetRegionStorage(regionStorage) s.cluster = newRaftCluster(s, s.clusterID) s.hbStreams = newHeartbeatStreams(s.clusterID, s.cluster) - if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.kv, s.idAlloc); err != nil { + if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.storage, s.idAlloc); err != nil { return err } @@ -276,8 +277,8 @@ func (s *Server) Close() { if s.hbStreams != nil { s.hbStreams.Close() } - if err := s.kv.Close(); err != nil { - log.Error("close kv meet error", zap.Error(err)) + if err := s.storage.Close(); err != nil { + log.Error("close storage meet error", zap.Error(err)) } log.Info("close server") @@ -406,7 +407,7 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe // TODO: we must figure out a better way to handle bootstrap failed, maybe intervene manually. bootstrapCmp := clientv3.Compare(clientv3.CreateRevision(clusterRootPath), "=", 0) - resp, err := s.txn().If(bootstrapCmp).Then(ops...).Commit() + resp, err := kv.NewSlowLogTxn(s.client).If(bootstrapCmp).Then(ops...).Commit() if err != nil { return nil, errors.WithStack(err) } @@ -416,11 +417,11 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe } log.Info("bootstrap cluster ok", zap.Uint64("cluster-id", clusterID)) - err = s.kv.SaveRegion(req.GetRegion()) + err = s.storage.SaveRegion(req.GetRegion()) if err != nil { log.Warn("save the bootstrap region failed", zap.Error(err)) } - err = s.kv.Flush() + err = s.storage.Flush() if err != nil { log.Warn("flush the bootstrap region failed", zap.Error(err)) } @@ -469,8 +470,8 @@ func (s *Server) GetClient() *clientv3.Client { } // GetStorage returns the backend storage of server. -func (s *Server) GetStorage() *core.KV { - return s.kv +func (s *Server) GetStorage() *core.Storage { + return s.storage } // ID returns the unique etcd ID for this server in etcd cluster. @@ -493,16 +494,11 @@ func (s *Server) GetClassifier() namespace.Classifier { return s.classifier } -// txn returns an etcd client transaction wrapper. -// The wrapper will set a request timeout to the context and log slow transactions. -func (s *Server) txn() clientv3.Txn { - return newSlowLogTxn(s.client) -} - -// leaderTxn returns txn() with a leader comparison to guarantee that +// LeaderTxn returns txn() with a leader comparison to guarantee that // the transaction can be executed only if the server is leader. -func (s *Server) leaderTxn(cs ...clientv3.Cmp) clientv3.Txn { - return s.txn().If(append(cs, s.leaderCmp())...) +func (s *Server) LeaderTxn(cs ...clientv3.Cmp) clientv3.Txn { + txn := kv.NewSlowLogTxn(s.client) + return txn.If(append(cs, s.leaderCmp())...) } // GetConfig gets the config information. @@ -533,7 +529,7 @@ func (s *Server) SetScheduleConfig(cfg ScheduleConfig) error { } old := s.scheduleOpt.load() s.scheduleOpt.store(&cfg) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.store(old) log.Error("failed to update schedule config", zap.Reflect("new", cfg), @@ -559,7 +555,7 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error { } old := s.scheduleOpt.rep.load() s.scheduleOpt.rep.store(&cfg) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.rep.store(old) log.Error("failed to update replication config", zap.Reflect("new", cfg), @@ -575,7 +571,7 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error { func (s *Server) SetPDServerConfig(cfg PDServerConfig) error { old := s.scheduleOpt.loadPDServerConfig() s.scheduleOpt.pdServerConfig.Store(&cfg) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.pdServerConfig.Store(old) log.Error("failed to update PDServer config", zap.Reflect("new", cfg), @@ -617,7 +613,7 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { if n, ok := s.scheduleOpt.getNS(name); ok { old := n.load() n.store(&cfg) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.ns.Store(name, newNamespaceOption(old)) log.Error("failed to update namespace config", zap.String("name", name), @@ -629,7 +625,7 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error { log.Info("namespace config is updated", zap.String("name", name), zap.Reflect("new", cfg), zap.Reflect("old", old)) } else { s.scheduleOpt.ns.Store(name, newNamespaceOption(&cfg)) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.ns.Delete(name) log.Error("failed to add namespace config", zap.String("name", name), @@ -647,7 +643,7 @@ func (s *Server) DeleteNamespaceConfig(name string) error { if n, ok := s.scheduleOpt.getNS(name); ok { cfg := n.load() s.scheduleOpt.ns.Delete(name) - if err := s.scheduleOpt.persist(s.kv); err != nil { + if err := s.scheduleOpt.persist(s.storage); err != nil { s.scheduleOpt.ns.Store(name, newNamespaceOption(cfg)) log.Error("failed to delete namespace config", zap.String("name", name), @@ -662,7 +658,7 @@ func (s *Server) DeleteNamespaceConfig(name string) error { // SetLabelProperty inserts a label property config. func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) - err := s.scheduleOpt.persist(s.kv) + err := s.scheduleOpt.persist(s.storage) if err != nil { s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) log.Error("failed to update label property config", @@ -680,7 +676,7 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { // DeleteLabelProperty deletes a label property config. func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue) - err := s.scheduleOpt.persist(s.kv) + err := s.scheduleOpt.persist(s.storage) if err != nil { s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue) log.Error("failed to delete label property config", @@ -708,7 +704,7 @@ func (s *Server) SetClusterVersion(v string) error { } old := s.scheduleOpt.loadClusterVersion() s.scheduleOpt.SetClusterVersion(*version) - err = s.scheduleOpt.persist(s.kv) + err = s.scheduleOpt.persist(s.storage) if err != nil { s.scheduleOpt.SetClusterVersion(old) log.Error("failed to update cluster version", @@ -784,7 +780,7 @@ func (s *Server) getMemberLeaderPriorityPath(id uint64) string { // SetMemberLeaderPriority saves a member's priority to be elected as the etcd leader. func (s *Server) SetMemberLeaderPriority(id uint64, priority int) error { key := s.getMemberLeaderPriorityPath(id) - res, err := s.leaderTxn().Then(clientv3.OpPut(key, strconv.Itoa(priority))).Commit() + res, err := s.LeaderTxn().Then(clientv3.OpPut(key, strconv.Itoa(priority))).Commit() if err != nil { return errors.WithStack(err) } @@ -797,7 +793,7 @@ func (s *Server) SetMemberLeaderPriority(id uint64, priority int) error { // DeleteMemberLeaderPriority removes a member's priority config. func (s *Server) DeleteMemberLeaderPriority(id uint64) error { key := s.getMemberLeaderPriorityPath(id) - res, err := s.leaderTxn().Then(clientv3.OpDelete(key)).Commit() + res, err := s.LeaderTxn().Then(clientv3.OpDelete(key)).Commit() if err != nil { return errors.WithStack(err) } diff --git a/server/tso.go b/server/tso.go index 17d2ebaad185..d81d2390a098 100644 --- a/server/tso.go +++ b/server/tso.go @@ -63,7 +63,7 @@ func (s *Server) saveTimestamp(ts time.Time) error { data := uint64ToBytes(uint64(ts.UnixNano())) key := s.getTimestampPath() - resp, err := s.leaderTxn().Then(clientv3.OpPut(key, string(data))).Commit() + resp, err := s.LeaderTxn().Then(clientv3.OpPut(key, string(data))).Commit() if err != nil { return errors.WithStack(err) } diff --git a/server/util.go b/server/util.go index 24347b5d5e40..88ceb64b399b 100644 --- a/server/util.go +++ b/server/util.go @@ -182,57 +182,6 @@ func uint64ToBytes(v uint64) []byte { return b } -// slowLogTxn wraps etcd transaction and log slow one. -type slowLogTxn struct { - clientv3.Txn - cancel context.CancelFunc -} - -func newSlowLogTxn(client *clientv3.Client) clientv3.Txn { - ctx, cancel := context.WithTimeout(client.Ctx(), requestTimeout) - return &slowLogTxn{ - Txn: client.Txn(ctx), - cancel: cancel, - } -} - -func (t *slowLogTxn) If(cs ...clientv3.Cmp) clientv3.Txn { - return &slowLogTxn{ - Txn: t.Txn.If(cs...), - cancel: t.cancel, - } -} - -func (t *slowLogTxn) Then(ops ...clientv3.Op) clientv3.Txn { - return &slowLogTxn{ - Txn: t.Txn.Then(ops...), - cancel: t.cancel, - } -} - -// Commit implements Txn Commit interface. -func (t *slowLogTxn) Commit() (*clientv3.TxnResponse, error) { - start := time.Now() - resp, err := t.Txn.Commit() - t.cancel() - - cost := time.Since(start) - if cost > slowRequestTime { - log.Warn("txn runs too slow", - zap.Error(err), - zap.Reflect("response", resp), - zap.Duration("cost", cost)) - } - label := "success" - if err != nil { - label = "failed" - } - txnCounter.WithLabelValues(label).Inc() - txnDuration.WithLabelValues(label).Observe(cost.Seconds()) - - return resp, errors.WithStack(err) -} - // GetMembers return a slice of Members. func GetMembers(etcdClient *clientv3.Client) ([]*pdpb.Member, error) { listResp, err := etcdutil.ListEtcdMembers(etcdClient) diff --git a/table/namespace_classifier.go b/table/namespace_classifier.go index c6f6c3f32d45..7b44501a6514 100644 --- a/table/namespace_classifier.go +++ b/table/namespace_classifier.go @@ -92,7 +92,7 @@ func (ns *Namespace) AddStoreID(storeID uint64) { type tableNamespaceClassifier struct { sync.RWMutex nsInfo *namespacesInfo - kv *core.KV + storage *core.Storage idAlloc core.IDAllocator http.Handler } @@ -101,14 +101,14 @@ const kvRangeLimit = 1000 // NewTableNamespaceClassifier creates a new namespace classifier that // classifies stores and regions by table range. -func NewTableNamespaceClassifier(kv *core.KV, idAlloc core.IDAllocator) (namespace.Classifier, error) { +func NewTableNamespaceClassifier(storage *core.Storage, idAlloc core.IDAllocator) (namespace.Classifier, error) { nsInfo := newNamespacesInfo() - if err := nsInfo.loadNamespaces(kv, kvRangeLimit); err != nil { + if err := nsInfo.loadNamespaces(storage, kvRangeLimit); err != nil { return nil, err } c := &tableNamespaceClassifier{ nsInfo: nsInfo, - kv: kv, + storage: storage, idAlloc: idAlloc, } c.Handler = newTableClassifierHandler(c) @@ -331,13 +331,13 @@ func (c *tableNamespaceClassifier) RemoveNamespaceStoreID(name string, storeID u return c.putNamespaceLocked(n) } -// ReloadNamespaces reloads ns info from kv storage +// ReloadNamespaces reloads ns info from storage. func (c *tableNamespaceClassifier) ReloadNamespaces() error { nsInfo := newNamespacesInfo() c.Lock() defer c.Unlock() - if err := nsInfo.loadNamespaces(c.kv, kvRangeLimit); err != nil { + if err := nsInfo.loadNamespaces(c.storage, kvRangeLimit); err != nil { return err } @@ -346,8 +346,8 @@ func (c *tableNamespaceClassifier) ReloadNamespaces() error { } func (c *tableNamespaceClassifier) putNamespaceLocked(ns *Namespace) error { - if c.kv != nil { - if err := c.nsInfo.saveNamespace(c.kv, ns); err != nil { + if c.storage != nil { + if err := c.nsInfo.saveNamespace(c.storage, ns); err != nil { return err } } @@ -425,16 +425,16 @@ func (namespaceInfo *namespacesInfo) namespacePath(nsID uint64) string { return path.Join("namespace", fmt.Sprintf("%20d", nsID)) } -func (namespaceInfo *namespacesInfo) saveNamespace(kv *core.KV, ns *Namespace) error { +func (namespaceInfo *namespacesInfo) saveNamespace(storage *core.Storage, ns *Namespace) error { value, err := json.Marshal(ns) if err != nil { return errors.WithStack(err) } - err = kv.Save(namespaceInfo.namespacePath(ns.GetID()), string(value)) + err = storage.Save(namespaceInfo.namespacePath(ns.GetID()), string(value)) return err } -func (namespaceInfo *namespacesInfo) loadNamespaces(kv *core.KV, rangeLimit int) error { +func (namespaceInfo *namespacesInfo) loadNamespaces(storage *core.Storage, rangeLimit int) error { start := time.Now() nextID := uint64(0) @@ -442,7 +442,7 @@ func (namespaceInfo *namespacesInfo) loadNamespaces(kv *core.KV, rangeLimit int) for { key := namespaceInfo.namespacePath(nextID) - _, res, err := kv.LoadRange(key, endKey, rangeLimit) + _, res, err := storage.LoadRange(key, endKey, rangeLimit) if err != nil { return err } diff --git a/table/namespace_classifier_test.go b/table/namespace_classifier_test.go index fac70219f275..3de72aa86c70 100644 --- a/table/namespace_classifier_test.go +++ b/table/namespace_classifier_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/pkg/mock/mockid" "github.com/pingcap/pd/server/core" + "github.com/pingcap/pd/server/kv" ) var _ = Suite(&testTableNamespaceSuite{}) @@ -40,8 +41,8 @@ type testTableNamespaceSuite struct { } func (s *testTableNamespaceSuite) newClassifier(c *C) *tableNamespaceClassifier { - kv := core.NewKV(core.NewMemoryKV()) - classifier, err := NewTableNamespaceClassifier(kv, mockid.NewIDAllocator()) + memStorage := core.NewStorage(kv.NewMemoryKV()) + classifier, err := NewTableNamespaceClassifier(memStorage, mockid.NewIDAllocator()) c.Assert(err, IsNil) tableClassifier := classifier.(*tableNamespaceClassifier) testNamespace1 := Namespace{ @@ -135,8 +136,8 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceGetRegionNamespace(c *C) { } func (s *testTableNamespaceSuite) TestNamespaceOperation(c *C) { - kv := core.NewKV(core.NewMemoryKV()) - classifier, err := NewTableNamespaceClassifier(kv, mockid.NewIDAllocator()) + memStorage := core.NewStorage(kv.NewMemoryKV()) + classifier, err := NewTableNamespaceClassifier(memStorage, mockid.NewIDAllocator()) c.Assert(err, IsNil) tableClassifier := classifier.(*tableNamespaceClassifier) nsInfo := tableClassifier.nsInfo diff --git a/tests/server/region_syncer/region_syncer_test.go b/tests/server/region_syncer/region_syncer_test.go index b72c147e6ff4..ab6af0b5bcba 100644 --- a/tests/server/region_syncer/region_syncer_test.go +++ b/tests/server/region_syncer/region_syncer_test.go @@ -80,7 +80,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } - // ensure flush to region kv + // ensure flush to region storage time.Sleep(3 * time.Second) err = leaderServer.Stop() c.Assert(err, IsNil) @@ -125,7 +125,7 @@ func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } - // ensure flush to region kv + // ensure flush to region storage time.Sleep(3 * time.Second) // restart pd1 err = leaderServer.Stop() From d53e6ea2df44edd42f8d97ee65fef2a7f51eed6a Mon Sep 17 00:00:00 2001 From: ShuNing Date: Thu, 11 Jul 2019 18:34:02 +0800 Subject: [PATCH 3/6] scheduler: let balance region do not always select same source and target (#1442) * scheduler: let balance region do not always select same source and target Signed-off-by: nolouch --- server/schedule/filters.go | 56 +++++++++ server/schedule/selector.go | 5 +- server/schedulers/balance_region.go | 174 ++++++++++++++++++---------- server/schedulers/balance_test.go | 91 ++++++++++++--- 4 files changed, 245 insertions(+), 81 deletions(-) diff --git a/server/schedule/filters.go b/server/schedule/filters.go index 928110eccf8b..ac91db1a8062 100644 --- a/server/schedule/filters.go +++ b/server/schedule/filters.go @@ -444,3 +444,59 @@ func (f StoreStateFilter) filterMoveRegion(opt Options, store *core.StoreInfo) b } return false } + +// BlacklistType the type of BlackListStore Filter. +type BlacklistType int + +// some flags about blacklist type. +const ( + // blacklist associated with the source. + BlacklistSource BlacklistType = 1 << iota + // blacklist associated with the target. + BlacklistTarget +) + +// BlacklistStoreFilter filters the store according to the blacklist. +type BlacklistStoreFilter struct { + blacklist map[uint64]struct{} + flag BlacklistType +} + +// NewBlacklistStoreFilter creates a blacklist filter. +func NewBlacklistStoreFilter(typ BlacklistType) *BlacklistStoreFilter { + return &BlacklistStoreFilter{ + blacklist: make(map[uint64]struct{}), + flag: typ, + } +} + +// Type implements the Filter. +func (f *BlacklistStoreFilter) Type() string { + return "blacklist-store-filter" +} + +// FilterSource implements the Filter. +func (f *BlacklistStoreFilter) FilterSource(opt Options, store *core.StoreInfo) bool { + if f.flag&BlacklistSource != BlacklistSource { + return false + } + return f.filter(store) +} + +// Add adds the store to the blacklist. +func (f *BlacklistStoreFilter) Add(storeID uint64) { + f.blacklist[storeID] = struct{}{} +} + +// FilterTarget implements the Filter. +func (f *BlacklistStoreFilter) FilterTarget(opt Options, store *core.StoreInfo) bool { + if f.flag&BlacklistTarget != BlacklistTarget { + return false + } + return f.filter(store) +} + +func (f *BlacklistStoreFilter) filter(store *core.StoreInfo) bool { + _, ok := f.blacklist[store.GetID()] + return ok +} diff --git a/server/schedule/selector.go b/server/schedule/selector.go index 0530f106c13a..315c0519cdea 100644 --- a/server/schedule/selector.go +++ b/server/schedule/selector.go @@ -36,10 +36,11 @@ func NewBalanceSelector(kind core.ResourceKind, filters []Filter) *BalanceSelect // SelectSource selects the store that can pass all filters and has the maximal // resource score. -func (s *BalanceSelector) SelectSource(opt Options, stores []*core.StoreInfo) *core.StoreInfo { +func (s *BalanceSelector) SelectSource(opt Options, stores []*core.StoreInfo, filters ...Filter) *core.StoreInfo { + filters = append(filters, s.filters...) var result *core.StoreInfo for _, store := range stores { - if FilterSource(opt, store, s.filters) { + if FilterSource(opt, store, filters) { continue } if result == nil || diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index ecae36ce3fc8..1c521fb0d4c6 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -14,11 +14,12 @@ package schedulers import ( + "fmt" "strconv" + "time" "github.com/pingcap/kvproto/pkg/metapb" log "github.com/pingcap/log" - "github.com/pingcap/pd/server/cache" "github.com/pingcap/pd/server/checker" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" @@ -31,36 +32,45 @@ func init() { }) } -// balanceRegionRetryLimit is the limit to retry schedule for selected store. -const balanceRegionRetryLimit = 10 +const ( + // balanceRegionRetryLimit is the limit to retry schedule for selected store. + balanceRegionRetryLimit = 10 + hitsStoreTTL = 5 * time.Minute + // The scheduler selects the same source or source-target for a long time + // and do not create an operator will trigger the hit filter. the + // calculation of this time is as follows: + // ScheduleIntervalFactor default is 1.3 , and MinScheduleInterval is 10ms, + // the total time spend t = a1 * (1-pow(q,n)) / (1 - q), where a1 = 10, + // q = 1.3, and n = 30, so t = 87299ms ≈ 87s. + hitsStoreCountThreshold = 30 * balanceRegionRetryLimit + balanceRegionName = "balance-region-scheduler" +) type balanceRegionScheduler struct { *baseScheduler selector *schedule.BalanceSelector - taintStores *cache.TTLUint64 opController *schedule.OperatorController + hitsCounter *hitsStoreBuilder } // newBalanceRegionScheduler creates a scheduler that tends to keep regions on // each store balanced. func newBalanceRegionScheduler(opController *schedule.OperatorController) schedule.Scheduler { - taintStores := newTaintCache() filters := []schedule.Filter{ schedule.StoreStateFilter{MoveRegion: true}, - schedule.NewCacheFilter(taintStores), } base := newBaseScheduler(opController) s := &balanceRegionScheduler{ baseScheduler: base, selector: schedule.NewBalanceSelector(core.RegionKind, filters), - taintStores: taintStores, opController: opController, + hitsCounter: newHitsStoreBuilder(hitsStoreTTL, hitsStoreCountThreshold), } return s } func (s *balanceRegionScheduler) GetName() string { - return "balance-region-scheduler" + return balanceRegionName } func (s *balanceRegionScheduler) GetType() string { @@ -73,11 +83,11 @@ func (s *balanceRegionScheduler) IsScheduleAllowed(cluster schedule.Cluster) boo func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule.Operator { schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() - stores := cluster.GetStores() // source is the store with highest region score in the list that can be selected as balance source. - source := s.selector.SelectSource(cluster, stores) + filter := s.hitsCounter.buildSourceFilter(cluster) + source := s.selector.SelectSource(cluster, stores, filter) if source == nil { schedulerCounter.WithLabelValues(s.GetName(), "no_store").Inc() // Unlike the balanceLeaderScheduler, we don't need to clear the taintCache @@ -93,7 +103,6 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. balanceRegionCounter.WithLabelValues("source_store", sourceAddress, sourceLabel).Inc() opInfluence := s.opController.GetOpInfluence(cluster) - var hasPotentialTarget bool for i := 0; i < balanceRegionRetryLimit; i++ { // Priority picks the region that has a pending peer. // Pending region may means the disk is overload, remove the pending region firstly. @@ -108,6 +117,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. } if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no_region").Inc() + s.hitsCounter.put(source, nil) continue } log.Debug("select region", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID())) @@ -116,6 +126,7 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. if len(region.GetPeers()) != cluster.GetMaxReplicas() { log.Debug("region has abnormal replica count", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID())) schedulerCounter.WithLabelValues(s.GetName(), "abnormal_replica").Inc() + s.hitsCounter.put(source, nil) continue } @@ -123,28 +134,16 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*schedule. if cluster.IsRegionHot(region.GetID()) { log.Debug("region is hot", zap.String("scheduler", s.GetName()), zap.Uint64("region-id", region.GetID())) schedulerCounter.WithLabelValues(s.GetName(), "region_hot").Inc() + s.hitsCounter.put(source, nil) continue } - if !s.hasPotentialTarget(cluster, region, source, opInfluence) { - continue - } - hasPotentialTarget = true - oldPeer := region.GetStorePeer(sourceID) if op := s.transferPeer(cluster, region, oldPeer, opInfluence); op != nil { schedulerCounter.WithLabelValues(s.GetName(), "new_operator").Inc() return []*schedule.Operator{op} } } - - if !hasPotentialTarget { - // If no potential target store can be found for the selected store, ignore it for a while. - log.Debug("no operator created for selected store", zap.String("scheduler", s.GetName()), zap.Uint64("store-id", sourceID)) - balanceRegionCounter.WithLabelValues("add_taint", sourceAddress, sourceLabel).Inc() - s.taintStores.Put(sourceID) - } - return nil } @@ -154,11 +153,12 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region * stores := cluster.GetRegionStores(region) source := cluster.GetStore(oldPeer.GetStoreId()) scoreGuard := schedule.NewDistinctScoreFilter(cluster.GetLocationLabels(), stores, source) - + hitsFilter := s.hitsCounter.buildTargetFilter(cluster, source) checker := checker.NewReplicaChecker(cluster, nil) - storeID, _ := checker.SelectBestReplacementStore(region, oldPeer, scoreGuard) + storeID, _ := checker.SelectBestReplacementStore(region, oldPeer, scoreGuard, hitsFilter) if storeID == 0 { schedulerCounter.WithLabelValues(s.GetName(), "no_replacement").Inc() + s.hitsCounter.put(source, nil) return nil } @@ -177,6 +177,7 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region * zap.Int64("target-influence", opInfluence.GetStoreInfluence(targetID).ResourceSize(core.RegionKind)), zap.Int64("average-region-size", cluster.GetAverageRegionSize())) schedulerCounter.WithLabelValues(s.GetName(), "skip").Inc() + s.hitsCounter.put(source, target) return nil } @@ -190,54 +191,103 @@ func (s *balanceRegionScheduler) transferPeer(cluster schedule.Cluster, region * schedulerCounter.WithLabelValues(s.GetName(), "create_operator_fail").Inc() return nil } + s.hitsCounter.remove(source, target) + s.hitsCounter.remove(source, nil) sourceLabel := strconv.FormatUint(sourceID, 10) targetLabel := strconv.FormatUint(targetID, 10) balanceRegionCounter.WithLabelValues("move_peer", source.GetAddress()+"-out", sourceLabel).Inc() balanceRegionCounter.WithLabelValues("move_peer", target.GetAddress()+"-in", targetLabel).Inc() - // only for testing balanceRegionCounter.WithLabelValues("direction", "from_to", sourceLabel+"-"+targetLabel).Inc() return op } -// hasPotentialTarget is used to determine whether the specified sourceStore -// cannot find a matching targetStore in the long term. -// The main factor for judgment includes StoreState, DistinctScore, and -// ResourceScore, while excludes factors such as ServerBusy, too many snapshot, -// which may recover soon. -func (s *balanceRegionScheduler) hasPotentialTarget(cluster schedule.Cluster, region *core.RegionInfo, source *core.StoreInfo, opInfluence schedule.OpInfluence) bool { - filters := []schedule.Filter{ - schedule.NewExcludedFilter(nil, region.GetStoreIds()), - schedule.NewDistinctScoreFilter(cluster.GetLocationLabels(), cluster.GetRegionStores(region), source), +type record struct { + lastTime time.Time + count int +} +type hitsStoreBuilder struct { + hits map[string]*record + ttl time.Duration + threshold int +} + +func newHitsStoreBuilder(ttl time.Duration, threshold int) *hitsStoreBuilder { + return &hitsStoreBuilder{ + hits: make(map[string]*record), + ttl: ttl, + threshold: threshold, } +} - for _, store := range cluster.GetStores() { - if schedule.FilterTarget(cluster, store, filters) { - log.Debug("skip target store by filters", - zap.String("scheduler", s.GetName()), - zap.Uint64("region", region.GetID()), - zap.Uint64("source", source.GetID()), - zap.Uint64("target", store.GetID())) - continue - } - if !store.IsUp() || store.DownTime() > cluster.GetMaxStoreDownTime() { - log.Debug("skip target store by status", - zap.String("scheduler", s.GetName()), - zap.Uint64("region", region.GetID()), - zap.Uint64("source", source.GetID()), - zap.Uint64("target", store.GetID()), - zap.Bool("isup", store.IsUp()), - zap.Duration("downtime", store.DownTime())) - continue +func (h *hitsStoreBuilder) getKey(source, target *core.StoreInfo) string { + if source == nil { + return "" + } + key := fmt.Sprintf("s%d", source.GetID()) + if target != nil { + key = fmt.Sprintf("%s->t%d", key, target.GetID()) + } + return key +} + +func (h *hitsStoreBuilder) filter(source, target *core.StoreInfo) bool { + key := h.getKey(source, target) + if key == "" { + return false + } + if item, ok := h.hits[key]; ok { + if time.Since(item.lastTime) > h.ttl { + delete(h.hits, key) } - if !shouldBalance(cluster, source, store, region, core.RegionKind, opInfluence) { - log.Debug("skip target store for it should not balance", - zap.String("scheduler", s.GetName()), - zap.Uint64("region", region.GetID()), - zap.Uint64("source", source.GetID()), - zap.Uint64("target", store.GetID())) - continue + if time.Since(item.lastTime) <= h.ttl && item.count >= h.threshold { + log.Debug("skip the the store", zap.String("scheduler", balanceRegionName), zap.String("filter-key", key)) + return true } - return true } return false } + +func (h *hitsStoreBuilder) remove(source, target *core.StoreInfo) { + key := h.getKey(source, target) + if _, ok := h.hits[key]; ok && key != "" { + delete(h.hits, key) + } +} + +func (h *hitsStoreBuilder) put(source, target *core.StoreInfo) { + key := h.getKey(source, target) + if key == "" { + return + } + if item, ok := h.hits[key]; ok { + if time.Since(item.lastTime) >= h.ttl { + item.count = 0 + } else { + item.count++ + } + item.lastTime = time.Now() + } else { + item := &record{lastTime: time.Now()} + h.hits[key] = item + } +} + +func (h *hitsStoreBuilder) buildSourceFilter(cluster schedule.Cluster) schedule.Filter { + filter := schedule.NewBlacklistStoreFilter(schedule.BlacklistSource) + for _, source := range cluster.GetStores() { + if h.filter(source, nil) { + filter.Add(source.GetID()) + } + } + return filter +} + +func (h *hitsStoreBuilder) buildTargetFilter(cluster schedule.Cluster, source *core.StoreInfo) schedule.Filter { + filter := schedule.NewBlacklistStoreFilter(schedule.BlacklistTarget) + for _, target := range cluster.GetStores() { + if h.filter(source, target) { + filter.Add(target.GetID()) + } + } + return filter +} diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 82ed7d0db4e1..b199d49f74f6 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -330,7 +330,6 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { sb, err := schedule.CreateScheduler("balance-region", oc) c.Assert(err, IsNil) - cache := sb.(*balanceRegionScheduler).taintStores opt.SetMaxReplicas(1) // Add stores 1,2,3,4. @@ -345,14 +344,13 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { // Test stateFilter. tc.SetStoreOffline(1) tc.UpdateRegionCount(2, 6) - cache.Remove(4) + // When store 1 is offline, it will be filtered, // store 2 becomes the store with least regions. testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], schedule.OpBalance, 4, 2) opt.SetMaxReplicas(3) c.Assert(sb.Schedule(tc), IsNil) - cache.Clear() opt.SetMaxReplicas(1) c.Assert(sb.Schedule(tc), NotNil) } @@ -367,8 +365,6 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { sb, err := schedule.CreateScheduler("balance-region", oc) c.Assert(err, IsNil) - cache := sb.(*balanceRegionScheduler).taintStores - // Store 1 has the largest region score, so the balancer try to replace peer in store 1. tc.AddLabelsStore(1, 16, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) tc.AddLabelsStore(2, 15, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) @@ -378,22 +374,21 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { // This schedule try to replace peer in store 1, but we have no other stores, // so store 1 will be set in the cache and skipped next schedule. c.Assert(sb.Schedule(tc), IsNil) - c.Assert(cache.Exists(1), IsTrue) + for i := 0; i <= hitsStoreCountThreshold/balanceRegionRetryLimit; i++ { + sb.Schedule(tc) + } + hit := sb.(*balanceRegionScheduler).hitsCounter + c.Assert(hit.buildSourceFilter(tc).FilterSource(tc, tc.GetStore(1)), IsTrue) + c.Assert(hit.buildSourceFilter(tc).FilterSource(tc, tc.GetStore(2)), IsFalse) + c.Assert(hit.buildSourceFilter(tc).FilterSource(tc, tc.GetStore(3)), IsFalse) // Store 4 has smaller region score than store 2. tc.AddLabelsStore(4, 2, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 2, 4) - // If store 4 is busy, no operator will be created. - tc.SetStoreBusy(4, true) - c.Assert(sb.Schedule(tc), IsNil) - // Since busy is a short term state, store2 will not be added to taint cache. - c.Assert(cache.Exists(2), IsFalse) - tc.SetStoreBusy(4, false) - // Store 5 has smaller region score than store 1. tc.AddLabelsStore(5, 2, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) - cache.Remove(1) // Delete store 1 from cache, or it will be skipped. + hit.remove(tc.GetStore(1), nil) testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 5) // Store 6 has smaller region score than store 5. @@ -417,9 +412,12 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { tc.SetStoreDown(5) tc.SetStoreDown(6) tc.SetStoreDown(7) - c.Assert(sb.Schedule(tc), IsNil) - c.Assert(cache.Exists(1), IsTrue) - cache.Remove(1) + tc.SetStoreDown(8) + for i := 0; i <= hitsStoreCountThreshold/balanceRegionRetryLimit; i++ { + c.Assert(sb.Schedule(tc), IsNil) + } + c.Assert(hit.buildSourceFilter(tc).FilterSource(tc, tc.GetStore(1)), IsTrue) + hit.remove(tc.GetStore(1), nil) // Store 9 has different zone with other stores but larger region score than store 1. tc.AddLabelsStore(9, 20, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) @@ -464,6 +462,65 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) { testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 11, 6) } +// TestBalance2 for cornor case 1: +// 11 regions distributed across 5 stores. +//| region_id | leader_store | follower_store | follower_store | +//|-----------|--------------|----------------|----------------| +//| 1 | 1 | 2 | 3 | +//| 2 | 1 | 2 | 3 | +//| 3 | 1 | 2 | 3 | +//| 4 | 1 | 2 | 3 | +//| 5 | 1 | 2 | 3 | +//| 6 | 1 | 2 | 3 | +//| 7 | 1 | 2 | 4 | +//| 8 | 1 | 2 | 4 | +//| 9 | 1 | 2 | 4 | +//| 10 | 1 | 4 | 5 | +//| 11 | 1 | 4 | 5 | +// and the space of last store 5 if very small, about 5 * regionsize +// the source region is more likely distributed in store[1, 2, 3]. +func (s *testBalanceRegionSchedulerSuite) TestBalance1(c *C) { + opt := mockoption.NewScheduleOptions() + tc := mockcluster.NewCluster(opt) + oc := schedule.NewOperatorController(nil, nil) + + opt.TolerantSizeRatio = 1 + + sb, err := schedule.CreateScheduler("balance-region", oc) + c.Assert(err, IsNil) + + tc.AddRegionStore(1, 11) + tc.AddRegionStore(2, 9) + tc.AddRegionStore(3, 6) + tc.AddRegionStore(4, 5) + tc.AddRegionStore(5, 2) + tc.AddLeaderRegion(1, 1, 2, 3) + tc.AddLeaderRegion(2, 1, 2, 3) + + c.Assert(sb.Schedule(tc)[0], NotNil) + // if the space of store 5 is normal, we can balance region to store 5 + testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 5) + + // the used size of store 5 reach (highSpace, lowSpace) + origin := tc.GetStore(5) + stats := origin.GetStoreStats() + stats.Capacity = 50 + stats.Available = 28 + stats.UsedSize = 20 + store5 := origin.Clone(core.SetStoreStats(stats)) + tc.PutStore(store5) + + // the scheduler always pick store 1 as source store, + // and store 5 as target store, but cannot pass `shouldBalance`. + c.Assert(sb.Schedule(tc), IsNil) + // hits the store many times + for i := 0; i < 1000; i++ { + sb.Schedule(tc) + } + // now filter the store 5, and can transfer store 1 to store 4 + testutil.CheckTransferPeer(c, sb.Schedule(tc)[0], schedule.OpBalance, 1, 4) +} + func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { opt := mockoption.NewScheduleOptions() tc := mockcluster.NewCluster(opt) From 6ee7fda429ff6b924c1ee19ec00f4c9201322a4d Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Thu, 11 Jul 2019 21:41:46 +0800 Subject: [PATCH 4/6] schedule: fix merge related operators in waiting operators (#1633) * fix random merge in waiting operators Signed-off-by: Ryan Leung --- server/schedule/waiting_operator.go | 2 +- server/schedule/waiting_operator_test.go | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/schedule/waiting_operator.go b/server/schedule/waiting_operator.go index 3b03db6b145f..6f5d7efd8e03 100644 --- a/server/schedule/waiting_operator.go +++ b/server/schedule/waiting_operator.go @@ -90,7 +90,7 @@ func (b *RandBuckets) GetOperator() []*Operator { var res []*Operator res = append(res, bucket.ops[0]) // Merge operation has two operators, and thus it should be handled specifically. - if bucket.ops[0].Desc() == "merge-region" { + if bucket.ops[0].Kind()&OpMerge != 0 { res = append(res, bucket.ops[1]) bucket.ops = bucket.ops[2:] } else { diff --git a/server/schedule/waiting_operator_test.go b/server/schedule/waiting_operator_test.go index 2a542a57a1db..2cdac8db7c6e 100644 --- a/server/schedule/waiting_operator_test.go +++ b/server/schedule/waiting_operator_test.go @@ -58,9 +58,11 @@ func (s *testWaitingOperatorSuite) TestListOperator(c *C) { func (s *testWaitingOperatorSuite) TestRandomBucketsWithMergeRegion(c *C) { rb := NewRandBuckets() + descs := []string{"merge-region", "admin-merge-region", "random-merge"} for j := 0; j < 100; j++ { // adds operators - op := NewOperator("merge-region", uint64(1), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OperatorStep{ + desc := descs[j%3] + op := NewOperator(desc, uint64(1), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OperatorStep{ MergeRegion{ FromRegion: &metapb.Region{ Id: 1, @@ -75,7 +77,7 @@ func (s *testWaitingOperatorSuite) TestRandomBucketsWithMergeRegion(c *C) { }, }...) rb.PutOperator(op) - op = NewOperator("merge-region", uint64(2), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OperatorStep{ + op = NewOperator(desc, uint64(2), &metapb.RegionEpoch{}, OpRegion|OpMerge, []OperatorStep{ MergeRegion{ FromRegion: &metapb.Region{ Id: 1, From 9129b6666ded6a833d65c3ca5cc5ffcc67a7fb28 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 12 Jul 2019 11:29:38 +0800 Subject: [PATCH 5/6] move cache package to pkg (#1632) Signed-off-by: Ryan Leung --- {server => pkg}/cache/cache.go | 0 {server => pkg}/cache/cache_test.go | 0 {server => pkg}/cache/fifo.go | 0 {server => pkg}/cache/lru.go | 0 {server => pkg}/cache/ttl.go | 0 {server => pkg}/cache/two_queue.go | 0 server/checker/merge_checker.go | 2 +- server/schedule/filters.go | 2 +- server/schedule/operator_controller.go | 2 +- server/schedulers/balance_leader.go | 2 +- server/schedulers/utils.go | 2 +- server/statistics/hot_cache.go | 2 +- 12 files changed, 6 insertions(+), 6 deletions(-) rename {server => pkg}/cache/cache.go (100%) rename {server => pkg}/cache/cache_test.go (100%) rename {server => pkg}/cache/fifo.go (100%) rename {server => pkg}/cache/lru.go (100%) rename {server => pkg}/cache/ttl.go (100%) rename {server => pkg}/cache/two_queue.go (100%) diff --git a/server/cache/cache.go b/pkg/cache/cache.go similarity index 100% rename from server/cache/cache.go rename to pkg/cache/cache.go diff --git a/server/cache/cache_test.go b/pkg/cache/cache_test.go similarity index 100% rename from server/cache/cache_test.go rename to pkg/cache/cache_test.go diff --git a/server/cache/fifo.go b/pkg/cache/fifo.go similarity index 100% rename from server/cache/fifo.go rename to pkg/cache/fifo.go diff --git a/server/cache/lru.go b/pkg/cache/lru.go similarity index 100% rename from server/cache/lru.go rename to pkg/cache/lru.go diff --git a/server/cache/ttl.go b/pkg/cache/ttl.go similarity index 100% rename from server/cache/ttl.go rename to pkg/cache/ttl.go diff --git a/server/cache/two_queue.go b/pkg/cache/two_queue.go similarity index 100% rename from server/cache/two_queue.go rename to pkg/cache/two_queue.go diff --git a/server/checker/merge_checker.go b/server/checker/merge_checker.go index b018294ef4d7..d91800fe796c 100644 --- a/server/checker/merge_checker.go +++ b/server/checker/merge_checker.go @@ -17,7 +17,7 @@ import ( "time" log "github.com/pingcap/log" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" diff --git a/server/schedule/filters.go b/server/schedule/filters.go index ac91db1a8062..502ac81e7158 100644 --- a/server/schedule/filters.go +++ b/server/schedule/filters.go @@ -16,7 +16,7 @@ package schedule import ( "fmt" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" ) diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 2e3259cca484..9ada499c8d71 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -26,7 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" log "github.com/pingcap/log" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "go.uber.org/zap" ) diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 62698dc3a695..8ecfbb2e5a88 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -17,7 +17,7 @@ import ( "strconv" log "github.com/pingcap/log" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" "go.uber.org/zap" diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 37ae6ce7c6e1..a21ac5fed5d7 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -17,7 +17,7 @@ import ( "time" "github.com/montanaflynn/stats" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" ) diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index d851cb1b1450..fb3075edd826 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -17,7 +17,7 @@ import ( "math/rand" "time" - "github.com/pingcap/pd/server/cache" + "github.com/pingcap/pd/pkg/cache" "github.com/pingcap/pd/server/core" ) From 75a1f9f3062b2a60b027e889321072c30ec9681b Mon Sep 17 00:00:00 2001 From: Shafreeck Sea Date: Fri, 12 Jul 2019 12:49:14 +0800 Subject: [PATCH 6/6] pdctl: support to specify multiple pd addrs seperated by commas (#1629) * pdctl: support to specify multiple pd addrs seperated by commas Signed-off-by: Shafreeck Sea Closes #1611 Example: $ pd-ctl -u 'http://192.168.10.1:2379,http://192.168.10.2:2379' pd-ctl prefers to stick on a health endpoint until getting some error, then it chooses a next endpoint in a round-robin manner and keeps using it until next error. It should be noticed that if current endpoint can not fail fast, it maybe slow to get the result while the next endpoint is healthy, because we were waiting for the current endpoint to return until it was timeout. --- tools/pd-ctl/pdctl/command/config_command.go | 7 +- tools/pd-ctl/pdctl/command/global.go | 151 ++++++++++++------- tools/pd-ctl/pdctl/command/log_command.go | 8 +- tools/pd-ctl/pdctl/command/member_command.go | 7 +- 4 files changed, 102 insertions(+), 71 deletions(-) diff --git a/tools/pd-ctl/pdctl/command/config_command.go b/tools/pd-ctl/pdctl/command/config_command.go index d7905f5d7308..6cf2505a5758 100644 --- a/tools/pd-ctl/pdctl/command/config_command.go +++ b/tools/pd-ctl/pdctl/command/config_command.go @@ -270,11 +270,8 @@ func postConfigDataWithPath(cmd *cobra.Command, key, value, path string) error { if err != nil { return err } - req, err := getRequest(cmd, path, http.MethodPost, "application/json", bytes.NewBuffer(reqData)) - if err != nil { - return err - } - _, err = dail(req) + _, err = doRequest(cmd, path, http.MethodPost, + WithBody("application/json", bytes.NewBuffer(reqData))) if err != nil { return err } diff --git a/tools/pd-ctl/pdctl/command/global.go b/tools/pd-ctl/pdctl/command/global.go index 2082748fb9e8..733455258fe6 100644 --- a/tools/pd-ctl/pdctl/command/global.go +++ b/tools/pd-ctl/pdctl/command/global.go @@ -55,69 +55,110 @@ func InitHTTPSClient(CAPath, CertPath, KeyPath string) error { return nil } -func getRequest(cmd *cobra.Command, prefix string, method string, bodyType string, body io.Reader) (*http.Request, error) { - if method == "" { - method = http.MethodGet +type bodyOption struct { + contentType string + body io.Reader +} + +// BodyOption sets the type and content of the body +type BodyOption func(*bodyOption) + +// WithBody returns a BodyOption +func WithBody(contentType string, body io.Reader) BodyOption { + return func(bo *bodyOption) { + bo.contentType = contentType + bo.body = body } - url := getAddressFromCmd(cmd, prefix) - req, err := http.NewRequest(method, url, body) - if err != nil { - return nil, err +} +func doRequest(cmd *cobra.Command, prefix string, method string, + opts ...BodyOption) (string, error) { + b := &bodyOption{} + for _, o := range opts { + o(b) } - req.Header.Set("Content-Type", bodyType) - return req, err + var resp string + var err error + tryURLs(cmd, func(endpoint string) error { + url := endpoint + "/" + prefix + if method == "" { + method = http.MethodGet + } + var req *http.Request + req, err = http.NewRequest(method, url, b.body) + if err != nil { + return err + } + if b.contentType != "" { + req.Header.Set("Content-Type", b.contentType) + } + // the resp would be returned by the outer function + resp, err = dail(req) + if err != nil { + return err + } + return nil + }) + return resp, err } func dail(req *http.Request) (string, error) { - var res string - reps, err := dialClient.Do(req) + resp, err := dialClient.Do(req) if err != nil { - return res, err - } - defer reps.Body.Close() - if reps.StatusCode != http.StatusOK { - return res, genResponseError(reps) + return "", err } - - r, err := ioutil.ReadAll(reps.Body) - if err != nil { - return res, err + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + var msg []byte + msg, err = ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + return fmt.Sprintf("[%d] %s", resp.StatusCode, msg), nil } - res = string(r) - return res, nil -} -func doRequest(cmd *cobra.Command, prefix string, method string) (string, error) { - req, err := getRequest(cmd, prefix, method, "", nil) + content, err := ioutil.ReadAll(resp.Body) if err != nil { return "", err } - return dail(req) + return string(content), nil } -func genResponseError(r *http.Response) error { - res, _ := ioutil.ReadAll(r.Body) - return errors.Errorf("[%d] %s", r.StatusCode, res) -} +// DoFunc receives an endpoint which you can issue request to +type DoFunc func(endpoint string) error -func getAddressFromCmd(cmd *cobra.Command, prefix string) string { - p, err := cmd.Flags().GetString("pd") +// tryURLs issues requests to each URL and tries next one if there +// is an error +func tryURLs(cmd *cobra.Command, f DoFunc) { + addrs, err := cmd.Flags().GetString("pd") if err != nil { - cmd.Println("get pd address error, should set flag with '-u'") + cmd.Println("get pd address failed, should set flag with '-u'") os.Exit(1) } + endpoints := strings.Split(addrs, ",") + for _, endpoint := range endpoints { + var u *url.URL + u, err = url.Parse(endpoint) + if err != nil { + cmd.Println("address format is wrong, should like 'http://127.0.0.1:2379' or '127.0.0.1:2379'") + os.Exit(1) + } + // tolerate some schemes that will be used by users, the TiKV SDK + // use 'tikv' as the scheme, it is really confused if we do not + // support it by pdctl + if u.Scheme == "" || u.Scheme == "pd" || u.Scheme == "tikv" { + u.Scheme = "http" + } - if p != "" && !strings.HasPrefix(p, "http") { - p = "http://" + p + endpoint = u.String() + err = f(endpoint) + if err != nil { + continue + } + break } - - u, err := url.Parse(p) if err != nil { - cmd.Println("address format is wrong, should like 'http://127.0.0.1:2379' or '127.0.0.1:2379'") + cmd.Println("after trying all endpoints, no endpoint is available, the last error we met:", err) } - - s := fmt.Sprintf("%s/%s", u, prefix) - return s } func printResponseError(r *http.Response) error { @@ -133,21 +174,23 @@ func postJSON(cmd *cobra.Command, prefix string, input map[string]interface{}) { return } - url := getAddressFromCmd(cmd, prefix) - r, err := dialClient.Post(url, "application/json", bytes.NewBuffer(data)) - if err != nil { - cmd.Println(err) - return - } - defer r.Body.Close() - - if r.StatusCode != http.StatusOK { - if err := printResponseError(r); err != nil { - cmd.Println(err) + tryURLs(cmd, func(endpoint string) error { + url := endpoint + "/" + prefix + r, err := dialClient.Post(url, "application/json", bytes.NewBuffer(data)) + if err != nil { + return err + } + defer r.Body.Close() + if r.StatusCode != http.StatusOK { + if err := printResponseError(r); err != nil { + cmd.Println(err) + } + return nil } - } else { cmd.Println("success!") - } + return nil + }) + } // UsageTemplate will used to generate a help information diff --git a/tools/pd-ctl/pdctl/command/log_command.go b/tools/pd-ctl/pdctl/command/log_command.go index c1e070e8f1e6..86e10b44cce0 100644 --- a/tools/pd-ctl/pdctl/command/log_command.go +++ b/tools/pd-ctl/pdctl/command/log_command.go @@ -47,12 +47,8 @@ func logCommandFunc(cmd *cobra.Command, args []string) { cmd.Printf("Failed to set log level: %s\n", err) return } - req, err := getRequest(cmd, logPrefix, http.MethodPost, "application/json", bytes.NewBuffer(data)) - if err != nil { - cmd.Printf("Failed to set log level: %s\n", err) - return - } - _, err = dail(req) + _, err = doRequest(cmd, logPrefix, http.MethodPost, + WithBody("application/json", bytes.NewBuffer(data))) if err != nil { cmd.Printf("Failed to set log level: %s\n", err) return diff --git a/tools/pd-ctl/pdctl/command/member_command.go b/tools/pd-ctl/pdctl/command/member_command.go index 891e72a012f5..df0d2eb29deb 100644 --- a/tools/pd-ctl/pdctl/command/member_command.go +++ b/tools/pd-ctl/pdctl/command/member_command.go @@ -171,12 +171,7 @@ func setLeaderPriorityFunc(cmd *cobra.Command, args []string) { } data := map[string]interface{}{"leader-priority": priority} reqData, _ := json.Marshal(data) - req, err := getRequest(cmd, prefix, http.MethodPost, "application/json", bytes.NewBuffer(reqData)) - if err != nil { - cmd.Printf("failed to set leader priority: %v\n", err) - return - } - _, err = dail(req) + _, err = doRequest(cmd, prefix, http.MethodPost, WithBody("application/json", bytes.NewBuffer(reqData))) if err != nil { cmd.Printf("failed to set leader priority: %v\n", err) return