Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch committed Sep 25, 2018
1 parent 308cdde commit cb41f3d
Show file tree
Hide file tree
Showing 15 changed files with 445 additions and 194 deletions.
7 changes: 5 additions & 2 deletions pkg/integration_test/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,14 @@ type testCluster struct {
servers map[string]*testServer
}

func newTestCluster(initialServerCount int) (*testCluster, error) {
// ConfigOption is used to define customize settings in test.
type ConfigOption func(conf *server.Config)

func newTestCluster(initialServerCount int, opts ...ConfigOption) (*testCluster, error) {
config := newClusterConfig(initialServerCount)
servers := make(map[string]*testServer)
for _, conf := range config.InitialServers {
serverConf, err := conf.Generate()
serverConf, err := conf.Generate(opts...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/integration_test/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newServerConfig(name string, cc *clusterConfig, join bool) *serverConfig {
}
}

func (c *serverConfig) Generate() (*server.Config, error) {
func (c *serverConfig) Generate(opts ...ConfigOption) (*server.Config, error) {
arguments := []string{
"--name=" + c.Name,
"--data-dir=" + c.DataDir,
Expand All @@ -65,6 +65,9 @@ func (c *serverConfig) Generate() (*server.Config, error) {
if err != nil {
return nil, err
}
for _, opt := range opts {
opt(cfg)
}
return cfg, nil
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/integration_test/region_syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"fmt"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"

"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/core"
)

type idAllocator struct {
id uint64
}

func (alloc *idAllocator) Alloc() uint64 {
alloc.id++
return alloc.id
}

func (s *integrationTestSuite) TestRegionSyncer(c *C) {
c.Parallel()
cluster, err := newTestCluster(3, func(conf *server.Config) { conf.PDServerCfg.EnableRegionStorage = true })
c.Assert(err, IsNil)

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
leaderServer := cluster.GetServer(cluster.GetLeader())
s.bootstrapCluster(leaderServer, c)
rc := leaderServer.server.GetRaftCluster()
c.Assert(rc, NotNil)
regionLen := 110
id := &idAllocator{}
regions := make([]*core.RegionInfo, 0, regionLen)
for i := 0; i < regionLen; i++ {
r := &metapb.Region{
Id: id.Alloc(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: []*metapb.Peer{{Id: id.Alloc(), StoreId: uint64(0)}},
}
regions = append(regions, core.NewRegionInfo(r, r.Peers[0]))
}
for _, region := range regions {
rc.HandleRegionHeartbeat(region)
}
// ensure flush to region kv
time.Sleep(3 * time.Second)
leaderServer.Stop()
leader := cluster.WaitLeader()
fmt.Println("leader:", leader)
leaderServer = cluster.GetServer(cluster.GetLeader())
c.Assert(leaderServer, NotNil)
loadRegions := leaderServer.server.GetRaftCluster().GetRegions()
c.Assert(len(loadRegions), Equals, regionLen)
cluster.Destroy()
}
5 changes: 3 additions & 2 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,9 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
// after restart. Here we only log the error then go on updating cache.
log.Errorf("[region %d] fail to save region %v: %v", region.GetID(), region, err)
}
if c.opt.loadPDServerConfig().EnableRegionStorage {
c.changedRegions <- region
select {
case c.changedRegions <- region:
default:
}
}
if !isWriteUpdate && !isReadUpdate && !saveCache && !isNew {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,7 @@ func mustSaveRegions(c *C, kv *core.KV, n int) []*metapb.Region {
for _, region := range regions {
c.Assert(kv.SaveRegion(region), IsNil)
}
c.Assert(kv.FlushRegion(), IsNil)
c.Assert(kv.Flush(), IsNil)

return regions
}
2 changes: 1 addition & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ func (s SecurityConfig) ToTLSConfig() (*tls.Config, error) {

// PDServerConfig is the configuration for pd server.
type PDServerConfig struct {
// EnableRegionStorage enables the independent region storage.
// EnableRegionStorage enables the independent region storage.
EnableRegionStorage bool `toml:"enable-region-storage" json:"enable-region-storage"`
}

Expand Down
Loading

0 comments on commit cb41f3d

Please sign in to comment.