Skip to content

Commit

Permalink
*: support a region divided into multiple regions (#11739)
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored Sep 3, 2019
1 parent 06fc28f commit f9d8541
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 134 deletions.
40 changes: 22 additions & 18 deletions ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package ddl

import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -68,19 +70,20 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat
// And the max _tidb_rowid is 9223372036854775807, it won't be negative number.

// Split table region.
regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions)+len(tbInfo.Indices))
step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions))
max := int64(1 << tbInfo.ShardRowIDBits)
splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions))
for p := int64(step); p < max; p += step {
recordID := p << (64 - tbInfo.ShardRowIDBits - 1)
recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID)
key := tablecodec.EncodeRecordKey(recordPrefix, recordID)
regionID, err := store.SplitRegion(key, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), zap.Error(err))
} else {
regionIDs = append(regionIDs, regionID)
}
splitTableKeys = append(splitTableKeys, key)
}
var err error
regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split some table regions failed",
zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...)
if scatter {
Expand All @@ -90,26 +93,27 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat

func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 {
tableStartKey := tablecodec.GenTablePrefix(tableID)
regionID, err := store.SplitRegion(tableStartKey, scatter)
regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter)
if err != nil {
// It will be automatically split by TiKV later.
logutil.BgLogger().Warn("[ddl] split table region failed", zap.Error(err))
}
return regionID
if len(regionIDs) == 1 {
return regionIDs[0]
}
return 0
}

func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 {
regionIDs := make([]uint64, 0, len(tblInfo.Indices))
splitKeys := make([][]byte, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID)
regionID, err := store.SplitRegion(indexPrefix, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split table index region failed",
zap.Stringer("table", tblInfo.Name),
zap.Stringer("index", idx.Name),
zap.Error(err))
}
regionIDs = append(regionIDs, regionID)
splitKeys = append(splitKeys, indexPrefix)
}
regionIDs, err := store.SplitRegions(context.Background(), splitKeys, scatter)
if err != nil {
logutil.BgLogger().Warn("[ddl] pre split some table index regions failed",
zap.Stringer("table", tblInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err))
}
return regionIDs
}
Expand Down
13 changes: 9 additions & 4 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2153,7 +2153,7 @@ func (s *testSuiteP1) TestBatchPointGetRepeatableRead(c *C) {
}

func (s *testSuite4) TestSplitRegionTimeout(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout", `return(true)`), IsNil)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
Expand All @@ -2162,7 +2162,7 @@ func (s *testSuite4) TestSplitRegionTimeout(c *C) {
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil)
}

func (s *testSuiteP1) TestRow(c *C) {
Expand Down Expand Up @@ -4050,7 +4050,7 @@ func (s *testSuiteP1) TestReadPartitionedTable(c *C) {
func (s *testSuiteP1) TestSplitRegion(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t, t1")
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
_, err := tk.Exec(`split table t index idx1 by ("abcd");`)
Expand Down Expand Up @@ -4127,8 +4127,13 @@ func (s *testSuiteP1) TestSplitRegion(c *C) {
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "Split table `t` region step value should more than 1000, step 10 is invalid")

// Test split region by syntax
// Test split region by syntax.
tk.MustExec(`split table t by (0),(1000),(1000000)`)

// Test split region twice to test for multiple batch split region requests.
tk.MustExec("create table t1(a int, b int)")
tk.MustQuery("split table t1 between(0) and (10000) regions 10;").Check(testkit.Rows("9 1"))
tk.MustQuery("split table t1 between(10) and (10010) regions 5;").Check(testkit.Rows("4 1"))
}

func (s *testSuite) TestShowTableRegion(c *C) {
Expand Down
61 changes: 19 additions & 42 deletions executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -94,27 +93,18 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
if isCtxDone(ctxWithTimeout) {
break
}

regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.BgLogger().Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true)
if err != nil {
logutil.BgLogger().Warn("split table index region failed",
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down Expand Up @@ -294,30 +284,17 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
if err != nil {
return err
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second*1 + time.Millisecond*10)
}
})
if isCtxDone(ctxWithTimeout) {
break
}
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.BgLogger().Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true)
if err != nil {
logutil.BgLogger().Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
}
e.splitRegions = len(regionIDs)
if e.splitRegions == 0 {
return nil
}

if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190902030720-275a827cf4e3
github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE=
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae h1:WR4d5ga8zXT+QDWYFzzyA+PJMMszR0kQxyYMh6dvHPg=
github.com/pingcap/kvproto v0.0.0-20190821201150-798d27658fae/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ type Iterator interface {

// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error)
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
29 changes: 19 additions & 10 deletions store/mockstore/mocktikv/rpc.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -604,16 +604,25 @@ func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawSc
}

func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse {
key := NewMvccKey(req.GetSplitKey())
region, _ := h.cluster.GetRegionByKey(key)
if bytes.Equal(region.GetStartKey(), key) {
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
// The mocktikv should return a deep copy of meta info to avoid data race
metaCloned := proto.Clone(newRegion.Meta)
return &kvrpcpb.SplitRegionResponse{Left: metaCloned.(*metapb.Region)}
keys := req.GetSplitKeys()
resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)}
for i, key := range keys {
k := NewMvccKey(key)
region, _ := h.cluster.GetRegionByKey(k)
if bytes.Equal(region.GetStartKey(), key) {
continue
}
if i == 0 {
// Set the leftmost region.
resp.Regions = append(resp.Regions, region)
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0])
// The mocktikv should return a deep copy of meta info to avoid data race
metaCloned := proto.Clone(newRegion.Meta)
resp.Regions = append(resp.Regions, metaCloned.(*metapb.Region))
}
return resp
}

// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
if len(keys) == 0 {
return nil
}
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys)
groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ const (
deleteRangeOneRegionMaxBackoff = 100000
rawkvMaxBackoff = 20000
splitRegionBackoff = 20000
maxSplitRegionsBackoff = 120000
scatterRegionBackoff = 20000
waitScatterRegionFinishBackoff = 120000
locateRegionMaxBackoff = 20000
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (*
}

func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -544,7 +544,7 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error {
for i, key := range keys {
keyToValue[string(key)] = values[i]
}
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 5 additions & 1 deletion store/tikv/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca
// GroupKeysByRegion separates keys into groups by their belonging Regions.
// Specially it also returns the first key's region which may be used as the
// 'PrimaryLockKey' and should be committed ahead of others.
func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) {
// filter is used to filter some unwanted keys.
func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) {
groups := make(map[RegionVerID][][]byte)
var first RegionVerID
var lastLoc *KeyLocation
Expand All @@ -484,6 +485,9 @@ func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[Regio
if err != nil {
return nil, first, errors.Trace(err)
}
if filter != nil && filter(k, lastLoc.StartKey) {
continue
}
}
id := lastLoc.Region
if i == 0 {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func (s *testScanSuite) TestScan(c *C) {
c.Assert(err, IsNil)

if rowNum > 123 {
_, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123)), false)
_, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 123))}, false)
c.Assert(err, IsNil)
}

if rowNum > 456 {
_, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456)), false)
_, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 456))}, false)
c.Assert(err, IsNil)
}

Expand Down
2 changes: 1 addition & 1 deletion store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string]
}

func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys)
groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit f9d8541

Please sign in to comment.