Skip to content

Commit

Permalink
executor,distsql: fix analyze version 2 memory leak (#28729) (#29305)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Feb 9, 2022
1 parent bcfc749 commit 423d188
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 10 deletions.
7 changes: 7 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/errno"
Expand Down Expand Up @@ -272,6 +273,12 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {

// NextRaw returns the next raw partial result.
func (r *selectResult) NextRaw(ctx context.Context) (data []byte, err error) {
failpoint.Inject("mockNextRawError", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(nil, errors.New("mockNextRawError"))
}
})

resultSubset, err := r.resp.Next(ctx)
r.partialCount++
r.feedback.Invalidate()
Expand Down
28 changes: 18 additions & 10 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,21 @@ func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
return nil
}

func readDataAndSendTask(handler *tableResultHandler, mergeTaskCh chan []byte) error {
defer close(mergeTaskCh)
for {
data, err := handler.nextRaw(context.TODO())
if err != nil {
return errors.Trace(err)
}
if data == nil {
break
}
mergeTaskCh <- data
}
return nil
}

func (e *AnalyzeColumnsExec) buildSamplingStats(
ranges []*ranger.Range,
needExtStats bool,
Expand Down Expand Up @@ -853,17 +868,10 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
for i := 0; i < statsConcurrency; i++ {
go e.subMergeWorker(mergeResultCh, mergeTaskCh, l, i == 0)
}
for {
data, err1 := e.resultHandler.nextRaw(context.TODO())
if err1 != nil {
return 0, nil, nil, nil, nil, err1
}
if data == nil {
break
}
mergeTaskCh <- data
if err = readDataAndSendTask(e.resultHandler, mergeTaskCh); err != nil {
return 0, nil, nil, nil, nil, err
}
close(mergeTaskCh)

mergeWorkerPanicCnt := 0
for mergeWorkerPanicCnt < statsConcurrency {
mergeResult, ok := <-mergeResultCh
Expand Down
17 changes: 17 additions & 0 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,3 +1578,20 @@ func (s *seqTestSuite) TestIssue19410(c *C) {
tk.MustQuery("select /*+ INL_HASH_JOIN(t3) */ * from t join t3 on t.b = t3.b1;").Check(testkit.Rows("1 A 1 A"))
tk.MustQuery("select /*+ INL_JOIN(t3) */ * from t join t3 on t.b = t3.b1;").Check(testkit.Rows("1 A 1 A"))
}

func (s *seqTestSuite) TestAnalyzeNextRawErrorNoLeak(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1(id int, c varchar(32))")
tk.MustExec("set @@session.tidb_analyze_version = 2")

c.Assert(failpoint.Enable("github.com/pingcap/tidb/distsql/mockNextRawError", `return(true)`), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/distsql/mockNextRawError"), IsNil)
}()

err := tk.ExecToErr("analyze table t1")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "mockNextRawError")
}

0 comments on commit 423d188

Please sign in to comment.