Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: some picks to 3.0 #1663

Merged
merged 4 commits into from
Aug 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type testOperatorSuite struct {
}

func (s *testOperatorSuite) SetUpSuite(c *C) {
s.svr, s.cleanup = mustNewServer(c)
s.svr, s.cleanup = mustNewServer(c, func(cfg *server.Config) { cfg.Replication.MaxReplicas = 1 })
mustWaitLeader(c, []*server.Server{s.svr})

addr := s.svr.GetAddr()
Expand Down Expand Up @@ -104,7 +104,32 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) {
c.Assert(err, NotNil)
err = postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [1, 2, 3]}`))
c.Assert(err, NotNil)
}

func (s *testOperatorSuite) TestMergeRegionOperator(c *C) {
r1 := newTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1))
mustRegionHeartbeat(c, s.svr, r1)
r2 := newTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3))
mustRegionHeartbeat(c, s.svr, r2)
r3 := newTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2))
mustRegionHeartbeat(c, s.svr, r3)

err := postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`))
c.Assert(err, IsNil)

s.svr.GetHandler().RemoveOperator(10)
s.svr.GetHandler().RemoveOperator(20)
err = postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 20, "target_region_id": 10}`))
c.Assert(err, IsNil)
s.svr.GetHandler().RemoveOperator(10)
s.svr.GetHandler().RemoveOperator(20)
err = postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 30}`))
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "not adjacent"), IsTrue)
err = postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 30, "target_region_id": 10}`))

c.Assert(strings.Contains(err.Error(), "not adjacent"), IsTrue)
c.Assert(err, NotNil)
}

func mustPutStore(c *C, svr *server.Server, id uint64, state metapb.StoreState, labels []*metapb.StoreLabel) {
Expand Down
24 changes: 22 additions & 2 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,20 +269,40 @@ func (s *testGetRegionSuite) TestRegionKey(c *C) {
func (s *testGetRegionSuite) TestScanRegionByKey(c *C) {
r1 := newTestRegionInfo(2, 1, []byte("a"), []byte("b"))
r2 := newTestRegionInfo(3, 1, []byte("b"), []byte("c"))
r3 := newTestRegionInfo(4, 2, []byte("c"), []byte("d"))
r3 := newTestRegionInfo(4, 2, []byte("c"), []byte("e"))
r4 := newTestRegionInfo(5, 2, []byte("x"), []byte("z"))
r := newTestRegionInfo(99, 1, []byte{0xFF, 0xFF, 0xAA}, []byte{0xFF, 0xFF, 0xCC}, core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2))
mustRegionHeartbeat(c, s.svr, r1)
mustRegionHeartbeat(c, s.svr, r2)
mustRegionHeartbeat(c, s.svr, r3)
mustRegionHeartbeat(c, s.svr, r4)
mustRegionHeartbeat(c, s.svr, r)

url := fmt.Sprintf("%s/regions/key?key=%s", s.urlPrefix, "b")
regionIds := []uint64{3, 4, 99}
regionIds := []uint64{3, 4, 5, 99}
regions := &RegionsInfo{}
err := readJSONWithURL(url, regions)
c.Assert(err, IsNil)
c.Assert(len(regionIds), Equals, regions.Count)
for i, v := range regionIds {
c.Assert(v, Equals, regions.Regions[i].ID)
}
url = fmt.Sprintf("%s/regions/key?key=%s", s.urlPrefix, "d")
regionIds = []uint64{4, 5, 99}
regions = &RegionsInfo{}
err = readJSONWithURL(url, regions)
c.Assert(err, IsNil)
c.Assert(len(regionIds), Equals, regions.Count)
for i, v := range regionIds {
c.Assert(v, Equals, regions.Regions[i].ID)
}
url = fmt.Sprintf("%s/regions/key?key=%s", s.urlPrefix, "g")
regionIds = []uint64{5, 99}
regions = &RegionsInfo{}
err = readJSONWithURL(url, regions)
c.Assert(err, IsNil)
c.Assert(len(regionIds), Equals, regions.Count)
for i, v := range regionIds {
c.Assert(v, Equals, regions.Regions[i].ID)
}
}
11 changes: 10 additions & 1 deletion server/api/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ func (s *testStatsSuite) TestRegionStats(c *C) {
c.Assert(err, IsNil)
c.Assert(stats, DeepEquals, statsAll)

args := fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("\x01\x02"), url.QueryEscape("xyz\x00\x00"))
res, err = http.Get(statsURL + args)
c.Assert(err, IsNil)
stats = &statistics.RegionStats{}
err = apiutil.ReadJSON(res.Body, stats)
c.Assert(err, IsNil)
c.Assert(stats, DeepEquals, statsAll)

stats23 := &statistics.RegionStats{
Count: 2,
EmptyCount: 1,
Expand All @@ -159,7 +167,8 @@ func (s *testStatsSuite) TestRegionStats(c *C) {
StorePeerSize: map[uint64]int64{1: 201, 4: 200, 5: 201},
StorePeerKeys: map[uint64]int64{1: 151, 4: 150, 5: 151},
}
args := fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("\x01\x02"), url.QueryEscape("xyz\x00\x00"))

args = fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("a"), url.QueryEscape("x"))
res, err = http.Get(statsURL + args)
c.Assert(err, IsNil)
stats = &statistics.RegionStats{}
Expand Down
10 changes: 6 additions & 4 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,8 @@ func (r *RegionsInfo) GetFollower(storeID uint64, regionID uint64) *RegionInfo {
return r.followers[storeID].Get(regionID)
}

// ScanRange scans region with start key, until number greater than limit.
// ScanRange scans from the first region containing or behind start key,
// until number greater than limit.
func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo {
res := make([]*RegionInfo, 0, limit)
r.tree.scanRange(startKey, func(metaRegion *metapb.Region) bool {
Expand All @@ -703,11 +704,11 @@ func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo {
return res
}

// ScanRangeWithEndKey scans region with start key and end key.
// ScanRangeWithEndKey scans regions intersecting [start key, end key).
func (r *RegionsInfo) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo {
var regions []*RegionInfo
r.tree.scanRange(startKey, func(meta *metapb.Region) bool {
if len(endKey) > 0 && (len(meta.EndKey) == 0 || bytes.Compare(meta.EndKey, endKey) >= 0) {
if len(endKey) > 0 && bytes.Compare(meta.StartKey, endKey) >= 0 {
return false
}
if region := r.GetRegion(meta.GetId()); region != nil {
Expand All @@ -718,7 +719,8 @@ func (r *RegionsInfo) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo
return regions
}

// ScanRangeWithIterator scans region with start key, until iterator returns false.
// ScanRangeWithIterator scans from the first region containing or behind start key,
// until iterator returns false.
func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(metaRegion *metapb.Region) bool) {
r.tree.scanRange(startKey, iterator)
}
Expand Down
9 changes: 8 additions & 1 deletion server/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,15 @@ func (t *regionTree) find(region *metapb.Region) *regionItem {
return result
}

// scanRage scans from the first region containing or behind the start key
// until f return false
func (t *regionTree) scanRange(startKey []byte, f func(*metapb.Region) bool) {
startItem := &regionItem{region: &metapb.Region{StartKey: startKey}}
region := &metapb.Region{StartKey: startKey}
// find if there is a region with key range [s, d), s < startKey < d
startItem := t.find(region)
if startItem == nil {
startItem = &regionItem{region: &metapb.Region{StartKey: startKey}}
}
t.tree.AscendGreaterOrEqual(startItem, func(item btree.Item) bool {
return f(item.(*regionItem).region)
})
Expand Down
4 changes: 2 additions & 2 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,8 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error
}

// for the case first region (start key is nil) with the last region (end key is nil) but not adjacent
if (bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) &&
(bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) {
if (!bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) &&
(!bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) {
return ErrRegionNotAdjacent
}

Expand Down
64 changes: 48 additions & 16 deletions server/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"os"
"path"
"strings"
"time"

"github.com/pingcap/failpoint"
log "github.com/pingcap/log"
"github.com/pingcap/pd/pkg/etcdutil"
"github.com/pkg/errors"
Expand All @@ -35,6 +37,9 @@ const (
privateDirMode = 0700
)

// listMemberRetryTimes is the retry times of list member.
var listMemberRetryTimes = 20

// PrepareJoinCluster sends MemberAdd command to PD cluster,
// and returns the initial configuration of the PD cluster.
//
Expand Down Expand Up @@ -137,31 +142,58 @@ func PrepareJoinCluster(cfg *Config) error {
return errors.New("missing data or join a duplicated pd")
}

var addResp *clientv3.MemberAddResponse

failpoint.Inject("add-member-failed", func() {
listMemberRetryTimes = 2
failpoint.Goto("LabelSkipAddMember")
})
// - A new PD joins an existing cluster.
// - A deleted PD joins to previous cluster.
addResp, err := etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls})
if err != nil {
return err
{
// First adds member through the API
addResp, err = etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls})
if err != nil {
return err
}
}
failpoint.Label("LabelSkipAddMember")

listResp, err = etcdutil.ListEtcdMembers(client)
if err != nil {
return err
}
var (
pds []string
listSucc bool
)

pds := []string{}
for _, memb := range listResp.Members {
n := memb.Name
if memb.ID == addResp.Member.ID {
n = cfg.Name
for i := 0; i < listMemberRetryTimes; i++ {
listResp, err = etcdutil.ListEtcdMembers(client)
if err != nil {
return err
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")

pds = []string{}
for _, memb := range listResp.Members {
n := memb.Name
if addResp != nil && memb.ID == addResp.Member.ID {
n = cfg.Name
listSucc = true
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))
}
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))

if listSucc {
break
}
time.Sleep(500 * time.Millisecond)
}
if !listSucc {
return errors.Errorf("join failed, adds the new member %s may failed", cfg.Name)
}

initialCluster = strings.Join(pds, ",")
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
Expand Down
2 changes: 1 addition & 1 deletion server/statistics/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *RegionStats) Observe(r *core.RegionInfo) {
}
}

// GetRegionStats scans regions that inside range [startKey, endKey) and sums up
// GetRegionStats scans regions that intersect range [startKey, endKey) and sums up
// their statistics.
func GetRegionStats(r *core.RegionsInfo, startKey, endKey []byte) *RegionStats {
stats := newRegionStats()
Expand Down
53 changes: 53 additions & 0 deletions tests/server/join/join_fail/join_fail_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package join_fail_test

import (
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/tests"
)

func Test(t *testing.T) {
TestingT(t)
}

var _ = Suite(&serverTestSuite{})

type serverTestSuite struct{}

func (s *serverTestSuite) SetUpSuite(c *C) {
server.EnableZap = true
}

func (s *serverTestSuite) TestFailedPDJoinInStep1(c *C) {
cluster, err := tests.NewTestCluster(1)
c.Assert(err, IsNil)
defer cluster.Destroy()

err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()

// Join the second PD.
c.Assert(failpoint.Enable("github.com/pingcap/pd/server/add-member-failed", `return`), IsNil)
_, err = cluster.Join()
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), "join failed"), IsTrue)
c.Assert(failpoint.Disable("github.com/pingcap/pd/server/add-member-failed"), IsNil)
}
2 changes: 1 addition & 1 deletion tools/pd-ctl/pdctl/command/store_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func showAllLimitCommandFunc(cmd *cobra.Command, args []string) {
}

func removeTombStoneCommandFunc(cmd *cobra.Command, args []string) {
prefix := fmt.Sprintf(path.Join(storesPrefix, "remove-tombstone"), "")
prefix := path.Join(storesPrefix, "remove-tombstone")
_, err := doRequest(cmd, prefix, http.MethodDelete)
if err != nil {
cmd.Printf("Failed to remove tombstone store %s \n", err)
Expand Down