diff --git a/server/api/operator_test.go b/server/api/operator_test.go index 2c6528ecc4d..c02375097f0 100644 --- a/server/api/operator_test.go +++ b/server/api/operator_test.go @@ -36,7 +36,7 @@ type testOperatorSuite struct { } func (s *testOperatorSuite) SetUpSuite(c *C) { - s.svr, s.cleanup = mustNewServer(c) + s.svr, s.cleanup = mustNewServer(c, func(cfg *server.Config) { cfg.Replication.MaxReplicas = 1 }) mustWaitLeader(c, []*server.Server{s.svr}) addr := s.svr.GetAddr() @@ -104,7 +104,32 @@ func (s *testOperatorSuite) TestAddRemovePeer(c *C) { c.Assert(err, NotNil) err = postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"transfer-region", "region_id": 1, "to_store_ids": [1, 2, 3]}`)) c.Assert(err, NotNil) +} + +func (s *testOperatorSuite) TestMergeRegionOperator(c *C) { + r1 := newTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) + mustRegionHeartbeat(c, s.svr, r1) + r2 := newTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) + mustRegionHeartbeat(c, s.svr, r2) + r3 := newTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) + mustRegionHeartbeat(c, s.svr, r3) + + err := postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 20}`)) + c.Assert(err, IsNil) + s.svr.GetHandler().RemoveOperator(10) + s.svr.GetHandler().RemoveOperator(20) + err = postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 20, "target_region_id": 10}`)) + c.Assert(err, IsNil) + s.svr.GetHandler().RemoveOperator(10) + s.svr.GetHandler().RemoveOperator(20) + err = postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 10, "target_region_id": 30}`)) + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), "not adjacent"), IsTrue) + err = postJSON(fmt.Sprintf("%s/operators", s.urlPrefix), []byte(`{"name":"merge-region", "source_region_id": 30, "target_region_id": 10}`)) + + c.Assert(strings.Contains(err.Error(), "not adjacent"), IsTrue) + c.Assert(err, NotNil) } func mustPutStore(c *C, svr *server.Server, id uint64, state metapb.StoreState, labels []*metapb.StoreLabel) { diff --git a/server/api/region_test.go b/server/api/region_test.go index 8204d0a0243..b13ba137c47 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -269,15 +269,17 @@ func (s *testGetRegionSuite) TestRegionKey(c *C) { func (s *testGetRegionSuite) TestScanRegionByKey(c *C) { r1 := newTestRegionInfo(2, 1, []byte("a"), []byte("b")) r2 := newTestRegionInfo(3, 1, []byte("b"), []byte("c")) - r3 := newTestRegionInfo(4, 2, []byte("c"), []byte("d")) + r3 := newTestRegionInfo(4, 2, []byte("c"), []byte("e")) + r4 := newTestRegionInfo(5, 2, []byte("x"), []byte("z")) r := newTestRegionInfo(99, 1, []byte{0xFF, 0xFF, 0xAA}, []byte{0xFF, 0xFF, 0xCC}, core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) mustRegionHeartbeat(c, s.svr, r1) mustRegionHeartbeat(c, s.svr, r2) mustRegionHeartbeat(c, s.svr, r3) + mustRegionHeartbeat(c, s.svr, r4) mustRegionHeartbeat(c, s.svr, r) url := fmt.Sprintf("%s/regions/key?key=%s", s.urlPrefix, "b") - regionIds := []uint64{3, 4, 99} + regionIds := []uint64{3, 4, 5, 99} regions := &RegionsInfo{} err := readJSONWithURL(url, regions) c.Assert(err, IsNil) @@ -285,4 +287,22 @@ func (s *testGetRegionSuite) TestScanRegionByKey(c *C) { for i, v := range regionIds { c.Assert(v, Equals, regions.Regions[i].ID) } + url = fmt.Sprintf("%s/regions/key?key=%s", s.urlPrefix, "d") + regionIds = []uint64{4, 5, 99} + regions = &RegionsInfo{} + err = readJSONWithURL(url, regions) + c.Assert(err, IsNil) + c.Assert(len(regionIds), Equals, regions.Count) + for i, v := range regionIds { + c.Assert(v, Equals, regions.Regions[i].ID) + } + url = fmt.Sprintf("%s/regions/key?key=%s", s.urlPrefix, "g") + regionIds = []uint64{5, 99} + regions = &RegionsInfo{} + err = readJSONWithURL(url, regions) + c.Assert(err, IsNil) + c.Assert(len(regionIds), Equals, regions.Count) + for i, v := range regionIds { + c.Assert(v, Equals, regions.Regions[i].ID) + } } diff --git a/server/api/stats_test.go b/server/api/stats_test.go index f4fdd3d963d..6d922eb3664 100644 --- a/server/api/stats_test.go +++ b/server/api/stats_test.go @@ -147,6 +147,14 @@ func (s *testStatsSuite) TestRegionStats(c *C) { c.Assert(err, IsNil) c.Assert(stats, DeepEquals, statsAll) + args := fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("\x01\x02"), url.QueryEscape("xyz\x00\x00")) + res, err = http.Get(statsURL + args) + c.Assert(err, IsNil) + stats = &statistics.RegionStats{} + err = apiutil.ReadJSON(res.Body, stats) + c.Assert(err, IsNil) + c.Assert(stats, DeepEquals, statsAll) + stats23 := &statistics.RegionStats{ Count: 2, EmptyCount: 1, @@ -159,7 +167,8 @@ func (s *testStatsSuite) TestRegionStats(c *C) { StorePeerSize: map[uint64]int64{1: 201, 4: 200, 5: 201}, StorePeerKeys: map[uint64]int64{1: 151, 4: 150, 5: 151}, } - args := fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("\x01\x02"), url.QueryEscape("xyz\x00\x00")) + + args = fmt.Sprintf("?start_key=%s&end_key=%s", url.QueryEscape("a"), url.QueryEscape("x")) res, err = http.Get(statsURL + args) c.Assert(err, IsNil) stats = &statistics.RegionStats{} diff --git a/server/core/region.go b/server/core/region.go index f35777d932f..e04719411d5 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -693,7 +693,8 @@ func (r *RegionsInfo) GetFollower(storeID uint64, regionID uint64) *RegionInfo { return r.followers[storeID].Get(regionID) } -// ScanRange scans region with start key, until number greater than limit. +// ScanRange scans from the first region containing or behind start key, +// until number greater than limit. func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo { res := make([]*RegionInfo, 0, limit) r.tree.scanRange(startKey, func(metaRegion *metapb.Region) bool { @@ -703,11 +704,11 @@ func (r *RegionsInfo) ScanRange(startKey []byte, limit int) []*RegionInfo { return res } -// ScanRangeWithEndKey scans region with start key and end key. +// ScanRangeWithEndKey scans regions intersecting [start key, end key). func (r *RegionsInfo) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo { var regions []*RegionInfo r.tree.scanRange(startKey, func(meta *metapb.Region) bool { - if len(endKey) > 0 && (len(meta.EndKey) == 0 || bytes.Compare(meta.EndKey, endKey) >= 0) { + if len(endKey) > 0 && bytes.Compare(meta.StartKey, endKey) >= 0 { return false } if region := r.GetRegion(meta.GetId()); region != nil { @@ -718,7 +719,8 @@ func (r *RegionsInfo) ScanRangeWithEndKey(startKey, endKey []byte) []*RegionInfo return regions } -// ScanRangeWithIterator scans region with start key, until iterator returns false. +// ScanRangeWithIterator scans from the first region containing or behind start key, +// until iterator returns false. func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(metaRegion *metapb.Region) bool) { r.tree.scanRange(startKey, iterator) } diff --git a/server/core/region_tree.go b/server/core/region_tree.go index d46132fcbe8..ce673c3f361 100644 --- a/server/core/region_tree.go +++ b/server/core/region_tree.go @@ -159,8 +159,15 @@ func (t *regionTree) find(region *metapb.Region) *regionItem { return result } +// scanRage scans from the first region containing or behind the start key +// until f return false func (t *regionTree) scanRange(startKey []byte, f func(*metapb.Region) bool) { - startItem := ®ionItem{region: &metapb.Region{StartKey: startKey}} + region := &metapb.Region{StartKey: startKey} + // find if there is a region with key range [s, d), s < startKey < d + startItem := t.find(region) + if startItem == nil { + startItem = ®ionItem{region: &metapb.Region{StartKey: startKey}} + } t.tree.AscendGreaterOrEqual(startItem, func(item btree.Item) bool { return f(item.(*regionItem).region) }) diff --git a/server/handler.go b/server/handler.go index 5c064f12a70..dd9fb92b21c 100644 --- a/server/handler.go +++ b/server/handler.go @@ -618,8 +618,8 @@ func (h *Handler) AddMergeRegionOperator(regionID uint64, targetID uint64) error } // for the case first region (start key is nil) with the last region (end key is nil) but not adjacent - if (bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) && - (bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) { + if (!bytes.Equal(region.GetStartKey(), target.GetEndKey()) || len(region.GetStartKey()) == 0) && + (!bytes.Equal(region.GetEndKey(), target.GetStartKey()) || len(region.GetEndKey()) == 0) { return ErrRegionNotAdjacent } diff --git a/server/join.go b/server/join.go index a4b4d3ebfa5..744dd4af978 100644 --- a/server/join.go +++ b/server/join.go @@ -19,7 +19,9 @@ import ( "os" "path" "strings" + "time" + "github.com/pingcap/failpoint" log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/etcdutil" "github.com/pkg/errors" @@ -35,6 +37,9 @@ const ( privateDirMode = 0700 ) +// listMemberRetryTimes is the retry times of list member. +var listMemberRetryTimes = 20 + // PrepareJoinCluster sends MemberAdd command to PD cluster, // and returns the initial configuration of the PD cluster. // @@ -137,31 +142,58 @@ func PrepareJoinCluster(cfg *Config) error { return errors.New("missing data or join a duplicated pd") } + var addResp *clientv3.MemberAddResponse + + failpoint.Inject("add-member-failed", func() { + listMemberRetryTimes = 2 + failpoint.Goto("LabelSkipAddMember") + }) // - A new PD joins an existing cluster. // - A deleted PD joins to previous cluster. - addResp, err := etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls}) - if err != nil { - return err + { + // First adds member through the API + addResp, err = etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls}) + if err != nil { + return err + } } + failpoint.Label("LabelSkipAddMember") - listResp, err = etcdutil.ListEtcdMembers(client) - if err != nil { - return err - } + var ( + pds []string + listSucc bool + ) - pds := []string{} - for _, memb := range listResp.Members { - n := memb.Name - if memb.ID == addResp.Member.ID { - n = cfg.Name + for i := 0; i < listMemberRetryTimes; i++ { + listResp, err = etcdutil.ListEtcdMembers(client) + if err != nil { + return err } - if len(n) == 0 { - return errors.New("there is a member that has not joined successfully") + + pds = []string{} + for _, memb := range listResp.Members { + n := memb.Name + if addResp != nil && memb.ID == addResp.Member.ID { + n = cfg.Name + listSucc = true + } + if len(n) == 0 { + return errors.New("there is a member that has not joined successfully") + } + for _, m := range memb.PeerURLs { + pds = append(pds, fmt.Sprintf("%s=%s", n, m)) + } } - for _, m := range memb.PeerURLs { - pds = append(pds, fmt.Sprintf("%s=%s", n, m)) + + if listSucc { + break } + time.Sleep(500 * time.Millisecond) } + if !listSucc { + return errors.Errorf("join failed, adds the new member %s may failed", cfg.Name) + } + initialCluster = strings.Join(pds, ",") cfg.InitialCluster = initialCluster cfg.InitialClusterState = embed.ClusterStateFlagExisting diff --git a/server/statistics/region.go b/server/statistics/region.go index 1eb63ced8b8..6b0d34b7e4f 100644 --- a/server/statistics/region.go +++ b/server/statistics/region.go @@ -113,7 +113,7 @@ func (s *RegionStats) Observe(r *core.RegionInfo) { } } -// GetRegionStats scans regions that inside range [startKey, endKey) and sums up +// GetRegionStats scans regions that intersect range [startKey, endKey) and sums up // their statistics. func GetRegionStats(r *core.RegionsInfo, startKey, endKey []byte) *RegionStats { stats := newRegionStats() diff --git a/tests/server/join/join_fail/join_fail_test.go b/tests/server/join/join_fail/join_fail_test.go new file mode 100644 index 00000000000..f4df51c3d5d --- /dev/null +++ b/tests/server/join/join_fail/join_fail_test.go @@ -0,0 +1,53 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package join_fail_test + +import ( + "strings" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/pd/server" + "github.com/pingcap/pd/tests" +) + +func Test(t *testing.T) { + TestingT(t) +} + +var _ = Suite(&serverTestSuite{}) + +type serverTestSuite struct{} + +func (s *serverTestSuite) SetUpSuite(c *C) { + server.EnableZap = true +} + +func (s *serverTestSuite) TestFailedPDJoinInStep1(c *C) { + cluster, err := tests.NewTestCluster(1) + c.Assert(err, IsNil) + defer cluster.Destroy() + + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + + // Join the second PD. + c.Assert(failpoint.Enable("github.com/pingcap/pd/server/add-member-failed", `return`), IsNil) + _, err = cluster.Join() + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), "join failed"), IsTrue) + c.Assert(failpoint.Disable("github.com/pingcap/pd/server/add-member-failed"), IsNil) +} diff --git a/tools/pd-ctl/pdctl/command/store_command.go b/tools/pd-ctl/pdctl/command/store_command.go index f1c29adfe26..fedc3d212ea 100644 --- a/tools/pd-ctl/pdctl/command/store_command.go +++ b/tools/pd-ctl/pdctl/command/store_command.go @@ -257,7 +257,7 @@ func showAllLimitCommandFunc(cmd *cobra.Command, args []string) { } func removeTombStoneCommandFunc(cmd *cobra.Command, args []string) { - prefix := fmt.Sprintf(path.Join(storesPrefix, "remove-tombstone"), "") + prefix := path.Join(storesPrefix, "remove-tombstone") _, err := doRequest(cmd, prefix, http.MethodDelete) if err != nil { cmd.Printf("Failed to remove tombstone store %s \n", err)