From 795615b3c190adf56bc8c60d1e8152d60ad00d51 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Thu, 16 Dec 2021 13:04:35 +0800 Subject: [PATCH 1/3] cherry pick #30696 to release-5.1 Signed-off-by: ti-srebot --- executor/builder.go | 17 +++++ executor/cte.go | 6 +- executor/distsql.go | 6 +- executor/executor_test.go | 150 ++++++++++++++++++++++++++++++++++++++ executor/join.go | 7 +- executor/merge_join.go | 5 ++ 6 files changed, 186 insertions(+), 5 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 3a315862d74f3..9bea24543fcc2 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -117,6 +117,23 @@ type MockPhysicalPlan interface { GetExecutor() Executor } +// MockExecutorBuilder is a wrapper for executorBuilder. +// ONLY used in test. +type MockExecutorBuilder struct { + *executorBuilder +} + +// NewMockExecutorBuilderForTest is ONLY used in test. +func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *MockExecutorBuilder { + return &MockExecutorBuilder{ + executorBuilder: newExecutorBuilder(ctx, is, ti, snapshotTS, isStaleness, replicaReadScope)} +} + +// Build builds an executor tree according to `p`. +func (b *MockExecutorBuilder) Build(p plannercore.Plan) Executor { + return b.build(p) +} + func (b *executorBuilder) build(p plannercore.Plan) Executor { switch v := p.(type) { case nil: diff --git a/executor/cte.go b/executor/cte.go index 4fe3414b97154..a432fb15fe6ba 100644 --- a/executor/cte.go +++ b/executor/cte.go @@ -203,8 +203,10 @@ func (e *CTEExec) Close() (err error) { } // `iterInTbl` and `resTbl` are shared by multiple operators, // so will be closed when the SQL finishes. - if err = e.iterOutTbl.DerefAndClose(); err != nil { - return err + if e.iterOutTbl != nil { + if err = e.iterOutTbl.DerefAndClose(); err != nil { + return err + } } } diff --git a/executor/distsql.go b/executor/distsql.go index b6be537114d15..f3002d508688c 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -196,12 +196,14 @@ type IndexReaderExecutor struct { } // Close clears all resources hold by current object. -func (e *IndexReaderExecutor) Close() error { +func (e *IndexReaderExecutor) Close() (err error) { if e.table != nil && e.table.Meta().TempTableType != model.TempTableNone { return nil } - err := e.result.Close() + if e.result != nil { + err = e.result.Close() + } e.result = nil e.ctx.StoreQueryFeedback(e.feedback) return err diff --git a/executor/executor_test.go b/executor/executor_test.go index f33554fec287f..9e25f0f7ddbbd 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -21,6 +21,12 @@ import ( "math/rand" "net" "os" +<<<<<<< HEAD +======= + "path/filepath" + "reflect" + "runtime" +>>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) "strconv" "strings" "sync" @@ -8813,3 +8819,147 @@ func (s *testSerialSuite) TestIssue28650(c *C) { }() } } +<<<<<<< HEAD +======= + +func (s *testSerialSuite) TestIssue30289(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + fpName := "github.com/pingcap/tidb/executor/issue30289" + c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") + c.Assert(err.Error(), Matches, "issue30289 build return error") +} + +// Test invoke Close without invoking Open before for each operators. +func (s *testSerialSuite) TestUnreasonablyClose(c *C) { + defer testleak.AfterTest(c)() + + is := infoschema.MockInfoSchema([]*model.TableInfo{plannercore.MockSignedTable(), plannercore.MockUnsignedTable()}) + se, err := session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "use test") + c.Assert(err, IsNil) + // To enable the shuffleExec operator. + _, err = se.Execute(context.Background(), "set @@tidb_merge_join_concurrency=4") + c.Assert(err, IsNil) + + var opsNeedsCovered = []plannercore.PhysicalPlan{ + &plannercore.PhysicalHashJoin{}, + &plannercore.PhysicalMergeJoin{}, + &plannercore.PhysicalIndexJoin{}, + &plannercore.PhysicalIndexHashJoin{}, + &plannercore.PhysicalTableReader{}, + &plannercore.PhysicalIndexReader{}, + &plannercore.PhysicalIndexLookUpReader{}, + &plannercore.PhysicalIndexMergeReader{}, + &plannercore.PhysicalApply{}, + &plannercore.PhysicalHashAgg{}, + &plannercore.PhysicalStreamAgg{}, + &plannercore.PhysicalLimit{}, + &plannercore.PhysicalSort{}, + &plannercore.PhysicalTopN{}, + &plannercore.PhysicalCTE{}, + &plannercore.PhysicalCTETable{}, + &plannercore.PhysicalMaxOneRow{}, + &plannercore.PhysicalProjection{}, + &plannercore.PhysicalSelection{}, + &plannercore.PhysicalTableDual{}, + &plannercore.PhysicalWindow{}, + &plannercore.PhysicalShuffle{}, + &plannercore.PhysicalUnionAll{}, + } + executorBuilder := executor.NewMockExecutorBuilderForTest(se, is, nil, math.MaxUint64, false, "global") + + var opsNeedsCoveredMask uint64 = 1< t1.a) AS a from t as t1) t", + "select /*+ hash_agg() */ count(f) from t group by a", + "select /*+ stream_agg() */ count(f) from t group by a", + "select * from t order by a, f", + "select * from t order by a, f limit 1", + "select * from t limit 1", + "select (select t1.a from t t1 where t1.a > t2.a) as a from t t2;", + "select a + 1 from t", + "select count(*) a from t having a > 1", + "select * from t where a = 1.1", + "with recursive cte1(c1) as (select 1 union select c1 + 1 from cte1 limit 5 offset 0) select * from cte1", + "select /*+use_index_merge(t, c_d_e, f)*/ * from t where c < 1 or f > 2", + "select sum(f) over (partition by f) from t", + "select /*+ merge_join(t1)*/ * from t t1 join t t2 on t1.d = t2.d", + "select a from t union all select a from t", + } { + comment := Commentf("case:%v sql:%s", i, tc) + c.Assert(err, IsNil, comment) + stmt, err := s.ParseOneStmt(tc, "", "") + c.Assert(err, IsNil, comment) + + err = se.NewTxn(context.Background()) + c.Assert(err, IsNil, comment) + p, _, err := planner.Optimize(context.TODO(), se, stmt, is) + c.Assert(err, IsNil, comment) + // This for loop level traverses the plan tree to get which operators are covered. + for child := []plannercore.PhysicalPlan{p.(plannercore.PhysicalPlan)}; len(child) != 0; { + newChild := make([]plannercore.PhysicalPlan, 0, len(child)) + for _, ch := range child { + found := false + for k, t := range opsNeedsCovered { + if reflect.TypeOf(t) == reflect.TypeOf(ch) { + opsAlreadyCoveredMask |= 1 << k + found = true + break + } + } + c.Assert(found, IsTrue, Commentf("case: %v sql: %s operator %v is not registered in opsNeedsCoveredMask", i, tc, reflect.TypeOf(ch))) + switch x := ch.(type) { + case *plannercore.PhysicalCTE: + newChild = append(newChild, x.RecurPlan) + newChild = append(newChild, x.SeedPlan) + continue + case *plannercore.PhysicalShuffle: + newChild = append(newChild, x.DataSources...) + newChild = append(newChild, x.Tails...) + continue + } + newChild = append(newChild, ch.Children()...) + } + child = newChild + } + + e := executorBuilder.Build(p) + + func() { + defer func() { + r := recover() + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + c.Assert(r, IsNil, Commentf("case: %v\n sql: %s\n error stack: %v", i, tc, string(buf))) + }() + c.Assert(e.Close(), IsNil, comment) + }() + } + // The following code is used to make sure all the operators registered + // in opsNeedsCoveredMask are covered. + commentBuf := strings.Builder{} + if opsAlreadyCoveredMask != opsNeedsCoveredMask { + for i := range opsNeedsCovered { + if opsAlreadyCoveredMask&(1<>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) diff --git a/executor/join.go b/executor/join.go index 1a3f62de47ac1..78b10ceff43ad 100644 --- a/executor/join.go +++ b/executor/join.go @@ -114,7 +114,9 @@ type hashjoinWorkerResult struct { // Close implements the Executor Close interface. func (e *HashJoinExec) Close() error { - close(e.closeCh) + if e.closeCh != nil { + close(e.closeCh) + } e.finished.Store(true) if e.prepared { if e.buildFinished != nil { @@ -157,7 +159,10 @@ func (e *HashJoinExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } +<<<<<<< HEAD +======= +>>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) e.prepared = false e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) diff --git a/executor/merge_join.go b/executor/merge_join.go index d6374910a53b9..57e02b6e7a753 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -52,6 +52,7 @@ type MergeJoinExec struct { } type mergeJoinTable struct { + inited bool isInner bool childIndex int joinKeys []*expression.Column @@ -107,10 +108,14 @@ func (t *mergeJoinTable) init(exec *MergeJoinExec) { } t.memTracker.AttachTo(exec.memTracker) + t.inited = true t.memTracker.Consume(t.childChunk.MemoryUsage()) } func (t *mergeJoinTable) finish() error { + if !t.inited { + return nil + } t.memTracker.Consume(-t.childChunk.MemoryUsage()) if t.isInner { From 34212b6041c8ebe366ddf79dcd23b7080b4d8e04 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Thu, 16 Dec 2021 17:10:41 +0800 Subject: [PATCH 2/3] resolve conflicts --- executor/executor_test.go | 21 --------------------- executor/join.go | 3 --- 2 files changed, 24 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 9e25f0f7ddbbd..cabf7066d9fd7 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -21,12 +21,8 @@ import ( "math/rand" "net" "os" -<<<<<<< HEAD -======= - "path/filepath" "reflect" "runtime" ->>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) "strconv" "strings" "sync" @@ -8819,22 +8815,6 @@ func (s *testSerialSuite) TestIssue28650(c *C) { }() } } -<<<<<<< HEAD -======= - -func (s *testSerialSuite) TestIssue30289(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - fpName := "github.com/pingcap/tidb/executor/issue30289" - c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) - defer func() { - c.Assert(failpoint.Disable(fpName), IsNil) - }() - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int)") - err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") - c.Assert(err.Error(), Matches, "issue30289 build return error") -} // Test invoke Close without invoking Open before for each operators. func (s *testSerialSuite) TestUnreasonablyClose(c *C) { @@ -8962,4 +8942,3 @@ func (s *testSerialSuite) TestUnreasonablyClose(c *C) { } c.Assert(opsAlreadyCoveredMask, Equals, opsNeedsCoveredMask, Commentf("these operators are not covered %s", commentBuf.String())) } ->>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) diff --git a/executor/join.go b/executor/join.go index 78b10ceff43ad..77170202f8668 100644 --- a/executor/join.go +++ b/executor/join.go @@ -159,10 +159,7 @@ func (e *HashJoinExec) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } -<<<<<<< HEAD -======= ->>>>>>> 8cf847a57... executor: add an unit test case for unreasonable invoking Close (#30696) e.prepared = false e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) From 22061c6e1e02c9dda11fef00f1a94182cde9ddde Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 19 Sep 2022 15:47:05 +0800 Subject: [PATCH 3/3] fix ci --- executor/shuffle.go | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/executor/shuffle.go b/executor/shuffle.go index 9ad4ff522e4cd..cf2a3a225cd28 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -141,17 +141,29 @@ func (e *ShuffleExec) Close() error { if !e.prepared { for _, w := range e.workers { for _, r := range w.receivers { - close(r.inputHolderCh) - close(r.inputCh) + if r.inputHolderCh != nil { + close(r.inputHolderCh) + } + if r.inputCh != nil { + close(r.inputCh) + } } - close(w.outputHolderCh) + if w.outputHolderCh != nil { + close(w.outputHolderCh) + } + } + if e.outputCh != nil { + close(e.outputCh) } - close(e.outputCh) } - close(e.finishCh) + if e.finishCh != nil { + close(e.finishCh) + } for _, w := range e.workers { for _, r := range w.receivers { - for range r.inputCh { + if r.inputCh != nil { + for range r.inputCh { + } } } // close child executor of each worker @@ -159,7 +171,9 @@ func (e *ShuffleExec) Close() error { firstErr = err } } - for range e.outputCh { // workers exit before `e.outputCh` is closed. + if e.outputCh != nil { + for range e.outputCh { // workers exit before `e.outputCh` is closed. + } } e.executed = false