Skip to content

Commit

Permalink
Merge branch 'master' into fix-log-master
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Jul 15, 2019
2 parents e5d44e8 + 75a1f9f commit 3f03d91
Show file tree
Hide file tree
Showing 56 changed files with 1,274 additions and 804 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ simulator: export GO111MODULE=on
simulator:
CGO_ENABLED=0 go build -o bin/pd-simulator tools/pd-simulator/main.go

regions-dump: export GO111MODULE=on
regions-dump:
CGO_ENABLED=0 go build -o bin/regions-dump tools/regions-dump/main.go

clean-test:
rm -rf /tmp/test_pd*
rm -rf /tmp/pd-tests*
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion server/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"time"

log "github.com/pingcap/log"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/pkg/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pingcap/pd/server/schedule"
Expand Down
6 changes: 3 additions & 3 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *RaftCluster) isInitialized() bool {
// yet.
func (c *RaftCluster) loadBootstrapTime() (time.Time, error) {
var t time.Time
data, err := c.s.kv.Load(c.s.kv.ClusterStatePath("raft_bootstrap_time"))
data, err := c.s.storage.Load(c.s.storage.ClusterStatePath("raft_bootstrap_time"))
if err != nil {
return t, err
}
Expand All @@ -128,7 +128,7 @@ func (c *RaftCluster) start() error {
return nil
}

cluster, err := loadClusterInfo(c.s.idAlloc, c.s.kv, c.s.scheduleOpt)
cluster, err := loadClusterInfo(c.s.idAlloc, c.s.storage, c.s.scheduleOpt)
if err != nil {
return err
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func (c *RaftCluster) SetStoreWeight(storeID uint64, leaderWeight, regionWeight
return core.NewStoreNotFoundErr(storeID)
}

if err := c.s.kv.SaveStoreWeight(storeID, leaderWeight, regionWeight); err != nil {
if err := c.s.storage.SaveStoreWeight(storeID, leaderWeight, regionWeight); err != nil {
return err
}

Expand Down
50 changes: 25 additions & 25 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type clusterInfo struct {
sync.RWMutex
core *core.BasicCluster
id core.IDAllocator
kv *core.KV
storage *core.Storage
meta *metapb.Cluster
opt *scheduleOption
regionStats *statistics.RegionStatistics
Expand All @@ -45,12 +45,12 @@ type clusterInfo struct {

var defaultChangedRegionsLimit = 10000

func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clusterInfo {
func newClusterInfo(id core.IDAllocator, opt *scheduleOption, storage *core.Storage) *clusterInfo {
return &clusterInfo{
core: core.NewBasicCluster(),
id: id,
opt: opt,
kv: kv,
storage: storage,
labelLevelStats: statistics.NewLabelLevelStatistics(),
storesStats: statistics.NewStoresStats(),
prepareChecker: newPrepareChecker(),
Expand All @@ -60,11 +60,11 @@ func newClusterInfo(id core.IDAllocator, opt *scheduleOption, kv *core.KV) *clus
}

// Return nil if cluster is not bootstrapped.
func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*clusterInfo, error) {
c := newClusterInfo(id, opt, kv)
func loadClusterInfo(id core.IDAllocator, storage *core.Storage, opt *scheduleOption) (*clusterInfo, error) {
c := newClusterInfo(id, opt, storage)

c.meta = &metapb.Cluster{}
ok, err := kv.LoadMeta(c.meta)
ok, err := storage.LoadMeta(c.meta)
if err != nil {
return nil, err
}
Expand All @@ -73,7 +73,7 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl
}

start := time.Now()
if err := kv.LoadStores(c.core.Stores); err != nil {
if err := storage.LoadStores(c.core.Stores); err != nil {
return nil, err
}
log.Info("load stores",
Expand All @@ -82,7 +82,7 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl
)

start = time.Now()
if err := kv.LoadRegions(c.core.Regions); err != nil {
if err := storage.LoadRegions(c.core.Regions); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down Expand Up @@ -117,7 +117,7 @@ func (c *clusterInfo) OnStoreVersionChange() {
// it will update the cluster version.
if clusterVersion.LessThan(*minVersion) {
c.opt.SetClusterVersion(*minVersion)
err := c.opt.persist(c.kv)
err := c.opt.persist(c.storage)
if err != nil {
log.Error("persist cluster version meet error", zap.Error(err))
}
Expand Down Expand Up @@ -176,8 +176,8 @@ func (c *clusterInfo) putMeta(meta *metapb.Cluster) error {
}

func (c *clusterInfo) putMetaLocked(meta *metapb.Cluster) error {
if c.kv != nil {
if err := c.kv.SaveMeta(meta); err != nil {
if c.storage != nil {
if err := c.storage.SaveMeta(meta); err != nil {
return err
}
}
Expand All @@ -199,8 +199,8 @@ func (c *clusterInfo) putStore(store *core.StoreInfo) error {
}

func (c *clusterInfo) putStoreLocked(store *core.StoreInfo) error {
if c.kv != nil {
if err := c.kv.SaveStore(store.GetMeta()); err != nil {
if c.storage != nil {
if err := c.storage.SaveStore(store.GetMeta()); err != nil {
return err
}
}
Expand All @@ -216,8 +216,8 @@ func (c *clusterInfo) deleteStore(store *core.StoreInfo) error {
}

func (c *clusterInfo) deleteStoreLocked(store *core.StoreInfo) error {
if c.kv != nil {
if err := c.kv.DeleteStore(store.GetMeta()); err != nil {
if c.storage != nil {
if err := c.storage.DeleteStore(store.GetMeta()); err != nil {
return err
}
}
Expand Down Expand Up @@ -348,8 +348,8 @@ func (c *clusterInfo) putRegion(region *core.RegionInfo) error {
}

func (c *clusterInfo) putRegionLocked(region *core.RegionInfo) error {
if c.kv != nil {
if err := c.kv.SaveRegion(region.GetMeta()); err != nil {
if c.storage != nil {
if err := c.storage.SaveRegion(region.GetMeta()); err != nil {
return err
}
}
Expand Down Expand Up @@ -525,7 +525,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
isReadUpdate, readItem := c.CheckReadStatus(region)
c.RUnlock()

// Save to KV if meta is updated.
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
var saveKV, saveCache, isNew bool
Expand Down Expand Up @@ -589,11 +589,11 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
}
}

if saveKV && c.kv != nil {
if err := c.kv.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to kv is not fatal, it only leads to longer warm-up
if saveKV && c.storage != nil {
if err := c.storage.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to storage is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Error("fail to save region to kv",
log.Error("fail to save region to storage",
zap.Uint64("region-id", region.GetID()),
zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta())),
zap.Error(err))
Expand All @@ -615,10 +615,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {

if saveCache {
overlaps := c.core.Regions.SetRegion(region)
if c.kv != nil {
if c.storage != nil {
for _, item := range overlaps {
if err := c.kv.DeleteRegion(item); err != nil {
log.Error("fail to delete region from kv",
if err := c.storage.DeleteRegion(item); err != nil {
log.Error("fail to delete region from storage",
zap.Uint64("region-id", item.GetId()),
zap.Reflect("region-meta", core.HexRegionMeta(item)),
zap.Error(err))
Expand Down
Loading

0 comments on commit 3f03d91

Please sign in to comment.