Skip to content

Commit

Permalink
executor: open childExec during execution for UnionExec (#24899) (#25334
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ti-srebot authored Jun 11, 2021
1 parent eaacb0e commit 25dc28e
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 4 deletions.
42 changes: 38 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1491,6 +1491,12 @@ type UnionExec struct {
results []*chunk.Chunk
wg sync.WaitGroup
initialized bool
mu struct {
*sync.Mutex
maxOpenedChildID int
}

childInFlightForTest int32
}

// unionWorkerResult stores the result for a union worker.
Expand All @@ -1510,12 +1516,11 @@ func (e *UnionExec) waitAllFinished() {

// Open implements the Executor Open interface.
func (e *UnionExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
e.stopFetchData.Store(false)
e.initialized = false
e.finished = make(chan struct{})
e.mu.Mutex = &sync.Mutex{}
e.mu.maxOpenedChildID = -1
return nil
}

Expand Down Expand Up @@ -1561,6 +1566,19 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) {
e.wg.Done()
}()
for childID := range e.childIDChan {
e.mu.Lock()
if childID > e.mu.maxOpenedChildID {
e.mu.maxOpenedChildID = childID
}
e.mu.Unlock()
if err := e.children[childID].Open(ctx); err != nil {
result.err = err
e.stopFetchData.Store(true)
e.resultPool <- result
}
failpoint.Inject("issue21441", func() {
atomic.AddInt32(&e.childInFlightForTest, 1)
})
for {
if e.stopFetchData.Load().(bool) {
return
Expand All @@ -1575,12 +1593,20 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) {
e.resourcePools[workerID] <- result.chk
break
}
failpoint.Inject("issue21441", func() {
if int(atomic.LoadInt32(&e.childInFlightForTest)) > e.concurrency {
panic("the count of child in flight is larger than e.concurrency unexpectedly")
}
})
e.resultPool <- result
if result.err != nil {
e.stopFetchData.Store(true)
return
}
}
failpoint.Inject("issue21441", func() {
atomic.AddInt32(&e.childInFlightForTest, -1)
})
}
}

Expand Down Expand Up @@ -1623,7 +1649,15 @@ func (e *UnionExec) Close() error {
for range e.childIDChan {
}
}
return e.baseExecutor.Close()
// We do not need to acquire the e.mu.Lock since all the resultPuller can be
// promised to exit when reaching here (e.childIDChan been closed).
var firstErr error
for i := 0; i <= e.mu.maxOpenedChildID; i++ {
if err := e.children[i].Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}

// ResetContextOfStmt resets the StmtContext and session variables.
Expand Down
35 changes: 35 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7777,6 +7777,41 @@ func (s *testSuite) TestOOMActionPriority(c *C) {
c.Assert(action.GetPriority(), Equals, int64(memory.DefLogPriority))
}

func (s *testSerialSuite) TestIssue21441(c *C) {
failpoint.Enable("github.com/pingcap/tidb/executor/issue21441", `return`)
defer failpoint.Disable("github.com/pingcap/tidb/executor/issue21441")

tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
tk.MustExec(`insert into t values(1),(2),(3)`)
tk.Se.GetSessionVars().InitChunkSize = 1
tk.Se.GetSessionVars().MaxChunkSize = 1
sql := `
select a from t union all
select a from t union all
select a from t union all
select a from t union all
select a from t union all
select a from t union all
select a from t union all
select a from t`
tk.MustQuery(sql).Sort().Check(testkit.Rows(
"1", "1", "1", "1", "1", "1", "1", "1",
"2", "2", "2", "2", "2", "2", "2", "2",
"3", "3", "3", "3", "3", "3", "3", "3",
))

tk.MustQuery("select a from (" + sql + ") t order by a limit 4").Check(testkit.Rows("1", "1", "1", "1"))
tk.MustQuery("select a from (" + sql + ") t order by a limit 7, 4").Check(testkit.Rows("1", "2", "2", "2"))

tk.MustExec("set @@tidb_executor_concurrency = 2")
c.Assert(tk.Se.GetSessionVars().UnionConcurrency(), Equals, 2)
tk.MustQuery("select a from (" + sql + ") t order by a limit 4").Check(testkit.Rows("1", "1", "1", "1"))
tk.MustQuery("select a from (" + sql + ") t order by a limit 7, 4").Check(testkit.Rows("1", "2", "2", "2"))
}

func (s *testSuite) Test17780(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down

0 comments on commit 25dc28e

Please sign in to comment.