Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix hang for analyze table when exceed tidb_mem_quota_analyze #48264

Merged
merged 9 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 57 additions & 19 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type AnalyzeExec struct {
opts map[ast.AnalyzeOptionType]uint64
OptionsMap map[int64]core.V2AnalyzeOptions
gp *gp.Pool
// errExitCh is used to notice the worker that the whole analyze task is finished when to meet error.
errExitCh chan struct{}
}

var (
Expand Down Expand Up @@ -123,7 +125,6 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
g.Go(func() error {
return e.handleResultsError(ctx, concurrency, needGlobalStats, globalStatsMap, resultsCh, len(tasks))
})

for _, task := range tasks {
prepareV2AnalyzeJobInfo(task.colExec, false)
AddNewAnalyzeJob(e.Ctx(), task.job)
Expand All @@ -137,19 +138,19 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
taskCh <- task
}
close(taskCh)

// Wait all workers done and close the results channel.
e.wg.Wait()
close(resultsCh)
err = g.Wait()
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
defer func() {
for _, task := range tasks {
if task.colExec != nil && task.colExec.memTracker != nil {
task.colExec.memTracker.Detach()
}
}
}
}()

err = e.waitFinish(ctx, g, resultsCh)
if err != nil {
return err
}

failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
dom := domain.GetDomain(e.Ctx())
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
Expand All @@ -170,6 +171,27 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return statsHandle.Update(infoSchema)
}

func (e *AnalyzeExec) waitFinish(ctx context.Context, g *errgroup.Group, resultsCh chan *statistics.AnalyzeResults) error {
checkwg, _ := errgroup.WithContext(ctx)
checkwg.Go(func() error {
// It is to wait for the completion of the result handler. if the result handler meets error, we should cancel
// the analyze process by closing the errExitCh.
err := g.Wait()
if err != nil {
close(e.errExitCh)
return err
}
return nil
})
checkwg.Go(func() error {
// Wait all workers done and close the results channel.
e.wg.Wait()
close(resultsCh)
return nil
})
return checkwg.Wait()
}

