Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/remove-clone' into improve-regio…
Browse files Browse the repository at this point in the history
…n-clone
  • Loading branch information
nolouch committed Sep 5, 2018
2 parents 198293b + acf87a2 commit 20d0b80
Show file tree
Hide file tree
Showing 26 changed files with 497 additions and 120 deletions.
10 changes: 5 additions & 5 deletions pd-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,26 +559,26 @@ type TSFuture interface {
Wait() (int64, int64, error)
}

func (req *tsoRequest) Wait() (int64, int64, error) {
func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
// If tso command duration is observed very high, the reason could be it
// takes too long for Wait() be called.
cmdDuration.WithLabelValues("tso_async_wait").Observe(time.Since(req.start).Seconds())
select {
case err := <-req.done:
case err = <-req.done:
defer tsoReqPool.Put(req)
if err != nil {
cmdFailedDuration.WithLabelValues("tso").Observe(time.Since(req.start).Seconds())
return 0, 0, errors.WithStack(err)
}
physical, logical := req.physical, req.logical
physical, logical = req.physical, req.logical
cmdDuration.WithLabelValues("tso").Observe(time.Since(req.start).Seconds())
return physical, logical, err
return
case <-req.ctx.Done():
return 0, 0, errors.WithStack(req.ctx.Err())
}
}

