Skip to content

Commit

Permalink
pdctl, api: support hex format keys (#1309)
Browse files Browse the repository at this point in the history
pdctl, api: support hex format keys

Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing authored Nov 6, 2018
1 parent fc912d3 commit 9ac87dd
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 74 deletions.
6 changes: 2 additions & 4 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ package api

import (
"container/heap"
"fmt"
"net/http"
"strconv"
"strings"

"github.com/gorilla/mux"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -50,8 +48,8 @@ func newRegionInfo(r *core.RegionInfo) *regionInfo {
}
return &regionInfo{
ID: r.GetID(),
StartKey: strings.Trim(fmt.Sprintf("%q", r.GetStartKey()), "\""),
EndKey: strings.Trim(fmt.Sprintf("%q", r.GetEndKey()), "\""),
StartKey: string(core.HexRegionKey(r.GetStartKey())),
EndKey: string(core.HexRegionKey(r.GetEndKey())),
RegionEpoch: r.GetRegionEpoch(),
Peers: r.GetPeers(),
Leader: r.GetLeader(),
Expand Down
6 changes: 3 additions & 3 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
// Mark isNew if the region in cache does not have leader.
var saveKV, saveCache, isNew bool
if origin == nil {
log.Debugf("[region %d] Insert new region {%v}", region.GetID(), region)
log.Debugf("[region %d] Insert new region {%v}", region.GetID(), core.HexRegionMeta(region.GetMeta()))
saveKV, saveCache, isNew = true, true, true
} else {
r := region.GetRegionEpoch()
Expand Down Expand Up @@ -517,7 +517,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
if err := c.kv.SaveRegion(region.GetMeta()); err != nil {
// Not successfully saved to kv is not fatal, it only leads to longer warm-up
// after restart. Here we only log the error then go on updating cache.
log.Errorf("[region %d] fail to save region %v: %v", region.GetID(), region, err)
log.Errorf("[region %d] fail to save region %v: %v", region.GetID(), core.HexRegionMeta(region.GetMeta()), err)
}
select {
case c.changedRegions <- region:
Expand All @@ -539,7 +539,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
if c.kv != nil {
for _, item := range overlaps {
if err := c.kv.DeleteRegion(item); err != nil {
log.Errorf("[region %d] fail to delete region %v: %v", item.GetId(), item, err)
log.Errorf("[region %d] fail to delete region %v: %v", item.GetId(), core.HexRegionMeta(item), err)
}
}
}
Expand Down
19 changes: 11 additions & 8 deletions server/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package server

import (
"bytes"
"fmt"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -35,8 +34,8 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// If the region peer count is 0, then we should not handle this.
if len(region.GetPeers()) == 0 {
log.Warnf("invalid region, zero region peer count - %v", region)
return errors.Errorf("invalid region, zero region peer count - %v", region)
log.Warnf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta()))
return errors.Errorf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta()))
}

c.coordinator.opController.Dispatch(region)
Expand Down Expand Up @@ -80,7 +79,7 @@ func (c *RaftCluster) validRequestRegion(reqRegion *metapb.Region) error {
startKey := reqRegion.GetStartKey()
region, _ := c.GetRegionByKey(startKey)
if region == nil {
return errors.Errorf("region not found, request region: %v", reqRegion)
return errors.Errorf("region not found, request region: %v", core.HexRegionMeta(reqRegion))
}
// If the request epoch is less than current region epoch, then returns an error.
reqRegionEpoch := reqRegion.GetRegionEpoch()
Expand Down Expand Up @@ -170,28 +169,32 @@ func (c *RaftCluster) handleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb

err := c.checkSplitRegion(left, right)
if err != nil {
log.Warnf("report split region is invalid - %v, %v", request, fmt.Sprintf("%+v", err))
log.Warnf("report split region is invalid: %v, %v, %v", core.HexRegionMeta(left), core.HexRegionMeta(right), err)
return nil, err
}

// Build origin region by using left and right.
originRegion := proto.Clone(right).(*metapb.Region)
originRegion.RegionEpoch = nil
originRegion.StartKey = left.GetStartKey()
log.Infof("[region %d] region split, generate new region: %v", originRegion.GetId(), left)
log.Infof("[region %d] region split, generate new region: %v", originRegion.GetId(), core.HexRegionMeta(left))
return &pdpb.ReportSplitResponse{}, nil
}

func (c *RaftCluster) handleBatchReportSplit(request *pdpb.ReportBatchSplitRequest) (*pdpb.ReportBatchSplitResponse, error) {
regions := request.GetRegions()
hexRegionMetas := make([]*metapb.Region, len(regions))
for i, region := range regions {
hexRegionMetas[i] = core.HexRegionMeta(region)
}

err := c.checkSplitRegions(regions)
if err != nil {
log.Warnf("report batch split region is invalid - %v, %v", request, fmt.Sprintf("%+v", err))
log.Warnf("report batch split region is invalid: %v, %v", hexRegionMetas, err)
return nil, err
}
last := len(regions) - 1
originRegion := proto.Clone(regions[last]).(*metapb.Region)
log.Infof("[region %d] region split, generate %d new regions: %v", originRegion.GetId(), last, regions[:last])
log.Infof("[region %d] region split, generate %d new regions: %v", originRegion.GetId(), last, hexRegionMetas[:last])
return &pdpb.ReportBatchSplitResponse{}, nil
}
31 changes: 23 additions & 8 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package core

import (
"bytes"
"encoding/hex"
"fmt"
"math/rand"
"reflect"
Expand Down Expand Up @@ -875,19 +876,33 @@ func DiffRegionPeersInfo(origin *RegionInfo, other *RegionInfo) string {
func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string {
var ret []string
if !bytes.Equal(origin.meta.StartKey, other.meta.StartKey) {
originKey := &metapb.Region{StartKey: origin.meta.StartKey}
otherKey := &metapb.Region{StartKey: other.meta.StartKey}
ret = append(ret, fmt.Sprintf("StartKey Changed:{%s} -> {%s}", originKey, otherKey))
ret = append(ret, fmt.Sprintf("StartKey Changed:%s -> %s", HexRegionKey(origin.meta.StartKey), HexRegionKey(other.meta.StartKey)))
} else {
ret = append(ret, fmt.Sprintf("StartKey:{%s}", &metapb.Region{StartKey: origin.meta.StartKey}))
ret = append(ret, fmt.Sprintf("StartKey:%s", HexRegionKey(origin.meta.StartKey)))
}
if !bytes.Equal(origin.meta.EndKey, other.meta.EndKey) {
originKey := &metapb.Region{EndKey: origin.meta.EndKey}
otherKey := &metapb.Region{EndKey: other.meta.EndKey}
ret = append(ret, fmt.Sprintf("EndKey Changed:{%s} -> {%s}", originKey, otherKey))
ret = append(ret, fmt.Sprintf("EndKey Changed:%s -> %s", HexRegionKey(origin.meta.EndKey), HexRegionKey(other.meta.EndKey)))
} else {
ret = append(ret, fmt.Sprintf("EndKey:{%s}", &metapb.Region{EndKey: origin.meta.EndKey}))
ret = append(ret, fmt.Sprintf("EndKey:%s", HexRegionKey(origin.meta.EndKey)))
}

return strings.Join(ret, ", ")
}

// HexRegionKey converts region key to hex format. Used for formating region in
// logs.
func HexRegionKey(key []byte) []byte {
return []byte(strings.ToUpper(hex.EncodeToString(key)))
}

// HexRegionMeta converts a region meta's keys to hex format. Used for formating
// region in logs.
func HexRegionMeta(meta *metapb.Region) *metapb.Region {
if meta == nil {
return nil
}
meta = proto.Clone(meta).(*metapb.Region)
meta.StartKey = HexRegionKey(meta.StartKey)
meta.EndKey = HexRegionKey(meta.EndKey)
return meta
}
6 changes: 3 additions & 3 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,14 @@ func (*testRegionKey) TestRegionKey(c *C) {
expect string
}{
{`"t\x80\x00\x00\x00\x00\x00\x00\xff!_r\x80\x00\x00\x00\x00\xff\x02\u007fY\x00\x00\x00\x00\x00\xfa"`,
`"t\200\000\000\000\000\000\000\377!_r\200\000\000\000\000\377\002\177Y\000\000\000\000\000\372"`},
`7480000000000000FF215F728000000000FF027F590000000000FA`},
{"\"\\x80\\x00\\x00\\x00\\x00\\x00\\x00\\xff\\x05\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\xf8\"",
`"\200\000\000\000\000\000\000\377\005\000\000\000\000\000\000\000\370"`},
`80000000000000FF0500000000000000F8`},
}
for _, t := range testCase {
got, err := strconv.Unquote(t.key)
c.Assert(err, IsNil)
s := fmt.Sprintln(&metapb.Region{StartKey: []byte(got)})
s := fmt.Sprintln(HexRegionMeta(&metapb.Region{StartKey: []byte(got)}))
c.Assert(strings.Contains(s, t.expect), IsTrue)

// start key changed
Expand Down
2 changes: 1 addition & 1 deletion server/core/region_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (t *regionTree) update(region *metapb.Region) []*metapb.Region {
})

for _, item := range overlaps {
log.Debugf("[region %d] delete region {%v}, cause overlapping with region {%v}", item.GetId(), item, region)
log.Debugf("[region %d] delete region %v, cause overlapping with region %v", item.GetId(), HexRegionMeta(item), HexRegionMeta(region))
t.tree.Delete(&regionItem{item})
}

Expand Down
2 changes: 1 addition & 1 deletion server/schedule/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*Operator {
}

checkerCounter.WithLabelValues("merge_checker", "new_operator").Inc()
log.Debugf("try to merge region {%v} into region {%v}", region, target)
log.Debugf("try to merge region %v into region %v", core.HexRegionMeta(region.GetMeta()), core.HexRegionMeta(target.GetMeta()))
ops, err := CreateMergeRegionOperator("merge-region", m.cluster, region, target, OpMerge)
if err != nil {
return nil
Expand Down
74 changes: 31 additions & 43 deletions tools/pd-ctl/pdctl/command/region_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package command

import (
"bytes"
"encoding/hex"
"fmt"
"io"
"net/http"
Expand All @@ -23,7 +24,9 @@ import (
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
)

var (
Expand Down Expand Up @@ -202,11 +205,11 @@ func showRegionTopSizeCommandFunc(cmd *cobra.Command, args []string) {
// NewRegionWithKeyCommand return a region with key subcommand of regionCmd
func NewRegionWithKeyCommand() *cobra.Command {
r := &cobra.Command{
Use: "key [--format=raw|encode] <key>",
Use: "key [--format=raw|encode|hex] <key>",
Short: "show the region with key",
Run: showRegionWithTableCommandFunc,
}
r.Flags().String("format", "raw", "the key format")
r.Flags().String("format", "hex", "the key format")
return r
}

Expand All @@ -215,27 +218,11 @@ func showRegionWithTableCommandFunc(cmd *cobra.Command, args []string) {
fmt.Println(cmd.UsageString())
return
}

var (
key string
err error
)

format := cmd.Flags().Lookup("format").Value.String()
switch format {
case "raw":
key = args[0]
case "encode":
key, err = decodeKey(args[0])
if err != nil {
fmt.Println("Error: ", err)
return
}
default:
fmt.Println("Error: unknown format")
key, err := parseKey(cmd.Flags(), args[0])
if err != nil {
fmt.Println("Error: ", err)
return
}

key = url.QueryEscape(key)
prefix := regionKeyPrefix + "/" + key
r, err := doRequest(cmd, prefix, http.MethodGet)
Expand All @@ -246,14 +233,30 @@ func showRegionWithTableCommandFunc(cmd *cobra.Command, args []string) {
fmt.Println(r)
}

func parseKey(flags *pflag.FlagSet, key string) (string, error) {
switch flags.Lookup("format").Value.String() {
case "raw":
return key, nil
case "encode":
return decodeKey(key)
case "hex":
key, err := hex.DecodeString(key)
if err != nil {
return "", errors.WithStack(err)
}
return string(key), nil
}
return "", errors.New("unknown format")
}

func decodeKey(text string) (string, error) {
var buf []byte
r := bytes.NewBuffer([]byte(text))
for {
c, err := r.ReadByte()
if err != nil {
if err != io.EOF {
return "", err
return "", errors.WithStack(err)
}
break
}
Expand All @@ -279,7 +282,7 @@ func decodeKey(text string) (string, error) {
n = append(n, r.Next(2)...)
_, err := fmt.Sscanf(string(n), "%03o", &c)
if err != nil {
return "", err
return "", errors.WithStack(err)
}
buf = append(buf, c)
}
Expand All @@ -290,12 +293,12 @@ func decodeKey(text string) (string, error) {
// NewRegionsWithStartKeyCommand returns regions from startkey subcommand of regionCmd.
func NewRegionsWithStartKeyCommand() *cobra.Command {
r := &cobra.Command{
Use: "startkey [--format=raw|encode] <key> <limit>",
Use: "startkey [--format=raw|encode|hex] <key> <limit>",
Short: "show regions from start key",
Run: showRegionsFromStartKeyCommandFunc,
}

r.Flags().String("format", "raw", "the key format")
r.Flags().String("format", "hex", "the key format")
return r
}

Expand All @@ -304,24 +307,9 @@ func showRegionsFromStartKeyCommandFunc(cmd *cobra.Command, args []string) {
fmt.Println(cmd.UsageString())
return
}

var (
key string
err error
)

format := cmd.Flags().Lookup("format").Value.String()
switch format {
case "raw":
key = args[0]
case "encode":
key, err = decodeKey(args[0])
if err != nil {
fmt.Println("Error: ", err)
return
}
default:
fmt.Println("Error: unknown format")
key, err := parseKey(cmd.Flags(), args[0])
if err != nil {
fmt.Println("Error: ", err)
return
}
key = url.QueryEscape(key)
Expand Down
17 changes: 14 additions & 3 deletions tools/pd-ctl/pdctl/command/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,11 @@ func addSchedulerCommandFunc(cmd *cobra.Command, args []string) {
// NewScatterRangeSchedulerCommand returns a command to add a scatter-range-scheduler.
func NewScatterRangeSchedulerCommand() *cobra.Command {
c := &cobra.Command{
Use: "scatter-range <start_key> <end_key> <range_name>",
Use: "scatter-range [--format=raw|encode|hex] <start_key> <end_key> <range_name>",
Short: "add a scheduler to scatter range",
Run: addSchedulerForScatterRangeCommandFunc,
}
c.Flags().String("format", "hex", "the key format")
return c
}

Expand All @@ -216,11 +217,21 @@ func addSchedulerForScatterRangeCommandFunc(cmd *cobra.Command, args []string) {
fmt.Println(cmd.UsageString())
return
}
startKey, err := parseKey(cmd.Flags(), args[0])
if err != nil {
fmt.Println("Error: ", err)
return
}
endKey, err := parseKey(cmd.Flags(), args[1])
if err != nil {
fmt.Println("Error: ", err)
return
}

input := make(map[string]interface{})
input["name"] = cmd.Name()
input["start_key"] = url.QueryEscape(args[0])
input["end_key"] = url.QueryEscape(args[1])
input["start_key"] = url.QueryEscape(startKey)
input["end_key"] = url.QueryEscape(endKey)
input["range_name"] = args[2]
postJSON(cmd, schedulersPrefix, input)
}
Expand Down

0 comments on commit 9ac87dd

Please sign in to comment.