Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#57294
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
windtalker authored and ti-chi-bot committed Jan 24, 2025
1 parent 5e39597 commit 618baca
Show file tree
Hide file tree
Showing 3 changed files with 851 additions and 28 deletions.
81 changes: 53 additions & 28 deletions executor/cte.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,16 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
defer e.producer.resTbl.Unlock()

if e.producer.checkAndUpdateCorColHashCode() {
e.producer.reset()
if err = e.producer.reopenTbls(); err != nil {
err = e.producer.reset()
if err != nil {
return err
}
}
if e.producer.openErr != nil {
return e.producer.openErr
}
if !e.producer.opened {
if err = e.producer.openProducer(ctx, e); err != nil {
if !e.producer.hasCTEResult() && !e.producer.executorOpened {
if err = e.producer.openProducerExecutor(ctx, e); err != nil {
return err
}
}
Expand All @@ -106,8 +106,14 @@ func (e *CTEExec) Open(ctx context.Context) (err error) {
func (e *CTEExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.resTbl.Done() {
if err = e.producer.produce(ctx); err != nil {
if !e.producer.hasCTEResult() {
// in case that another CTEExec call close without generate CTE result.
if !e.producer.executorOpened {
if err = e.producer.openProducerExecutor(ctx, e); err != nil {
return err
}
}
if err = e.producer.genCTEResult(ctx); err != nil {
return err
}
}
Expand All @@ -129,20 +135,25 @@ func (e *CTEExec) Close() (firstErr error) {
func() {
e.producer.resTbl.Lock()
defer e.producer.resTbl.Unlock()
if !e.producer.closed {
if e.producer.executorOpened {
failpoint.Inject("mock_cte_exec_panic_avoid_deadlock", func(v failpoint.Value) {
ok := v.(bool)
if ok {
// mock an oom panic, returning ErrMemoryExceedForQuery for error identification in recovery work.
panic(memory.PanicMemoryExceedWarnMsg)
}
})
// closeProducer() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducer().
// You can even call all three functions(openProducer/produce/closeProducer) in CTEExec.Next().
// closeProducerExecutor() only close seedExec and recursiveExec, will not touch resTbl.
// It means you can still read resTbl after call closeProducerExecutor().
// You can even call all three functions(openProducerExecutor/genCTEResult/closeProducerExecutor) in CTEExec.Next().
// Separating these three function calls is only to follow the abstraction of the volcano model.
err := e.producer.closeProducer()
err := e.producer.closeProducerExecutor()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
if !e.producer.hasCTEResult() {
// CTE result is not generated, in this case, we reset it
err = e.producer.reset()
firstErr = setFirstErr(firstErr, err, "close cte producer error")
}
}
}()
err := e.baseExecutor.Close()
Expand All @@ -157,10 +168,10 @@ func (e *CTEExec) reset() {
}

type cteProducer struct {
// opened should be false when not open or open fail(a.k.a. openErr != nil)
opened bool
produced bool
closed bool
// executorOpened is used to indicate whether the executor(seedExec/recursiveExec) is opened.
// when executorOpened is true, the executor is opened, otherwise it means the executor is
// not opened or is already closed.
executorOpened bool

// cteProducer is shared by multiple operators, so if the first operator tries to open
// and got error, the second should return open error directly instead of open again.
Expand Down Expand Up @@ -199,14 +210,10 @@ type cteProducer struct {
corColHashCodes [][]byte
}

func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err error) {
func (p *cteProducer) openProducerExecutor(ctx context.Context, cteExec *CTEExec) (err error) {
defer func() {
p.openErr = err
if err == nil {
p.opened = true
} else {
p.opened = false
}
p.executorOpened = true
}()
if p.seedExec == nil {
return errors.New("seedExec for CTEExec is nil")
Expand Down Expand Up @@ -249,8 +256,13 @@ func (p *cteProducer) openProducer(ctx context.Context, cteExec *CTEExec) (err e
return nil
}

<<<<<<< HEAD:executor/cte.go
func (p *cteProducer) closeProducer() (firstErr error) {
err := p.seedExec.Close()
=======
func (p *cteProducer) closeProducerExecutor() (firstErr error) {
err := exec.Close(p.seedExec)
>>>>>>> 738adb9934c (executor: fix random cte error under apply (#57294)):pkg/executor/cte.go
firstErr = setFirstErr(firstErr, err, "close seedExec err")
if p.recursiveExec != nil {
err = p.recursiveExec.Close()
Expand All @@ -267,7 +279,7 @@ func (p *cteProducer) closeProducer() (firstErr error) {
// because ExplainExec still needs tracker to get mem usage info.
p.memTracker = nil
p.diskTracker = nil
p.closed = true
p.executorOpened = false
return
}

Expand Down Expand Up @@ -334,7 +346,13 @@ func (p *cteProducer) nextChunkLimit(cteExec *CTEExec, req *chunk.Chunk) error {
return nil
}

func (p *cteProducer) produce(ctx context.Context) (err error) {
func (p *cteProducer) hasCTEResult() bool {
return p.resTbl.Done()
}

// genCTEResult generates the result of CTE, and stores the result in resTbl.
// This is a synchronous function, which means it will block until the result is generated.
func (p *cteProducer) genCTEResult(ctx context.Context) (err error) {
if p.resTbl.Error() != nil {
return p.resTbl.Error()
}
Expand Down Expand Up @@ -527,14 +545,18 @@ func (p *cteProducer) setupTblsForNewIteration() (err error) {
return nil
}

func (p *cteProducer) reset() {
func (p *cteProducer) reset() error {
p.curIter = 0
p.hashTbl = nil

p.opened = false
p.executorOpened = false
p.openErr = nil
p.produced = false
p.closed = false

// Normally we need to setup tracker after calling Reopen(),
// But reopen resTbl means we need to call genCTEResult() again, it will setup tracker.
if err := p.resTbl.Reopen(); err != nil {
return err
}
return p.iterInTbl.Reopen()
}

func (p *cteProducer) resetTracker() {
Expand All @@ -548,6 +570,7 @@ func (p *cteProducer) resetTracker() {
}
}

<<<<<<< HEAD:executor/cte.go
func (p *cteProducer) reopenTbls() (err error) {
if p.isDistinct {
p.hashTbl = newConcurrentMapHashTable()
Expand All @@ -560,6 +583,8 @@ func (p *cteProducer) reopenTbls() (err error) {
return p.iterInTbl.Reopen()
}

=======
>>>>>>> 738adb9934c (executor: fix random cte error under apply (#57294)):pkg/executor/cte.go
// Check if tbl meets the requirement of limit.
func (p *cteProducer) limitDone(tbl cteutil.Storage) bool {
return p.hasLimit && uint64(tbl.NumRows()) >= p.limitEnd
Expand Down
32 changes: 32 additions & 0 deletions pkg/executor/test/issuetest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "issuetest_test",
timeout = "short",
srcs = [
"executor_issue_test.go",
"main_test.go",
],
flaky = True,
shard_count = 24,
deps = [
"//pkg/autoid_service",
"//pkg/config",
"//pkg/executor",
"//pkg/executor/join",
"//pkg/kv",
"//pkg/meta/autoid",
"//pkg/parser/auth",
"//pkg/parser/charset",
"//pkg/parser/mysql",
"//pkg/session/types",
"//pkg/testkit",
"//pkg/util",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/memory",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
],
)
Loading

0 comments on commit 618baca

Please sign in to comment.