func (c *client) GetTS(ctx context.Context) (int64, int64, error) {
func (c *client) GetTS(ctx context.Context) (physical int64, logical int64, err error) {
resp := c.GetTSAsync(ctx)
return resp.Wait()
}
Expand Down
83 changes: 80 additions & 3 deletions pdctl/command/region_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@ import (

var (
regionsPrefix = "pd/api/v1/regions"
regionsStorePrefix = "pd/api/v1/regions/store"
regionsCheckPrefix = "pd/api/v1/regions/check"
regionsWriteflowPrefix = "pd/api/v1/regions/writeflow"
regionsReadflowPrefix = "pd/api/v1/regions/readflow"
regionsConfVerPrefix = "pd/api/v1/regions/confver"
regionsVersionPrefix = "pd/api/v1/regions/version"
regionsSiblingPrefix = "pd/api/v1/regions/sibling"
regionIDPrefix = "pd/api/v1/region/id"
regionKeyPrefix = "pd/api/v1/region/key"
)

// NewRegionCommand return a region subcommand of rootCmd
// NewRegionCommand returns a region subcommand of rootCmd
func NewRegionCommand() *cobra.Command {
r := &cobra.Command{
Use: `region <region_id> [-jq="<query string>"]`,
Expand All @@ -44,6 +47,7 @@ func NewRegionCommand() *cobra.Command {
r.AddCommand(NewRegionWithKeyCommand())
r.AddCommand(NewRegionWithCheckCommand())
r.AddCommand(NewRegionWithSiblingCommand())
r.AddCommand(NewRegionWithStoreCommand())

topRead := &cobra.Command{
Use: "topread <limit>",
Expand All @@ -58,6 +62,20 @@ func NewRegionCommand() *cobra.Command {
Run: showRegionTopWriteCommandFunc,
}
r.AddCommand(topWrite)

topConfVer := &cobra.Command{
Use: "topconfver <limit>",
Short: "show regions with top conf version",
Run: showRegionTopConfVerCommandFunc,
}
r.AddCommand(topConfVer)

topVersion := &cobra.Command{
Use: "topversion <limit>",
Short: "show regions with top version",
Run: showRegionTopVersionCommandFunc,
}
r.AddCommand(topVersion)
r.Flags().String("jq", "", "jq query")

return r
Expand Down Expand Up @@ -119,6 +137,40 @@ func showRegionTopReadCommandFunc(cmd *cobra.Command, args []string) {
fmt.Println(r)
}

func showRegionTopConfVerCommandFunc(cmd *cobra.Command, args []string) {
prefix := regionsConfVerPrefix
if len(args) == 1 {
if _, err := strconv.Atoi(args[0]); err != nil {
fmt.Println("limit should be a number")
return
}
prefix += "?limit=" + args[0]
}
r, err := doRequest(cmd, prefix, http.MethodGet)
if err != nil {
fmt.Printf("Failed to get regions: %s\n", err)
return
}
fmt.Println(r)
}

func showRegionTopVersionCommandFunc(cmd *cobra.Command, args []string) {
prefix := regionsVersionPrefix
if len(args) == 1 {
if _, err := strconv.Atoi(args[0]); err != nil {
fmt.Println("limit should be a number")
return
}
prefix += "?limit=" + args[0]
}
r, err := doRequest(cmd, prefix, http.MethodGet)
if err != nil {
fmt.Printf("Failed to get regions: %s\n", err)
return
}
fmt.Println(r)
}

// NewRegionWithKeyCommand return a region with key subcommand of regionCmd
func NewRegionWithKeyCommand() *cobra.Command {
r := &cobra.Command{
Expand Down Expand Up @@ -188,7 +240,7 @@ func decodeProtobufText(text string) (string, error) {
return string(buf), nil
}

// NewRegionWithCheckCommand return a region with check subcommand of regionCmd
// NewRegionWithCheckCommand returns a region with check subcommand of regionCmd
func NewRegionWithCheckCommand() *cobra.Command {
r := &cobra.Command{
Use: "check [miss-peer|extra-peer|down-peer|pending-peer|incorrect-ns]",
Expand All @@ -213,7 +265,7 @@ func showRegionWithCheckCommandFunc(cmd *cobra.Command, args []string) {
fmt.Println(r)
}

// NewRegionWithSiblingCommand return a region with check subcommand of regionCmd
// NewRegionWithSiblingCommand returns a region with sibling subcommand of regionCmd
func NewRegionWithSiblingCommand() *cobra.Command {
r := &cobra.Command{
Use: "sibling <region_id>",
Expand All @@ -238,6 +290,31 @@ func showRegionWithSiblingCommandFunc(cmd *cobra.Command, args []string) {
fmt.Println(r)
}

// NewRegionWithStoreCommand returns regions with store subcommand of regionCmd
func NewRegionWithStoreCommand() *cobra.Command {
r := &cobra.Command{
Use: "store <store_id>",
Short: "show the regions of a specific store",
Run: showRegionWithStoreCommandFunc,
}
return r
}

func showRegionWithStoreCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
fmt.Println(cmd.UsageString())
return
}
storeID := args[0]
prefix := regionsStorePrefix + "/" + storeID
r, err := doRequest(cmd, prefix, http.MethodGet)
if err != nil {
fmt.Printf("Failed to get regions with the given storeID: %s\n", err)
return
}
fmt.Println(r)
}

func printWithJQFilter(data, filter string) {
cmd := exec.Command("jq", "-c", filter)
stdin, err := cmd.StdinPipe()
Expand Down
17 changes: 9 additions & 8 deletions pkg/faketikv/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,15 @@ func (a *idAllocator) nextID() uint64 {

// ConfMap is a mapping of the cases to the their corresponding initialize functions.
var ConfMap = map[string]func() *Conf{
"balance-leader": newBalanceLeader,
"add-nodes": newAddNodes,
"add-nodes-dynamic": newAddNodesDynamic,
"delete-nodes": newDeleteNodes,
"region-split": newRegionSplit,
"region-merge": newRegionMerge,
"hot-read": newHotRead,
"hot-write": newHotWrite,
"balance-leader": newBalanceLeader,
"add-nodes": newAddNodes,
"add-nodes-dynamic": newAddNodesDynamic,
"delete-nodes": newDeleteNodes,
"region-split": newRegionSplit,
"region-merge": newRegionMerge,
"hot-read": newHotRead,
"hot-write": newHotWrite,
"makeup-down-replicas": newMakeupDownReplicas,
}

// NewConf creates a config to initialize simulator cluster.
Expand Down
2 changes: 1 addition & 1 deletion pkg/faketikv/cases/delete_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newDeleteNodes() *Conf {
numNodes := 8
e := &DeleteNodesInner{}
e.Step = func(tick int64) uint64 {
if tick%100 == 0 && numNodes > 7 {
if numNodes > 7 && tick%100 == 0 {
idx := rand.Intn(numNodes)
numNodes--
nodeID := ids[idx]
Expand Down
93 changes: 93 additions & 0 deletions pkg/faketikv/cases/makeup_down_replica.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// // http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cases

import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pkg/faketikv/simutil"
"github.com/pingcap/pd/server/core"
)

func newMakeupDownReplicas() *Conf {
var conf Conf
var id idAllocator

for i := 1; i <= 4; i++ {
conf.Stores = append(conf.Stores, &Store{
ID: id.nextID(),
Status: metapb.StoreState_Up,
Capacity: 1 * TB,
Available: 900 * GB,
Version: "2.1.0",
})
}

for i := 0; i < 400; i++ {
peers := []*metapb.Peer{
{Id: id.nextID(), StoreId: uint64(i)%4 + 1},
{Id: id.nextID(), StoreId: uint64(i+1)%4 + 1},
{Id: id.nextID(), StoreId: uint64(i+2)%4 + 1},
}
conf.Regions = append(conf.Regions, Region{
ID: id.nextID(),
Peers: peers,
Leader: peers[0],
Size: 96 * MB,
Keys: 960000,
})
}
conf.MaxID = id.maxID

numNodes := 4
down := false
e := &DeleteNodesInner{}
e.Step = func(tick int64) uint64 {
if numNodes > 3 && tick%100 == 0 {
numNodes--
return uint64(1)
}
if tick == 300 {
down = true
}
return 0
}
conf.Events = []EventInner{e}

conf.Checker = func(regions *core.RegionsInfo) bool {
sum := 0
regionCounts := make([]int, 0, 3)
for i := 1; i <= 4; i++ {
regionCount := regions.GetStoreRegionCount(uint64(i))
if i != 1 {
regionCounts = append(regionCounts, regionCount)
}
sum += regionCount
}
simutil.Logger.Infof("region counts: %v", regionCounts)

if down && sum < 1200 {
// only need to print once
down = false
simutil.Logger.Error("making up replicas don't start immediately")
return false
}

for _, regionCount := range regionCounts {
if regionCount != 400 {
return false
}
}
return true
}
return &conf
}
46 changes: 46 additions & 0 deletions server/api/api.raml
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,38 @@ types:
description: The input is invalid.
500:
description: PD server failed to proceed the request.
/confver:
get:
description: List regions with the largest conf version.
queryParameters:
limit?:
type: integer
default: 16
responses:
200:
body:
application/json:
type: Regions
400:
description: The input is invalid.
500:
description: PD server failed to proceed the request.
/version:
get:
description: List regions with the largest version.
queryParameters:
limit?:
type: integer
default: 16
responses:
200:
body:
application/json:
type: Regions
400:
description: The input is invalid.
500:
description: PD server failed to proceed the request.
/check/{filter}:
uriParameters:
filter:
Expand Down Expand Up @@ -861,6 +893,20 @@ types:
description: The region does not exist.
500:
description: PD server failed to proceed the request.
/store/{id}:
uriParameters:
id: integer
get:
description: List all regions of a specific store.
responses:
200:
body:
application/json:
type: Regions
400:
description: The input is invalid.
500:
description: PD server failed to proceed the request.

/schedulers:
description: Running schedulers.
Expand Down
8 changes: 4 additions & 4 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func newMemberHandler(svr *server.Server, rd *render.Render) *memberHandler {
}

func (h *memberHandler) ListMembers(w http.ResponseWriter, r *http.Request) {
members, err := h.listMembers()
members, err := h.getMembers()
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, members)
}

func (h *memberHandler) listMembers() (*pdpb.GetMembersResponse, error) {
func (h *memberHandler) getMembers() (*pdpb.GetMembersResponse, error) {
req := &pdpb.GetMembersRequest{Header: &pdpb.RequestHeader{ClusterId: h.svr.ClusterID()}}
members, err := h.svr.GetMembers(context.Background(), req)
if err != nil {
Expand All @@ -68,7 +68,7 @@ func (h *memberHandler) listMembers() (*pdpb.GetMembersResponse, error) {
}
m.LeaderPriority = int32(leaderPriority)
}
return members, errors.WithStack(err)
return members, nil
}

func (h *memberHandler) DeleteByName(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -134,7 +134,7 @@ func (h *memberHandler) DeleteByID(w http.ResponseWriter, r *http.Request) {
}

func (h *memberHandler) SetMemberPropertyByName(w http.ResponseWriter, r *http.Request) {
members, membersErr := h.listMembers()
members, membersErr := h.getMembers()
if membersErr != nil {
h.rd.JSON(w, http.StatusInternalServerError, membersErr.Error())
return
Expand Down
Loading

0 comments on commit 20d0b80

Please sign in to comment.