Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor/split: return split result when do split region and refine split timeout logic. (#11259) #11484

Merged
merged 5 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter

func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) {
for _, regionID := range regionIDs {
err := store.WaitScatterRegionFinish(regionID)
err := store.WaitScatterRegionFinish(regionID, 0)
if err != nil {
logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
}
Expand Down
5 changes: 3 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,8 +1265,9 @@ func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executo
}

func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor {
base := newBaseExecutor(b.ctx, nil, v.ExplainID())
base.initCap = chunk.ZeroCapacity
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = 1
base.maxChunkSize = 1
if v.IndexInfo != nil {
return &SplitIndexRegionExec{
baseExecutor: base,
Expand Down
19 changes: 6 additions & 13 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,16 +1976,9 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
_, err := tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "split region timeout(1s)")
// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil)
_, err = tk.Exec(`split table t between (0) and (10000) regions 10`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)")
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil)
}

func (s *testSuite) TestRow(c *C) {
Expand Down Expand Up @@ -3952,7 +3945,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {

// Test show table regions.
tk.MustExec(`split table t_regions1 by (0)`)
tk.MustExec(`split table t_regions between (-10000) and (10000) regions 4;`)
tk.MustQuery(`split table t_regions between (-10000) and (10000) regions 4;`).Check(testkit.Rows("3 1"))
re := tk.MustQuery("show table t_regions regions")
rows := re.Rows()
// Table t_regions should have 4 regions now.
Expand All @@ -3967,7 +3960,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_5000", tbl.Meta().ID))

// Test show table index regions.
tk.MustExec(`split table t_regions index idx between (-1000) and (1000) regions 4;`)
tk.MustQuery(`split table t_regions index idx between (-1000) and (1000) regions 4;`).Check(testkit.Rows("4 1"))
re = tk.MustQuery("show table t_regions index idx regions")
rows = re.Rows()
// The index `idx` of table t_regions should have 4 regions now.
Expand Down Expand Up @@ -3997,7 +3990,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {

// Test show table regions.
tk.MustExec(`set @@session.tidb_wait_split_region_finish=1;`)
tk.MustExec(`split table t_regions between (0) and (10000) regions 4;`)
tk.MustQuery(`split table t_regions by (2500),(5000),(7500);`).Check(testkit.Rows("3 1"))
re = tk.MustQuery("show table t_regions regions")
rows = re.Rows()
// Table t_regions should have 4 regions now.
Expand All @@ -4010,7 +4003,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_7500", tbl.Meta().ID))

// Test show table index regions.
tk.MustExec(`split table t_regions index idx between (0) and (1000) regions 4;`)
tk.MustQuery(`split table t_regions index idx by (250),(500),(750);`).Check(testkit.Rows("4 1"))
re = tk.MustQuery("show table t_regions index idx regions")
rows = re.Rows()
// The index `idx` of table t_regions should have 4 regions now.
Expand Down
153 changes: 112 additions & 41 deletions executor/split.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -48,10 +49,37 @@ type SplitIndexRegionExec struct {
upper []types.Datum
num int
valueLists [][]types.Datum

done bool
splitRegionResult
}

type splitRegionResult struct {
splitRegions int
finishScatterNum int
}

// Open implements the Executor Open interface.
func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
return e.splitIndexRegion(ctx)
}

// Next implements the Executor Next interface.
func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
return nil
}

// checkScatterRegionFinishBackOff is the back off time that used to check if a region has finished scattering before split region timeout.
const checkScatterRegionFinishBackOff = 50

// splitIndexRegion is used to split index regions.
func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
store := e.ctx.GetStore()
s, ok := store.(kv.SplitableStore)
if !ok {
Expand All @@ -62,10 +90,15 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()
regionIDs := make([]uint64, 0, len(splitIdxKeys))
for _, idxKey := range splitIdxKeys {
if isCtxDone(ctxWithTimeout) {
break
}

regionID, err := s.SplitRegion(idxKey, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table index region failed",
Expand All @@ -74,28 +107,17 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.splitRegions = len(regionIDs)
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(context.Background()).Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", e.tableInfo.Name.L),
zap.String("index", e.indexInfo.Name.L),
zap.Error(err))
}
if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, e.indexInfo.Name.L)
return nil
}

Expand Down Expand Up @@ -225,16 +247,35 @@ type SplitTableRegionExec struct {
upper types.Datum
num int
valueLists [][]types.Datum

done bool
splitRegionResult
}

// Open implements the Executor Open interface.
func (e *SplitTableRegionExec) Open(ctx context.Context) error {
return e.splitTableRegion(ctx)
}

// Next implements the Executor Next interface.
func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if e.done {
return nil
}
appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
e.done = true
return nil
}

func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
store := e.ctx.GetStore()
s, ok := store.(kv.SplitableStore)
if !ok {
return nil
}

start := time.Now()
ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
defer cancel()

Expand All @@ -244,48 +285,78 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
regionIDs := make([]uint64, 0, len(splitKeys))
for _, key := range splitKeys {
failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second*1 + time.Millisecond*10)
}
})
if isCtxDone(ctxWithTimeout) {
break
}
regionID, err := s.SplitRegion(key, true)
if err != nil {
logutil.Logger(context.Background()).Warn("split table region failed",
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
continue
}
if regionID == 0 {
continue
}
regionIDs = append(regionIDs, regionID)

failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
e.splitRegions = len(regionIDs)
if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
return nil
}

