diff --git a/executor/memtable_reader.go b/executor/memtable_reader.go index 4d6e5112bfefb..3c1530ed714cf 100644 --- a/executor/memtable_reader.go +++ b/executor/memtable_reader.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" - "github.com/pingcap/log" "github.com/pingcap/sysutil" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/infoschema" @@ -47,7 +46,6 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/pdapi" "github.com/pingcap/tidb/util/set" - "go.uber.org/zap" "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -177,7 +175,7 @@ func fetchClusterConfig(sctx sessionctx.Context, nodeTypes, nodeAddrs set.String if err != nil { return nil, err } - serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs) + serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs) //nolint: prealloc var finalRows [][]types.Datum wg := sync.WaitGroup{} @@ -310,108 +308,12 @@ func (e *clusterServerInfoRetriever) retrieve(ctx context.Context, sctx sessionc return nil, nil } e.retrieved = true - serversInfo, err := infoschema.GetClusterServerInfo(sctx) if err != nil { return nil, err } - serversInfo = filterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Instances) - - type result struct { - idx int - rows [][]types.Datum - err error - } - wg := sync.WaitGroup{} - ch := make(chan result, len(serversInfo)) - infoTp := e.serverInfoType - finalRows := make([][]types.Datum, 0, len(serversInfo)*10) - for i, srv := range serversInfo { - address := srv.Address - remote := address - if srv.ServerType == "tidb" { - remote = srv.StatusAddr - } - wg.Add(1) - go func(index int, remote, address, serverTP string) { - util.WithRecovery(func() { - defer wg.Done() - items, err := getServerInfoByGRPC(ctx, remote, infoTp) - if err != nil { - ch <- result{idx: index, err: err} - return - } - partRows := serverInfoItemToRows(items, serverTP, address) - ch <- result{idx: index, rows: partRows} - }, nil) - }(i, remote, address, srv.ServerType) - } - wg.Wait() - close(ch) - // Keep the original order to make the result more stable - var results []result //nolint: prealloc - for result := range ch { - if result.err != nil { - sctx.GetSessionVars().StmtCtx.AppendWarning(result.err) - continue - } - results = append(results, result) - } - slices.SortFunc(results, func(i, j result) bool { return i.idx < j.idx }) - for _, result := range results { - finalRows = append(finalRows, result.rows...) - } - return finalRows, nil -} - -func serverInfoItemToRows(items []*diagnosticspb.ServerInfoItem, tp, addr string) [][]types.Datum { - rows := make([][]types.Datum, 0, len(items)) - for _, v := range items { - for _, item := range v.Pairs { - row := types.MakeDatums( - tp, - addr, - v.Tp, - v.Name, - item.Key, - item.Value, - ) - rows = append(rows, row) - } - } - return rows -} - -func getServerInfoByGRPC(ctx context.Context, address string, tp diagnosticspb.ServerInfoType) ([]*diagnosticspb.ServerInfoItem, error) { - opt := grpc.WithInsecure() - security := config.GetGlobalConfig().Security - if len(security.ClusterSSLCA) != 0 { - clusterSecurity := security.ClusterSecurity() - tlsConfig, err := clusterSecurity.ToTLSConfig() - if err != nil { - return nil, errors.Trace(err) - } - opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) - } - conn, err := grpc.Dial(address, opt) - if err != nil { - return nil, err - } - defer func() { - err := conn.Close() - if err != nil { - log.Error("close grpc connection error", zap.Error(err)) - } - }() - - cli := diagnosticspb.NewDiagnosticsClient(conn) - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - r, err := cli.ServerInfo(ctx, &diagnosticspb.ServerInfoRequest{Tp: tp}) - if err != nil { - return nil, err - } - return r.Items, nil + serversInfo = infoschema.FilterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Instances) + return infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, serversInfo, e.serverInfoType, true) } func parseFailpointServerInfo(s string) []infoschema.ServerInfo { @@ -428,28 +330,6 @@ func parseFailpointServerInfo(s string) []infoschema.ServerInfo { return serversInfo } -func filterClusterServerInfo(serversInfo []infoschema.ServerInfo, nodeTypes, addresses set.StringSet) []infoschema.ServerInfo { - if len(nodeTypes) == 0 && len(addresses) == 0 { - return serversInfo - } - - filterServers := make([]infoschema.ServerInfo, 0, len(serversInfo)) - for _, srv := range serversInfo { - // Skip some node type which has been filtered in WHERE clause - // e.g: SELECT * FROM cluster_config WHERE type='tikv' - if len(nodeTypes) > 0 && !nodeTypes.Exist(srv.ServerType) { - continue - } - // Skip some node address which has been filtered in WHERE clause - // e.g: SELECT * FROM cluster_config WHERE address='192.16.8.12:2379' - if len(addresses) > 0 && !addresses.Exist(srv.Address) { - continue - } - filterServers = append(filterServers, srv) - } - return filterServers -} - type clusterLogRetriever struct { isDrained bool retrieving bool @@ -515,7 +395,7 @@ func (e *clusterLogRetriever) initialize(ctx context.Context, sctx sessionctx.Co instances := e.extractor.Instances nodeTypes := e.extractor.NodeTypes - serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, instances) + serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, instances) var levels = make([]diagnosticspb.LogLevel, 0, len(e.extractor.LogLevels)) for l := range e.extractor.LogLevels { diff --git a/executor/set_config.go b/executor/set_config.go index 531a61acdc4a0..b508b55eb2fc7 100644 --- a/executor/set_config.go +++ b/executor/set_config.go @@ -102,7 +102,7 @@ func (s *SetConfigExec) Next(ctx context.Context, req *chunk.Chunk) error { if s.p.Instance != "" { nodeAddrs.Insert(s.p.Instance) } - serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs) + serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs) if s.p.Instance != "" && len(serversInfo) == 0 { return errors.Errorf("instance %v is not found in this cluster", s.p.Instance) } diff --git a/infoschema/BUILD.bazel b/infoschema/BUILD.bazel index 37fd2bc423dc9..a92608a8df1b3 100644 --- a/infoschema/BUILD.bazel +++ b/infoschema/BUILD.bazel @@ -50,8 +50,12 @@ go_library( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/diagnosticspb", "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//tikv", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//credentials", "@org_golang_x_exp//slices", "@org_uber_go_zap//:zap", ], diff --git a/infoschema/tables.go b/infoschema/tables.go index 0f036ab2c042c..0e16f1c3b3eb1 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -22,10 +22,14 @@ import ( "net/http" "strconv" "strings" + "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" @@ -47,9 +51,13 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/pdapi" "github.com/pingcap/tidb/util/sem" + "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/stmtsummary" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" + "golang.org/x/exp/slices" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) const ( @@ -2227,3 +2235,142 @@ func (vt *VirtualTable) GetPhysicalID() int64 { func (vt *VirtualTable) Type() table.Type { return table.VirtualTable } + +// GetTiFlashServerInfo returns all TiFlash server infos +func GetTiFlashServerInfo(sctx sessionctx.Context) ([]ServerInfo, error) { + if config.GetGlobalConfig().DisaggregatedTiFlash { + return nil, table.ErrUnsupportedOp + } + serversInfo, err := GetStoreServerInfo(sctx) + if err != nil { + return nil, err + } + serversInfo = FilterClusterServerInfo(serversInfo, set.NewStringSet(kv.TiFlash.Name()), set.NewStringSet()) + return serversInfo, nil +} + +// FetchClusterServerInfoWithoutPrivilegeCheck fetches cluster server information +func FetchClusterServerInfoWithoutPrivilegeCheck(ctx context.Context, sctx sessionctx.Context, serversInfo []ServerInfo, serverInfoType diagnosticspb.ServerInfoType, recordWarningInStmtCtx bool) ([][]types.Datum, error) { + type result struct { + idx int + rows [][]types.Datum + err error + } + wg := sync.WaitGroup{} + ch := make(chan result, len(serversInfo)) + infoTp := serverInfoType + finalRows := make([][]types.Datum, 0, len(serversInfo)*10) + for i, srv := range serversInfo { + address := srv.Address + remote := address + if srv.ServerType == "tidb" { + remote = srv.StatusAddr + } + wg.Add(1) + go func(index int, remote, address, serverTP string) { + util.WithRecovery(func() { + defer wg.Done() + items, err := getServerInfoByGRPC(ctx, remote, infoTp) + if err != nil { + ch <- result{idx: index, err: err} + return + } + partRows := serverInfoItemToRows(items, serverTP, address) + ch <- result{idx: index, rows: partRows} + }, nil) + }(i, remote, address, srv.ServerType) + } + wg.Wait() + close(ch) + // Keep the original order to make the result more stable + var results []result //nolint: prealloc + for result := range ch { + if result.err != nil { + if recordWarningInStmtCtx { + sctx.GetSessionVars().StmtCtx.AppendWarning(result.err) + } else { + log.Warn(result.err.Error()) + } + continue + } + results = append(results, result) + } + slices.SortFunc(results, func(i, j result) bool { return i.idx < j.idx }) + for _, result := range results { + finalRows = append(finalRows, result.rows...) + } + return finalRows, nil +} + +func serverInfoItemToRows(items []*diagnosticspb.ServerInfoItem, tp, addr string) [][]types.Datum { + rows := make([][]types.Datum, 0, len(items)) + for _, v := range items { + for _, item := range v.Pairs { + row := types.MakeDatums( + tp, + addr, + v.Tp, + v.Name, + item.Key, + item.Value, + ) + rows = append(rows, row) + } + } + return rows +} + +func getServerInfoByGRPC(ctx context.Context, address string, tp diagnosticspb.ServerInfoType) ([]*diagnosticspb.ServerInfoItem, error) { + opt := grpc.WithInsecure() + security := config.GetGlobalConfig().Security + if len(security.ClusterSSLCA) != 0 { + clusterSecurity := security.ClusterSecurity() + tlsConfig, err := clusterSecurity.ToTLSConfig() + if err != nil { + return nil, errors.Trace(err) + } + opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + } + conn, err := grpc.Dial(address, opt) + if err != nil { + return nil, err + } + defer func() { + err := conn.Close() + if err != nil { + log.Error("close grpc connection error", zap.Error(err)) + } + }() + + cli := diagnosticspb.NewDiagnosticsClient(conn) + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + r, err := cli.ServerInfo(ctx, &diagnosticspb.ServerInfoRequest{Tp: tp}) + if err != nil { + return nil, err + } + return r.Items, nil +} + +// FilterClusterServerInfo filters serversInfo by nodeTypes and addresses +func FilterClusterServerInfo(serversInfo []ServerInfo, nodeTypes, addresses set.StringSet) []ServerInfo { + if len(nodeTypes) == 0 && len(addresses) == 0 { + return serversInfo + } + + filterServers := make([]ServerInfo, 0, len(serversInfo)) + for _, srv := range serversInfo { + // Skip some node type which has been filtered in WHERE clause + // e.g: SELECT * FROM cluster_config WHERE type='tikv' + if len(nodeTypes) > 0 && !nodeTypes.Exist(srv.ServerType) { + continue + } + // Skip some node address which has been filtered in WHERE clause + // e.g: SELECT * FROM cluster_config WHERE address='192.16.8.12:2379' + if len(addresses) > 0 && !addresses.Exist(srv.Address) { + continue + } + filterServers = append(filterServers, srv) + } + return filterServers +} diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index b7f37923547be..3afbdf3b8a0bc 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -146,6 +146,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_pingcap_kvproto//pkg/diagnosticspb", "@com_github_pingcap_tipb//go-tipb", "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//tikv", diff --git a/planner/core/explain.go b/planner/core/explain.go index 16140495de3e7..ade44533eb360 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -385,6 +385,9 @@ func (p *basePhysicalAgg) explainInfo(normalized bool) string { builder.WriteString(", ") } } + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + builder.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)) + } return builder.String() } @@ -543,6 +546,9 @@ func (p *PhysicalHashJoin) explainInfo(normalized bool) string { buffer.WriteString(", other cond:") buffer.Write(sortedExplainExpressionList(p.OtherConditions)) } + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + buffer.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)) + } return buffer.String() } diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 864127022f9a7..459d113d97f2c 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -4914,6 +4914,64 @@ func TestMppJoinExchangeColumnPrune(t *testing.T) { } } +func TestMppFineGrainedJoinAndAgg(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists tt") + tk.MustExec("create table t (c1 int, c2 int, c3 int NOT NULL, c4 int NOT NULL, c5 int)") + tk.MustExec("create table tt (b1 int)") + tk.MustExec("analyze table t") + tk.MustExec("analyze table tt") + + instances := []string{ + "tiflash,127.0.0.1:3933,127.0.0.1:7777,,", + "tikv,127.0.0.1:11080,127.0.0.1:10080,,", + } + fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo" + fpExpr := `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable(fpName, fpExpr)) + defer func() { require.NoError(t, failpoint.Disable(fpName)) }() + fpName2 := "github.com/pingcap/tidb/planner/core/mockTiFlashStreamCountUsingMinLogicalCores" + require.NoError(t, failpoint.Enable(fpName2, `return("8")`)) + defer func() { require.NoError(t, failpoint.Disable(fpName2)) }() + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" || tblInfo.Name.L == "tt" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@tidb_allow_mpp=1;") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 1") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 1") + + var input []string + var output []struct { + SQL string + Plan []string + } + integrationSuiteData := core.GetIntegrationSuiteData() + integrationSuiteData.LoadTestCases(t, &input, &output) + for i, tt := range input { + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + func TestMppAggTopNWithJoin(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 1a3e1ea5ab821..9ea2fdf89c006 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -18,8 +18,11 @@ import ( "context" "fmt" "math" + "strconv" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" @@ -297,7 +300,7 @@ func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic if err != nil { return nil, 0, err } - finalPlan, err := postOptimize(sctx, physical) + finalPlan, err := postOptimize(ctx, sctx, physical) if err != nil { return nil, 0, err } @@ -377,7 +380,7 @@ func mergeContinuousSelections(p PhysicalPlan) { } } -func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) (PhysicalPlan, error) { +func postOptimize(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan) (PhysicalPlan, error) { // some cases from update optimize will require avoiding projection elimination. // see comments ahead of call of DoOptimize in function of buildUpdate(). err := prunePhysicalColumns(sctx, plan) @@ -389,7 +392,7 @@ func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) (PhysicalPlan, err mergeContinuousSelections(plan) plan = eliminateUnionScanAndLock(sctx, plan) plan = enableParallelApply(sctx, plan) - handleFineGrainedShuffle(sctx, plan) + handleFineGrainedShuffle(ctx, sctx, plan) checkPlanCacheable(sctx, plan) propagateProbeParents(plan, nil) countStarRewrite(plan) @@ -652,8 +655,8 @@ func rewriteTableScanAndAggArgs(physicalTableScan *PhysicalTableScan, aggFuncs [ // TiFlashFineGrainedShuffleStreamCount: // < 0: fine grained shuffle is disabled. // > 0: use TiFlashFineGrainedShuffleStreamCount as stream count. -// == 0: use TiFlashMaxThreads as stream count when it's greater than 0. Otherwise use DefStreamCountWhenMaxThreadsNotSet. -func handleFineGrainedShuffle(sctx sessionctx.Context, plan PhysicalPlan) { +// == 0: use TiFlashMaxThreads as stream count when it's greater than 0. Otherwise set status as uninitialized. +func handleFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan) { streamCount := sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount if streamCount < 0 { return @@ -661,22 +664,27 @@ func handleFineGrainedShuffle(sctx sessionctx.Context, plan PhysicalPlan) { if streamCount == 0 { if sctx.GetSessionVars().TiFlashMaxThreads > 0 { streamCount = sctx.GetSessionVars().TiFlashMaxThreads - } else { - streamCount = variable.DefStreamCountWhenMaxThreadsNotSet } } - setupFineGrainedShuffle(uint64(streamCount), plan) + // use two separate cluster info to avoid grpc calls cost + tiflashServerCountInfo := tiflashClusterInfo{unInitialized, 0} + streamCountInfo := tiflashClusterInfo{unInitialized, 0} + if streamCount != 0 { + streamCountInfo.itemStatus = initialized + streamCountInfo.itemValue = uint64(streamCount) + } + setupFineGrainedShuffle(ctx, sctx, &streamCountInfo, &tiflashServerCountInfo, plan) } -func setupFineGrainedShuffle(streamCount uint64, plan PhysicalPlan) { +func setupFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, plan PhysicalPlan) { if tableReader, ok := plan.(*PhysicalTableReader); ok { if _, isExchangeSender := tableReader.tablePlan.(*PhysicalExchangeSender); isExchangeSender { helper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: make([]*basePhysicalPlan, 1)} - setupFineGrainedShuffleInternal(tableReader.tablePlan, &helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, tableReader.tablePlan, &helper, streamCountInfo, tiflashServerCountInfo) } } else { for _, child := range plan.Children() { - setupFineGrainedShuffle(streamCount, child) + setupFineGrainedShuffle(ctx, sctx, streamCountInfo, tiflashServerCountInfo, child) } } } @@ -687,16 +695,32 @@ const ( unknown shuffleTarget = iota window joinBuild + hashAgg ) type fineGrainedShuffleHelper struct { shuffleTarget shuffleTarget plans []*basePhysicalPlan + joinKeysCount int +} + +type tiflashClusterInfoStatus uint8 + +const ( + unInitialized tiflashClusterInfoStatus = iota + initialized + failed +) + +type tiflashClusterInfo struct { + itemStatus tiflashClusterInfoStatus + itemValue uint64 } func (h *fineGrainedShuffleHelper) clear() { h.shuffleTarget = unknown h.plans = h.plans[:0] + h.joinKeysCount = 0 } func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysicalPlan) { @@ -704,14 +728,153 @@ func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysical h.plans = append(h.plans, p) } -func setupFineGrainedShuffleInternal(plan PhysicalPlan, helper *fineGrainedShuffleHelper, streamCount uint64) { +// calculateTiFlashStreamCountUsingMinLogicalCores uses minimal logical cpu cores among tiflash servers, and divide by 2 +// return false, 0 if any err happens +func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx sessionctx.Context, serversInfo []infoschema.ServerInfo) (bool, uint64) { + failpoint.Inject("mockTiFlashStreamCountUsingMinLogicalCores", func(val failpoint.Value) { + intVal, err := strconv.Atoi(val.(string)) + if err == nil { + failpoint.Return(true, uint64(intVal)) + } else { + failpoint.Return(false, 0) + } + }) + rows, err := infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, serversInfo, diagnosticspb.ServerInfoType_HardwareInfo, false) + if err != nil { + return false, 0 + } + var initialMaxCores uint64 = 10000 + var minLogicalCores uint64 = initialMaxCores // set to a large enough value here + for _, row := range rows { + if row[4].GetString() == "cpu-logical-cores" { + logicalCpus, err := strconv.Atoi(row[5].GetString()) + if err == nil && logicalCpus > 0 && uint64(logicalCpus) < minLogicalCores { + minLogicalCores = uint64(logicalCpus) + } + } + } + // No need to check len(serersInfo) == serverCount here, since missing some servers' info won't affect the correctness + if minLogicalCores > 1 && minLogicalCores != initialMaxCores { + return true, minLogicalCores / 2 + } + + return false, 0 +} + +func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, exchangeColCount int, splitLimit uint64) (applyFlag bool, streamCount uint64) { + switch (*streamCountInfo).itemStatus { + case unInitialized: + streamCount = 4 // assume 8c node in cluster as minimal, stream count is 8 / 2 = 4 + case initialized: + streamCount = (*streamCountInfo).itemValue + case failed: + return false, 0 // probably won't reach this path + } + + var tiflashServerCount uint64 = 0 + switch (*tiflashServerCountInfo).itemStatus { + case unInitialized: + serversInfo, err := infoschema.GetTiFlashServerInfo(sctx) + if err != nil { + (*tiflashServerCountInfo).itemStatus = failed + (*tiflashServerCountInfo).itemValue = 0 + if (*streamCountInfo).itemStatus == unInitialized { + setDefaultStreamCount(streamCountInfo) + } + return false, 0 + } + tiflashServerCount = uint64(len(serversInfo)) + (*tiflashServerCountInfo).itemStatus = initialized + (*tiflashServerCountInfo).itemValue = tiflashServerCount + case initialized: + tiflashServerCount = (*tiflashServerCountInfo).itemValue + case failed: + return false, 0 + } + + // if already exceeds splitLimit, no need to fetch actual logical cores + if tiflashServerCount*uint64(exchangeColCount)*streamCount > splitLimit { + return false, 0 + } + + // if streamCount already initialized, and can pass splitLimit check + if (*streamCountInfo).itemStatus == initialized { + return true, streamCount + } + + serversInfo, err := infoschema.GetTiFlashServerInfo(sctx) + if err != nil { + (*tiflashServerCountInfo).itemStatus = failed + (*tiflashServerCountInfo).itemValue = 0 + return false, 0 + } + flag, temStreamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo) + if !flag { + setDefaultStreamCount(streamCountInfo) + (*tiflashServerCountInfo).itemStatus = failed + return false, 0 + } + streamCount = temStreamCount + (*streamCountInfo).itemStatus = initialized + (*streamCountInfo).itemValue = streamCount + applyFlag = tiflashServerCount*uint64(exchangeColCount)*streamCount <= splitLimit + return applyFlag, streamCount +} + +func inferFineGrainedShuffleStreamCountForWindow(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo) (streamCount uint64) { + switch (*streamCountInfo).itemStatus { + case unInitialized: + if (*tiflashServerCountInfo).itemStatus == failed { + setDefaultStreamCount(streamCountInfo) + streamCount = (*streamCountInfo).itemValue + break + } + + serversInfo, err := infoschema.GetTiFlashServerInfo(sctx) + if err != nil { + setDefaultStreamCount(streamCountInfo) + streamCount = (*streamCountInfo).itemValue + (*tiflashServerCountInfo).itemStatus = failed + break + } + + if (*tiflashServerCountInfo).itemStatus == unInitialized { + (*tiflashServerCountInfo).itemStatus = initialized + (*tiflashServerCountInfo).itemValue = uint64(len(serversInfo)) + } + + flag, temStreamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo) + if !flag { + setDefaultStreamCount(streamCountInfo) + streamCount = (*streamCountInfo).itemValue + (*tiflashServerCountInfo).itemStatus = failed + break + } + streamCount = temStreamCount + (*streamCountInfo).itemStatus = initialized + (*streamCountInfo).itemValue = streamCount + case initialized: + streamCount = (*streamCountInfo).itemValue + case failed: + setDefaultStreamCount(streamCountInfo) + streamCount = (*streamCountInfo).itemValue + } + return streamCount +} + +func setDefaultStreamCount(streamCountInfo *tiflashClusterInfo) { + (*streamCountInfo).itemStatus = initialized + (*streamCountInfo).itemValue = variable.DefStreamCountWhenMaxThreadsNotSet +} + +func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan, helper *fineGrainedShuffleHelper, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo) { switch x := plan.(type) { case *PhysicalWindow: // Do not clear the plans because window executor will keep the data partition. // For non hash partition window function, there will be a passthrough ExchangeSender to collect data, // which will break data partition. helper.updateTarget(window, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalSort: if x.IsPartialSort { // Partial sort will keep the data partition. @@ -720,58 +883,85 @@ func setupFineGrainedShuffleInternal(plan PhysicalPlan, helper *fineGrainedShuff // Global sort will break the data partition. helper.clear() } - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalSelection: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalProjection: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalExchangeReceiver: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalHashAgg: - // HashAgg is not implemented for now. - helper.clear() - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + // Todo: allow hash aggregation's output still benefits from fine grained shuffle + aggHelper := fineGrainedShuffleHelper{shuffleTarget: hashAgg, plans: []*basePhysicalPlan{}} + aggHelper.plans = append(aggHelper.plans, &x.basePhysicalPlan) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], &aggHelper, streamCountInfo, tiflashServerCountInfo) case *PhysicalHashJoin: child0 := x.children[0] child1 := x.children[1] - if x.InnerChildIdx == 0 { - // Child0 is build side. - child0Helper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} - setupFineGrainedShuffleInternal(child0, &child0Helper, streamCount) - - // HashJoin is not implemented for now. - helper.clear() - setupFineGrainedShuffleInternal(child1, helper, streamCount) - } else { + buildChild := child0 + probChild := child1 + joinKeys := x.LeftJoinKeys + if x.InnerChildIdx != 0 { // Child1 is build side. - child1Helper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} - setupFineGrainedShuffleInternal(child1, &child1Helper, streamCount) - - // HashJoin is not implemented for now. - helper.clear() - setupFineGrainedShuffleInternal(child0, helper, streamCount) + buildChild = child1 + joinKeys = x.RightJoinKeys + probChild = child0 } + if len(joinKeys) > 0 { // Not cross join + buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} + buildHelper.plans = append(buildHelper.plans, &x.basePhysicalPlan) + buildHelper.joinKeysCount = len(joinKeys) + setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo) + } else { + buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} + setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo) + } + // don't apply fine grained shuffle for probe side + helper.clear() + setupFineGrainedShuffleInternal(ctx, sctx, probChild, helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalExchangeSender: if x.ExchangeType == tipb.ExchangeType_Hash { - if helper.shuffleTarget == window { - // Set up stream count for all plans based on shuffle target type. - // Currently, only enable fine grained shuffle if the shuffle target is window. + // Set up stream count for all plans based on shuffle target type. + var exchangeColCount = x.Schema().Len() + switch helper.shuffleTarget { + case window: + streamCount := inferFineGrainedShuffleStreamCountForWindow(ctx, sctx, streamCountInfo, tiflashServerCountInfo) x.TiFlashFineGrainedShuffleStreamCount = streamCount for _, p := range helper.plans { p.TiFlashFineGrainedShuffleStreamCount = streamCount } + case hashAgg: + applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 1200) // 1200: performance test result + if applyFlag { + x.TiFlashFineGrainedShuffleStreamCount = streamCount + for _, p := range helper.plans { + p.TiFlashFineGrainedShuffleStreamCount = streamCount + } + } + case joinBuild: + // Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations + if len(x.HashCols) != helper.joinKeysCount { + break + } + applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result + if applyFlag { + x.TiFlashFineGrainedShuffleStreamCount = streamCount + for _, p := range helper.plans { + p.TiFlashFineGrainedShuffleStreamCount = streamCount + } + } } } // exchange sender will break the data partition. helper.clear() - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) default: for _, child := range x.Children() { childHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} - setupFineGrainedShuffleInternal(child, &childHelper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, child, &childHelper, streamCountInfo, tiflashServerCountInfo) } } } diff --git a/planner/core/optimizer_test.go b/planner/core/optimizer_test.go index 0f9dc0a4050c4..59afdf9e9afba 100644 --- a/planner/core/optimizer_test.go +++ b/planner/core/optimizer_test.go @@ -16,8 +16,10 @@ package core import ( "reflect" + "strings" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" @@ -163,7 +165,7 @@ func TestHandleFineGrainedShuffle(t *testing.T) { sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount = expStreamCount start := func(p PhysicalPlan, expStreamCount int64, expChildCount int, curChildCount int) { - handleFineGrainedShuffle(sctx, tableReader) + handleFineGrainedShuffle(nil, sctx, tableReader) check(p, expStreamCount, expChildCount, curChildCount) clear(plans) } @@ -289,6 +291,111 @@ func TestHandleFineGrainedShuffle(t *testing.T) { tableScan1 = &PhysicalTableScan{} hashSender1.children = []PhysicalPlan{tableScan1} start(partWindow, expStreamCount, 3, 0) + + instances := []string{ + "tiflash,127.0.0.1:3933,127.0.0.1:7777,,", + "tikv,127.0.0.1:11080,127.0.0.1:10080,,", + } + fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo" + fpExpr := `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable(fpName, fpExpr)) + defer func() { require.NoError(t, failpoint.Disable(fpName)) }() + fpName2 := "github.com/pingcap/tidb/planner/core/mockTiFlashStreamCountUsingMinLogicalCores" + require.NoError(t, failpoint.Enable(fpName2, `return("8")`)) + sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount = 0 + + col0 := &expression.Column{ + UniqueID: sctx.GetSessionVars().AllocPlanColumnID(), + RetType: types.NewFieldType(mysql.TypeLonglong), + } + cond, err := expression.NewFunction(sctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), col0, col0) + require.True(t, err == nil) + sf, isSF := cond.(*expression.ScalarFunction) + require.True(t, isSF) + var partitionCols = make([]*property.MPPPartitionColumn, 0, 1) + partitionCols = append(partitionCols, &property.MPPPartitionColumn{ + Col: col0, + CollateID: property.GetCollateIDByNameForPartition(col0.GetType().GetCollate()), + }) + + // HashAgg(x) <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + hashAgg = &PhysicalHashAgg{} + passSender.children = []PhysicalPlan{hashAgg} + hashAgg.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + tableScan.Schema().Columns = append(tableScan.Schema().Columns, col0) + start(hashAgg, 8, 3, 0) + + // Join(x) <- ExchangeReceiver <- ExchangeSender + // <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + hashJoin = &PhysicalHashJoin{} + hashJoin.EqualConditions = append(hashJoin.EqualConditions, sf) + hashJoin.RightJoinKeys = append(hashJoin.RightJoinKeys, col0) + hashJoin.InnerChildIdx = 1 + passSender.children = []PhysicalPlan{hashJoin} + recv = &PhysicalExchangeReceiver{} + recv1 = &PhysicalExchangeReceiver{} + tableScan = &PhysicalTableScan{} + tableScan1 = &PhysicalTableScan{} + hashSender = &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + hashSender1 = &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + hashJoin.children = []PhysicalPlan{recv, recv1} + recv.children = []PhysicalPlan{hashSender} + recv1.children = []PhysicalPlan{hashSender1} + hashSender.children = []PhysicalPlan{tableScan} + hashSender1.children = []PhysicalPlan{tableScan1} + hashSender1.HashCols = partitionCols + tableScan1.Schema().Columns = append(tableScan1.Schema().Columns, col0) + handleFineGrainedShuffle(nil, sctx, tableReader) + require.Equal(t, uint64(8), hashJoin.TiFlashFineGrainedShuffleStreamCount) + require.Equal(t, uint64(8), recv1.TiFlashFineGrainedShuffleStreamCount) + require.Equal(t, uint64(8), hashSender1.TiFlashFineGrainedShuffleStreamCount) + require.Equal(t, uint64(0), recv.TiFlashFineGrainedShuffleStreamCount) + require.Equal(t, uint64(0), hashSender.TiFlashFineGrainedShuffleStreamCount) + clear(plans) + + require.NoError(t, failpoint.Disable(fpName2)) + require.NoError(t, failpoint.Enable(fpName2, `return("8000")`)) + // HashAgg(x) <- ExchangeReceiver <- ExchangeSender, exceed splitLimit + tableReader.tablePlan = passSender + hashAgg = &PhysicalHashAgg{} + passSender.children = []PhysicalPlan{hashAgg} + hashAgg.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + tableScan.Schema().Columns = append(tableScan.Schema().Columns, col0) + start(hashAgg, 0, 3, 0) + + // exceed splitLimit + // Join(x) <- ExchangeReceiver <- ExchangeSender + // <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + hashJoin = &PhysicalHashJoin{} + hashJoin.EqualConditions = append(hashJoin.EqualConditions, sf) + hashJoin.LeftJoinKeys = append(hashJoin.LeftJoinKeys, col0) + hashJoin.InnerChildIdx = 1 + passSender.children = []PhysicalPlan{hashJoin} + recv1 = &PhysicalExchangeReceiver{} + tableScan1 = &PhysicalTableScan{} + hashSender1 = &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + hashJoin.children = []PhysicalPlan{recv, recv1} + recv.children = []PhysicalPlan{hashSender} + recv1.children = []PhysicalPlan{hashSender1} + hashSender.children = []PhysicalPlan{tableScan} + hashSender1.children = []PhysicalPlan{tableScan1} + hashSender1.HashCols = partitionCols + tableScan1.Schema().Columns = append(tableScan1.Schema().Columns, col0) + start(hashJoin, 0, 3, 0) + require.NoError(t, failpoint.Disable(fpName2)) } // Test for core.prunePhysicalColumns() diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index 71a7f11c09a65..922b61975ab3c 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -60,7 +60,13 @@ func (p *PhysicalHashAgg) ToPB(ctx sessionctx.Context, storeType kv.StoreType) ( } executorID = p.ExplainID().String() } - return &tipb.Executor{Tp: tipb.ExecType_TypeAggregation, Aggregation: aggExec, ExecutorId: &executorID}, nil + return &tipb.Executor{ + Tp: tipb.ExecType_TypeAggregation, + Aggregation: aggExec, + ExecutorId: &executorID, + FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount, + FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, + }, nil } // ToPB implements PhysicalPlan ToPB interface. @@ -492,7 +498,13 @@ func (p *PhysicalHashJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreType) } executorID := p.ExplainID().String() - return &tipb.Executor{Tp: tipb.ExecType_TypeJoin, Join: join, ExecutorId: &executorID}, nil + return &tipb.Executor{ + Tp: tipb.ExecType_TypeJoin, + Join: join, + ExecutorId: &executorID, + FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount, + FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, + }, nil } // ToPB converts FrameBound to tipb structure. diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index df990e52c65eb..0e668ce3df8ba 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -960,6 +960,13 @@ "desc format = 'brief' select * from tt t1 where exists (select * from t t2 where t1.b1 = t2.c3 and t2.c1 < t2.c2)" ] }, + { + "name": "TestMppFineGrainedJoinAndAgg", + "cases": [ + "desc format = 'brief' select * from tt t1 where exists (select * from t t2 where t1.b1 = t2.c3 and t2.c1 < t2.c2)", + "desc format = 'brief' select count(*) from tt group by b1" + ] + }, { "name": "TestPushDownAggForMPP", "cases": [ diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index c1dbae406a922..40d8402497569 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -7524,6 +7524,41 @@ } ] }, + { + "Name": "TestMppFineGrainedJoinAndAgg", + "Cases": [ + { + "SQL": "desc format = 'brief' select * from tt t1 where exists (select * from t t2 where t1.b1 = t2.c3 and t2.c1 < t2.c2)", + "Plan": [ + "TableReader 7992.00 root data:ExchangeSender", + "└─ExchangeSender 7992.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashJoin 7992.00 mpp[tiflash] semi join, equal:[eq(test.tt.b1, test.t.c3)], stream_count: 8", + " ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] stream_count: 8", + " │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c3, collate: binary], stream_count: 8", + " │ └─Projection 8000.00 mpp[tiflash] test.t.c3", + " │ └─Selection 8000.00 mpp[tiflash] lt(test.t.c1, test.t.c2)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t2 keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 9990.00 mpp[tiflash] ", + " └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tt.b1, collate: binary]", + " └─Selection 9990.00 mpp[tiflash] not(isnull(test.tt.b1))", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select count(*) from tt group by b1", + "Plan": [ + "TableReader 8000.00 root data:ExchangeSender", + "└─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 8000.00 mpp[tiflash] Column#3", + " └─HashAgg 8000.00 mpp[tiflash] group by:test.tt.b1, funcs:sum(Column#7)->Column#3, stream_count: 8", + " └─ExchangeReceiver 8000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tt.b1, collate: binary], stream_count: 8", + " └─HashAgg 8000.00 mpp[tiflash] group by:test.tt.b1, funcs:count(1)->Column#7", + " └─TableFullScan 10000.00 mpp[tiflash] table:tt keep order:false, stats:pseudo" + ] + } + ] + }, { "Name": "TestPushDownAggForMPP", "Cases": [ diff --git a/planner/core/testdata/window_push_down_suite_out.json b/planner/core/testdata/window_push_down_suite_out.json index 0b587eb83ce39..395c9e4ff61b6 100644 --- a/planner/core/testdata/window_push_down_suite_out.json +++ b/planner/core/testdata/window_push_down_suite_out.json @@ -415,7 +415,7 @@ " └─HashAgg_43 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#9", " └─ExchangeReceiver_45 1.00 mpp[tiflash] ", " └─ExchangeSender_44 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.empid, collate: binary]", - " └─HashAgg_41 1.00 mpp[tiflash] group by:test.employee.empid, ", + " └─HashAgg_41 1.00 mpp[tiflash] group by:test.employee.empid, , stream_count: 8", " └─Window_27 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", " └─Sort_18 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", " └─ExchangeReceiver_17 10000.00 mpp[tiflash] stream_count: 8", @@ -447,7 +447,7 @@ " └─HashAgg_50 10000.00 mpp[tiflash] group by:Column#6, funcs:sum(Column#10)->Column#7", " └─ExchangeReceiver_52 10000.00 mpp[tiflash] ", " └─ExchangeSender_51 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#6, collate: binary]", - " └─HashAgg_47 10000.00 mpp[tiflash] group by:Column#6, funcs:count(test.employee.empid)->Column#10", + " └─HashAgg_47 10000.00 mpp[tiflash] group by:Column#6, funcs:count(test.employee.empid)->Column#10, stream_count: 8", " └─Window_36 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", " └─Sort_21 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", " └─ExchangeReceiver_20 10000.00 mpp[tiflash] stream_count: 8",