Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Sep 24, 2018
1 parent d48d655 commit 92da196
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 78 deletions.
27 changes: 5 additions & 22 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/pkg/error_code"
Expand Down Expand Up @@ -58,7 +57,6 @@ type RaftCluster struct {

wg sync.WaitGroup
quit chan struct{}
clients map[string]pdpb.PDClient
regionSyncer *regionSyncer
}

Expand All @@ -73,7 +71,6 @@ func newRaftCluster(s *Server, clusterID uint64) *RaftCluster {
running: false,
clusterID: clusterID,
clusterRoot: s.getClusterRootPath(),
clients: make(map[string]pdpb.PDClient),
regionSyncer: newRegionSyncer(s),
}
}
Expand Down Expand Up @@ -134,36 +131,22 @@ func (c *RaftCluster) runSyncRegions() {
defer c.wg.Done()
var requests []*metapb.Region
// grpc has limit on message size
maxBatchSize := 100
for {
select {
case <-c.quit:
return
case first := <-c.cachedCluster.getChangedRegions():
requests = append(requests, first.GetMeta())
pending := len(c.cachedCluster.getChangedRegions())
for i := 0; i < pending && i < maxBatchSize; i++ {
for i := 0; i < pending && i < maxSyncRegionBatchSize; i++ {
region := <-c.cachedCluster.getChangedRegions()
requests = append(requests, region.GetMeta())
}
msg := &pdpb.MetaRegions{
Count: uint32(len(requests)),
Regions: requests}
data, err := proto.Marshal(msg)
if err != nil {
log.Errorf("Report regions meet error: %s", err)
continue
}
req := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: c.s.clusterID},
Data: data,
}
for _, stream := range c.regionSyncer.streams {
err := stream.Send(req)
if err != nil {
log.Errorf("Report regions meet error: %s", err)
}
regions := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: c.s.clusterID},
Regions: requests,
}
c.regionSyncer.broadcast(regions)
}
requests = requests[:0]
}
Expand Down
2 changes: 1 addition & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func (s SecurityConfig) ToTLSConfig() (*tls.Config, error) {

// PDServerConfig is the configuration for pd server.
type PDServerConfig struct {
// EnableRegionStorage enable the independent region storage.
// EnableRegionStorage enables the independent region storage.
EnableRegionStorage bool `toml:"enable-region-storage" json:"enable-region-storage"`
}

Expand Down
23 changes: 14 additions & 9 deletions server/core/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

const (
Expand All @@ -41,13 +42,13 @@ const (
)

const (
//DefaultSyncRegionTTL is the ttl to sync the regions to kv storage.
DefaultSyncRegionTTL = 3 * time.Second
//DefaultSyncRegionRate is the ttl to sync the regions to kv storage.
DefaultSyncRegionRate = 3 * time.Second
//DefaultBatchSize is the batch size to save the regions to kv storage.
DefaultBatchSize = 100
)

// WithLevelDBKV store the regions information in levelDB.
// WithLevelDBKV stores the regions information in levelDB.
func WithLevelDBKV(path string, batchSize int, ttl time.Duration) func(*KV) {
return func(kv *KV) {
levelDB, err := NewLeveldbKV(path)
Expand All @@ -60,7 +61,7 @@ func WithLevelDBKV(path string, batchSize int, ttl time.Duration) func(*KV) {
kv.ttl = ttl
kv.batchSize = batchSize
kv.flushTime = time.Now().Add(ttl)
kv.doGC()
kv.backgroundFlush()
}
}

Expand Down Expand Up @@ -156,7 +157,6 @@ func (kv *KV) SaveRegion(region *metapb.Region) error {
kv.cacheSize = 0
kv.batchRegions = make(map[string]*metapb.Region, kv.batchSize)
return nil
//return saveProto(kv.regionKV, kv.regionPath(region.GetId()), region)
}
return saveProto(kv.KVBase, kv.regionPath(region.GetId()), region)
}
Expand Down Expand Up @@ -351,10 +351,13 @@ func (kv *KV) Close() error {
return nil
}

func (kv *KV) doGC() {
func (kv *KV) backgroundFlush() {
tick := time.NewTicker(dirtyFlushTick)
defer tick.Stop()
var isFlush bool
var (
isFlush bool
err error
)
go func() {
for {
<-tick.C
Expand All @@ -364,12 +367,14 @@ func (kv *KV) doGC() {
if !isFlush {
continue
}
kv.FlushRegion()
if err = kv.FlushRegion(); err != nil {
log.Info("flush regions error: ", err)
}
}
}()
}

// FlushRegion save the cache region to region kv storage.
// FlushRegion saves the cache region to region kv storage.
func (kv *KV) FlushRegion() error {
if kv.regionKV != nil {
kv.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion server/core/kv_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type KVBase interface {
Delete(key string) error
}

// RegionKV used to save region metadata.
// RegionKV is used to save region metadata.
type RegionKV interface {
KVBase
SaveRegions(regions map[string]*metapb.Region) error
Expand Down
11 changes: 3 additions & 8 deletions server/core/levedb_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type leveldbKV struct {
db *leveldb.DB
}

// NewLeveldbKV to store regions information.
// NewLeveldbKV is used to store regions information.
func NewLeveldbKV(path string) (RegionKV, error) {
db, err := leveldb.OpenFile(path, nil)
if err != nil {
Expand All @@ -42,12 +42,8 @@ func (kv *leveldbKV) Load(key string) (string, error) {
return string(v), err
}

//LoadRange(key, endKey string, limit int) ([]string, error)
//Save(key, value string) error
//Delete(key string) error

func (kv *leveldbKV) LoadRange(key, endKey string, limit int) ([]string, error) {
iter := kv.db.NewIterator(&util.Range{Start: []byte(key), Limit: []byte(endKey)}, nil)
func (kv *leveldbKV) LoadRange(startKey, endKey string, limit int) ([]string, error) {
iter := kv.db.NewIterator(&util.Range{Start: []byte(startKey), Limit: []byte(endKey)}, nil)
values := make([]string, 0, limit)
count := 0
for iter.Next() {
Expand All @@ -58,7 +54,6 @@ func (kv *leveldbKV) LoadRange(key, endKey string, limit int) ([]string, error)
count++
}
iter.Release()
//iter.Error()
return values, nil
}

Expand Down
10 changes: 3 additions & 7 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,13 +623,9 @@ func (s *Server) SyncRegions(stream pdpb.PD_SyncRegionsServer) error {
if request.GetHeader().GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId())
}
switch request.GetTp() {
case 1:
log.Infof("Establish sync region stream with %s [%s]", request.GetMember().GetName(), request.GetMember().GetClientUrls()[0])
if s.cluster.regionSyncer != nil {
s.cluster.regionSyncer.bindStream(request.GetMember().GetName(), stream)
}
default:
log.Infof("Establish sync region stream with %s [%s]", request.GetMember().GetName(), request.GetMember().GetClientUrls()[0])
if s.cluster.regionSyncer != nil {
s.cluster.regionSyncer.bindStream(request.GetMember().GetName(), stream)
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions server/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,8 @@ func (s *Server) watchLeader(leader *pdpb.Member) {
log.Error("reload config failed:", err)
return
}
err = s.cluster.regionSyncer.statSyncerWithLeader(leader.GetClientUrls()[0])
if err != nil {
log.Errorf("Syncer with leader meet with error %s", err)
return
}
defer s.cluster.regionSyncer.stopSyncerWithLeader()
s.cluster.regionSyncer.startSyncWithLeader(leader.GetClientUrls()[0])
defer s.cluster.regionSyncer.stopSyncWithLeader()
for {
rch := watcher.Watch(ctx, s.getLeaderPath())
for wresp := range rch {
Expand Down
68 changes: 48 additions & 20 deletions server/region_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ import (
"context"
"net/url"
"sync"
"time"

"github.com/pingcap/kvproto/pkg/pdpb"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)

const (
msgSize = 8 * 1024 * 1024
maxSyncRegionBatchSize = 100
)

type regionSyncer struct {
sync.Mutex
sync.RWMutex
streams map[string]pdpb.PD_SyncRegionsServer
ctx context.Context
cancel context.CancelFunc
Expand All @@ -45,7 +51,18 @@ func (s *regionSyncer) bindStream(name string, stream pdpb.PD_SyncRegionsServer)
s.Unlock()
}

func (s *regionSyncer) stopSyncerWithLeader() {
func (s *regionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
s.Lock()
for _, sender := range s.streams {
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error:", err)
}
}
s.Unlock()
}

func (s *regionSyncer) stopSyncWithLeader() {
if s.cancel == nil {
return
}
Expand All @@ -54,50 +71,61 @@ func (s *regionSyncer) stopSyncerWithLeader() {
s.wg.Wait()
}

func (s *regionSyncer) statSyncerWithLeader(addr string) error {
func (s *regionSyncer) establish(addr string) (pdpb.PD_SyncRegionsClient, error) {
if s.cancel != nil {
s.stopSyncerWithLeader()
s.stopSyncWithLeader()
}
u, err := url.Parse(addr)
if err != nil {
return err
return nil, err
}

cc, err := grpc.Dial(u.Host, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(8*1024*1024)))
cc, err := grpc.Dial(u.Host, grpc.WithInsecure(), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(msgSize)))
if err != nil {
return err
return nil, err
}

s.ctx, s.cancel = context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
client, err := pdpb.NewPDClient(cc).SyncRegions(s.ctx)
if err != nil {
return err
cancel()
return nil, err
}
err = client.Send(&pdpb.SyncRegionRequest{
Header: &pdpb.RequestHeader{ClusterId: s.server.clusterID},
Member: s.server.member,
Tp: 1,
})
if err != nil {
return err
cancel()
return nil, err
}
s.ctx, s.cancel = ctx, cancel
return client, nil
}

log.Infof("%s start sync with leader %s", s.server.member.GetName(), s.server.GetLeader().GetName())
func (s *regionSyncer) startSyncWithLeader(addr string) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
resp, err := client.Recv()
// establish client
client, err := s.establish(addr)
if err != nil {
log.Error("region sync with leader meet error:", err)
return
log.Errorf("%s failed to establish sync stream with leader %s: %s", s.server.member.GetName(), s.server.GetLeader().GetName(), err)
time.Sleep(time.Second)
continue
}
metas := &pdpb.MetaRegions{}
metas.Unmarshal(resp.Data)
for i := uint32(0); i < metas.GetCount(); i++ {
s.server.kv.SaveRegion(metas.Regions[i])
log.Infof("%s start sync with leader %s", s.server.member.GetName(), s.server.GetLeader().GetName())
for {
resp, err := client.Recv()
if err != nil {
log.Error("region sync with leader meet error:", err)
break
}
for _, r := range resp.GetRegions() {
s.server.kv.SaveRegion(r)
}
}
}
}()
return nil
}
14 changes: 10 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ func (s *Server) startServer() error {

s.idAlloc = &idAllocator{s: s}
kvBase := newEtcdKVBase(s)
path := filepath.Join(s.cfg.DataDir, "leveldb")
s.kv = core.NewKV(kvBase, core.WithLevelDBKV(path, core.DefaultBatchSize, core.DefaultSyncRegionTTL))
path := filepath.Join(s.cfg.DataDir, "region-meta")
s.kv = core.NewKV(kvBase, core.WithLevelDBKV(path, core.DefaultBatchSize, core.DefaultSyncRegionRate))
s.cluster = newRaftCluster(s, s.clusterID)
s.hbStreams = newHeartbeatStreams(s.clusterID)
if s.classifier, err = namespace.CreateClassifier(s.cfg.NamespaceClassifier, s.kv, s.idAlloc); err != nil {
Expand Down Expand Up @@ -393,8 +393,14 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe
}

log.Infof("bootstrap cluster %d ok", clusterID)
s.kv.SaveRegion(req.GetRegion())
s.kv.FlushRegion()
err = s.kv.SaveRegion(req.GetRegion())
if err != nil {
log.Warnf("save the bootstrap region faild: %s", err)
}
err = s.kv.FlushRegion()
if err != nil {
log.Warnf("flush the bootstrap region faild: %s", err)
}
if err := s.cluster.start(); err != nil {
return nil, err
}
Expand Down

0 comments on commit 92da196

Please sign in to comment.