Skip to content

Commit

Permalink
copr: add paging API for streaming-like process (#29612)
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored Nov 30, 2021
1 parent 0c8614a commit 37e0dac
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 40 deletions.
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ func (builder *RequestBuilder) SetStreaming(streaming bool) *RequestBuilder {
return builder
}

// SetPaging sets "Paging" flag for "kv.Request".
func (builder *RequestBuilder) SetPaging(paging bool) *RequestBuilder {
builder.Request.Paging = paging
return builder
}

// SetConcurrency sets "Concurrency" for "kv.Request".
func (builder *RequestBuilder) SetConcurrency(concurrency int) *RequestBuilder {
builder.Request.Concurrency = concurrency
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ type Request struct {
MatchStoreLabels []*metapb.StoreLabel
// ResourceGroupTagger indicates the kv request task group tagger.
ResourceGroupTagger tikvrpc.ResourceGroupTagger
// Paging indicates whether the request is a paging request.
Paging bool
}

const (
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,9 @@ type SessionVars struct {

// Rng stores the rand_seed1 and rand_seed2 for Rand() function
Rng *utilMath.MysqlRng

// EnablePaging indicates whether enable paging in coprocessor requests.
EnablePaging bool
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,10 @@ var defaultSysVars = []*SysVar{
}, GetSession: func(s *SessionVars) (string, error) {
return "0", nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnablePaging, Value: Off, Type: TypeBool, Hidden: true, skipInit: true, SetSession: func(s *SessionVars, val string) error {
s.EnablePaging = TiDBOptOn(val)
return nil
}},
}

func collectAllowFuncName4ExpressionIndex() string {
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ const (

// TiDBReadStaleness indicates the staleness duration for following statement
TiDBReadStaleness = "tidb_read_staleness"

// TiDBEnablePaging indicates whether paging is enabled in coprocessor requests.
TiDBEnablePaging = "tidb_enable_paging"
)

// TiDB system variable names that both in session and global scope.
Expand Down
178 changes: 138 additions & 40 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ const (
copNextMaxBackoff = 20000
)

// A paging request may be separated into multi requests if there are more data than a page.
// The paging size grows from min to max, it's not well tuned yet.
// e.g. a paging request scans over range (r1, r200), it requires 64 rows in the first batch,
// if it's not drained, then the paging size grows, the new range is calculated like (r100, r200), then send a request again.
// Compare with the common unary request, paging request allows early access of data, it offers a streaming-like way processing data.
// TODO: may make the paging parameters configurable.
const (
minPagingSize uint64 = 64
maxPagingSize = minPagingSize * 128
pagingSizeGrow uint64 = 2
)

// CopClient is coprocessor client.
type CopClient struct {
kv.RequestTypeSupportedChecker
Expand All @@ -78,6 +90,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
if req.Streaming && req.Paging {
return copErrorResponse{errors.New("streaming and paging are both on")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := NewKeyRanges(req.KeyRanges)
Expand Down Expand Up @@ -115,6 +130,13 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
// 2*it.concurrency to avoid deadlock in the unit test caused by the `MustExec` or `Exec`
capacity = it.concurrency * 2
}
// in streaming or paging request, a request will be returned in multi batches,
// enlarge the channel size to avoid the request blocked by buffer full.
if req.Streaming || req.Paging {
if capacity < 2048 {
capacity = 2048
}
}
it.respChan = make(chan *copResponse, capacity)
it.sendRate = util.NewRateLimit(it.concurrency)
}
Expand All @@ -140,7 +162,9 @@ type copTask struct {
cmdType tikvrpc.CmdType
storeType kv.StoreType

eventCb trxevents.EventCallback
eventCb trxevents.EventCallback
paging bool
pagingSize uint64
}

func (r *copTask) String() string {
Expand Down Expand Up @@ -168,6 +192,14 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
if err != nil {
return nil, errors.Trace(err)
}
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
chanSize := 2
// in streaming or paging request, a request will be returned in multi batches,
// enlarge the channel size to avoid the request blocked by buffer full.
if req.Streaming || req.Paging {
chanSize = 128
}

var tasks []*copTask
for _, loc := range locs {
Expand All @@ -176,15 +208,21 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv
rLen := loc.Ranges.Len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
// If this is a paging request, we set the paging size to minPagingSize,
// the size will grow every round.
pagingSize := uint64(0)
if req.Paging {
pagingSize = minPagingSize
}
tasks = append(tasks, &copTask{
region: loc.Location.Region,
ranges: loc.Ranges.Slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
eventCb: eventCb,
region: loc.Location.Region,
ranges: loc.Ranges.Slice(i, nextI),
respChan: make(chan *copResponse, chanSize),
cmdType: cmdType,
storeType: req.StoreType,
eventCb: eventCb,
paging: req.Paging,
pagingSize: pagingSize,
})
i = nextI
}
Expand Down Expand Up @@ -386,13 +424,8 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
worker.sendToRespCh(finCopResp, worker.respChan, false)
}
close(task.respChan)
if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
return
}
select {
case <-worker.finishCh:
if worker.finished() {
return
default:
}
}
}
Expand Down Expand Up @@ -648,11 +681,9 @@ func (worker *copIteratorWorker) handleTask(ctx context.Context, task *copTask,
worker.sendToRespCh(resp, respCh, true)
return
}
// test whether the ctx is cancelled
if vars := bo.GetVars(); vars != nil && vars.Killed != nil && atomic.LoadUint32(vars.Killed) == 1 {
return
if worker.finished() {
break
}

if len(tasks) > 0 {
remainTasks = append(tasks, remainTasks[1:]...)
} else {
Expand All @@ -674,19 +705,21 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
})

copReq := coprocessor.Request{
Tp: worker.req.Tp,
StartTs: worker.req.StartTs,
Data: worker.req.Data,
Ranges: task.ranges.ToPBRanges(),
SchemaVer: worker.req.SchemaVar,
Tp: worker.req.Tp,
StartTs: worker.req.StartTs,
Data: worker.req.Data,
Ranges: task.ranges.ToPBRanges(),
SchemaVer: worker.req.SchemaVar,
PagingSize: task.pagingSize,
}

var cacheKey []byte = nil
var cacheValue *coprCacheValue = nil
var cacheKey []byte
var cacheValue *coprCacheValue

// TODO: cache paging copr
// If there are many ranges, it is very likely to be a TableLookupRequest. They are not worth to cache since
// computing is not the main cost. Ignore such requests directly to avoid slowly building the cache key.
if task.cmdType == tikvrpc.CmdCop && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
if task.cmdType == tikvrpc.CmdCop && !task.paging && worker.store.coprCache != nil && worker.req.Cacheable && worker.store.coprCache.CheckRequestAdmission(len(copReq.Ranges)) {
cKey, err := coprCacheBuildKey(&copReq)
if err == nil {
cacheKey = cKey
Expand Down Expand Up @@ -753,6 +786,10 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch, costTime)
}

if worker.req.Paging {
return worker.handleCopPagingResult(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, costTime)
}

// Handles the response for non-streaming copTask.
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, cacheKey, cacheValue, task, ch, nil, costTime)
}
Expand Down Expand Up @@ -862,14 +899,38 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *ti
} else {
logutil.BgLogger().Info("stream unknown error", zap.Error(err))
}
return worker.buildCopTasksFromRemain(bo, lastRange, task)
task.ranges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
return []*copTask{task}, nil
}
if resp.Range != nil {
lastRange = resp.Range
}
}
}

