-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
store/tikv: pipe the split and scatter task #23357
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -19,6 +19,7 @@ import ( | |||||||||
"fmt" | ||||||||||
"math" | ||||||||||
"sync/atomic" | ||||||||||
"time" | ||||||||||
|
||||||||||
"github.com/pingcap/errors" | ||||||||||
"github.com/pingcap/kvproto/pkg/kvrpcpb" | ||||||||||
|
@@ -67,11 +68,11 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo | |||||||||
return resp.resp, errors.Trace(resp.err) | ||||||||||
} | ||||||||||
ch := make(chan singleBatchResp, len(batches)) | ||||||||||
// step 1: split all regions in each batch | ||||||||||
for _, batch1 := range batches { | ||||||||||
go func(b batch) { | ||||||||||
backoffer, cancel := bo.Fork() | ||||||||||
defer cancel() | ||||||||||
|
||||||||||
util.WithRecovery(func() { | ||||||||||
select { | ||||||||||
case ch <- s.batchSendSingleRegion(backoffer, b, scatter, tableID): | ||||||||||
|
@@ -83,9 +84,42 @@ func (s *KVStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter boo | |||||||||
ch <- singleBatchResp{err: errors.Errorf("%v", r)} | ||||||||||
} | ||||||||||
}) | ||||||||||
|
||||||||||
}(batch1) | ||||||||||
} | ||||||||||
|
||||||||||
// step 2: scatter all regions in each batch | ||||||||||
if scatter { | ||||||||||
splitResps := make([]singleBatchResp, 0, len(batches)) | ||||||||||
for i := 0; i < len(batches); i++ { | ||||||||||
r := <-ch | ||||||||||
splitResps = append(splitResps, r) | ||||||||||
} | ||||||||||
for _, r := range splitResps { | ||||||||||
resp := r | ||||||||||
if resp.err != nil { | ||||||||||
ch <- resp | ||||||||||
continue | ||||||||||
} | ||||||||||
go func() { | ||||||||||
backoffer, cancel := bo.Fork() | ||||||||||
defer cancel() | ||||||||||
util.WithRecovery(func() { | ||||||||||
select { | ||||||||||
case ch <- s.scatterRegionsInBatchSplitResp(backoffer, tableID, resp): | ||||||||||
case <-bo.ctx.Done(): | ||||||||||
ch <- singleBatchResp{err: bo.ctx.Err()} | ||||||||||
} | ||||||||||
}, func(r interface{}) { | ||||||||||
if r != nil { | ||||||||||
ch <- singleBatchResp{err: errors.Errorf("%v", r)} | ||||||||||
} | ||||||||||
}) | ||||||||||
|
||||||||||
}() | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)} | ||||||||||
for i := 0; i < len(batches); i++ { | ||||||||||
batchResp := <-ch | ||||||||||
|
@@ -114,7 +148,7 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool | |||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
start := time.Now() | ||||||||||
req := tikvrpc.NewRequest(tikvrpc.CmdSplitRegion, &kvrpcpb.SplitRegionRequest{ | ||||||||||
SplitKeys: batch.keys, | ||||||||||
}, kvrpcpb.Context{ | ||||||||||
|
@@ -124,7 +158,7 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool | |||||||||
sender := NewRegionRequestSender(s.regionCache, s.client) | ||||||||||
resp, err := sender.SendReq(bo, req, batch.regionID, readTimeoutShort) | ||||||||||
|
||||||||||
batchResp := singleBatchResp{resp: resp} | ||||||||||
batchResp := singleBatchResp{regionID: batch.regionID, resp: resp} | ||||||||||
if err != nil { | ||||||||||
batchResp.err = errors.Trace(err) | ||||||||||
return batchResp | ||||||||||
|
@@ -138,6 +172,7 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool | |||||||||
err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) | ||||||||||
if err != nil { | ||||||||||
batchResp.err = errors.Trace(err) | ||||||||||
logutil.BgLogger().Error("batch split regions request failed", zap.Duration("cost-time", time.Since(start)), zap.Error(err)) | ||||||||||
return batchResp | ||||||||||
} | ||||||||||
resp, err = s.splitBatchRegionsReq(bo, batch.keys, scatter, tableID) | ||||||||||
|
@@ -157,29 +192,41 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool | |||||||||
if len(spResp.Regions) > 0 { | ||||||||||
newRegionLeft = logutil.Hex(spResp.Regions[0]).String() | ||||||||||
} | ||||||||||
logutil.BgLogger().Info("batch split regions complete", | ||||||||||
logutil.BgLogger().Info("batch split regions, split regions request complete", | ||||||||||
zap.Uint64("batch region ID", batch.regionID.id), | ||||||||||
zap.Duration("cost-time", time.Since(start)), | ||||||||||
zap.Stringer("first at", kv.Key(batch.keys[0])), | ||||||||||
zap.String("first new region left", newRegionLeft), | ||||||||||
zap.Int("new region count", len(spResp.Regions))) | ||||||||||
|
||||||||||
if !scatter { | ||||||||||
return batchResp | ||||||||||
} | ||||||||||
return batchResp | ||||||||||
} | ||||||||||
|
||||||||||
for i, r := range spResp.Regions { | ||||||||||
func (s *KVStore) scatterRegionsInBatchSplitResp(bo *Backoffer, tableID *int64, batchResp singleBatchResp) singleBatchResp { | ||||||||||
var err error | ||||||||||
start := time.Now() | ||||||||||
spResp := batchResp.resp.Resp.(*kvrpcpb.SplitRegionResponse) | ||||||||||
for _, r := range spResp.Regions { | ||||||||||
tid := int64(0) | ||||||||||
if tableID != nil { | ||||||||||
tid = *tableID | ||||||||||
} | ||||||||||
if err = s.scatterRegion(bo, r.Id, tableID); err == nil { | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it's still using tidb/store/tikv/split_region.go Line 217 in 579421f
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean #22788 . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current region scatter may not work in a specific case which described in tikv/pd#3422 , I think it's not related to the api. |
||||||||||
logutil.BgLogger().Info("batch split regions, scatter region complete", | ||||||||||
zap.Uint64("batch region ID", batch.regionID.id), | ||||||||||
zap.Stringer("at", kv.Key(batch.keys[i])), | ||||||||||
zap.Stringer("new region left", logutil.Hex(r))) | ||||||||||
logutil.BgLogger().Info("batch split regions, scatter single region complete", | ||||||||||
zap.Uint64("batch region ID", batchResp.regionID.id), | ||||||||||
zap.Uint64("scatter region ID", r.GetId()), | ||||||||||
zap.Stringer("at", kv.Key(r.EndKey)), | ||||||||||
zap.Stringer("new region left", logutil.Hex(r)), | ||||||||||
zap.Int64("table ID", tid)) | ||||||||||
continue | ||||||||||
} | ||||||||||
|
||||||||||
logutil.BgLogger().Info("batch split regions, scatter region failed", | ||||||||||
zap.Uint64("batch region ID", batch.regionID.id), | ||||||||||
zap.Stringer("at", kv.Key(batch.keys[i])), | ||||||||||
zap.Uint64("batch region ID", batchResp.regionID.id), | ||||||||||
zap.Stringer("at", kv.Key(r.EndKey)), | ||||||||||
zap.Uint64("scatter region ID", r.GetId()), | ||||||||||
zap.Stringer("new region left", logutil.Hex(r)), | ||||||||||
zap.Int64("table ID", tid), | ||||||||||
zap.Error(err)) | ||||||||||
if batchResp.err == nil { | ||||||||||
batchResp.err = err | ||||||||||
|
@@ -188,6 +235,15 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool | |||||||||
break | ||||||||||
} | ||||||||||
} | ||||||||||
tid := int64(0) | ||||||||||
if tableID != nil { | ||||||||||
tid = *tableID | ||||||||||
} | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
logutil.BgLogger().Info("batch split regions, scatter all region complete", | ||||||||||
zap.Uint64("batch region ID", batchResp.regionID.id), | ||||||||||
zap.Duration("cost-time", time.Since(start)), | ||||||||||
zap.Int64("table ID", tid)) | ||||||||||
|
||||||||||
return batchResp | ||||||||||
} | ||||||||||
|
||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.