Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix random cte error under apply (#57294) #59183

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 46 additions & 41 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@
defer e.producer.resTbl.Unlock()

if e.producer.checkAndUpdateCorColHashCode() {
e.producer.reset()
if err = e.producer.reopenTbls(); err != nil {
err = e.producer.reset()
if err != nil {
return err
}
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
if !e.producer.hasCTEResult() && !e.producer.executorOpened {
if err = e.producer.openProducerExecutor(ctx, e); err != nil {
return err
}
}
Expand All @@ -107,8 +107,14 @@
func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.resTbl.Done() {
if err = e.producer.produce(ctx); err != nil {
if !e.producer.hasCTEResult() {
// in case that another CTEExec call close without generate CTE result.
if !e.producer.executorOpened {
if err = e.producer.openProducerExecutor(ctx, e); err != nil {
return err
}

Check warning on line 115 in executor/cte.go

View check run for this annotation

Codecov / codecov/patch

executor/cte.go#L113-L115

Added lines #L113 - L115 were not covered by tests
}
if err = e.producer.genCTEResult(ctx); err != nil {
return err
}
}
Expand All @@ -130,20 +136,25 @@
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.closed {
if e.producer.executorOpened {
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
ok := v.(bool)
if ok {
// mock an oom panic, returning ErrMemoryExceedForQuery for error identification in recovery work.
panic(memory.PanicMemoryExceedWarnMsg)
}
})
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// closeProducerExecutor() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducerExecutor().
// You can even call all three functions(openProducerExecutor/genCTEResult/closeProducerExecutor) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err := e.producer.closeProducer()
err := e.producer.closeProducerExecutor()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
if !e.producer.hasCTEResult() {
// CTE result is not generated, in this case, we reset it
err = e.producer.reset()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}
}()
err := e.baseExecutor.Close()
Expand All @@ -158,10 +169,10 @@
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool
// executorOpened is used to indicate whether the executor(seedExec/recursiveExec) is opened.
// when executorOpened is true, the executor is opened, otherwise it means the executor is
// not opened or is already closed.
executorOpened bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
Expand Down Expand Up @@ -200,14 +211,10 @@
corColHashCodes [][]byte
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
func (p *cteProducer) openProducerExecutor(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
p.executorOpened = true
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
Expand Down Expand Up @@ -250,7 +257,7 @@
return nil
}

func (p *cteProducer) closeProducer() (firstErr error) {
func (p *cteProducer) closeProducerExecutor() (firstErr error) {
err := p.seedExec.Close()
firstErr = setFirstErr(firstErr, err, "close seedExec err")
if p.recursiveExec != nil {
Expand All @@ -268,7 +275,7 @@
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
p.executorOpened = false
return
}

Expand Down Expand Up @@ -335,7 +342,13 @@
return nil
}

func (p *cteProducer) produce(ctx context.Context) (err error) {
func (p *cteProducer) hasCTEResult() bool {
return p.resTbl.Done()
}

// genCTEResult generates the result of CTE, and stores the result in resTbl.
// This is a synchronous function, which means it will block until the result is generated.
func (p *cteProducer) genCTEResult(ctx context.Context) (err error) {
if p.resTbl.Error() != nil {
return p.resTbl.Error()
}
Expand Down Expand Up @@ -528,14 +541,18 @@
return nil
}

func (p *cteProducer) reset() {
func (p *cteProducer) reset() error {
p.curIter = 0
p.hashTbl = nil

p.opened = false
p.executorOpened = false
p.openErr = nil
p.produced = false
p.closed = false

// Normally we need to setup tracker after calling Reopen(),
// But reopen resTbl means we need to call genCTEResult() again, it will setup tracker.
if err := p.resTbl.Reopen(); err != nil {
return err
}

Check warning on line 554 in executor/cte.go

View check run for this annotation

Codecov / codecov/patch

executor/cte.go#L553-L554

Added lines #L553 - L554 were not covered by tests
return p.iterInTbl.Reopen()
}

func (p *cteProducer) resetTracker() {
Expand All @@ -549,18 +566,6 @@
}
}

func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
}
// Normally we need to setup tracker after calling Reopen(),
// But reopen resTbl means we need to call produce() again, it will setup tracker.
if err := p.resTbl.Reopen(); err != nil {
return err
}
return p.iterInTbl.Reopen()
}

// Check if tbl meets the requirement of limit.
func (p *cteProducer) limitDone(tbl cteutil.Storage) bool {
return p.hasLimit && uint64(tbl.NumRows()) >= p.limitEnd
Expand Down
19 changes: 19 additions & 0 deletions executor/issuetest/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,3 +1494,22 @@ func TestIssue49902(t *testing.T) {
tk.MustQuery("SELECT count(`t`.`c`) FROM (`s`) JOIN `t` GROUP BY `t`.`c`;").Check(testkit.Rows("170"))
tk.MustExec("set @@tidb_max_chunk_size = default;")
}

func TestIssue55881(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists aaa;")
tk.MustExec("drop table if exists bbb;")
tk.MustExec("create table aaa(id int, value int);")
tk.MustExec("create table bbb(id int, value int);")
tk.MustExec("insert into aaa values(1,2),(2,3)")
tk.MustExec("insert into bbb values(1,2),(2,3),(3,4)")
// set tidb_executor_concurrency to 1 to let the issue happens with high probability.
tk.MustExec("set tidb_executor_concurrency=1;")
// this is a random issue, so run it 100 times to increase the probability of the issue.
for i := 0; i < 100; i++ {
tk.MustQuery("with cte as (select * from aaa) select id, (select id from (select * from aaa where aaa.id != bbb.id union all select * from cte union all select * from cte) d limit 1)," +
"(select max(value) from (select * from cte union all select * from cte union all select * from aaa where aaa.id > bbb.id)) from bbb;")
}
}