Skip to content

Commit

Permalink
remove lite worker
Browse files Browse the repository at this point in the history
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed Dec 11, 2024
1 parent c652cf9 commit 0854b6e
Showing 1 changed file with 3 additions and 69 deletions.
72 changes: 3 additions & 69 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables any, op
if ctx.Value(util.RUDetailsCtxKey) == nil {
ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails())
}
it.open(ctx, option.TryCopLiteWorker)
it.open(ctx)
return it
}

Expand Down Expand Up @@ -678,7 +678,6 @@ type copIterator struct {
req *kv.Request
concurrency int
smallTaskConcurrency int
liteWorker *liteCopIteratorWorker
finishCh chan struct{}

// If keepOrder, results are stored in copTask.respChan, read them out one by one.
Expand Down Expand Up @@ -720,13 +719,6 @@ type copIterator struct {
stats *copIteratorRuntimeStats
}

type liteCopIteratorWorker struct {
// ctx contains some info(such as rpc interceptor(WithSQLKvExecCounterInterceptor)), it is used for handle cop task later.
ctx context.Context
worker *copIteratorWorker
batchCopRespList []*copResponse
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
type copIteratorWorker struct {
taskCh <-chan *copTask
Expand Down Expand Up @@ -865,14 +857,7 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
}

// open starts workers and sender goroutines.
func (it *copIterator) open(ctx context.Context, tryCopLiteWorker *uint32) {
//if len(it.tasks) == 1 && tryCopLiteWorker != nil && atomic.CompareAndSwapUint32(tryCopLiteWorker, 0, 1) {
// it.liteWorker = &liteCopIteratorWorker{
// ctx: ctx,
// worker: newCopIteratorWorker(it, nil),
// }
// return
//}
func (it *copIterator) open(ctx context.Context) {
taskCh := make(chan *copTask, 1)
it.wg.Add(it.concurrency + it.smallTaskConcurrency)
var smallTaskCh chan *copTask
Expand Down Expand Up @@ -1096,15 +1081,7 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
// Otherwise all responses are returned from a single channel.

failpoint.InjectCall("CtxCancelBeforeReceive", ctx)
if it.liteWorker != nil {
resp = it.liteSendReq(ctx)
if resp == nil {
it.actionOnExceed.close()
return nil, nil
}
it.actionOnExceed.destroyTokenIfNeeded(func() {})
memTrackerConsumeResp(it.memTracker, resp)
} else if it.respChan != nil {
if it.respChan != nil {
// Get next fetched resp from chan
resp, ok, closed = it.recvFromRespCh(ctx, it.respChan)
if !ok || closed {
Expand Down Expand Up @@ -1153,49 +1130,6 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
return resp, nil
}

func (it *copIterator) liteSendReq(ctx context.Context) (resp *copResponse) {
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("copIteratorWork meet panic",
zap.Any("r", r),
zap.Stack("stack trace"))
resp = &copResponse{err: util2.GetRecoverError(r)}
}
}()

if len(it.liteWorker.batchCopRespList) > 0 {
resp := it.liteWorker.batchCopRespList[0]
it.liteWorker.batchCopRespList = it.liteWorker.batchCopRespList[1:]
return resp
}

worker := it.liteWorker.worker
backoffermap := make(map[uint64]*Backoffer)
for len(it.tasks) > 0 {
curTask := it.tasks[0]
bo := chooseBackoffer(it.liteWorker.ctx, backoffermap, curTask, worker)
taskResp, err := worker.handleTaskOnce(bo, curTask)
if err != nil || taskResp == nil {
resp = &copResponse{err: errors.Trace(err)}
worker.checkRespOOM(resp, true)
return resp
}
if len(taskResp.batchRespList) > 0 {
it.tasks = append(taskResp.remains, it.tasks[1:]...)
} else {
it.tasks = it.tasks[1:]
}
resp = taskResp.resp
if resp != nil {
worker.checkRespOOM(resp, true)
it.liteWorker.batchCopRespList = taskResp.batchRespList
return resp
}
}
return nil
}

// HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats.
type HasUnconsumedCopRuntimeStats interface {
// CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats.
Expand Down

0 comments on commit 0854b6e

Please sign in to comment.