diff --git a/executor/executor_test.go b/executor/executor_test.go index 8a8889cb95d9f..e5b6e37970695 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9327,6 +9327,45 @@ func (s *testSuiteP1) TestIssue29412(c *C) { tk.MustQuery("select sum(distinct a) as x from t29142_1 having x > some ( select a from t29142_2 where x in (a));").Check(nil) } +func (s *testSerialSuite) TestIndexJoin31494(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t1, t2;") + tk.MustExec("create table t1(a int(11) default null, b int(11) default null, key(b));") + insertStr := "insert into t1 values(1, 1)" + for i := 1; i < 32768; i++ { + insertStr += fmt.Sprintf(", (%d, %d)", i, i) + } + tk.MustExec(insertStr) + tk.MustExec("create table t2(a int(11) default null, b int(11) default null, c int(11) default null)") + insertStr = "insert into t2 values(1, 1, 1)" + for i := 1; i < 32768; i++ { + insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i) + } + tk.MustExec(insertStr) + sm := &mockSessionManager1{ + PS: make([]*util.ProcessInfo, 0), + } + tk.Se.SetSessionManager(sm) + s.domain.ExpensiveQueryHandle().SetSessionManager(sm) + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.OOMAction = config.OOMActionCancel + }) + c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue) + tk.MustExec("set @@tidb_mem_quota_query=2097152;") + // This bug will be reproduced in 10 times. + for i := 0; i < 10; i++ { + err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 right join t2 on t1.b=t2.b;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t2 on t1.b=t2.b;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + } +} + func (s *testSerialSuite) TestIssue28650(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index 4fb2abf4509aa..9e9fd50694743 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -212,7 +212,6 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) { e.taskCh <- task } if e.cancelFunc != nil { - e.IndexLookUpJoin.ctxCancelReason.Store(err) e.cancelFunc() } } @@ -249,9 +248,6 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er return result.err } case <-ctx.Done(): - if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } req.SwapColumns(result.chk) @@ -281,9 +277,6 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu return result.err } case <-ctx.Done(): - if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } req.SwapColumns(result.chk) @@ -656,9 +649,6 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i select { case resultCh <- joinResult: case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) @@ -807,9 +797,6 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind select { case resultCh <- joinResult: case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() } joinResult, ok = iw.getNewJoinResult(ctx) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index aa155b0d4c610..d9b6c3a0db934 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -83,9 +83,8 @@ type IndexLookUpJoin struct { memTracker *memory.Tracker // track memory usage. - stats *indexLookUpJoinRuntimeStats - ctxCancelReason atomic.Value - finished *atomic.Value + stats *indexLookUpJoinRuntimeStats + finished *atomic.Value } type outerCtx struct { @@ -314,9 +313,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, select { case task = <-e.resultCh: case <-ctx.Done(): - if err := e.ctxCancelReason.Load(); err != nil { - return nil, err.(error) - } return nil, ctx.Err() } if task == nil { @@ -329,9 +325,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, err } case <-ctx.Done(): - if err := e.ctxCancelReason.Load(); err != nil { - return nil, err.(error) - } return nil, ctx.Err() } @@ -364,8 +357,6 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) { err := errors.Errorf("%v", r) task.doneCh <- err ow.pushToChan(ctx, task, ow.resultCh) - ow.lookup.ctxCancelReason.Store(err) - ow.lookup.cancelFunc() } close(ow.resultCh) close(ow.innerCh) @@ -485,8 +476,6 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) { err := errors.Errorf("%v", r) // "task != nil" is guaranteed when panic happened. task.doneCh <- err - iw.lookup.ctxCancelReason.Store(err) - iw.lookup.cancelFunc() } wg.Done() }() @@ -702,9 +691,6 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa for { select { case <-ctx.Done(): - if err := iw.lookup.ctxCancelReason.Load(); err != nil { - return err.(error) - } return ctx.Err() default: }