From b7c951beb5c989b3fb20aefba0ebfbd4a2eca835 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Wed, 9 Dec 2020 15:10:15 +0800 Subject: [PATCH 1/3] executor: open childExec during execution for UnionExec (#21561) --- executor/executor.go | 42 +++++++++++++++++++++++++++++++++++---- executor/executor_test.go | 30 ++++++++++++++++++++++++++++ store/tikv/snapshot.go | 12 ----------- 3 files changed, 68 insertions(+), 16 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 1d136bac8a2f9..06f4bbb7ac607 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1468,6 +1468,12 @@ type UnionExec struct { results []*chunk.Chunk wg sync.WaitGroup initialized bool + mu struct { + *sync.Mutex + maxOpenedChildID int + } + + childInFlightForTest int32 } // unionWorkerResult stores the result for a union worker. @@ -1487,12 +1493,11 @@ func (e *UnionExec) waitAllFinished() { // Open implements the Executor Open interface. func (e *UnionExec) Open(ctx context.Context) error { - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } e.stopFetchData.Store(false) e.initialized = false e.finished = make(chan struct{}) + e.mu.Mutex = &sync.Mutex{} + e.mu.maxOpenedChildID = -1 return nil } @@ -1538,6 +1543,19 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) { e.wg.Done() }() for childID := range e.childIDChan { + e.mu.Lock() + if childID > e.mu.maxOpenedChildID { + e.mu.maxOpenedChildID = childID + } + e.mu.Unlock() + if err := e.children[childID].Open(ctx); err != nil { + result.err = err + e.stopFetchData.Store(true) + e.resultPool <- result + } + failpoint.Inject("issue21441", func() { + atomic.AddInt32(&e.childInFlightForTest, 1) + }) for { if e.stopFetchData.Load().(bool) { return @@ -1552,12 +1570,20 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) { e.resourcePools[workerID] <- result.chk break } + failpoint.Inject("issue21441", func() { + if int(atomic.LoadInt32(&e.childInFlightForTest)) > e.concurrency { + panic("the count of child in flight is larger than e.concurrency unexpectedly") + } + }) e.resultPool <- result if result.err != nil { e.stopFetchData.Store(true) return } } + failpoint.Inject("issue21441", func() { + atomic.AddInt32(&e.childInFlightForTest, -1) + }) } } @@ -1600,7 +1626,15 @@ func (e *UnionExec) Close() error { for range e.childIDChan { } } - return e.baseExecutor.Close() + // We do not need to acquire the e.mu.Lock since all the resultPuller can be + // promised to exit when reaching here (e.childIDChan been closed). + var firstErr error + for i := 0; i <= e.mu.maxOpenedChildID; i++ { + if err := e.children[i].Close(); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr } // ResetContextOfStmt resets the StmtContext and session variables. diff --git a/executor/executor_test.go b/executor/executor_test.go index c50459cf2850b..cada9a9fcf6fa 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7783,6 +7783,36 @@ func (s *testSuite) TestOOMActionPriority(c *C) { c.Assert(action.GetPriority(), Equals, int64(memory.DefLogPriority)) } +func (s *testSerialSuite) TestIssue21441(c *C) { + failpoint.Enable("github.com/pingcap/tidb/executor/issue21441", `return`) + defer failpoint.Disable("github.com/pingcap/tidb/executor/issue21441") + + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec(`insert into t values(1),(2),(3)`) + tk.Se.GetSessionVars().InitChunkSize = 1 + tk.Se.GetSessionVars().MaxChunkSize = 1 + sql := ` +select a from t union all +select a from t union all +select a from t union all +select a from t union all +select a from t union all +select a from t union all +select a from t union all +select a from t` + tk.MustQuery(sql).Sort().Check(testkit.Rows( + "1", "1", "1", "1", "1", "1", "1", "1", + "2", "2", "2", "2", "2", "2", "2", "2", + "3", "3", "3", "3", "3", "3", "3", "3", + )) + + tk.MustQuery("select a from (" + sql + ") t order by a limit 4").Check(testkit.Rows("1", "1", "1", "1")) + tk.MustQuery("select a from (" + sql + ") t order by a limit 7, 4").Check(testkit.Rows("1", "2", "2", "2")) +} + func (s *testSuite) Test17780(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 743111827e85d..4fa098bfc3ba1 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -639,18 +639,6 @@ func (s *KVSnapshot) SetResourceGroupTag(tag []byte) { s.resourceGroupTag = tag } -// SnapCacheHitCount gets the snapshot cache hit count. Only for test. -func (s *KVSnapshot) SnapCacheHitCount() int { - return int(atomic.LoadInt64(&s.mu.hitCnt)) -} - -// SnapCacheSize gets the snapshot cache size. Only for test. -func (s *KVSnapshot) SnapCacheSize() int { - s.mu.RLock() - defer s.mu.RLock() - return len(s.mu.cached) -} - func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { if locked := keyErr.GetLocked(); locked != nil { return NewLock(locked), nil From d956f3c4677c38415fa5b1ac5c9664a4f2006403 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 26 May 2021 10:30:01 +0800 Subject: [PATCH 2/3] add some test case --- executor/executor_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/executor/executor_test.go b/executor/executor_test.go index cada9a9fcf6fa..214b0df8fbaa1 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7811,6 +7811,11 @@ select a from t` tk.MustQuery("select a from (" + sql + ") t order by a limit 4").Check(testkit.Rows("1", "1", "1", "1")) tk.MustQuery("select a from (" + sql + ") t order by a limit 7, 4").Check(testkit.Rows("1", "2", "2", "2")) + + tk.MustExec("set @@tidb_executor_concurrency = 2") + c.Assert(tk.Se.GetSessionVars().UnionConcurrency(), Equals, 2) + tk.MustQuery("select a from (" + sql + ") t order by a limit 4").Check(testkit.Rows("1", "1", "1", "1")) + tk.MustQuery("select a from (" + sql + ") t order by a limit 7, 4").Check(testkit.Rows("1", "2", "2", "2")) } func (s *testSuite) Test17780(c *C) { From cc7ac971cc49246d99ffbc4172247785b670e3ba Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 26 May 2021 10:32:15 +0800 Subject: [PATCH 3/3] remove useless code --- store/tikv/snapshot.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 4fa098bfc3ba1..743111827e85d 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -639,6 +639,18 @@ func (s *KVSnapshot) SetResourceGroupTag(tag []byte) { s.resourceGroupTag = tag } +// SnapCacheHitCount gets the snapshot cache hit count. Only for test. +func (s *KVSnapshot) SnapCacheHitCount() int { + return int(atomic.LoadInt64(&s.mu.hitCnt)) +} + +// SnapCacheSize gets the snapshot cache size. Only for test. +func (s *KVSnapshot) SnapCacheSize() int { + s.mu.RLock() + defer s.mu.RLock() + return len(s.mu.cached) +} + func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { if locked := keyErr.GetLocked(); locked != nil { return NewLock(locked), nil