Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: set upper limit for extra concurrency #41135

Merged
merged 12 commits into from
Feb 7, 2023
16 changes: 12 additions & 4 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
copNextMaxBackoff = 20000
CopSmallTaskRow = 32 // 32 is the initial batch size of TiKV
smallTaskSigma = 0.5
smallConcPerCore = 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any experience or experiments about this choice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the PR itself, this number seems a bit conservative to me. Maybe ok if this is combined with coprocessor batch.

Copy link
Contributor Author

@you06 you06 Feb 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the solution for the massive table lookup tasks, it's conservative, however, if we calculate the number of goroutines(given a 16c instance, there can be 320 goroutines per cop iterator, and there are 5 table workers in 1 index lookup executor), it's not a small number. The rest speedup optimization might be done by coprocessor batch.

)

// CopClient is coprocessor client.
Expand Down Expand Up @@ -200,7 +201,7 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
}
if tryRowHint {
var smallTasks int
smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks)
smallTasks, it.smallTaskConcurrency = smallTaskConcurrency(tasks, c.store.numcpu)
if len(tasks)-smallTasks < it.concurrency {
it.concurrency = len(tasks) - smallTasks
}
Expand Down Expand Up @@ -580,7 +581,7 @@ func isSmallTask(task *copTask) bool {

// smallTaskConcurrency counts the small tasks of tasks,
// then returns the task count and extra concurrency for small tasks.
func smallTaskConcurrency(tasks []*copTask) (int, int) {
func smallTaskConcurrency(tasks []*copTask, numcpu int) (int, int) {
res := 0
for _, task := range tasks {
if isSmallTask(task) {
Expand All @@ -592,8 +593,15 @@ func smallTaskConcurrency(tasks []*copTask) (int, int) {
}
// Calculate the extra concurrency for small tasks
// extra concurrency = tasks / (1 + sigma * sqrt(log(tasks ^ 2)))
extraConc := float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res))))
return res, int(extraConc)
extraConc := int(float64(res) / (1 + smallTaskSigma*math.Sqrt(2*math.Log(float64(res)))))
if numcpu <= 0 {
numcpu = 1
}
smallTaskConcurrencyLimit := smallConcPerCore * numcpu
if extraConc > smallTaskConcurrencyLimit {
extraConc = smallTaskConcurrencyLimit
}
return res, extraConc
}

type copIterator struct {
Expand Down
23 changes: 20 additions & 3 deletions store/copr/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func TestBasicSmallTaskConc(t *testing.T) {
require.True(t, isSmallTask(&copTask{RowCountHint: 6}))
require.True(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow}))
require.False(t, isSmallTask(&copTask{RowCountHint: CopSmallTaskRow + 1}))
_, conc := smallTaskConcurrency([]*copTask{})
_, conc := smallTaskConcurrency([]*copTask{}, 16)
require.GreaterOrEqual(t, conc, 0)
}

Expand Down Expand Up @@ -734,7 +734,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) {
require.Equal(t, tasks[2].RowCountHint, 3)
// task[3] ["t"-"x", "y"-"z"]
require.Equal(t, tasks[3].RowCountHint, 3+CopSmallTaskRow)
_, conc := smallTaskConcurrency(tasks)
_, conc := smallTaskConcurrency(tasks, 16)
require.Equal(t, conc, 1)

ranges = buildCopRanges("a", "c", "d", "e", "h", "x", "y", "z")
Expand All @@ -753,7 +753,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) {
require.Equal(t, tasks[2].RowCountHint, 3)
// task[3] ["t"-"x", "y"-"z"]
require.Equal(t, tasks[3].RowCountHint, 6)
_, conc = smallTaskConcurrency(tasks)
_, conc = smallTaskConcurrency(tasks, 16)
require.Equal(t, conc, 2)

// cross-region long range
Expand All @@ -774,3 +774,20 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) {
// task[3] ["t"-"z"]
require.Equal(t, tasks[3].RowCountHint, 10)
}

func TestSmallTaskConcurrencyLimit(t *testing.T) {
smallTaskCount := 1000
tasks := make([]*copTask, 0, smallTaskCount)
for i := 0; i < smallTaskCount; i++ {
tasks = append(tasks, &copTask{
RowCountHint: 1,
})
}
count, conc := smallTaskConcurrency(tasks, 1)
require.Equal(t, smallConcPerCore, conc)
require.Equal(t, smallTaskCount, count)
// also handle 0 value.
count, conc = smallTaskConcurrency(tasks, 0)
require.Equal(t, smallConcPerCore, conc)
require.Equal(t, smallTaskCount, count)
}
3 changes: 3 additions & 0 deletions store/copr/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package copr
import (
"context"
"math/rand"
"runtime"
"sync/atomic"
"time"

Expand Down Expand Up @@ -76,6 +77,7 @@ type Store struct {
*kvStore
coprCache *coprCache
replicaReadSeed uint32
numcpu int
}

// NewStore creates a new store instance.
Expand All @@ -90,6 +92,7 @@ func NewStore(s *tikv.KVStore, coprCacheConfig *config.CoprocessorCache) (*Store
kvStore: &kvStore{store: s},
coprCache: coprCache,
replicaReadSeed: rand.Uint32(),
numcpu: runtime.GOMAXPROCS(0),
}, nil
}

Expand Down