// filterAndCollectTasks filters the tasks that are not locked and collects the table IDs.
func filterAndCollectTasks(tasks []*analyzeTask, statsHandle *handle.Handle, infoSchema infoschema.InfoSchema) ([]*analyzeTask, uint, []string, error) {
var (
Expand Down Expand Up @@ -496,9 +518,17 @@ func (e *AnalyzeExec) analyzeWorker(taskCh <-chan *analyzeTask, resultsCh chan<-
StartAnalyzeJob(e.Ctx(), task.job)
switch task.taskType {
case colTask:
resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec)
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeColumnsPushDownEntry(e.gp, task.colExec):
}
case idxTask:
resultsCh <- analyzeIndexPushdown(task.idxExec)
select {
case <-e.errExitCh:
return
case resultsCh <- analyzeIndexPushdown(task.idxExec):
}
}
}
}
Expand Down Expand Up @@ -685,16 +715,24 @@ func finishJobWithLog(sctx sessionctx.Context, job *statistics.AnalyzeJob, analy
var state string
if analyzeErr != nil {
state = statistics.AnalyzeFailed
logutil.BgLogger().Warn(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason),
zap.Error(analyzeErr))
} else {
state = statistics.AnalyzeFinished
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason))
}
logutil.BgLogger().Info(fmt.Sprintf("analyze table `%s`.`%s` has %s", job.DBName, job.TableName, state),
zap.String("partition", job.PartitionName),
zap.String("job info", job.JobInfo),
zap.Time("start time", job.StartTime),
zap.Time("end time", job.EndTime),
zap.String("cost", job.EndTime.Sub(job.StartTime).String()),
zap.String("sample rate reason", job.SampleRateReason))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func isAnalyzeWorkerPanic(err error) bool {
func getAnalyzePanicErr(r interface{}) error {
if msg, ok := r.(string); ok {
if msg == globalPanicAnalyzeMemoryExceed {
return errAnalyzeOOM
return errors.Trace(errAnalyzeOOM)
}
}
if err, ok := r.(error); ok {
Expand All @@ -66,7 +66,7 @@ func getAnalyzePanicErr(r interface{}) error {
}
return err
}
return errAnalyzeWorkerPanic
return errors.Trace(errAnalyzeWorkerPanic)
winoros marked this conversation as resolved.
Show resolved Hide resolved
}

// analyzeResultsNotifyWaitGroupWrapper is a wrapper for sync.WaitGroup
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2903,6 +2903,7 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
OptionsMap: v.OptionsMap,
wg: util.NewWaitGroupPool(gp),
gp: gp,
errExitCh: make(chan struct{}),
}
autoAnalyze := ""
if b.ctx.GetSessionVars().InRestrictedSQL {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ go_test(
"memory_control_test.go",
],
flaky = True,
shard_count = 3,
shard_count = 4,
deps = [
"//pkg/config",
"//pkg/executor",
Expand Down
19 changes: 19 additions & 0 deletions pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,22 @@ func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) {
childTrackers = executor.GlobalAnalyzeMemoryTracker.GetChildrenForTest()
require.Len(t, childTrackers, 0)
}

func TestMemQuotaAnalyze(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table tbl_2 ( col_20 decimal default 84232 , col_21 tinyint not null , col_22 int default 80814394 , col_23 mediumint default -8036687 not null , col_24 smallint default 9185 not null , col_25 tinyint unsigned default 65 , col_26 char(115) default 'ZyfroRODMbNDRZnPNRW' not null , col_27 bigint not null , col_28 tinyint not null , col_29 char(130) default 'UMApsVgzHblmY' , primary key idx_14 ( col_28,col_22 ) , unique key idx_15 ( col_24,col_22 ) , key idx_16 ( col_21,col_20,col_24,col_25,col_27,col_28,col_26,col_29 ) , key idx_17 ( col_24,col_25 ) , unique key idx_18 ( col_25,col_23,col_29,col_27,col_26,col_22 ) , key idx_19 ( col_25,col_22,col_26,col_23 ) , unique key idx_20 ( col_22,col_24,col_28,col_29,col_26,col_20 ) , key idx_21 ( col_25,col_24,col_26,col_29,col_27,col_22,col_28 ) ) partition by range ( col_22 ) ( partition p0 values less than (-1938341588), partition p1 values less than (-1727506184), partition p2 values less than (-1700184882), partition p3 values less than (-1596142809), partition p4 values less than (445165686) );")
tk.MustExec("insert ignore into tbl_2 values ( 942,33,-1915007317,3408149,-3699,193,'Trywdis',1876334369465184864,115,null );")
tk.MustExec("insert ignore into tbl_2 values ( 7,-39,-1382727205,-2544981,-28075,88,'FDhOsTRKRLCwEk',-1239168882463214388,17,'WskQzCK' );")
tk.MustExec("insert ignore into tbl_2 values ( null,55,-388460319,-2292918,10130,162,'UqjDlYvdcNY',4872802276956896607,-51,'ORBQjnumcXP' );")
tk.MustExec("insert ignore into tbl_2 values ( 42,-19,-9677826,-1168338,16904,79,'TzOqH',8173610791128879419,65,'lNLcvOZDcRzWvDO' );")
tk.MustExec("insert ignore into tbl_2 values ( 2,26,369867543,-6773303,-24953,41,'BvbdrKTNtvBgsjjnxt',5996954963897924308,-95,'wRJYPBahkIGDfz' );")
tk.MustExec("insert ignore into tbl_2 values ( 6896,3,444460824,-2070971,-13095,167,'MvWNKbaOcnVuIrtbT',6968339995987739471,-5,'zWipNBxGeVmso' );")
tk.MustExec("insert ignore into tbl_2 values ( 58761,112,-1535034546,-5837390,-14204,157,'',-8319786912755096816,15,'WBjsozfBfrPPHmKv' );")
tk.MustExec("insert ignore into tbl_2 values ( 84923,113,-973946646,406140,25040,51,'THQdwkQvppWZnULm',5469507709881346105,94,'oGNmoxLLgHkdyDCT' );")
tk.MustExec("insert ignore into tbl_2 values ( 0,-104,-488745187,-1941015,-2646,39,'jyKxfs',-5307175470406648836,46,'KZpfjFounVgFeRPa' );")
tk.MustExec("insert ignore into tbl_2 values ( 4,97,2105289255,1034363,28385,192,'',4429378142102752351,8,'jOk' );")
tk.MustExec("set global tidb_mem_quota_analyze=128;")
tk.MustExecToErr("analyze table tbl_2;")
}