func (worker *copIteratorWorker) handleCopPagingResult(bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, resp, nil, nil, task, ch, nil, costTime)
if err != nil || len(remainedTasks) != 0 {
// If there is region error or lock error, keep the paging size and retry.
for _, remainedTask := range remainedTasks {
remainedTask.pagingSize = task.pagingSize
}
return remainedTasks, errors.Trace(err)
}
pagingRange := resp.pbResp.Range
// only paging requests need to calculate the next ranges
if pagingRange == nil {
return nil, errors.New("lastRange in paging should not be nil")
}
// calculate next ranges and grow the paging size
task.ranges = worker.calculateRemain(task.ranges, pagingRange, worker.req.Desc)
if task.ranges.Len() == 0 {
return nil, nil
}
task.pagingSize = growPagingSize(task.pagingSize)
return []*copTask{task}, nil
}

// handleCopResponse checks coprocessor Response for region split and lock,
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
Expand Down Expand Up @@ -909,7 +970,10 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
return nil, errors.Trace(err)
}
}
return worker.buildCopTasksFromRemain(bo, lastRange, task)
if worker.req.Streaming {
task.ranges = worker.calculateRetry(task.ranges, lastRange, worker.req.Desc)
}
return []*copTask{task}, nil
}
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
Expand Down Expand Up @@ -1037,30 +1101,56 @@ func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask,
return nil
}

