diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index e03384a798f28..dfede97ee03c6 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -336,6 +336,7 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { defer trace.StartRegion(ctx, "IndexHashJoinOuterWorker").End() defer close(ow.innerCh) for { + failpoint.Inject("TestIssue30211", nil) task, err := ow.buildTask(ctx) failpoint.Inject("testIndexHashJoinOuterWorkerErr", func() { err = errors.New("mockIndexHashJoinOuterWorkerErr") diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index fd0be0f841516..db0ec2c3756d8 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -26,6 +26,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/expression" @@ -359,6 +360,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { task := &lookUpJoinTask{doneCh: make(chan error, 1)} err := errors.Errorf("%v", r) task.doneCh <- err + ow.pushToChan(ctx, task, ow.resultCh) ow.lookup.ctxCancelReason.Store(err) ow.lookup.cancelFunc() } @@ -367,6 +369,7 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { wg.Done() }() for { + failpoint.Inject("TestIssue30211", nil) task, err := ow.buildTask(ctx) if err != nil { task.doneCh <- err diff --git a/executor/join_test.go b/executor/join_test.go index a5ca48e7b1976..26b80c08827fc 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2609,3 +2609,39 @@ func (s *testSuiteJoinSerial) TestIssue25902(c *C) { tk.MustQuery("select * from tt1 where ts in (select ts from tt2);").Check(testkit.Rows()) tk.MustExec("set @@session.time_zone = @tmp;") } + +func (s *testSuiteJoinSerial) TestIssue30211(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int, index(a));") + tk.MustExec("create table t2(a int, index(a));") + func() { + fpName := "github.com/pingcap/tidb/executor/TestIssue30211" + c.Assert(failpoint.Enable(fpName, `panic("TestIssue30211 IndexJoinPanic")`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic") + + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(err, Matches, "failpoint panic: TestIssue30211 IndexJoinPanic") + }() + tk.MustExec("insert into t1 values(1),(2);") + tk.MustExec("insert into t2 values(1),(1),(2),(2);") + tk.MustExec("set @@tidb_mem_quota_query=8000;") + tk.MustExec("set tidb_index_join_batch_size = 1;") + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + defer func() { + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionLog + }) + }() + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 join t2 on t1.a = t2.a;").Error() + c.Assert(strings.Contains(err, "Out Of Memory Quota"), IsTrue) +}