diff --git a/cmd/simulator/case1.toml b/cmd/simulator/case1.toml deleted file mode 100644 index 3b20bad3a14..00000000000 --- a/cmd/simulator/case1.toml +++ /dev/null @@ -1,2 +0,0 @@ -node-number = 4 -region-number = 20 diff --git a/cmd/simulator/main.go b/cmd/simulator/main.go index 33e8d4ee2f4..b0a249780b3 100644 --- a/cmd/simulator/main.go +++ b/cmd/simulator/main.go @@ -14,6 +14,7 @@ package main import ( + "flag" "os" "os/signal" "syscall" @@ -30,13 +31,17 @@ import ( _ "github.com/pingcap/pd/table" ) +var confName = flag.String("conf", "", "config name") + func main() { + flag.Parse() + _, local, clean := NewSingleServer() err := local.Run() if err != nil { log.Fatal("run server error:", err) } - driver := faketikv.NewDriver(local.GetAddr()) + driver := faketikv.NewDriver(local.GetAddr(), *confName) err = driver.Prepare() if err != nil { log.Fatal("simulator prepare error:", err) @@ -49,16 +54,26 @@ func main() { syscall.SIGTERM, syscall.SIGQUIT) + simResult := "FAIL" + +EXIT: for { select { case <-tick.C: driver.Tick() + if driver.Check() { + simResult = "OK" + break EXIT + } case <-sc: - driver.Stop() - clean() - return + break EXIT } } + + driver.Stop() + clean() + + log.Infof("Simulation finish. Conf: %s, TotalTick: %d, Result: %s", *confName, driver.TickCount(), simResult) } // NewSingleServer creates a pd server for simulator. diff --git a/pkg/faketikv/case/case1.toml b/pkg/faketikv/case/case1.toml deleted file mode 100644 index 3b20bad3a14..00000000000 --- a/pkg/faketikv/case/case1.toml +++ /dev/null @@ -1,2 +0,0 @@ -node-number = 4 -region-number = 20 diff --git a/pkg/faketikv/cases/balance_leader.go b/pkg/faketikv/cases/balance_leader.go new file mode 100644 index 00000000000..2ab379180a3 --- /dev/null +++ b/pkg/faketikv/cases/balance_leader.go @@ -0,0 +1,62 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/core" + log "github.com/sirupsen/logrus" +) + +func newBalanceLeader() *Conf { + var conf Conf + + for i := 1; i <= 3; i++ { + conf.Stores = append(conf.Stores, Store{ + ID: uint64(i), + Status: metapb.StoreState_Up, + Capacity: 10 * gb, + Available: 9 * gb, + }) + } + + var id idAllocator + id.setMaxID(3) + for i := 0; i < 1000; i++ { + peers := []*metapb.Peer{ + {Id: id.nextID(), StoreId: 1}, + {Id: id.nextID(), StoreId: 2}, + {Id: id.nextID(), StoreId: 3}, + } + conf.Regions = append(conf.Regions, Region{ + ID: id.nextID(), + Peers: peers, + Leader: peers[0], + Size: 96 * mb, + }) + } + conf.MaxID = id.maxID + + conf.Checker = func(regions *core.RegionsInfo) bool { + count1 := regions.GetStoreLeaderCount(1) + count2 := regions.GetStoreLeaderCount(2) + count3 := regions.GetStoreLeaderCount(3) + log.Infof("leader counts: %v %v %v", count1, count2, count3) + + return count1 <= 350 && + count2 >= 300 && + count3 >= 300 + } + return &conf +} diff --git a/pkg/faketikv/cases/cases.go b/pkg/faketikv/cases/cases.go new file mode 100644 index 00000000000..32b658ad53a --- /dev/null +++ b/pkg/faketikv/cases/cases.go @@ -0,0 +1,78 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package cases + +import ( + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/server/core" +) + +// Store is the config to simulate tikv. +type Store struct { + ID uint64 + Status metapb.StoreState + Labels []metapb.StoreLabel + Capacity uint64 + Available uint64 + LeaderWeight float32 + RegionWeight float32 +} + +// Region is the config to simulate a region. +type Region struct { + ID uint64 + Peers []*metapb.Peer + Leader *metapb.Peer + Size int64 +} + +// Conf represents a test suite for simulator. +type Conf struct { + Stores []Store + Regions []Region + MaxID uint64 + + Checker func(*core.RegionsInfo) bool // To check the schedule is finished. +} + +const ( + kb = 1024 + mb = kb * 1024 + gb = mb * 1024 +) + +type idAllocator struct { + maxID uint64 +} + +func (a *idAllocator) nextID() uint64 { + a.maxID++ + return a.maxID +} + +func (a *idAllocator) setMaxID(id uint64) { + a.maxID = id +} + +var confMap = map[string]func() *Conf{ + "balance-leader": newBalanceLeader, +} + +// NewConf creates a config to initialize simulator cluster. +func NewConf(name string) *Conf { + if f, ok := confMap[name]; ok { + return f() + } + return nil +} diff --git a/pkg/faketikv/cluster.go b/pkg/faketikv/cluster.go index 343d57b3df5..ec168b0f9b6 100644 --- a/pkg/faketikv/cluster.go +++ b/pkg/faketikv/cluster.go @@ -14,7 +14,13 @@ package faketikv import ( + "fmt" + "math/rand" + "sort" + + "github.com/juju/errors" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/pd/pkg/faketikv/cases" "github.com/pingcap/pd/server/core" log "github.com/sirupsen/logrus" ) @@ -22,15 +28,63 @@ import ( // ClusterInfo records all cluster information. type ClusterInfo struct { *core.RegionsInfo - Nodes map[uint64]*Node - firstRegion *core.RegionInfo + Nodes map[uint64]*Node +} + +// NewClusterInfo creates the initialized cluster with config. +func NewClusterInfo(pdAddr string, conf *cases.Conf) (*ClusterInfo, error) { + cluster := &ClusterInfo{ + RegionsInfo: core.NewRegionsInfo(), + Nodes: make(map[uint64]*Node), + } + + for _, store := range conf.Stores { + node, err := NewNode(store.ID, fmt.Sprintf("mock:://tikv-%d", store.ID), pdAddr) + if err != nil { + return nil, errors.Trace(err) + } + node.clusterInfo = cluster + cluster.Nodes[store.ID] = node + } + + splitKeys := generateKeys(len(conf.Regions) - 1) + for i, region := range conf.Regions { + meta := &metapb.Region{ + Id: region.ID, + Peers: region.Peers, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1}, + } + if i > 0 { + meta.StartKey = []byte(splitKeys[i-1]) + } + if i < len(conf.Regions)-1 { + meta.EndKey = []byte(splitKeys[i]) + } + regionInfo := core.NewRegionInfo(meta, region.Leader) + regionInfo.ApproximateSize = region.Size + cluster.RegionsInfo.SetRegion(regionInfo) + } + + return cluster, nil } -// GetBootstrapInfo returns first region and its leader store. -func (c *ClusterInfo) GetBootstrapInfo() (*metapb.Store, *metapb.Region) { - storeID := c.firstRegion.Leader.GetStoreId() - store := c.Nodes[storeID] - return store.Store, c.firstRegion.Region +// GetBootstrapInfo returns a valid bootstrap store and region. +func (c *ClusterInfo) GetBootstrapInfo() (*metapb.Store, *metapb.Region, error) { + region := c.RegionsInfo.RandRegion() + if region == nil { + return nil, nil, errors.New("no region found for bootstrap") + } + if region.Leader == nil { + return nil, nil, errors.New("bootstrap region has no leader") + } + store := c.Nodes[region.Leader.GetStoreId()] + if store == nil { + return nil, nil, errors.Errorf("bootstrap store %v not found", region.Leader.GetStoreId()) + } + region.StartKey, region.EndKey = []byte(""), []byte("") + region.RegionEpoch = &metapb.RegionEpoch{} + region.Peers = []*metapb.Peer{region.Leader} + return store.Store, region.Region, nil } func (c *ClusterInfo) nodeHealth(storeID uint64) bool { @@ -42,7 +96,7 @@ func (c *ClusterInfo) nodeHealth(storeID uint64) bool { return n.GetState() == Up } -func (c ClusterInfo) electNewLeader(region *core.RegionInfo) *metapb.Peer { +func (c *ClusterInfo) electNewLeader(region *core.RegionInfo) *metapb.Peer { var ( unhealth int newLeaderStoreID uint64 @@ -103,3 +157,28 @@ func (c *ClusterInfo) AddTask(task Task) { n.AddTask(task) } } + +const ( + // 26^10 ~= 1.4e+14, should be enough. + keyChars = "abcdefghijklmnopqrstuvwxyz" + keyLen = 10 +) + +// generate ordered, unique strings. +func generateKeys(size int) []string { + m := make(map[string]struct{}, size) + for len(m) < size { + k := make([]byte, keyLen) + for i := range k { + k[i] = keyChars[rand.Intn(len(keyChars))] + } + m[string(k)] = struct{}{} + } + + v := make([]string, 0, size) + for k := range m { + v = append(v, k) + } + sort.Sort(sort.StringSlice(v)) + return v +} diff --git a/pkg/faketikv/drive.go b/pkg/faketikv/drive.go index 2b2f2af2412..fd1df941564 100644 --- a/pkg/faketikv/drive.go +++ b/pkg/faketikv/drive.go @@ -17,39 +17,70 @@ import ( "context" "fmt" + "github.com/juju/errors" + "github.com/pingcap/pd/pkg/faketikv/cases" log "github.com/sirupsen/logrus" ) // Driver promotes the cluster status change. type Driver struct { - clusterInfo *ClusterInfo addr string + confName string + conf *cases.Conf + clusterInfo *ClusterInfo client Client + tickCount int64 } // NewDriver returns a driver. -func NewDriver(addr string) *Driver { - return &Driver{addr: addr} +func NewDriver(addr string, confName string) *Driver { + return &Driver{ + addr: addr, + confName: confName, + } } // Prepare initializes cluster information, bootstraps cluster and starts nodes. -func (c *Driver) Prepare() error { - initCase := NewTiltCase() - // TODO: initialize accoring config - clusterInfo := initCase.Init(c.addr, "./case1.toml") - c.clusterInfo = clusterInfo - store, region := clusterInfo.GetBootstrapInfo() - c.client = clusterInfo.Nodes[store.GetId()].client +func (d *Driver) Prepare() error { + d.conf = cases.NewConf(d.confName) + if d.conf == nil { + return errors.Errorf("failed to create conf %s", d.confName) + } + + clusterInfo, err := NewClusterInfo(d.addr, d.conf) + if err != nil { + return errors.Trace(err) + } + d.clusterInfo = clusterInfo + + // Bootstrap. + store, region, err := clusterInfo.GetBootstrapInfo() + if err != nil { + return errors.Trace(err) + } + d.client = clusterInfo.Nodes[store.GetId()].client ctx, cancel := context.WithTimeout(context.Background(), pdTimeout) - err := c.client.Bootstrap(ctx, store, region) + err = d.client.Bootstrap(ctx, store, region) cancel() if err != nil { log.Fatal("bootstrapped error: ", err) } else { log.Info("Bootstrap sucess") } - for _, n := range c.clusterInfo.Nodes { + + // Setup alloc id. + for { + id, err := d.client.AllocID(context.Background()) + if err != nil { + return errors.Trace(err) + } + if id > d.conf.MaxID { + break + } + } + + for _, n := range d.clusterInfo.Nodes { err := n.Start() if err != nil { return err @@ -59,23 +90,34 @@ func (c *Driver) Prepare() error { } // Tick invokes nodes' Tick. -func (c *Driver) Tick() { - for _, n := range c.clusterInfo.Nodes { +func (d *Driver) Tick() { + d.tickCount++ + for _, n := range d.clusterInfo.Nodes { n.Tick() } } +// Check checks if the simulation is completed. +func (d *Driver) Check() bool { + return d.conf.Checker(d.clusterInfo.RegionsInfo) +} + // Stop stops all nodes. -func (c *Driver) Stop() { - for _, n := range c.clusterInfo.Nodes { +func (d *Driver) Stop() { + for _, n := range d.clusterInfo.Nodes { n.Stop() } } +// TickCount returns the simulation's tick count. +func (d *Driver) TickCount() int64 { + return d.tickCount +} + // AddNode adds new node. -func (c *Driver) AddNode() { - id, err := c.client.AllocID(context.Background()) - n, err := NewNode(id, fmt.Sprintf("mock://tikv-%d", id), c.addr) +func (d *Driver) AddNode() { + id, err := d.client.AllocID(context.Background()) + n, err := NewNode(id, fmt.Sprintf("mock://tikv-%d", id), d.addr) if err != nil { log.Info("Add node failed:", err) return @@ -85,9 +127,9 @@ func (c *Driver) AddNode() { log.Info("Start node failed:", err) return } - n.clusterInfo = c.clusterInfo - c.clusterInfo.Nodes[n.Id] = n + n.clusterInfo = d.clusterInfo + d.clusterInfo.Nodes[n.Id] = n } // DeleteNode deletes a node. -func (c *Driver) DeleteNode() {} +func (d *Driver) DeleteNode() {} diff --git a/pkg/faketikv/initializer.go b/pkg/faketikv/initializer.go deleted file mode 100644 index 0493e16d277..00000000000 --- a/pkg/faketikv/initializer.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2017 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package faketikv - -import ( - "context" - "fmt" - - "github.com/BurntSushi/toml" - "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/pd/server/core" - log "github.com/sirupsen/logrus" -) - -type localAlloc struct { - id uint64 -} - -func (l *localAlloc) AllocID() (uint64, error) { - l.id++ - return l.id, nil -} - -// Initializer defines an Init interface. -// we can implement different case to initialize cluster. -type Initializer interface { - Init(args ...string) *ClusterInfo -} - -// TiltCase will initailize cluster with all regions distributed in 3 node. -type TiltCase struct { - NodeNumber int `toml:"node-number" json:"node-number"` - RegionNumber int `toml:"region-number" json:"region-number"` - alloc *localAlloc -} - -// NewTiltCase returns tiltCase. -func NewTiltCase() *TiltCase { - return &TiltCase{alloc: &localAlloc{}} -} - -// Init implement Initializer. -func (c *TiltCase) Init(addr string, args ...string) *ClusterInfo { - path := args[0] - err := c.parser(path) - if err != nil { - log.Fatal("initalize failed: ", err) - } - nodes := make(map[uint64]*Node) - regions := core.NewRegionsInfo() - var ids []uint64 - for i := 0; i < c.NodeNumber; i++ { - id, err1 := c.alloc.AllocID() - if err1 != nil { - log.Fatal("alloc failed", err1) - } - node, err1 := NewNode(id, fmt.Sprintf("mock://tikv-%d", id), addr) - if err != nil { - log.Fatal("New node failed", err1) - } - nodes[id] = node - if len(ids) < 3 { - ids = append(ids, id) - } - } - var firstRegion *core.RegionInfo - for i := 0; i < c.RegionNumber; i++ { - start := i * 1000 - region := c.genRegion(ids, start) - regions.SetRegion(region) - if i == 0 { - firstRegion = region.Clone() - firstRegion.StartKey = []byte("") - firstRegion.EndKey = []byte("") - firstRegion.Peers = firstRegion.Peers[:1] - } - } - // TODO: remove this - client := nodes[firstRegion.Leader.GetStoreId()].client - for i := 0; i < c.NodeNumber+c.RegionNumber+10; i++ { - _, err = client.AllocID(context.Background()) - if err != nil { - log.Fatal("initalize failed when alloc ID: ", err) - } - } - - cluster := &ClusterInfo{ - regions, - nodes, - firstRegion, - } - for _, n := range nodes { - n.clusterInfo = cluster - } - return cluster -} - -func (c *TiltCase) parser(path string) error { - _, err := toml.DecodeFile(path, c) - return err -} - -func (c *TiltCase) genRegion(ids []uint64, start int) *core.RegionInfo { - if len(ids) == 0 { - return nil - } - regionID, _ := c.alloc.AllocID() - peers := make([]*metapb.Peer, 0, len(ids)) - for _, storeID := range ids { - id, err := c.alloc.AllocID() - if err != nil { - log.Fatal("initalize failed when alloc ID: ", err) - } - peer := &metapb.Peer{ - Id: id, - StoreId: storeID, - } - peers = append(peers, peer) - } - regionMeta := &metapb.Region{ - Id: regionID, - StartKey: []byte(fmt.Sprintf("zt_%020d", start)), - EndKey: []byte(fmt.Sprintf("zt_%020d", start+1000)), - Peers: peers, - RegionEpoch: &metapb.RegionEpoch{}, - } - region := core.NewRegionInfo(regionMeta, peers[0]) - region.ApproximateSize = 96 * 1000 * 1000 - return region -}