Skip to content

Commit

Permalink
Revert "remove lite worker"
Browse files Browse the repository at this point in the history
This reverts commit 0854b6e.

Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
crazycs520 committed Dec 11, 2024
1 parent 53010ee commit 0339fd2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 11 deletions.
8 changes: 1 addition & 7 deletions pkg/store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func TestCopQuery(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
// Test tidb_distsql_scan_concurrency for partition table.
// Test for https://github.com/pingcap/tidb/pull/57522#discussion_r1875515863
tk.MustExec("create table t1 (id int key, b int, c int, index idx_b(b)) partition by hash(id) partitions 10;")
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t1 values (%v, %v, %v)", i, i, i))
Expand All @@ -307,10 +307,4 @@ func TestCopQuery(t *testing.T) {
tk.MustQueryWithContext(ctx, "select sum(c) from t1 use index (idx_b) where b < 10;")
cancel()
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowCop"))

// Test query after split region.
tk.MustExec("create table t2 (id int key, b int, c int, index idx_b(b));")
tk.MustExec("insert into t2 select * from t1")
tk.MustQuery("split table t2 by (0), (1), (2), (3), (4), (5), (6), (7), (8), (9), (10);").Check(testkit.Rows("11 1"))
tk.MustQuery("select sum(c) from t2 use index (idx_b) where b < 10;")
}
76 changes: 72 additions & 4 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables any, op
if ctx.Value(util.RUDetailsCtxKey) == nil {
ctx = context.WithValue(ctx, util.RUDetailsCtxKey, util.NewRUDetails())
}
it.open(ctx)
it.open(ctx, option.TryCopLiteWorker)
return it
}

Expand Down Expand Up @@ -678,7 +678,9 @@ type copIterator struct {
req *kv.Request
concurrency int
smallTaskConcurrency int
finishCh chan struct{}
// liteWorker uses to send cop request without start new goroutine, it is only work when tasks count is 1, and used to improve the performance of small cop query.
liteWorker *liteCopIteratorWorker
finishCh chan struct{}

// If keepOrder, results are stored in copTask.respChan, read them out one by one.
tasks []*copTask
Expand Down Expand Up @@ -719,6 +721,13 @@ type copIterator struct {
stats *copIteratorRuntimeStats
}

type liteCopIteratorWorker struct {
// ctx contains some info(such as rpc interceptor(WithSQLKvExecCounterInterceptor)), it is used for handle cop task later.
ctx context.Context
worker *copIteratorWorker
batchCopRespList []*copResponse
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
type copIteratorWorker struct {
taskCh <-chan *copTask
Expand Down Expand Up @@ -857,7 +866,14 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
}

// open starts workers and sender goroutines.
func (it *copIterator) open(ctx context.Context) {
func (it *copIterator) open(ctx context.Context, tryCopLiteWorker *uint32) {
if len(it.tasks) == 1 && tryCopLiteWorker != nil && atomic.CompareAndSwapUint32(tryCopLiteWorker, 0, 1) {
it.liteWorker = &liteCopIteratorWorker{
ctx: ctx,
worker: newCopIteratorWorker(it, nil),
}
return
}
taskCh := make(chan *copTask, 1)
it.wg.Add(it.concurrency + it.smallTaskConcurrency)
var smallTaskCh chan *copTask
Expand Down Expand Up @@ -1081,7 +1097,15 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
// Otherwise all responses are returned from a single channel.

failpoint.InjectCall("CtxCancelBeforeReceive", ctx)
if it.respChan != nil {
if it.liteWorker != nil {
resp = it.liteWorker.liteSendReq(ctx, it)
if resp == nil {
it.actionOnExceed.close()
return nil, nil
}
it.actionOnExceed.destroyTokenIfNeeded(func() {})
memTrackerConsumeResp(it.memTracker, resp)
} else if it.respChan != nil {
// Get next fetched resp from chan
resp, ok, closed = it.recvFromRespCh(ctx, it.respChan)
if !ok || closed {
Expand Down Expand Up @@ -1130,6 +1154,50 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
return resp, nil
}

func (w *liteCopIteratorWorker) liteSendReq(ctx context.Context, it *copIterator) (resp *copResponse) {
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("copIteratorWork meet panic",
zap.Any("r", r),
zap.Stack("stack trace"))
resp = &copResponse{err: util2.GetRecoverError(r)}
}
}()

if len(w.batchCopRespList) > 0 {
resp := w.batchCopRespList[0]
w.batchCopRespList = w.batchCopRespList[1:]
return resp
}

worker := w.worker
backoffermap := make(map[uint64]*Backoffer)
for len(it.tasks) > 0 {
curTask := it.tasks[0]
bo := chooseBackoffer(w.ctx, backoffermap, curTask, worker)
result, err := worker.handleTaskOnce(bo, curTask)
if err != nil {
resp = &copResponse{err: errors.Trace(err)}
worker.checkRespOOM(resp, true)
return resp
}

if result != nil && len(result.batchRespList) > 0 {
it.tasks = append(result.remains, it.tasks[1:]...)
} else {
it.tasks = it.tasks[1:]
}
if result != nil && result.resp != nil {
resp = result.resp
worker.checkRespOOM(resp, true)
w.batchCopRespList = result.batchRespList
return resp
}
}
return nil
}

// HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats.
type HasUnconsumedCopRuntimeStats interface {
// CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats.
Expand Down

0 comments on commit 0339fd2

Please sign in to comment.