From b382b4d2e0aee859bf1f21de68f66b45356abcc6 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 21 Dec 2022 20:55:50 +0800 Subject: [PATCH 01/15] Save tem changes Signed-off-by: yibin --- executor/memtable_reader.go | 79 +------------------- executor/set_config.go | 2 +- infoschema/tables.go | 145 ++++++++++++++++++++++++++++++++++++ planner/core/explain.go | 6 ++ planner/core/optimizer.go | 140 ++++++++++++++++++++++++---------- planner/core/plan_to_pb.go | 16 +++- 6 files changed, 269 insertions(+), 119 deletions(-) diff --git a/executor/memtable_reader.go b/executor/memtable_reader.go index 4d6e5112bfefb..77eff1588b62b 100644 --- a/executor/memtable_reader.go +++ b/executor/memtable_reader.go @@ -177,7 +177,7 @@ func fetchClusterConfig(sctx sessionctx.Context, nodeTypes, nodeAddrs set.String if err != nil { return nil, err } - serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs) + serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs) //nolint: prealloc var finalRows [][]types.Datum wg := sync.WaitGroup{} @@ -310,58 +310,7 @@ func (e *clusterServerInfoRetriever) retrieve(ctx context.Context, sctx sessionc return nil, nil } e.retrieved = true - - serversInfo, err := infoschema.GetClusterServerInfo(sctx) - if err != nil { - return nil, err - } - serversInfo = filterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Instances) - - type result struct { - idx int - rows [][]types.Datum - err error - } - wg := sync.WaitGroup{} - ch := make(chan result, len(serversInfo)) - infoTp := e.serverInfoType - finalRows := make([][]types.Datum, 0, len(serversInfo)*10) - for i, srv := range serversInfo { - address := srv.Address - remote := address - if srv.ServerType == "tidb" { - remote = srv.StatusAddr - } - wg.Add(1) - go func(index int, remote, address, serverTP string) { - util.WithRecovery(func() { - defer wg.Done() - items, err := getServerInfoByGRPC(ctx, remote, infoTp) - if err != nil { - ch <- result{idx: index, err: err} - return - } - partRows := serverInfoItemToRows(items, serverTP, address) - ch <- result{idx: index, rows: partRows} - }, nil) - }(i, remote, address, srv.ServerType) - } - wg.Wait() - close(ch) - // Keep the original order to make the result more stable - var results []result //nolint: prealloc - for result := range ch { - if result.err != nil { - sctx.GetSessionVars().StmtCtx.AppendWarning(result.err) - continue - } - results = append(results, result) - } - slices.SortFunc(results, func(i, j result) bool { return i.idx < j.idx }) - for _, result := range results { - finalRows = append(finalRows, result.rows...) - } - return finalRows, nil + return infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, e.serverInfoType, e.extractor.NodeTypes, e.extractor.Instances) } func serverInfoItemToRows(items []*diagnosticspb.ServerInfoItem, tp, addr string) [][]types.Datum { @@ -428,28 +377,6 @@ func parseFailpointServerInfo(s string) []infoschema.ServerInfo { return serversInfo } -func filterClusterServerInfo(serversInfo []infoschema.ServerInfo, nodeTypes, addresses set.StringSet) []infoschema.ServerInfo { - if len(nodeTypes) == 0 && len(addresses) == 0 { - return serversInfo - } - - filterServers := make([]infoschema.ServerInfo, 0, len(serversInfo)) - for _, srv := range serversInfo { - // Skip some node type which has been filtered in WHERE clause - // e.g: SELECT * FROM cluster_config WHERE type='tikv' - if len(nodeTypes) > 0 && !nodeTypes.Exist(srv.ServerType) { - continue - } - // Skip some node address which has been filtered in WHERE clause - // e.g: SELECT * FROM cluster_config WHERE address='192.16.8.12:2379' - if len(addresses) > 0 && !addresses.Exist(srv.Address) { - continue - } - filterServers = append(filterServers, srv) - } - return filterServers -} - type clusterLogRetriever struct { isDrained bool retrieving bool @@ -515,7 +442,7 @@ func (e *clusterLogRetriever) initialize(ctx context.Context, sctx sessionctx.Co instances := e.extractor.Instances nodeTypes := e.extractor.NodeTypes - serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, instances) + serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, instances) var levels = make([]diagnosticspb.LogLevel, 0, len(e.extractor.LogLevels)) for l := range e.extractor.LogLevels { diff --git a/executor/set_config.go b/executor/set_config.go index 531a61acdc4a0..b508b55eb2fc7 100644 --- a/executor/set_config.go +++ b/executor/set_config.go @@ -102,7 +102,7 @@ func (s *SetConfigExec) Next(ctx context.Context, req *chunk.Chunk) error { if s.p.Instance != "" { nodeAddrs.Insert(s.p.Instance) } - serversInfo = filterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs) + serversInfo = infoschema.FilterClusterServerInfo(serversInfo, nodeTypes, nodeAddrs) if s.p.Instance != "" && len(serversInfo) == 0 { return errors.Errorf("instance %v is not found in this cluster", s.p.Instance) } diff --git a/infoschema/tables.go b/infoschema/tables.go index 2f481d4e004b5..9a4d066d97335 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -18,10 +18,18 @@ import ( "context" "encoding/json" "fmt" + "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/util/set" + "golang.org/x/exp/slices" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "net" "net/http" "strconv" "strings" + "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -2226,3 +2234,140 @@ func (vt *VirtualTable) GetPhysicalID() int64 { func (vt *VirtualTable) Type() table.Type { return table.VirtualTable } + +func GetTiFlashServerInfo(sctx sessionctx.Context) ([]ServerInfo, error) { + serversInfo, err := GetStoreServerInfo(sctx) + if err != nil { + return nil, err + } + serversInfo = FilterClusterServerInfo(serversInfo, set.NewStringSet("tiflash"), set.NewStringSet()) + return serversInfo, nil +} + +// FetchClusterServerInfoWithoutPrivilegeCheck implements the memTableRetriever interface +func FetchClusterServerInfoWithoutPrivilegeCheck(ctx context.Context, sctx sessionctx.Context, serverInfoType diagnosticspb.ServerInfoType, nodeTypes set.StringSet, instances set.StringSet) ([][]types.Datum, error) { + serversInfo, err := GetClusterServerInfo(sctx) + if err != nil { + return nil, err + } + serversInfo = FilterClusterServerInfo(serversInfo, nodeTypes, instances) + + type result struct { + idx int + rows [][]types.Datum + err error + } + wg := sync.WaitGroup{} + ch := make(chan result, len(serversInfo)) + infoTp := serverInfoType + finalRows := make([][]types.Datum, 0, len(serversInfo)*10) + for i, srv := range serversInfo { + address := srv.Address + remote := address + if srv.ServerType == "tidb" { + remote = srv.StatusAddr + } + wg.Add(1) + go func(index int, remote, address, serverTP string) { + util.WithRecovery(func() { + defer wg.Done() + items, err := getServerInfoByGRPC(ctx, remote, infoTp) + if err != nil { + ch <- result{idx: index, err: err} + return + } + partRows := serverInfoItemToRows(items, serverTP, address) + ch <- result{idx: index, rows: partRows} + }, nil) + }(i, remote, address, srv.ServerType) + } + wg.Wait() + close(ch) + // Keep the original order to make the result more stable + var results []result //nolint: prealloc + for result := range ch { + if result.err != nil { + sctx.GetSessionVars().StmtCtx.AppendWarning(result.err) + continue + } + results = append(results, result) + } + slices.SortFunc(results, func(i, j result) bool { return i.idx < j.idx }) + for _, result := range results { + finalRows = append(finalRows, result.rows...) + } + return finalRows, nil +} + +func serverInfoItemToRows(items []*diagnosticspb.ServerInfoItem, tp, addr string) [][]types.Datum { + rows := make([][]types.Datum, 0, len(items)) + for _, v := range items { + for _, item := range v.Pairs { + row := types.MakeDatums( + tp, + addr, + v.Tp, + v.Name, + item.Key, + item.Value, + ) + rows = append(rows, row) + } + } + return rows +} + +func getServerInfoByGRPC(ctx context.Context, address string, tp diagnosticspb.ServerInfoType) ([]*diagnosticspb.ServerInfoItem, error) { + opt := grpc.WithInsecure() + security := config.GetGlobalConfig().Security + if len(security.ClusterSSLCA) != 0 { + clusterSecurity := security.ClusterSecurity() + tlsConfig, err := clusterSecurity.ToTLSConfig() + if err != nil { + return nil, errors.Trace(err) + } + opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) + } + conn, err := grpc.Dial(address, opt) + if err != nil { + return nil, err + } + defer func() { + err := conn.Close() + if err != nil { + log.Error("close grpc connection error", zap.Error(err)) + } + }() + + cli := diagnosticspb.NewDiagnosticsClient(conn) + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + r, err := cli.ServerInfo(ctx, &diagnosticspb.ServerInfoRequest{Tp: tp}) + if err != nil { + return nil, err + } + return r.Items, nil +} + +// FilterClusterServerInfo filters serversInfo by nodeTypes and addresses +func FilterClusterServerInfo(serversInfo []ServerInfo, nodeTypes, addresses set.StringSet) []ServerInfo { + if len(nodeTypes) == 0 && len(addresses) == 0 { + return serversInfo + } + + filterServers := make([]ServerInfo, 0, len(serversInfo)) + for _, srv := range serversInfo { + // Skip some node type which has been filtered in WHERE clause + // e.g: SELECT * FROM cluster_config WHERE type='tikv' + if len(nodeTypes) > 0 && !nodeTypes.Exist(srv.ServerType) { + continue + } + // Skip some node address which has been filtered in WHERE clause + // e.g: SELECT * FROM cluster_config WHERE address='192.16.8.12:2379' + if len(addresses) > 0 && !addresses.Exist(srv.Address) { + continue + } + filterServers = append(filterServers, srv) + } + return filterServers +} diff --git a/planner/core/explain.go b/planner/core/explain.go index 16140495de3e7..ade44533eb360 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -385,6 +385,9 @@ func (p *basePhysicalAgg) explainInfo(normalized bool) string { builder.WriteString(", ") } } + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + builder.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)) + } return builder.String() } @@ -543,6 +546,9 @@ func (p *PhysicalHashJoin) explainInfo(normalized bool) string { buffer.WriteString(", other cond:") buffer.Write(sortedExplainExpressionList(p.OtherConditions)) } + if p.TiFlashFineGrainedShuffleStreamCount > 0 { + buffer.WriteString(fmt.Sprintf(", stream_count: %d", p.TiFlashFineGrainedShuffleStreamCount)) + } return buffer.String() } diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 1064ea529b2d0..8118e9e2ce850 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -17,7 +17,10 @@ package core import ( "context" "fmt" + "github.com/pingcap/kvproto/pkg/diagnosticspb" + "github.com/pingcap/log" "math" + "strconv" "github.com/pingcap/errors" "github.com/pingcap/tidb/config" @@ -297,7 +300,7 @@ func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic if err != nil { return nil, 0, err } - finalPlan, err := postOptimize(sctx, physical) + finalPlan, err := postOptimize(ctx, sctx, physical) if err != nil { return nil, 0, err } @@ -377,7 +380,7 @@ func mergeContinuousSelections(p PhysicalPlan) { } } -func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) (PhysicalPlan, error) { +func postOptimize(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan) (PhysicalPlan, error) { // some cases from update optimize will require avoiding projection elimination. // see comments ahead of call of DoOptimize in function of buildUpdate(). err := prunePhysicalColumns(sctx, plan) @@ -389,7 +392,7 @@ func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) (PhysicalPlan, err mergeContinuousSelections(plan) plan = eliminateUnionScanAndLock(sctx, plan) plan = enableParallelApply(sctx, plan) - handleFineGrainedShuffle(sctx, plan) + handleFineGrainedShuffle(ctx, sctx, plan) checkPlanCacheable(sctx, plan) propagateProbeParents(plan, nil) countStarRewrite(plan) @@ -653,7 +656,7 @@ func rewriteTableScanAndAggArgs(physicalTableScan *PhysicalTableScan, aggFuncs [ // < 0: fine grained shuffle is disabled. // > 0: use TiFlashFineGrainedShuffleStreamCount as stream count. // == 0: use TiFlashMaxThreads as stream count when it's greater than 0. Otherwise use DefStreamCountWhenMaxThreadsNotSet. -func handleFineGrainedShuffle(sctx sessionctx.Context, plan PhysicalPlan) { +func handleFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan) { streamCount := sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount if streamCount < 0 { return @@ -661,22 +664,22 @@ func handleFineGrainedShuffle(sctx sessionctx.Context, plan PhysicalPlan) { if streamCount == 0 { if sctx.GetSessionVars().TiFlashMaxThreads > 0 { streamCount = sctx.GetSessionVars().TiFlashMaxThreads - } else { - streamCount = variable.DefStreamCountWhenMaxThreadsNotSet } } - setupFineGrainedShuffle(uint64(streamCount), plan) + var tiflashServerCount uint64 = 0 + var streamCountUInt = uint64(streamCount) + setupFineGrainedShuffle(ctx, sctx, &streamCountUInt, &tiflashServerCount, plan) } -func setupFineGrainedShuffle(streamCount uint64, plan PhysicalPlan) { +func setupFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, streamCount * uint64, tiflashServerCount * uint64, plan PhysicalPlan) { if tableReader, ok := plan.(*PhysicalTableReader); ok { if _, isExchangeSender := tableReader.tablePlan.(*PhysicalExchangeSender); isExchangeSender { helper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: make([]*basePhysicalPlan, 1)} - setupFineGrainedShuffleInternal(tableReader.tablePlan, &helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, tableReader.tablePlan, &helper, streamCount, tiflashServerCount) } } else { for _, child := range plan.Children() { - setupFineGrainedShuffle(streamCount, child) + setupFineGrainedShuffle(ctx, sctx, streamCount, tiflashServerCount, child) } } } @@ -687,16 +690,19 @@ const ( unknown shuffleTarget = iota window joinBuild + hashAgg ) type fineGrainedShuffleHelper struct { shuffleTarget shuffleTarget plans []*basePhysicalPlan + joinKeysCount int } func (h *fineGrainedShuffleHelper) clear() { h.shuffleTarget = unknown h.plans = h.plans[:0] + h.joinKeysCount = 0 } func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysicalPlan) { @@ -704,14 +710,36 @@ func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysical h.plans = append(h.plans, p) } -func setupFineGrainedShuffleInternal(plan PhysicalPlan, helper *fineGrainedShuffleHelper, streamCount uint64) { +// getTiFlashServerInfo returns server count and minimal logical cpus among servers +// return 0,0 if any err happens +func getTiFlashServerInfo(ctx context.Context, sctx sessionctx.Context) (int, int) { + rows, err := infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, diagnosticspb.ServerInfoType_HardwareInfo, set.NewStringSet("tiflash"), set.NewStringSet()) + if err != nil { + return 0, 0 + } + var min_logical_cores = 1000 + var server_count = 0 + for _, row := range rows { + if row[4].GetString() == "cpu-logical-cores" { + server_count++ + logical_cpus, err := strconv.Atoi(row[5].GetString()) + if err == nil && logical_cpus < min_logical_cores { + min_logical_cores = logical_cpus + } + } + } + log.Error(fmt.Sprintf("Cluster Hardware info: %d, server_count: %d, min logical_cores: %d", len(rows), server_count, min_logical_cores)) + return server_count, min_logical_cores +} + +func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan, helper *fineGrainedShuffleHelper, streamCount * uint64, tiflashServerCount * uint64) { switch x := plan.(type) { case *PhysicalWindow: // Do not clear the plans because window executor will keep the data partition. // For non hash partition window function, there will be a passthrough ExchangeSender to collect data, // which will break data partition. helper.updateTarget(window, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) case *PhysicalSort: if x.IsPartialSort { // Partial sort will keep the data partition. @@ -720,58 +748,90 @@ func setupFineGrainedShuffleInternal(plan PhysicalPlan, helper *fineGrainedShuff // Global sort will break the data partition. helper.clear() } - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) case *PhysicalSelection: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) case *PhysicalProjection: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) case *PhysicalExchangeReceiver: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) case *PhysicalHashAgg: - // HashAgg is not implemented for now. - helper.clear() - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + helper.updateTarget(hashAgg, &x.basePhysicalPlan) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) case *PhysicalHashJoin: child0 := x.children[0] child1 := x.children[1] - if x.InnerChildIdx == 0 { + buildChild := child0 + probChild := child1 + joinKeys := x.LeftJoinKeys + if x.InnerChildIdx != 0 { // Child0 is build side. - child0Helper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} - setupFineGrainedShuffleInternal(child0, &child0Helper, streamCount) - - // HashJoin is not implemented for now. - helper.clear() - setupFineGrainedShuffleInternal(child1, helper, streamCount) + buildChild = child1 + joinKeys = x.RightJoinKeys + probChild = child0 + } + if len(joinKeys) > 0 { // Not cross join + buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} + buildHelper.plans = append(buildHelper.plans, &x.basePhysicalPlan) + buildHelper.joinKeysCount = len(joinKeys) + setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCount, tiflashServerCount) } else { - // Child1 is build side. - child1Helper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} - setupFineGrainedShuffleInternal(child1, &child1Helper, streamCount) - - // HashJoin is not implemented for now. - helper.clear() - setupFineGrainedShuffleInternal(child0, helper, streamCount) + buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} + setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCount, tiflashServerCount) } + // don't reply on prob side's fineGrainedShuffle attribute + helper.clear() + setupFineGrainedShuffleInternal(ctx, sctx, probChild, helper, streamCount, tiflashServerCount) case *PhysicalExchangeSender: if x.ExchangeType == tipb.ExchangeType_Hash { - if helper.shuffleTarget == window { - // Set up stream count for all plans based on shuffle target type. - // Currently, only enable fine grained shuffle if the shuffle target is window. - x.TiFlashFineGrainedShuffleStreamCount = streamCount + // Set up stream count for all plans based on shuffle target type. + switch helper.shuffleTarget { + case window: + if *streamCount == 0 { + *streamCount = variable.DefStreamCountWhenMaxThreadsNotSet + } + x.TiFlashFineGrainedShuffleStreamCount = *streamCount for _, p := range helper.plans { - p.TiFlashFineGrainedShuffleStreamCount = streamCount + p.TiFlashFineGrainedShuffleStreamCount = *streamCount + } + case hashAgg: + if *streamCount == 0 { + *streamCount = variable.DefStreamCountWhenMaxThreadsNotSet + } + x.TiFlashFineGrainedShuffleStreamCount = *streamCount + for _, p := range helper.plans { + p.TiFlashFineGrainedShuffleStreamCount = *streamCount + } + case joinBuild: + // Support hashJoin only when shuffle hash keys equals to join keys + if len(x.HashCols) == helper.joinKeysCount { + if *streamCount == 0 { + //*streamCount = variable.DefStreamCountWhenMaxThreadsNotSet + // serverInfos, err := infoschema.GetTiFlashServerInfo(sctx) + serverCount, logicalCpus := getTiFlashServerInfo(ctx, sctx) + if serverCount != 0 && logicalCpus > 1 { + *tiflashServerCount = uint64(serverCount) + *streamCount = uint64(logicalCpus / 2) + } + log.Error(fmt.Sprintf("%d %d %d", x.Schema().Len(), *streamCount, *tiflashServerCount)) + x.TiFlashFineGrainedShuffleStreamCount = *streamCount + for _, p := range helper.plans { + p.TiFlashFineGrainedShuffleStreamCount = *streamCount + } + } } } } // exchange sender will break the data partition. helper.clear() - setupFineGrainedShuffleInternal(x.children[0], helper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) default: for _, child := range x.Children() { childHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} - setupFineGrainedShuffleInternal(child, &childHelper, streamCount) + setupFineGrainedShuffleInternal(ctx, sctx, child, &childHelper, streamCount, tiflashServerCount) } } } diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index a68fdae38f5de..6eb0d8004cae1 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -60,7 +60,13 @@ func (p *PhysicalHashAgg) ToPB(ctx sessionctx.Context, storeType kv.StoreType) ( } executorID = p.ExplainID().String() } - return &tipb.Executor{Tp: tipb.ExecType_TypeAggregation, Aggregation: aggExec, ExecutorId: &executorID}, nil + return &tipb.Executor{ + Tp: tipb.ExecType_TypeAggregation, + Aggregation: aggExec, + ExecutorId: &executorID, + FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount, + FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, + }, nil } // ToPB implements PhysicalPlan ToPB interface. @@ -492,7 +498,13 @@ func (p *PhysicalHashJoin) ToPB(ctx sessionctx.Context, storeType kv.StoreType) } executorID := p.ExplainID().String() - return &tipb.Executor{Tp: tipb.ExecType_TypeJoin, Join: join, ExecutorId: &executorID}, nil + return &tipb.Executor{ + Tp: tipb.ExecType_TypeJoin, + Join: join, + ExecutorId: &executorID, + FineGrainedShuffleStreamCount: p.TiFlashFineGrainedShuffleStreamCount, + FineGrainedShuffleBatchSize: ctx.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, + }, nil } // ToPB converts FrameBound to tipb structure. From fcc1fceeea697e16596d95dcba11ae47e9f7cab0 Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 22 Dec 2022 14:14:49 +0800 Subject: [PATCH 02/15] First draft Signed-off-by: yibin --- executor/memtable_reader.go | 7 +- infoschema/tables.go | 19 ++-- planner/core/optimizer.go | 217 +++++++++++++++++++++++++++--------- 3 files changed, 177 insertions(+), 66 deletions(-) diff --git a/executor/memtable_reader.go b/executor/memtable_reader.go index 77eff1588b62b..8e837a1a71bb5 100644 --- a/executor/memtable_reader.go +++ b/executor/memtable_reader.go @@ -310,7 +310,12 @@ func (e *clusterServerInfoRetriever) retrieve(ctx context.Context, sctx sessionc return nil, nil } e.retrieved = true - return infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, e.serverInfoType, e.extractor.NodeTypes, e.extractor.Instances) + serversInfo, err := infoschema.GetClusterServerInfo(sctx) + if err != nil { + return nil, err + } + serversInfo = infoschema.FilterClusterServerInfo(serversInfo, e.extractor.NodeTypes, e.extractor.Instances) + return infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, serversInfo, e.serverInfoType, true) } func serverInfoItemToRows(items []*diagnosticspb.ServerInfoItem, tp, addr string) [][]types.Datum { diff --git a/infoschema/tables.go b/infoschema/tables.go index 9a4d066d97335..a867727aee179 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -2235,23 +2235,18 @@ func (vt *VirtualTable) Type() table.Type { return table.VirtualTable } +// GetTiFlashServerInfo returns all TiFlash server infos func GetTiFlashServerInfo(sctx sessionctx.Context) ([]ServerInfo, error) { serversInfo, err := GetStoreServerInfo(sctx) if err != nil { return nil, err } - serversInfo = FilterClusterServerInfo(serversInfo, set.NewStringSet("tiflash"), set.NewStringSet()) + serversInfo = FilterClusterServerInfo(serversInfo, set.NewStringSet(kv.TiFlash.Name()), set.NewStringSet()) return serversInfo, nil } -// FetchClusterServerInfoWithoutPrivilegeCheck implements the memTableRetriever interface -func FetchClusterServerInfoWithoutPrivilegeCheck(ctx context.Context, sctx sessionctx.Context, serverInfoType diagnosticspb.ServerInfoType, nodeTypes set.StringSet, instances set.StringSet) ([][]types.Datum, error) { - serversInfo, err := GetClusterServerInfo(sctx) - if err != nil { - return nil, err - } - serversInfo = FilterClusterServerInfo(serversInfo, nodeTypes, instances) - +// FetchClusterServerInfoWithoutPrivilegeCheck fetches cluster server information +func FetchClusterServerInfoWithoutPrivilegeCheck(ctx context.Context, sctx sessionctx.Context, serversInfo []ServerInfo, serverInfoType diagnosticspb.ServerInfoType, recordWarningInStmtCtx bool) ([][]types.Datum, error) { type result struct { idx int rows [][]types.Datum @@ -2287,7 +2282,11 @@ func FetchClusterServerInfoWithoutPrivilegeCheck(ctx context.Context, sctx sessi var results []result //nolint: prealloc for result := range ch { if result.err != nil { - sctx.GetSessionVars().StmtCtx.AppendWarning(result.err) + if recordWarningInStmtCtx { + sctx.GetSessionVars().StmtCtx.AppendWarning(result.err) + } else { + log.Warn(result.err.Error()) + } continue } results = append(results, result) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 8118e9e2ce850..700627fb8c9d9 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "github.com/pingcap/kvproto/pkg/diagnosticspb" - "github.com/pingcap/log" "math" "strconv" @@ -666,20 +665,25 @@ func handleFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, plan streamCount = sctx.GetSessionVars().TiFlashMaxThreads } } - var tiflashServerCount uint64 = 0 - var streamCountUInt = uint64(streamCount) - setupFineGrainedShuffle(ctx, sctx, &streamCountUInt, &tiflashServerCount, plan) + // use two separate cluster info to avoid grpc calls cost + serverCountInfo := tiflashClusterInfo{unInitialized, 0} + streamCountInfo := tiflashClusterInfo{unInitialized, 0} + if streamCount != 0 { + streamCountInfo.itemStatus = initialized + streamCountInfo.itemValue = uint64(streamCount) + } + setupFineGrainedShuffle(ctx, sctx, &serverCountInfo, &streamCountInfo, plan) } -func setupFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, streamCount * uint64, tiflashServerCount * uint64, plan PhysicalPlan) { +func setupFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, plan PhysicalPlan) { if tableReader, ok := plan.(*PhysicalTableReader); ok { if _, isExchangeSender := tableReader.tablePlan.(*PhysicalExchangeSender); isExchangeSender { helper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: make([]*basePhysicalPlan, 1)} - setupFineGrainedShuffleInternal(ctx, sctx, tableReader.tablePlan, &helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, tableReader.tablePlan, &helper, streamCountInfo, tiflashServerCountInfo) } } else { for _, child := range plan.Children() { - setupFineGrainedShuffle(ctx, sctx, streamCount, tiflashServerCount, child) + setupFineGrainedShuffle(ctx, sctx, streamCountInfo, tiflashServerCountInfo, child) } } } @@ -699,6 +703,19 @@ type fineGrainedShuffleHelper struct { joinKeysCount int } +type tiflashClusterInfoStatus uint8 + +const ( + unInitialized tiflashClusterInfoStatus = iota + initialized + failed +) + +type tiflashClusterInfo struct { + itemStatus tiflashClusterInfoStatus + itemValue uint64 +} + func (h *fineGrainedShuffleHelper) clear() { h.shuffleTarget = unknown h.plans = h.plans[:0] @@ -710,36 +727,134 @@ func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysical h.plans = append(h.plans, p) } -// getTiFlashServerInfo returns server count and minimal logical cpus among servers -// return 0,0 if any err happens -func getTiFlashServerInfo(ctx context.Context, sctx sessionctx.Context) (int, int) { - rows, err := infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, diagnosticspb.ServerInfoType_HardwareInfo, set.NewStringSet("tiflash"), set.NewStringSet()) +// calculateTiFlashStreamCountUsingMinLogicalCores uses minimal logical cpu cores among tiflash servers, and divide by 2 +// return false, 0 if any err happens +func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx sessionctx.Context, serversInfo []infoschema.ServerInfo) (bool, uint64) { + rows, err := infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, serversInfo, diagnosticspb.ServerInfoType_HardwareInfo, false) if err != nil { - return 0, 0 + return false, 0 } - var min_logical_cores = 1000 - var server_count = 0 + var kMaxCores uint64 = 10000 + var minLogicalCores uint64 = kMaxCores // set to a large enough value here for _, row := range rows { if row[4].GetString() == "cpu-logical-cores" { - server_count++ - logical_cpus, err := strconv.Atoi(row[5].GetString()) - if err == nil && logical_cpus < min_logical_cores { - min_logical_cores = logical_cpus + logicalCpus, err := strconv.Atoi(row[5].GetString()) + if err == nil && logicalCpus > 0 && uint64(logicalCpus) < minLogicalCores { + minLogicalCores = uint64(logicalCpus) } } } - log.Error(fmt.Sprintf("Cluster Hardware info: %d, server_count: %d, min logical_cores: %d", len(rows), server_count, min_logical_cores)) - return server_count, min_logical_cores + // No need to check len(serersInfo) == serverCount here, since missing some servers' info won't affect the correctness + if minLogicalCores > 1 && minLogicalCores != kMaxCores { + return true, minLogicalCores / 2 + } else { + return false, 0 + } +} + +func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, exchangeColLen int, splitLimit uint64) (applyFlag bool, streamCount uint64) { + switch (*streamCountInfo).itemStatus { + case unInitialized: + streamCount = 4 // assume 8c node in cluster as minimal, stream count is 8 / 2 = 4 + case initialized: + streamCount = (*streamCountInfo).itemValue + case failed: + return false, 0 + } + + var tiflashServerCount uint64 = 0 + switch (*tiflashServerCountInfo).itemStatus { + case unInitialized: + serversInfo, err := infoschema.GetTiFlashServerInfo(sctx) + if err != nil { + (*tiflashServerCountInfo).itemStatus = failed + (*tiflashServerCountInfo).itemValue = 0 + return false, 0 + } + tiflashServerCount = uint64(len(serversInfo)) + (*tiflashServerCountInfo).itemStatus = initialized + (*tiflashServerCountInfo).itemValue = tiflashServerCount + case initialized: + tiflashServerCount = (*tiflashServerCountInfo).itemValue + case failed: + return false, 0 + } + + // if already exceeds splitLimit, no need to fetch actual logical cores + if tiflashServerCount*uint64(exchangeColLen)*streamCount > splitLimit { + return false, 0 + } + + // if streamCount already initialized, and can pass splitLimit check + if (*streamCountInfo).itemStatus == initialized { + return true, streamCount + } + + serversInfo, err := infoschema.GetTiFlashServerInfo(sctx) + if err != nil { + (*tiflashServerCountInfo).itemStatus = failed + (*tiflashServerCountInfo).itemValue = 0 + return false, 0 + } + flag, streamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo) + if !flag { + setDefaultStreamCount(streamCountInfo) + (*tiflashServerCountInfo).itemStatus = failed + return false, 0 + } + (*streamCountInfo).itemStatus = initialized + (*streamCountInfo).itemValue = streamCount + applyFlag = tiflashServerCount*uint64(exchangeColLen)*streamCount <= splitLimit + return applyFlag, streamCount +} + +func inferFineGrainedShuffleStreamCountForWindow(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo) (streamCount uint64) { + switch (*streamCountInfo).itemStatus { + case unInitialized: + serversInfo, err := infoschema.GetTiFlashServerInfo(sctx) + if err != nil { + setDefaultStreamCount(streamCountInfo) + streamCount = (*streamCountInfo).itemValue + break + } + + flag, temStreamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo) + if !flag { + setDefaultStreamCount(streamCountInfo) + streamCount = (*streamCountInfo).itemValue + (*tiflashServerCountInfo).itemStatus = failed + break + } + streamCount = temStreamCount + (*streamCountInfo).itemStatus = initialized + (*streamCountInfo).itemValue = streamCount + + if (*tiflashServerCountInfo).itemStatus != initialized { + (*tiflashServerCountInfo).itemStatus = initialized + (*tiflashServerCountInfo).itemValue = uint64(len(serversInfo)) + } + case initialized: + streamCount = (*streamCountInfo).itemValue + case failed: + setDefaultStreamCount(streamCountInfo) + streamCount = (*streamCountInfo).itemValue + } + return streamCount +} + +func setDefaultStreamCount(streamCountInfo *tiflashClusterInfo) { + (*streamCountInfo).itemStatus = initialized + (*streamCountInfo).itemValue = variable.DefStreamCountWhenMaxThreadsNotSet } -func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan, helper *fineGrainedShuffleHelper, streamCount * uint64, tiflashServerCount * uint64) { +func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan, helper *fineGrainedShuffleHelper, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo) { switch x := plan.(type) { case *PhysicalWindow: // Do not clear the plans because window executor will keep the data partition. // For non hash partition window function, there will be a passthrough ExchangeSender to collect data, // which will break data partition. helper.updateTarget(window, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalSort: if x.IsPartialSort { // Partial sort will keep the data partition. @@ -748,19 +863,19 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex // Global sort will break the data partition. helper.clear() } - setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalSelection: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalProjection: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalExchangeReceiver: helper.plans = append(helper.plans, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalHashAgg: helper.updateTarget(hashAgg, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalHashJoin: child0 := x.children[0] child1 := x.children[1] @@ -777,49 +892,41 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*basePhysicalPlan{}} buildHelper.plans = append(buildHelper.plans, &x.basePhysicalPlan) buildHelper.joinKeysCount = len(joinKeys) - setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo) } else { buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} - setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo) } // don't reply on prob side's fineGrainedShuffle attribute helper.clear() - setupFineGrainedShuffleInternal(ctx, sctx, probChild, helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, probChild, helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalExchangeSender: if x.ExchangeType == tipb.ExchangeType_Hash { // Set up stream count for all plans based on shuffle target type. + var exchangeColLen = x.Schema().Len() switch helper.shuffleTarget { case window: - if *streamCount == 0 { - *streamCount = variable.DefStreamCountWhenMaxThreadsNotSet - } - x.TiFlashFineGrainedShuffleStreamCount = *streamCount + streamCount := inferFineGrainedShuffleStreamCountForWindow(ctx, sctx, streamCountInfo, tiflashServerCountInfo) + x.TiFlashFineGrainedShuffleStreamCount = streamCount for _, p := range helper.plans { - p.TiFlashFineGrainedShuffleStreamCount = *streamCount + p.TiFlashFineGrainedShuffleStreamCount = streamCount } case hashAgg: - if *streamCount == 0 { - *streamCount = variable.DefStreamCountWhenMaxThreadsNotSet - } - x.TiFlashFineGrainedShuffleStreamCount = *streamCount - for _, p := range helper.plans { - p.TiFlashFineGrainedShuffleStreamCount = *streamCount + applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColLen, 1200) // 1200: performance test result + if applyFlag { + x.TiFlashFineGrainedShuffleStreamCount = streamCount + for _, p := range helper.plans { + p.TiFlashFineGrainedShuffleStreamCount = streamCount + } } case joinBuild: - // Support hashJoin only when shuffle hash keys equals to join keys + // Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations if len(x.HashCols) == helper.joinKeysCount { - if *streamCount == 0 { - //*streamCount = variable.DefStreamCountWhenMaxThreadsNotSet - // serverInfos, err := infoschema.GetTiFlashServerInfo(sctx) - serverCount, logicalCpus := getTiFlashServerInfo(ctx, sctx) - if serverCount != 0 && logicalCpus > 1 { - *tiflashServerCount = uint64(serverCount) - *streamCount = uint64(logicalCpus / 2) - } - log.Error(fmt.Sprintf("%d %d %d", x.Schema().Len(), *streamCount, *tiflashServerCount)) - x.TiFlashFineGrainedShuffleStreamCount = *streamCount + applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColLen, 600) // 600: performance test result + if applyFlag { + x.TiFlashFineGrainedShuffleStreamCount = streamCount for _, p := range helper.plans { - p.TiFlashFineGrainedShuffleStreamCount = *streamCount + p.TiFlashFineGrainedShuffleStreamCount = streamCount } } } @@ -827,11 +934,11 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex } // exchange sender will break the data partition. helper.clear() - setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) default: for _, child := range x.Children() { childHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} - setupFineGrainedShuffleInternal(ctx, sctx, child, &childHelper, streamCount, tiflashServerCount) + setupFineGrainedShuffleInternal(ctx, sctx, child, &childHelper, streamCountInfo, tiflashServerCountInfo) } } } From 4ab9cdb6f9e7d4eeefdb5d5dc663e542b8902fd9 Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 22 Dec 2022 14:43:14 +0800 Subject: [PATCH 03/15] Code part finishd Signed-off-by: yibin --- executor/memtable_reader.go | 52 ------------------------------------- planner/core/optimizer.go | 38 ++++++++++++++++----------- 2 files changed, 23 insertions(+), 67 deletions(-) diff --git a/executor/memtable_reader.go b/executor/memtable_reader.go index 8e837a1a71bb5..3c1530ed714cf 100644 --- a/executor/memtable_reader.go +++ b/executor/memtable_reader.go @@ -29,7 +29,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" - "github.com/pingcap/log" "github.com/pingcap/sysutil" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/infoschema" @@ -47,7 +46,6 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/pdapi" "github.com/pingcap/tidb/util/set" - "go.uber.org/zap" "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -318,56 +316,6 @@ func (e *clusterServerInfoRetriever) retrieve(ctx context.Context, sctx sessionc return infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, serversInfo, e.serverInfoType, true) } -func serverInfoItemToRows(items []*diagnosticspb.ServerInfoItem, tp, addr string) [][]types.Datum { - rows := make([][]types.Datum, 0, len(items)) - for _, v := range items { - for _, item := range v.Pairs { - row := types.MakeDatums( - tp, - addr, - v.Tp, - v.Name, - item.Key, - item.Value, - ) - rows = append(rows, row) - } - } - return rows -} - -func getServerInfoByGRPC(ctx context.Context, address string, tp diagnosticspb.ServerInfoType) ([]*diagnosticspb.ServerInfoItem, error) { - opt := grpc.WithInsecure() - security := config.GetGlobalConfig().Security - if len(security.ClusterSSLCA) != 0 { - clusterSecurity := security.ClusterSecurity() - tlsConfig, err := clusterSecurity.ToTLSConfig() - if err != nil { - return nil, errors.Trace(err) - } - opt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) - } - conn, err := grpc.Dial(address, opt) - if err != nil { - return nil, err - } - defer func() { - err := conn.Close() - if err != nil { - log.Error("close grpc connection error", zap.Error(err)) - } - }() - - cli := diagnosticspb.NewDiagnosticsClient(conn) - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - r, err := cli.ServerInfo(ctx, &diagnosticspb.ServerInfoRequest{Tp: tp}) - if err != nil { - return nil, err - } - return r.Items, nil -} - func parseFailpointServerInfo(s string) []infoschema.ServerInfo { servers := strings.Split(s, ";") serversInfo := make([]infoschema.ServerInfo, 0, len(servers)) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 700627fb8c9d9..342acd7dfeeb9 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -654,7 +654,7 @@ func rewriteTableScanAndAggArgs(physicalTableScan *PhysicalTableScan, aggFuncs [ // TiFlashFineGrainedShuffleStreamCount: // < 0: fine grained shuffle is disabled. // > 0: use TiFlashFineGrainedShuffleStreamCount as stream count. -// == 0: use TiFlashMaxThreads as stream count when it's greater than 0. Otherwise use DefStreamCountWhenMaxThreadsNotSet. +// == 0: use TiFlashMaxThreads as stream count when it's greater than 0. Otherwise set status as uninitialized. func handleFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, plan PhysicalPlan) { streamCount := sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount if streamCount < 0 { @@ -752,14 +752,14 @@ func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx s } } -func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, exchangeColLen int, splitLimit uint64) (applyFlag bool, streamCount uint64) { +func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, exchangeColCount int, splitLimit uint64) (applyFlag bool, streamCount uint64) { switch (*streamCountInfo).itemStatus { case unInitialized: streamCount = 4 // assume 8c node in cluster as minimal, stream count is 8 / 2 = 4 case initialized: streamCount = (*streamCountInfo).itemValue case failed: - return false, 0 + return false, 0 // probably this path won't reach } var tiflashServerCount uint64 = 0 @@ -781,7 +781,7 @@ func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Cont } // if already exceeds splitLimit, no need to fetch actual logical cores - if tiflashServerCount*uint64(exchangeColLen)*streamCount > splitLimit { + if tiflashServerCount*uint64(exchangeColCount)*streamCount > splitLimit { return false, 0 } @@ -796,28 +796,41 @@ func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Cont (*tiflashServerCountInfo).itemValue = 0 return false, 0 } - flag, streamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo) + flag, temStreamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo) if !flag { setDefaultStreamCount(streamCountInfo) (*tiflashServerCountInfo).itemStatus = failed return false, 0 } + streamCount = temStreamCount (*streamCountInfo).itemStatus = initialized (*streamCountInfo).itemValue = streamCount - applyFlag = tiflashServerCount*uint64(exchangeColLen)*streamCount <= splitLimit + applyFlag = tiflashServerCount*uint64(exchangeColCount)*streamCount <= splitLimit return applyFlag, streamCount } func inferFineGrainedShuffleStreamCountForWindow(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo) (streamCount uint64) { switch (*streamCountInfo).itemStatus { case unInitialized: + if (*tiflashServerCountInfo).itemStatus == failed { + setDefaultStreamCount(streamCountInfo) + streamCount = (*streamCountInfo).itemValue + break + } + serversInfo, err := infoschema.GetTiFlashServerInfo(sctx) if err != nil { setDefaultStreamCount(streamCountInfo) streamCount = (*streamCountInfo).itemValue + (*tiflashServerCountInfo).itemStatus = failed break } + if (*tiflashServerCountInfo).itemStatus == unInitialized { + (*tiflashServerCountInfo).itemStatus = initialized + (*tiflashServerCountInfo).itemValue = uint64(len(serversInfo)) + } + flag, temStreamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo) if !flag { setDefaultStreamCount(streamCountInfo) @@ -828,11 +841,6 @@ func inferFineGrainedShuffleStreamCountForWindow(ctx context.Context, sctx sessi streamCount = temStreamCount (*streamCountInfo).itemStatus = initialized (*streamCountInfo).itemValue = streamCount - - if (*tiflashServerCountInfo).itemStatus != initialized { - (*tiflashServerCountInfo).itemStatus = initialized - (*tiflashServerCountInfo).itemValue = uint64(len(serversInfo)) - } case initialized: streamCount = (*streamCountInfo).itemValue case failed: @@ -897,13 +905,13 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo) } - // don't reply on prob side's fineGrainedShuffle attribute + // don't apply fine grained shuffle for prob side helper.clear() setupFineGrainedShuffleInternal(ctx, sctx, probChild, helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalExchangeSender: if x.ExchangeType == tipb.ExchangeType_Hash { // Set up stream count for all plans based on shuffle target type. - var exchangeColLen = x.Schema().Len() + var exchangeColCount = x.Schema().Len() switch helper.shuffleTarget { case window: streamCount := inferFineGrainedShuffleStreamCountForWindow(ctx, sctx, streamCountInfo, tiflashServerCountInfo) @@ -912,7 +920,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex p.TiFlashFineGrainedShuffleStreamCount = streamCount } case hashAgg: - applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColLen, 1200) // 1200: performance test result + applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 1200) // 1200: performance test result if applyFlag { x.TiFlashFineGrainedShuffleStreamCount = streamCount for _, p := range helper.plans { @@ -922,7 +930,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex case joinBuild: // Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations if len(x.HashCols) == helper.joinKeysCount { - applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColLen, 600) // 600: performance test result + applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result if applyFlag { x.TiFlashFineGrainedShuffleStreamCount = streamCount for _, p := range helper.plans { From 9911217febb97a37661369714e5d6457e25f674b Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 23 Dec 2022 11:37:30 +0800 Subject: [PATCH 04/15] Add unit tests and integration tests Signed-off-by: yibin --- planner/core/integration_test.go | 57 +++++++++ planner/core/optimizer.go | 33 ++++-- planner/core/optimizer_test.go | 109 +++++++++++++++++- .../core/testdata/integration_suite_in.json | 7 ++ .../core/testdata/integration_suite_out.json | 35 ++++++ 5 files changed, 229 insertions(+), 12 deletions(-) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 743c6b87dc6d0..000817e8e6dac 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -4914,6 +4914,63 @@ func TestMppJoinExchangeColumnPrune(t *testing.T) { } } +func TestMppFineGrainedJoinAndAgg(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists tt") + tk.MustExec("create table t (c1 int, c2 int, c3 int NOT NULL, c4 int NOT NULL, c5 int)") + tk.MustExec("create table tt (b1 int)") + tk.MustExec("analyze table t") + tk.MustExec("analyze table tt") + + instances := []string{ + "tiflash,127.0.0.1:3933,127.0.0.1:7777,,", + "tikv,127.0.0.1:11080,127.0.0.1:10080,,", + } + fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo" + fpExpr := `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable(fpName, fpExpr)) + defer func() { require.NoError(t, failpoint.Disable(fpName)) }() + fpName2 := "github.com/pingcap/tidb/planner/core/mockTiFlashStreamCountUsingMinLogicalCores" + require.NoError(t, failpoint.Enable(fpName2, `return("8")`)) + + // Create virtual tiflash replica info. + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" || tblInfo.Name.L == "tt" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + tk.MustExec("set @@tidb_allow_mpp=1;") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_size = 1") + tk.MustExec("set @@session.tidb_broadcast_join_threshold_count = 1") + + var input []string + var output []struct { + SQL string + Plan []string + } + integrationSuiteData := core.GetIntegrationSuiteData() + integrationSuiteData.LoadTestCases(t, &input, &output) + for i, tt := range input { + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery(tt).Rows()) + }) + res := tk.MustQuery(tt) + res.Check(testkit.Rows(output[i].Plan...)) + } +} + func TestMppAggTopNWithJoin(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 342acd7dfeeb9..39dda7cf62517 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -17,6 +17,7 @@ package core import ( "context" "fmt" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/diagnosticspb" "math" "strconv" @@ -666,13 +667,13 @@ func handleFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, plan } } // use two separate cluster info to avoid grpc calls cost - serverCountInfo := tiflashClusterInfo{unInitialized, 0} + tiflashServerCountInfo := tiflashClusterInfo{unInitialized, 0} streamCountInfo := tiflashClusterInfo{unInitialized, 0} if streamCount != 0 { streamCountInfo.itemStatus = initialized streamCountInfo.itemValue = uint64(streamCount) } - setupFineGrainedShuffle(ctx, sctx, &serverCountInfo, &streamCountInfo, plan) + setupFineGrainedShuffle(ctx, sctx, &streamCountInfo, &tiflashServerCountInfo, plan) } func setupFineGrainedShuffle(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, plan PhysicalPlan) { @@ -730,6 +731,14 @@ func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *basePhysical // calculateTiFlashStreamCountUsingMinLogicalCores uses minimal logical cpu cores among tiflash servers, and divide by 2 // return false, 0 if any err happens func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx sessionctx.Context, serversInfo []infoschema.ServerInfo) (bool, uint64) { + failpoint.Inject("mockTiFlashStreamCountUsingMinLogicalCores", func(val failpoint.Value) { + intVal, err := strconv.Atoi(val.(string)) + if err == nil { + failpoint.Return(true, uint64(intVal)) + } else { + failpoint.Return(false, 0) + } + }) rows, err := infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx, serversInfo, diagnosticspb.ServerInfoType_HardwareInfo, false) if err != nil { return false, 0 @@ -882,8 +891,9 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex helper.plans = append(helper.plans, &x.basePhysicalPlan) setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalHashAgg: - helper.updateTarget(hashAgg, &x.basePhysicalPlan) - setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) + aggHelper := fineGrainedShuffleHelper{shuffleTarget: hashAgg, plans: []*basePhysicalPlan{}} + aggHelper.plans = append(aggHelper.plans, &x.basePhysicalPlan) + setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], &aggHelper, streamCountInfo, tiflashServerCountInfo) case *PhysicalHashJoin: child0 := x.children[0] child1 := x.children[1] @@ -929,13 +939,14 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex } case joinBuild: // Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations - if len(x.HashCols) == helper.joinKeysCount { - applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result - if applyFlag { - x.TiFlashFineGrainedShuffleStreamCount = streamCount - for _, p := range helper.plans { - p.TiFlashFineGrainedShuffleStreamCount = streamCount - } + if len(x.HashCols) != helper.joinKeysCount { + break + } + applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result + if applyFlag { + x.TiFlashFineGrainedShuffleStreamCount = streamCount + for _, p := range helper.plans { + p.TiFlashFineGrainedShuffleStreamCount = streamCount } } } diff --git a/planner/core/optimizer_test.go b/planner/core/optimizer_test.go index 0f9dc0a4050c4..3e7df77e17bf3 100644 --- a/planner/core/optimizer_test.go +++ b/planner/core/optimizer_test.go @@ -15,7 +15,9 @@ package core import ( + "github.com/pingcap/failpoint" "reflect" + "strings" "testing" "github.com/pingcap/tidb/expression" @@ -163,7 +165,7 @@ func TestHandleFineGrainedShuffle(t *testing.T) { sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount = expStreamCount start := func(p PhysicalPlan, expStreamCount int64, expChildCount int, curChildCount int) { - handleFineGrainedShuffle(sctx, tableReader) + handleFineGrainedShuffle(nil, sctx, tableReader) check(p, expStreamCount, expChildCount, curChildCount) clear(plans) } @@ -289,6 +291,111 @@ func TestHandleFineGrainedShuffle(t *testing.T) { tableScan1 = &PhysicalTableScan{} hashSender1.children = []PhysicalPlan{tableScan1} start(partWindow, expStreamCount, 3, 0) + + instances := []string{ + "tiflash,127.0.0.1:3933,127.0.0.1:7777,,", + "tikv,127.0.0.1:11080,127.0.0.1:10080,,", + } + fpName := "github.com/pingcap/tidb/infoschema/mockStoreServerInfo" + fpExpr := `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable(fpName, fpExpr)) + defer func() { require.NoError(t, failpoint.Disable(fpName)) }() + fpName2 := "github.com/pingcap/tidb/planner/core/mockTiFlashStreamCountUsingMinLogicalCores" + require.NoError(t, failpoint.Enable(fpName2, `return("8")`)) + sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount = 0 + + col0 := &expression.Column{ + UniqueID: sctx.GetSessionVars().AllocPlanColumnID(), + RetType: types.NewFieldType(mysql.TypeLonglong), + } + cond, err := expression.NewFunction(sctx, ast.EQ, types.NewFieldType(mysql.TypeTiny), col0, col0) + require.True(t, err == nil) + sf, isSF := cond.(*expression.ScalarFunction) + require.True(t, isSF) + var partitionCols = make([]*property.MPPPartitionColumn, 0, 1) + partitionCols = append(partitionCols, &property.MPPPartitionColumn{ + Col: col0, + CollateID: property.GetCollateIDByNameForPartition(col0.GetType().GetCollate()), + }) + + // HashAgg(x) <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + hashAgg = &PhysicalHashAgg{} + passSender.children = []PhysicalPlan{hashAgg} + hashAgg.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + tableScan.Schema().Columns = append(tableScan.Schema().Columns, col0) + start(hashAgg, 8, 3, 0) + + // Join(x) <- ExchangeReceiver <- ExchangeSender + // <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + hashJoin = &PhysicalHashJoin{} + hashJoin.EqualConditions = append(hashJoin.EqualConditions, sf) + hashJoin.RightJoinKeys = append(hashJoin.RightJoinKeys, col0) + hashJoin.InnerChildIdx = 1 + passSender.children = []PhysicalPlan{hashJoin} + recv = &PhysicalExchangeReceiver{} + recv1 = &PhysicalExchangeReceiver{} + tableScan = &PhysicalTableScan{} + tableScan1 = &PhysicalTableScan{} + hashSender = &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + hashSender1 = &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + hashJoin.children = []PhysicalPlan{recv, recv1} + recv.children = []PhysicalPlan{hashSender} + recv1.children = []PhysicalPlan{hashSender1} + hashSender.children = []PhysicalPlan{tableScan} + hashSender1.children = []PhysicalPlan{tableScan1} + hashSender1.HashCols = partitionCols + tableScan1.Schema().Columns = append(tableScan1.Schema().Columns, col0) + handleFineGrainedShuffle(nil, sctx, tableReader) + require.Equal(t, uint64(8), hashJoin.TiFlashFineGrainedShuffleStreamCount) + require.Equal(t, uint64(8), recv1.TiFlashFineGrainedShuffleStreamCount) + require.Equal(t, uint64(8), hashSender1.TiFlashFineGrainedShuffleStreamCount) + require.Equal(t, uint64(0), recv.TiFlashFineGrainedShuffleStreamCount) + require.Equal(t, uint64(0), hashSender.TiFlashFineGrainedShuffleStreamCount) + clear(plans) + + require.NoError(t, failpoint.Disable(fpName2)) + require.NoError(t, failpoint.Enable(fpName2, `return("8000")`)) + // HashAgg(x) <- ExchangeReceiver <- ExchangeSender, exceed splitLimit + tableReader.tablePlan = passSender + hashAgg = &PhysicalHashAgg{} + passSender.children = []PhysicalPlan{hashAgg} + hashAgg.children = []PhysicalPlan{recv} + recv.children = []PhysicalPlan{hashSender} + hashSender.children = []PhysicalPlan{tableScan} + tableScan.Schema().Columns = append(tableScan.Schema().Columns, col0) + start(hashAgg, 0, 3, 0) + + // exceed splitLimit + // Join(x) <- ExchangeReceiver <- ExchangeSender + // <- ExchangeReceiver <- ExchangeSender + tableReader.tablePlan = passSender + hashJoin = &PhysicalHashJoin{} + hashJoin.EqualConditions = append(hashJoin.EqualConditions, sf) + hashJoin.LeftJoinKeys = append(hashJoin.LeftJoinKeys, col0) + hashJoin.InnerChildIdx = 1 + passSender.children = []PhysicalPlan{hashJoin} + recv1 = &PhysicalExchangeReceiver{} + tableScan1 = &PhysicalTableScan{} + hashSender1 = &PhysicalExchangeSender{ + ExchangeType: tipb.ExchangeType_Hash, + } + hashJoin.children = []PhysicalPlan{recv, recv1} + recv.children = []PhysicalPlan{hashSender} + recv1.children = []PhysicalPlan{hashSender1} + hashSender.children = []PhysicalPlan{tableScan} + hashSender1.children = []PhysicalPlan{tableScan1} + hashSender1.HashCols = partitionCols + tableScan1.Schema().Columns = append(tableScan1.Schema().Columns, col0) + start(hashJoin, 0, 3, 0) + require.NoError(t, failpoint.Disable(fpName2)) } // Test for core.prunePhysicalColumns() diff --git a/planner/core/testdata/integration_suite_in.json b/planner/core/testdata/integration_suite_in.json index df990e52c65eb..0e668ce3df8ba 100644 --- a/planner/core/testdata/integration_suite_in.json +++ b/planner/core/testdata/integration_suite_in.json @@ -960,6 +960,13 @@ "desc format = 'brief' select * from tt t1 where exists (select * from t t2 where t1.b1 = t2.c3 and t2.c1 < t2.c2)" ] }, + { + "name": "TestMppFineGrainedJoinAndAgg", + "cases": [ + "desc format = 'brief' select * from tt t1 where exists (select * from t t2 where t1.b1 = t2.c3 and t2.c1 < t2.c2)", + "desc format = 'brief' select count(*) from tt group by b1" + ] + }, { "name": "TestPushDownAggForMPP", "cases": [ diff --git a/planner/core/testdata/integration_suite_out.json b/planner/core/testdata/integration_suite_out.json index 14c04c6cfb0ab..9286949b3c647 100644 --- a/planner/core/testdata/integration_suite_out.json +++ b/planner/core/testdata/integration_suite_out.json @@ -7521,6 +7521,41 @@ } ] }, + { + "Name": "TestMppFineGrainedJoinAndAgg", + "Cases": [ + { + "SQL": "desc format = 'brief' select * from tt t1 where exists (select * from t t2 where t1.b1 = t2.c3 and t2.c1 < t2.c2)", + "Plan": [ + "TableReader 7992.00 root data:ExchangeSender", + "└─ExchangeSender 7992.00 mpp[tiflash] ExchangeType: PassThrough", + " └─HashJoin 7992.00 mpp[tiflash] semi join, equal:[eq(test.tt.b1, test.t.c3)], stream_count: 8", + " ├─ExchangeReceiver(Build) 8000.00 mpp[tiflash] stream_count: 8", + " │ └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.t.c3, collate: binary], stream_count: 8", + " │ └─Projection 8000.00 mpp[tiflash] test.t.c3", + " │ └─Selection 8000.00 mpp[tiflash] lt(test.t.c1, test.t.c2)", + " │ └─TableFullScan 10000.00 mpp[tiflash] table:t2 keep order:false, stats:pseudo", + " └─ExchangeReceiver(Probe) 9990.00 mpp[tiflash] ", + " └─ExchangeSender 9990.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tt.b1, collate: binary]", + " └─Selection 9990.00 mpp[tiflash] not(isnull(test.tt.b1))", + " └─TableFullScan 10000.00 mpp[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select count(*) from tt group by b1", + "Plan": [ + "TableReader 8000.00 root data:ExchangeSender", + "└─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: PassThrough", + " └─Projection 8000.00 mpp[tiflash] Column#3", + " └─HashAgg 8000.00 mpp[tiflash] group by:test.tt.b1, funcs:sum(Column#7)->Column#3, stream_count: 8", + " └─ExchangeReceiver 8000.00 mpp[tiflash] stream_count: 8", + " └─ExchangeSender 8000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.tt.b1, collate: binary], stream_count: 8", + " └─HashAgg 8000.00 mpp[tiflash] group by:test.tt.b1, funcs:count(1)->Column#7", + " └─TableFullScan 10000.00 mpp[tiflash] table:tt keep order:false, stats:pseudo" + ] + } + ] + }, { "Name": "TestPushDownAggForMPP", "Cases": [ From bb99b1b6c5b340f11790bdb85fe91f807b6dec41 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 23 Dec 2022 12:04:23 +0800 Subject: [PATCH 05/15] Little refact Signed-off-by: yibin --- infoschema/tables.go | 12 ++++++------ planner/core/optimizer.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/infoschema/tables.go b/infoschema/tables.go index a867727aee179..2614ce25697b3 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -18,12 +18,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/pingcap/kvproto/pkg/diagnosticspb" - "github.com/pingcap/log" - "github.com/pingcap/tidb/util/set" - "golang.org/x/exp/slices" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" "net" "net/http" "strconv" @@ -33,7 +27,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/domain/infosync" @@ -55,9 +51,13 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/pdapi" "github.com/pingcap/tidb/util/sem" + "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/stmtsummary" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" + "golang.org/x/exp/slices" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) const ( diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 39dda7cf62517..3f98024dda33b 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -17,12 +17,12 @@ package core import ( "context" "fmt" - "github.com/pingcap/failpoint" - "github.com/pingcap/kvproto/pkg/diagnosticspb" "math" "strconv" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/diagnosticspb" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" From 39e13f48d6e9699639b8c0bc839151c39caa89f1 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 23 Dec 2022 12:10:41 +0800 Subject: [PATCH 06/15] Little fix Signed-off-by: yibin --- planner/core/optimizer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/optimizer_test.go b/planner/core/optimizer_test.go index 3e7df77e17bf3..59afdf9e9afba 100644 --- a/planner/core/optimizer_test.go +++ b/planner/core/optimizer_test.go @@ -15,11 +15,11 @@ package core import ( - "github.com/pingcap/failpoint" "reflect" "strings" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" From 77fcd436a4b263aea042fd7733662afa3bbb8015 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 23 Dec 2022 12:24:05 +0800 Subject: [PATCH 07/15] Little fix Signed-off-by: yibin --- planner/core/optimizer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 3f98024dda33b..3bee8d4a82ea9 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -743,8 +743,8 @@ func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx s if err != nil { return false, 0 } - var kMaxCores uint64 = 10000 - var minLogicalCores uint64 = kMaxCores // set to a large enough value here + var initialMaxCores uint64 = 10000 + var minLogicalCores uint64 = initialMaxCores // set to a large enough value here for _, row := range rows { if row[4].GetString() == "cpu-logical-cores" { logicalCpus, err := strconv.Atoi(row[5].GetString()) @@ -754,11 +754,11 @@ func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx s } } // No need to check len(serersInfo) == serverCount here, since missing some servers' info won't affect the correctness - if minLogicalCores > 1 && minLogicalCores != kMaxCores { + if minLogicalCores > 1 && minLogicalCores != initialMaxCores { return true, minLogicalCores / 2 - } else { - return false, 0 } + + return false, 0 } func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Context, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, exchangeColCount int, splitLimit uint64) (applyFlag bool, streamCount uint64) { From 44a356024490135273d12a281392e8c7f3e36ec1 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 23 Dec 2022 12:51:53 +0800 Subject: [PATCH 08/15] Disable failpoint Signed-off-by: yibin --- planner/core/integration_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 000817e8e6dac..8c84a6b0798e8 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -4935,6 +4935,7 @@ func TestMppFineGrainedJoinAndAgg(t *testing.T) { defer func() { require.NoError(t, failpoint.Disable(fpName)) }() fpName2 := "github.com/pingcap/tidb/planner/core/mockTiFlashStreamCountUsingMinLogicalCores" require.NoError(t, failpoint.Enable(fpName2, `return("8")`)) + defer func() { require.NoError(t, failpoint.Disable(fpName2)) }() // Create virtual tiflash replica info. dom := domain.GetDomain(tk.Session()) From befd5f02e3f4234be36fe25d6cb51349ce0630e6 Mon Sep 17 00:00:00 2001 From: yibin Date: Fri, 23 Dec 2022 14:27:51 +0800 Subject: [PATCH 09/15] Little fix Signed-off-by: yibin --- infoschema/BUILD.bazel | 4 ++++ planner/core/BUILD.bazel | 1 + planner/core/testdata/window_push_down_suite_out.json | 4 ++-- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/infoschema/BUILD.bazel b/infoschema/BUILD.bazel index 37fd2bc423dc9..a92608a8df1b3 100644 --- a/infoschema/BUILD.bazel +++ b/infoschema/BUILD.bazel @@ -50,8 +50,12 @@ go_library( "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", + "@com_github_pingcap_kvproto//pkg/diagnosticspb", "@com_github_pingcap_kvproto//pkg/metapb", + "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//tikv", + "@org_golang_google_grpc//:grpc", + "@org_golang_google_grpc//credentials", "@org_golang_x_exp//slices", "@org_uber_go_zap//:zap", ], diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 0495f1f50cae0..c248e0f08f6ba 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -145,6 +145,7 @@ go_library( "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", + "@com_github_pingcap_kvproto//pkg/diagnosticspb", "@com_github_pingcap_tipb//go-tipb", "@com_github_tikv_client_go_v2//kv", "@com_github_tikv_client_go_v2//tikv", diff --git a/planner/core/testdata/window_push_down_suite_out.json b/planner/core/testdata/window_push_down_suite_out.json index 0b587eb83ce39..395c9e4ff61b6 100644 --- a/planner/core/testdata/window_push_down_suite_out.json +++ b/planner/core/testdata/window_push_down_suite_out.json @@ -415,7 +415,7 @@ " └─HashAgg_43 1.00 mpp[tiflash] funcs:count(distinct test.employee.empid)->Column#9", " └─ExchangeReceiver_45 1.00 mpp[tiflash] ", " └─ExchangeSender_44 1.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: test.employee.empid, collate: binary]", - " └─HashAgg_41 1.00 mpp[tiflash] group by:test.employee.empid, ", + " └─HashAgg_41 1.00 mpp[tiflash] group by:test.employee.empid, , stream_count: 8", " └─Window_27 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", " └─Sort_18 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", " └─ExchangeReceiver_17 10000.00 mpp[tiflash] stream_count: 8", @@ -447,7 +447,7 @@ " └─HashAgg_50 10000.00 mpp[tiflash] group by:Column#6, funcs:sum(Column#10)->Column#7", " └─ExchangeReceiver_52 10000.00 mpp[tiflash] ", " └─ExchangeSender_51 10000.00 mpp[tiflash] ExchangeType: HashPartition, Hash Cols: [name: Column#6, collate: binary]", - " └─HashAgg_47 10000.00 mpp[tiflash] group by:Column#6, funcs:count(test.employee.empid)->Column#10", + " └─HashAgg_47 10000.00 mpp[tiflash] group by:Column#6, funcs:count(test.employee.empid)->Column#10, stream_count: 8", " └─Window_36 10000.00 mpp[tiflash] row_number()->Column#6 over(partition by test.employee.deptid rows between current row and current row), stream_count: 8", " └─Sort_21 10000.00 mpp[tiflash] test.employee.deptid, stream_count: 8", " └─ExchangeReceiver_20 10000.00 mpp[tiflash] stream_count: 8", From 1bb31370347c754db4fa65f6951af3fc5bed04e1 Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 3 Jan 2023 10:02:20 +0800 Subject: [PATCH 10/15] Change comments Signed-off-by: yibin --- planner/core/optimizer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 3bee8d4a82ea9..9b3c9cc0bb4f7 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -768,7 +768,7 @@ func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Cont case initialized: streamCount = (*streamCountInfo).itemValue case failed: - return false, 0 // probably this path won't reach + return false, 0 // probably won't reach this path } var tiflashServerCount uint64 = 0 From 67dc7f4a9454b6984440649a1129415ed22f08de Mon Sep 17 00:00:00 2001 From: yibin Date: Tue, 3 Jan 2023 17:06:15 +0800 Subject: [PATCH 11/15] Handle DisaggregatedTiFlash mode Signed-off-by: yibin --- infoschema/tables.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/infoschema/tables.go b/infoschema/tables.go index 2614ce25697b3..2e91714ec2e42 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -2237,6 +2237,9 @@ func (vt *VirtualTable) Type() table.Type { // GetTiFlashServerInfo returns all TiFlash server infos func GetTiFlashServerInfo(sctx sessionctx.Context) ([]ServerInfo, error) { + if config.GetGlobalConfig().DisaggregatedTiFlash { + return nil, table.ErrUnsupportedOp + } serversInfo, err := GetStoreServerInfo(sctx) if err != nil { return nil, err From 87d3fd78836d4920d7643ff09be4015feee845fc Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 4 Jan 2023 10:50:49 +0800 Subject: [PATCH 12/15] Changes to review comments Signed-off-by: yibin --- planner/core/optimizer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 9b3c9cc0bb4f7..e8d6423514405 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -778,6 +778,9 @@ func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx sessionctx.Cont if err != nil { (*tiflashServerCountInfo).itemStatus = failed (*tiflashServerCountInfo).itemValue = 0 + if (*streamCountInfo).itemStatus == unInitialized { + setDefaultStreamCount(streamCountInfo) + } return false, 0 } tiflashServerCount = uint64(len(serversInfo)) From 0e1731d2978de201b3609a313cd9ad69fdb9250f Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 4 Jan 2023 12:09:42 +0800 Subject: [PATCH 13/15] Update planner/core/optimizer.go Co-authored-by: guo-shaoge --- planner/core/optimizer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 90df3b1cc3c2d..81b7b9c3291a1 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -904,7 +904,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex probChild := child1 joinKeys := x.LeftJoinKeys if x.InnerChildIdx != 0 { - // Child0 is build side. + // Child1 is build side. buildChild = child1 joinKeys = x.RightJoinKeys probChild = child0 From 4496777c4b6331b440a8fd5d43b91f7cfd119c08 Mon Sep 17 00:00:00 2001 From: yibin Date: Wed, 4 Jan 2023 12:10:03 +0800 Subject: [PATCH 14/15] Update planner/core/optimizer.go Co-authored-by: guo-shaoge --- planner/core/optimizer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 81b7b9c3291a1..7afb2227def8c 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -918,7 +918,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*basePhysicalPlan{}} setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo) } - // don't apply fine grained shuffle for prob side + // don't apply fine grained shuffle for probe side helper.clear() setupFineGrainedShuffleInternal(ctx, sctx, probChild, helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalExchangeSender: From f746693077c40f8dbcd800a059e6d51cbd6f0dc1 Mon Sep 17 00:00:00 2001 From: yibin Date: Thu, 5 Jan 2023 16:28:17 +0800 Subject: [PATCH 15/15] Changes to comments Signed-off-by: yibin --- planner/core/optimizer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 90df3b1cc3c2d..3e694d31fee82 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -894,6 +894,7 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex helper.plans = append(helper.plans, &x.basePhysicalPlan) setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], helper, streamCountInfo, tiflashServerCountInfo) case *PhysicalHashAgg: + // Todo: allow hash aggregation's output still benefits from fine grained shuffle aggHelper := fineGrainedShuffleHelper{shuffleTarget: hashAgg, plans: []*basePhysicalPlan{}} aggHelper.plans = append(aggHelper.plans, &x.basePhysicalPlan) setupFineGrainedShuffleInternal(ctx, sctx, x.children[0], &aggHelper, streamCountInfo, tiflashServerCountInfo)