Skip to content

Commit

Permalink
Merge branch 'release-5.3' into release-5.3-c68791566d55
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Feb 22, 2022
2 parents 345e8cd + 5d94fdd commit 88a430f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 29 deletions.
39 changes: 39 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9327,6 +9327,45 @@ func (s *testSuiteP1) TestIssue29412(c *C) {
tk.MustQuery("select sum(distinct a) as x from t29142_1 having x > some ( select a from t29142_2 where x in (a));").Check(nil)
}

func (s *testSerialSuite) TestIndexJoin31494(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(a int(11) default null, b int(11) default null, key(b));")
insertStr := "insert into t1 values(1, 1)"
for i := 1; i < 32768; i++ {
insertStr += fmt.Sprintf(", (%d, %d)", i, i)
}
tk.MustExec(insertStr)
tk.MustExec("create table t2(a int(11) default null, b int(11) default null, c int(11) default null)")
insertStr = "insert into t2 values(1, 1, 1)"
for i := 1; i < 32768; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
sm := &mockSessionManager1{
PS: make([]*util.ProcessInfo, 0),
}
tk.Se.SetSessionManager(sm)
s.domain.ExpensiveQueryHandle().SetSessionManager(sm)
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMAction = config.OOMActionCancel
})
c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue)
tk.MustExec("set @@tidb_mem_quota_query=2097152;")
// This bug will be reproduced in 10 times.
for i := 0; i < 10; i++ {
err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 right join t2 on t1.b=t2.b;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t2 on t1.b=t2.b;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
}
}

func (s *testSerialSuite) TestIssue28650(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
13 changes: 0 additions & 13 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
e.taskCh <- task
}
if e.cancelFunc != nil {
e.IndexLookUpJoin.ctxCancelReason.Store(err)
e.cancelFunc()
}
}
Expand Down Expand Up @@ -249,9 +248,6 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
return result.err
}
case <-ctx.Done():
if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
}
req.SwapColumns(result.chk)
Expand Down Expand Up @@ -281,9 +277,6 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu
return result.err
}
case <-ctx.Done():
if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
}
req.SwapColumns(result.chk)
Expand Down Expand Up @@ -656,9 +649,6 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i
select {
case resultCh <- joinResult:
case <-ctx.Done():
if err := iw.lookup.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
}
joinResult, ok = iw.getNewJoinResult(ctx)
Expand Down Expand Up @@ -807,9 +797,6 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind
select {
case resultCh <- joinResult:
case <-ctx.Done():
if err := iw.lookup.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
}
joinResult, ok = iw.getNewJoinResult(ctx)
Expand Down
18 changes: 2 additions & 16 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,8 @@ type IndexLookUpJoin struct {

memTracker *memory.Tracker // track memory usage.

stats *indexLookUpJoinRuntimeStats
ctxCancelReason atomic.Value
finished *atomic.Value
stats *indexLookUpJoinRuntimeStats
finished *atomic.Value
}

type outerCtx struct {
Expand Down Expand Up @@ -314,9 +313,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
select {
case task = <-e.resultCh:
case <-ctx.Done():
if err := e.ctxCancelReason.Load(); err != nil {
return nil, err.(error)
}
return nil, ctx.Err()
}
if task == nil {
Expand All @@ -329,9 +325,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
return nil, err
}
case <-ctx.Done():
if err := e.ctxCancelReason.Load(); err != nil {
return nil, err.(error)
}
return nil, ctx.Err()
}

Expand Down Expand Up @@ -364,8 +357,6 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
err := errors.Errorf("%v", r)
task.doneCh <- err
ow.pushToChan(ctx, task, ow.resultCh)
ow.lookup.ctxCancelReason.Store(err)
ow.lookup.cancelFunc()
}
close(ow.resultCh)
close(ow.innerCh)
Expand Down Expand Up @@ -485,8 +476,6 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
err := errors.Errorf("%v", r)
// "task != nil" is guaranteed when panic happened.
task.doneCh <- err
iw.lookup.ctxCancelReason.Store(err)
iw.lookup.cancelFunc()
}
wg.Done()
}()
Expand Down Expand Up @@ -702,9 +691,6 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
for {
select {
case <-ctx.Done():
if err := iw.lookup.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
default:
}
Expand Down

0 comments on commit 88a430f

Please sign in to comment.