Skip to content

Commit e0699a6

Browse files
authored
planner: use logical cores as default fine grained stream count (#55544) (#55575)
close #55543
1 parent b932aeb commit e0699a6

File tree

2 files changed

+20
-17
lines changed

2 files changed

+20
-17
lines changed

pkg/planner/core/optimizer.go

+15-12
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"context"
2020
"fmt"
2121
"math"
22-
"runtime"
2322
"slices"
2423
"strconv"
2524
"time"
@@ -121,6 +120,8 @@ var optRuleList = []logicalOptRule{
121120
&resolveExpand{},
122121
}
123122

123+
const initialMaxCores uint64 = 10000
124+
124125
// Interaction Rule List
125126
/* The interaction rule will be trigger when it satisfies following conditions:
126127
1. The related rule has been trigger and changed the plan
@@ -797,9 +798,7 @@ func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysical
797798
h.plans = append(h.plans, p)
798799
}
799800

800-
// calculateTiFlashStreamCountUsingMinLogicalCores uses minimal logical cpu cores among tiflash servers, and divide by 2
801-
// return false, 0 if any err happens
802-
func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx PlanContext, serversInfo []infoschema.ServerInfo) (bool, uint64) {
801+
func getTiFlashServerMinLogicalCores(ctx context.Context, sctx PlanContext, serversInfo []infoschema.ServerInfo) (bool, uint64) {
803802
failpoint.Inject("mockTiFlashStreamCountUsingMinLogicalCores", func(val failpoint.Value) {
804803
intVal, err := strconv.Atoi(val.(string))
805804
if err == nil {
@@ -812,7 +811,6 @@ func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx P
812811
if err != nil {
813812
return false, 0
814813
}
815-
var initialMaxCores uint64 = 10000
816814
var minLogicalCores = initialMaxCores // set to a large enough value here
817815
for _, row := range rows {
818816
if row[4].GetString() == "cpu-logical-cores" {
@@ -823,14 +821,19 @@ func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx P
823821
}
824822
}
825823
// No need to check len(serersInfo) == serverCount here, since missing some servers' info won't affect the correctness
826-
if minLogicalCores > 1 && minLogicalCores != initialMaxCores {
827-
if runtime.GOARCH == "amd64" {
828-
// In most x86-64 platforms, `Thread(s) per core` is 2
829-
return true, minLogicalCores / 2
830-
}
831-
// ARM cpus don't implement Hyper-threading.
824+
return true, minLogicalCores
825+
}
826+
827+
// calculateTiFlashStreamCountUsingMinLogicalCores uses minimal logical cpu cores among tiflash servers
828+
// return false, 0 if any err happens
829+
func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx PlanContext, serversInfo []infoschema.ServerInfo) (bool, uint64) {
830+
valid, minLogicalCores := getTiFlashServerMinLogicalCores(ctx, sctx, serversInfo)
831+
if !valid {
832+
return false, 0
833+
}
834+
if minLogicalCores != initialMaxCores {
835+
// use logical core number as the stream count, the same as TiFlash's default max_threads: https://github.com/pingcap/tiflash/blob/v7.5.0/dbms/src/Interpreters/SettingsCommon.h#L166
832836
return true, minLogicalCores
833-
// Other platforms are too rare to consider
834837
}
835838

836839
return false, 0

pkg/planner/core/optimizer_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func TestHandleFineGrainedShuffle(t *testing.T) {
304304
require.NoError(t, failpoint.Enable(fpName, fpExpr))
305305
defer func() { require.NoError(t, failpoint.Disable(fpName)) }()
306306
fpName2 := "github.com/pingcap/tidb/pkg/planner/core/mockTiFlashStreamCountUsingMinLogicalCores"
307-
require.NoError(t, failpoint.Enable(fpName2, `return("8")`))
307+
require.NoError(t, failpoint.Enable(fpName2, `return("16")`))
308308
sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount = 0
309309

310310
col0 := &expression.Column{
@@ -329,7 +329,7 @@ func TestHandleFineGrainedShuffle(t *testing.T) {
329329
recv.children = []PhysicalPlan{hashSender}
330330
hashSender.children = []PhysicalPlan{tableScan}
331331
tableScan.Schema().Columns = append(tableScan.Schema().Columns, col0)
332-
start(hashAgg, 8, 3, 0)
332+
start(hashAgg, 16, 3, 0)
333333

334334
// Join(x) <- ExchangeReceiver <- ExchangeSender
335335
// <- ExchangeReceiver <- ExchangeSender
@@ -357,9 +357,9 @@ func TestHandleFineGrainedShuffle(t *testing.T) {
357357
hashSender1.HashCols = partitionCols
358358
tableScan1.Schema().Columns = append(tableScan1.Schema().Columns, col0)
359359
handleFineGrainedShuffle(nil, sctx.GetPlanCtx(), tableReader)
360-
require.Equal(t, uint64(8), hashJoin.TiFlashFineGrainedShuffleStreamCount)
361-
require.Equal(t, uint64(8), recv1.TiFlashFineGrainedShuffleStreamCount)
362-
require.Equal(t, uint64(8), hashSender1.TiFlashFineGrainedShuffleStreamCount)
360+
require.Equal(t, uint64(16), hashJoin.TiFlashFineGrainedShuffleStreamCount)
361+
require.Equal(t, uint64(16), recv1.TiFlashFineGrainedShuffleStreamCount)
362+
require.Equal(t, uint64(16), hashSender1.TiFlashFineGrainedShuffleStreamCount)
363363
require.Equal(t, uint64(0), recv.TiFlashFineGrainedShuffleStreamCount)
364364
require.Equal(t, uint64(0), hashSender.TiFlashFineGrainedShuffleStreamCount)
365365
clear(plans)

0 commit comments

Comments
 (0)