Skip to content

Commit

Permalink
executor: return error when recover indexHash/MergeJoin worker (pingc…
Browse files Browse the repository at this point in the history
…ap#18509) (pingcap#18527)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
ti-srebot authored Jul 14, 2020
1 parent 89e34b7 commit cba1e23
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 4 deletions.
5 changes: 4 additions & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"context"
"fmt"
"hash"
"hash/fnv"
"sync"
Expand Down Expand Up @@ -189,7 +190,9 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {

func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
if r != nil {
logutil.BgLogger().Error("IndexNestedLoopHashJoin failed", zap.Error(errors.Errorf("%v", r)))
e.resultCh <- &indexHashJoinResult{
err: errors.New(fmt.Sprintf("%v", r)),
}
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down
13 changes: 10 additions & 3 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,18 @@ func (e *IndexLookUpMergeJoin) getFinishedTask(ctx context.Context) {

func (omw *outerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancelFunc context.CancelFunc) {
defer func() {
close(omw.resultCh)
close(omw.innerCh)
wg.Done()
if r := recover(); r != nil {
task := &lookUpMergeJoinTask{
doneErr: errors.New(fmt.Sprintf("%v", r)),
results: make(chan *indexMergeJoinResult, numResChkHold),
}
close(task.results)
omw.resultCh <- task
cancelFunc()
}
close(omw.resultCh)
close(omw.innerCh)
wg.Done()
}()
for {
task, err := omw.buildTask(ctx)
Expand All @@ -315,6 +321,7 @@ func (omw *outerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancel
omw.pushToChan(ctx, task, omw.resultCh)
return
}
failpoint.Inject("mockIndexMergeJoinOOMPanic", nil)
if task == nil {
return
}
Expand Down
24 changes: 24 additions & 0 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -2064,3 +2065,26 @@ func (s *testSuiteJoinSerial) TestInlineProjection4HashJoinIssue15316(c *C) {
" └─Selection_13 9990.00 cop[tikv] not(isnull(test.s.b))",
" └─TableFullScan_12 10000.00 cop[tikv] table:S keep order:false, stats:pseudo"))
}

func (s *testSuiteJoinSerial) TestIssue18070(c *C) {
config.GetGlobalConfig().OOMAction = config.OOMActionCancel
defer func() { config.GetGlobalConfig().OOMAction = config.OOMActionLog }()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1(a int, index(a))")
tk.MustExec("create table t2(a int, index(a))")
tk.MustExec("insert into t1 values(1),(2)")
tk.MustExec("insert into t2 values(1),(1),(2),(2)")
tk.MustExec("set @@tidb_mem_quota_query=1000")
err := tk.QueryToErr("select /*+ inl_hash_join(t1)*/ * from t1 join t2 on t1.a = t2.a;")
c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue)

fpName := "github.com/pingcap/tidb/executor/mockIndexMergeJoinOOMPanic"
c.Assert(failpoint.Enable(fpName, `panic("ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")`), IsNil)
defer func() {
c.Assert(failpoint.Disable(fpName), IsNil)
}()
err = tk.QueryToErr("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;")
c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue)
}

0 comments on commit cba1e23

Please sign in to comment.