From ff2f8c46c672aca54531fb007bc487d2f8bb5b75 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Sun, 8 Oct 2023 18:47:44 +0800 Subject: [PATCH 1/2] executor: AnalyzePartitionConcurrency is not more than number of task Signed-off-by: Weizhen Wang --- executor/analyze.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/analyze.go b/executor/analyze.go index 04c35d91b1395..988619c2a3a7f 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -355,6 +355,8 @@ func (e *AnalyzeExec) handleResultsError( resultsCh <-chan *statistics.AnalyzeResults, ) error { partitionStatsConcurrency := e.Ctx().GetSessionVars().AnalyzePartitionConcurrency + // the concurrency of handleResultsError cannot be more than partitionStatsConcurrency + partitionStatsConcurrency = min(concurrency, partitionStatsConcurrency) // If partitionStatsConcurrency > 1, we will try to demand extra session from Domain to save Analyze results in concurrency. // If there is no extra session we can use, we will save analyze results in single-thread. if partitionStatsConcurrency > 1 { From 489b259ad2ee8dadde8c1e6156b6d67e7b5a3bd4 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Sun, 8 Oct 2023 18:47:57 +0800 Subject: [PATCH 2/2] executor: AnalyzePartitionConcurrency is not more than number of task Signed-off-by: Weizhen Wang --- executor/analyze.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/executor/analyze.go b/executor/analyze.go index 988619c2a3a7f..073c3fb66aed9 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -122,7 +122,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error { globalStatsMap := make(map[globalStatsKey]globalStatsInfo) g, _ := errgroup.WithContext(ctx) g.Go(func() error { - return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh) + return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks)) }) for _, task := range tasks { @@ -353,10 +353,11 @@ func (e *AnalyzeExec) handleResultsError( needGlobalStats bool, globalStatsMap globalStatsMap, resultsCh <-chan *statistics.AnalyzeResults, + taskNum int, ) error { partitionStatsConcurrency := e.Ctx().GetSessionVars().AnalyzePartitionConcurrency // the concurrency of handleResultsError cannot be more than partitionStatsConcurrency - partitionStatsConcurrency = min(concurrency, partitionStatsConcurrency) + partitionStatsConcurrency = min(taskNum, partitionStatsConcurrency) // If partitionStatsConcurrency > 1, we will try to demand extra session from Domain to save Analyze results in concurrency. // If there is no extra session we can use, we will save analyze results in single-thread. if partitionStatsConcurrency > 1 {