e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, "")
return nil
}

func waitScatterRegionFinish(ctxWithTimeout context.Context, sctx sessionctx.Context, startTime time.Time, store kv.SplitableStore, regionIDs []uint64, tableName, indexName string) int {
remainMillisecond := 0
finishScatterNum := 0
for _, regionID := range regionIDs {
err := s.WaitScatterRegionFinish(regionID)
if err != nil {
logutil.Logger(context.Background()).Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", e.tableInfo.Name.L),
zap.Error(err))
if isCtxDone(ctxWithTimeout) {
// Do not break here for checking remain regions scatter finished with a very short backoff time.
// Consider this situation - Regions 1, 2, and 3 are to be split.
// Region 1 times out before scattering finishes, while Region 2 and Region 3 have finished scattering.
// In this case, we should return 2 Regions, instead of 0, have finished scattering.
remainMillisecond = checkScatterRegionFinishBackOff
} else {
remainMillisecond = int((sctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(startTime).Seconds()) * 1000)
}

failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Second * 1)
err := store.WaitScatterRegionFinish(regionID, remainMillisecond)
if err == nil {
finishScatterNum++
} else {
if len(indexName) == 0 {
logutil.Logger(context.Background()).Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", tableName),
zap.Error(err))
} else {
logutil.Logger(context.Background()).Warn("wait scatter region failed",
zap.Uint64("regionID", regionID),
zap.String("table", tableName),
zap.String("index", indexName),
zap.Error(err))
}
})

if isCtxDone(ctxWithTimeout) {
return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
}
}
return nil
return finishScatterNum
}

func appendSplitRegionResultToChunk(chk *chunk.Chunk, totalRegions, finishScatterNum int) {
chk.AppendInt64(0, int64(totalRegions))
if finishScatterNum > 0 && totalRegions > 0 {
chk.AppendFloat64(1, float64(finishScatterNum)/float64(totalRegions))
} else {
chk.AppendFloat64(1, float64(0))
}
}

func isCtxDone(ctx context.Context) bool {
Expand Down
2 changes: 1 addition & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,6 @@ type Iterator interface {
// SplitableStore is the kv store which supports split regions.
type SplitableStore interface {
SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
WaitScatterRegionFinish(regionID uint64) error
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}
9 changes: 9 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,13 @@ func buildTableRegionsSchema() *expression.Schema {
return schema
}

func buildSplitRegionsSchema() *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, 2)...)
schema.Append(buildColumn("", "TOTAL_SPLIT_REGION", mysql.TypeLonglong, 4))
schema.Append(buildColumn("", "SCATTER_FINISH_RATIO", mysql.TypeDouble, 8))
return schema
}

func buildShowDDLJobQueriesFields() *expression.Schema {
schema := expression.NewSchema(make([]*expression.Column, 0, 1)...)
schema.Append(buildColumn("", "QUERY", mysql.TypeVarchar, 256))
Expand Down Expand Up @@ -1687,6 +1694,7 @@ func (b *PlanBuilder) buildSplitIndexRegion(node *ast.SplitRegionStmt) (Plan, er
TableInfo: tblInfo,
IndexInfo: indexInfo,
}
p.SetSchema(buildSplitRegionsSchema())
// Split index regions by user specified value lists.
if len(node.SplitOpt.ValueLists) > 0 {
indexValues := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
Expand Down Expand Up @@ -1801,6 +1809,7 @@ func (b *PlanBuilder) buildSplitTableRegion(node *ast.SplitRegionStmt) (Plan, er
p := &SplitRegion{
TableInfo: tblInfo,
}
p.SetSchema(buildSplitRegionsSchema())
if len(node.SplitOpt.ValueLists) > 0 {
values := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
for i, valuesItem := range node.SplitOpt.ValueLists {
Expand Down
3 changes: 2 additions & 1 deletion store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,13 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint
}

// SplitRaw splits a Region at the key (not encoded) and creates new Region.
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) {
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) *Region {
c.Lock()
defer c.Unlock()

newRegion := c.regions[regionID].split(newRegionID, rawKey, peerIDs, leaderPeerID)
c.regions[newRegionID] = newRegion
return newRegion
}

// Merge merges 2 regions, their key ranges should be adjacent.
Expand Down
4 changes: 2 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb
return &kvrpcpb.SplitRegionResponse{}
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{}
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
}

// RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
Expand Down
9 changes: 7 additions & 2 deletions store/tikv/split_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,15 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
}

// WaitScatterRegionFinish implements SplitableStore interface.
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error {
// backOff is the back off time of the wait scatter region.(Milliseconds)
// if backOff <= 0, the default wait scatter back off time will be used.
func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error {
logutil.Logger(context.Background()).Info("wait scatter region",
zap.Uint64("regionID", regionID))
bo := NewBackoffer(context.Background(), waitScatterRegionFinishBackoff)
if backOff <= 0 {
backOff = waitScatterRegionFinishBackoff
}
bo := NewBackoffer(context.Background(), backOff)
logFreq := 0
for {
resp, err := s.pdClient.GetOperator(context.Background(), regionID)
Expand Down