diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 9056faa5d06d8..e66c912160153 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -15,6 +15,7 @@ package executor import ( "context" + "fmt" "hash" "hash/fnv" "sync" @@ -189,7 +190,9 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) { func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { if r != nil { - logutil.BgLogger().Error("IndexNestedLoopHashJoin failed", zap.Error(errors.Errorf("%v", r))) + e.resultCh <- &indexHashJoinResult{ + err: errors.New(fmt.Sprintf("%v", r)), + } if e.cancelFunc != nil { e.cancelFunc() } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index 5e495fc90f81d..4f1309d64e9cb 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -300,12 +300,21 @@ func (e *IndexLookUpMergeJoin) getFinishedTask(ctx context.Context) { func (omw *outerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancelFunc context.CancelFunc) { defer func() { - close(omw.resultCh) - close(omw.innerCh) - wg.Done() if r := recover(); r != nil { +<<<<<<< HEAD +======= + task := &lookUpMergeJoinTask{ + doneErr: errors.New(fmt.Sprintf("%v", r)), + results: make(chan *indexMergeJoinResult, numResChkHold), + } + close(task.results) + omw.resultCh <- task +>>>>>>> 30858ec... executor: return error when recover indexHash/MergeJoin worker (#18509) cancelFunc() } + close(omw.resultCh) + close(omw.innerCh) + wg.Done() }() for { task, err := omw.buildTask(ctx) @@ -315,6 +324,7 @@ func (omw *outerMergeWorker) run(ctx context.Context, wg *sync.WaitGroup, cancel omw.pushToChan(ctx, task, omw.resultCh) return } + failpoint.Inject("mockIndexMergeJoinOOMPanic", nil) if task == nil { return } diff --git a/executor/join_test.go b/executor/join_test.go index 8a7a13ceaaf56..5faacb2ce1cb7 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "math/rand" + "strings" "time" . "github.com/pingcap/check" @@ -2064,3 +2065,26 @@ func (s *testSuiteJoinSerial) TestInlineProjection4HashJoinIssue15316(c *C) { " └─Selection_13 9990.00 cop[tikv] not(isnull(test.s.b))", " └─TableFullScan_12 10000.00 cop[tikv] table:S keep order:false, stats:pseudo")) } + +func (s *testSuiteJoinSerial) TestIssue18070(c *C) { + config.GetGlobalConfig().OOMAction = config.OOMActionCancel + defer func() { config.GetGlobalConfig().OOMAction = config.OOMActionLog }() + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("create table t1(a int, index(a))") + tk.MustExec("create table t2(a int, index(a))") + tk.MustExec("insert into t1 values(1),(2)") + tk.MustExec("insert into t2 values(1),(1),(2),(2)") + tk.MustExec("set @@tidb_mem_quota_query=1000") + err := tk.QueryToErr("select /*+ inl_hash_join(t1)*/ * from t1 join t2 on t1.a = t2.a;") + c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue) + + fpName := "github.com/pingcap/tidb/executor/mockIndexMergeJoinOOMPanic" + c.Assert(failpoint.Enable(fpName, `panic("ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + err = tk.QueryToErr("select /*+ inl_merge_join(t1)*/ * from t1 join t2 on t1.a = t2.a;") + c.Assert(strings.Contains(err.Error(), "Out Of Memory Quota!"), IsTrue) +}