func (worker *copIteratorWorker) buildCopTasksFromRemain(bo *Backoffer, lastRange *coprocessor.KeyRange, task *copTask) ([]*copTask, error) {
remainedRanges := task.ranges
if worker.req.Streaming && lastRange != nil {
remainedRanges = worker.calculateRemain(task.ranges, lastRange, worker.req.Desc)
// calculateRetry splits the input ranges into two, and take one of them according to desc flag.
// It's used in streaming API, to calculate which range is consumed and what needs to be retry.
// For example:
// ranges: [r1 --> r2) [r3 --> r4)
// split: [s1 --> s2)
// In normal scan order, all data before s1 is consumed, so the retry ranges should be [s1 --> r2) [r3 --> r4)
// In reverse scan order, all data after s2 is consumed, so the retry ranges should be [r1 --> r2) [r3 --> s2)
func (worker *copIteratorWorker) calculateRetry(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges {
if split == nil {
return ranges
}
if desc {
left, _ := ranges.Split(split.End)
return left
}
return buildCopTasks(bo, worker.store.GetRegionCache(), remainedRanges, worker.req, task.eventCb)
_, right := ranges.Split(split.Start)
return right
}

// calculateRemain splits the input ranges into two, and take one of them according to desc flag.
// It's used in streaming API, to calculate which range is consumed and what needs to be retry.
// calculateRemain calculates the remain ranges to be processed, it's used in streaming and paging API.
// For example:
// ranges: [r1 --> r2) [r3 --> r4)
// split: [s1 --> s2)
// In normal scan order, all data before s1 is consumed, so the remain ranges should be [s1 --> r2) [r3 --> r4)
// In reverse scan order, all data after s2 is consumed, so the remain ranges should be [r1 --> r2) [r3 --> s2)
// In normal scan order, all data before s2 is consumed, so the remained ranges should be [s2 --> r4)
// In reverse scan order, all data after s1 is consumed, so the remained ranges should be [r1 --> s1)
func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *coprocessor.KeyRange, desc bool) *KeyRanges {
if split == nil {
return ranges
}
if desc {
left, _ := ranges.Split(split.End)
left, _ := ranges.Split(split.Start)
return left
}
_, right := ranges.Split(split.Start)
_, right := ranges.Split(split.End)
return right
}

// finished checks the flags and finished channel, it tells whether the worker is finished.
func (worker *copIteratorWorker) finished() bool {
if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
return true
}
select {
case <-worker.finishCh:
return true
default:
return false
}
}

func (it *copIterator) Close() error {
if atomic.CompareAndSwapUint32(&it.closed, 0, 1) {
close(it.finishCh)
Expand Down Expand Up @@ -1241,3 +1331,11 @@ func isolationLevelToPB(level kv.IsoLevel) kvrpcpb.IsolationLevel {
return kvrpcpb.IsolationLevel_SI
}
}

func growPagingSize(size uint64) uint64 {
size *= pagingSizeGrow
if size > maxPagingSize {
return maxPagingSize
}
return size
}
Loading

0 comments on commit 37e0dac

Please sign in to comment.