From 9dab0b7d0dbec13af08c001ed8112bbf0adb3873 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 8 Dec 2020 17:21:25 +0800 Subject: [PATCH 01/10] executor: open childExec during execution for UnionExec --- executor/executor.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 706348f80ecb5..002fb536a1192 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1439,6 +1439,10 @@ type UnionExec struct { results []*chunk.Chunk wg sync.WaitGroup initialized bool + mu struct { + *sync.Mutex + maxOpenedChildID int + } } // unionWorkerResult stores the result for a union worker. @@ -1458,9 +1462,6 @@ func (e *UnionExec) waitAllFinished() { // Open implements the Executor Open interface. func (e *UnionExec) Open(ctx context.Context) error { - if err := e.baseExecutor.Open(ctx); err != nil { - return err - } e.stopFetchData.Store(false) e.initialized = false e.finished = make(chan struct{}) @@ -1509,6 +1510,16 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) { e.wg.Done() }() for childID := range e.childIDChan { + if err := e.children[childID].Open(ctx); err != nil { + result.err = err + e.stopFetchData.Store(true) + e.resultPool <- result + } + e.mu.Lock() + if childID > e.mu.maxOpenedChildID { + e.mu.maxOpenedChildID = childID + } + e.mu.Unlock() for { if e.stopFetchData.Load().(bool) { return @@ -1567,7 +1578,14 @@ func (e *UnionExec) Close() error { for range e.childIDChan { } } - return e.baseExecutor.Close() + // We do not need to acquire the e.mu.Lock since all the resultPuller can be + // promised to exit + for i := 0; i < e.mu.maxOpenedChildID; i++ { + if err := e.children[i].Close(); err != nil { + return err + } + } + return nil } // ResetContextOfStmt resets the StmtContext and session variables. From 9541aca03d1aced3e079c2899c354225d3b9dbb8 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 8 Dec 2020 17:47:38 +0800 Subject: [PATCH 02/10] tiny change --- executor/executor.go | 20 ++++++++++++++++++-- executor/executor_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 002fb536a1192..75878b1e8eb07 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -28,6 +28,7 @@ import ( "github.com/cznic/mathutil" "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" @@ -1443,6 +1444,8 @@ type UnionExec struct { *sync.Mutex maxOpenedChildID int } + + childInFlightForTest int32 } // unionWorkerResult stores the result for a union worker. @@ -1465,6 +1468,8 @@ func (e *UnionExec) Open(ctx context.Context) error { e.stopFetchData.Store(false) e.initialized = false e.finished = make(chan struct{}) + e.mu.Mutex = &sync.Mutex{} + e.mu.maxOpenedChildID = -1 return nil } @@ -1515,6 +1520,9 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) { e.stopFetchData.Store(true) e.resultPool <- result } + failpoint.Inject("issue21441", func() { + atomic.AddInt32(&e.childInFlightForTest, 1) + }) e.mu.Lock() if childID > e.mu.maxOpenedChildID { e.mu.maxOpenedChildID = childID @@ -1534,12 +1542,20 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) { e.resourcePools[workerID] <- result.chk break } + failpoint.Inject("issue21441", func() { + if int(atomic.LoadInt32(&e.childInFlightForTest)) > e.concurrency { + panic("the count of child in flight is larger than e.concurrency unexpectedly") + } + }) e.resultPool <- result if result.err != nil { e.stopFetchData.Store(true) return } } + failpoint.Inject("issue21441", func() { + atomic.AddInt32(&e.childInFlightForTest, -1) + }) } } @@ -1579,8 +1595,8 @@ func (e *UnionExec) Close() error { } } // We do not need to acquire the e.mu.Lock since all the resultPuller can be - // promised to exit - for i := 0; i < e.mu.maxOpenedChildID; i++ { + // promised to exit when reaching here (e.childIDChan been closed). + for i := 0; i <= e.mu.maxOpenedChildID; i++ { if err := e.children[i].Close(); err != nil { return err } diff --git a/executor/executor_test.go b/executor/executor_test.go index 46358c7f1c3a2..261ea92a927f6 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -7033,3 +7033,33 @@ func (s *testSuite) TestOOMActionPriority(c *C) { } c.Assert(action.GetPriority(), Equals, int64(memory.DefLogPriority)) } + +func (s *testSerialSuite) TestIssue21441(c *C) { + failpoint.Enable("github.com/pingcap/tidb/executor/issue21441", `return`) + defer failpoint.Disable("github.com/pingcap/tidb/executor/issue21441") + + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec(`insert into t values(1),(2),(3)`) + tk.Se.GetSessionVars().InitChunkSize = 1 + tk.Se.GetSessionVars().MaxChunkSize = 1 + sql := ` +select a from t union all +select a from t union all +select a from t union all +select a from t union all +select a from t union all +select a from t union all +select a from t union all +select a from t` + tk.MustQuery(sql).Sort().Check(testkit.Rows( + "1", "1", "1", "1", "1", "1", "1", "1", + "2", "2", "2", "2", "2", "2", "2", "2", + "3", "3", "3", "3", "3", "3", "3", "3", + )) + + tk.MustQuery("select a from (" + sql + ") t order by a limit 4").Check(testkit.Rows("1", "1", "1", "1")) + tk.MustQuery("select a from (" + sql + ") t order by a limit 7, 4").Check(testkit.Rows("1", "2", "2", "2")) +} From 46d45e0ec29928aabe9f087a6207c99486e34f7b Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 8 Dec 2020 21:59:31 +0800 Subject: [PATCH 03/10] fix data race --- executor/batch_point_get.go | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 77cbb59295ed1..486b7bfa655af 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -68,6 +68,8 @@ type BatchPointGetExec struct { snapshot kv.Snapshot stats *runtimeStatsWithSnapshot + // 0 indicates not set, 1 indicates already set. + alreadySetOptionForSnapshot int32 } // buildVirtualColumnInfo saves virtual column indices and sort them in definition order @@ -101,18 +103,20 @@ func (e *BatchPointGetExec) Open(context.Context) error { } else { snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) } - if e.runtimeStats != nil { - snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &runtimeStatsWithSnapshot{ - SnapshotRuntimeStats: snapshotStats, + if atomic.CompareAndSwapInt32(&e.alreadySetOptionForSnapshot, 0, 1) { + if e.runtimeStats != nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &runtimeStatsWithSnapshot{ + SnapshotRuntimeStats: snapshotStats, + } + snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } - snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) - } - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) } - snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) var batchGetter kv.BatchGetter = snapshot if txn.Valid() { if e.lock { From a3210b9fe7788bbc68e0fe6316bc5fe3b7c6e7b7 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 8 Dec 2020 22:06:41 +0800 Subject: [PATCH 04/10] address comment --- executor/batch_point_get.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 486b7bfa655af..7eeae176c545b 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/math" "github.com/pingcap/tidb/util/rowcodec" + "sync" ) // BatchPointGetExec executes a bunch of point select queries. @@ -68,8 +69,12 @@ type BatchPointGetExec struct { snapshot kv.Snapshot stats *runtimeStatsWithSnapshot - // 0 indicates not set, 1 indicates already set. - alreadySetOptionForSnapshot int32 + // mu protect from data race since there may exist multiple + // BatchPointGetExec set option for the same snapshot simultaneously. + mu struct { + sync.Mutex + alreadySetOptionForSnapshot bool + } } // buildVirtualColumnInfo saves virtual column indices and sort them in definition order @@ -103,7 +108,8 @@ func (e *BatchPointGetExec) Open(context.Context) error { } else { snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) } - if atomic.CompareAndSwapInt32(&e.alreadySetOptionForSnapshot, 0, 1) { + e.mu.Lock() + if !e.mu.alreadySetOptionForSnapshot { if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} e.stats = &runtimeStatsWithSnapshot{ @@ -116,7 +122,9 @@ func (e *BatchPointGetExec) Open(context.Context) error { snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + e.mu.alreadySetOptionForSnapshot = true } + e.mu.Unlock() var batchGetter kv.BatchGetter = snapshot if txn.Valid() { if e.lock { From 00f19f676cea93c0d3c2defe3e64dc4765253c39 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 8 Dec 2020 22:09:48 +0800 Subject: [PATCH 05/10] format --- executor/batch_point_get.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 7eeae176c545b..301ba660b4f38 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sort" + "sync" "sync/atomic" "github.com/pingcap/failpoint" @@ -33,7 +34,6 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/math" "github.com/pingcap/tidb/util/rowcodec" - "sync" ) // BatchPointGetExec executes a bunch of point select queries. From 9fb404e67fa1e71d59604d8c955d1d614d348389 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 9 Dec 2020 10:12:43 +0800 Subject: [PATCH 06/10] fix data race --- executor/point_get.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/executor/point_get.go b/executor/point_get.go index bfa7a951e3a82..b4bba554c4eed 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/rowcodec" + "sync" ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { @@ -81,6 +82,12 @@ type PointGetExecutor struct { virtualColumnRetFieldTypes []*types.FieldType stats *runtimeStatsWithSnapshot + // mu protect from data race since there may exist multiple + // PointGetExec set option for the same snapshot simultaneously. + mu struct { + sync.Mutex + alreadySetOptionForSnapshot bool + } } // Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field @@ -128,18 +135,23 @@ func (e *PointGetExecutor) Open(context.Context) error { } else { e.snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS}) } - if e.runtimeStats != nil { - snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &runtimeStatsWithSnapshot{ - SnapshotRuntimeStats: snapshotStats, + e.mu.Lock() + if !e.mu.alreadySetOptionForSnapshot { + if e.runtimeStats != nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &runtimeStatsWithSnapshot{ + SnapshotRuntimeStats: snapshotStats, + } + e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) } - e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) - } - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { + e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + e.mu.alreadySetOptionForSnapshot = true } - e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) + e.mu.Unlock() return nil } From 7cce40dccd59c39023d423352a3c6dca04b65cc2 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 9 Dec 2020 10:22:56 +0800 Subject: [PATCH 07/10] format --- executor/point_get.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/point_get.go b/executor/point_get.go index b4bba554c4eed..3f165a4d6ee5d 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -15,6 +15,7 @@ package executor import ( "context" + "sync" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -33,7 +34,6 @@ import ( "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/rowcodec" - "sync" ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { From f1e57f29be2ce5cda17600aa32ef464ccd8117d3 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 9 Dec 2020 13:24:00 +0800 Subject: [PATCH 08/10] fix ci --- executor/batch_point_get.go | 32 ++++++++++---------------------- executor/point_get.go | 32 ++++++++++---------------------- store/tikv/snapshot.go | 4 ++++ 3 files changed, 24 insertions(+), 44 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 301ba660b4f38..77cbb59295ed1 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "sort" - "sync" "sync/atomic" "github.com/pingcap/failpoint" @@ -69,12 +68,6 @@ type BatchPointGetExec struct { snapshot kv.Snapshot stats *runtimeStatsWithSnapshot - // mu protect from data race since there may exist multiple - // BatchPointGetExec set option for the same snapshot simultaneously. - mu struct { - sync.Mutex - alreadySetOptionForSnapshot bool - } } // buildVirtualColumnInfo saves virtual column indices and sort them in definition order @@ -108,23 +101,18 @@ func (e *BatchPointGetExec) Open(context.Context) error { } else { snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS}) } - e.mu.Lock() - if !e.mu.alreadySetOptionForSnapshot { - if e.runtimeStats != nil { - snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &runtimeStatsWithSnapshot{ - SnapshotRuntimeStats: snapshotStats, - } - snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + if e.runtimeStats != nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &runtimeStatsWithSnapshot{ + SnapshotRuntimeStats: snapshotStats, } - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) - } - snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) - e.mu.alreadySetOptionForSnapshot = true + snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } - e.mu.Unlock() + snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) var batchGetter kv.BatchGetter = snapshot if txn.Valid() { if e.lock { diff --git a/executor/point_get.go b/executor/point_get.go index 3f165a4d6ee5d..bfa7a951e3a82 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -15,7 +15,6 @@ package executor import ( "context" - "sync" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -82,12 +81,6 @@ type PointGetExecutor struct { virtualColumnRetFieldTypes []*types.FieldType stats *runtimeStatsWithSnapshot - // mu protect from data race since there may exist multiple - // PointGetExec set option for the same snapshot simultaneously. - mu struct { - sync.Mutex - alreadySetOptionForSnapshot bool - } } // Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field @@ -135,23 +128,18 @@ func (e *PointGetExecutor) Open(context.Context) error { } else { e.snapshot = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: snapshotTS}) } - e.mu.Lock() - if !e.mu.alreadySetOptionForSnapshot { - if e.runtimeStats != nil { - snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &runtimeStatsWithSnapshot{ - SnapshotRuntimeStats: snapshotStats, - } - e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + if e.runtimeStats != nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &runtimeStatsWithSnapshot{ + SnapshotRuntimeStats: snapshotStats, } - if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { - e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) - } - e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) - e.mu.alreadySetOptionForSnapshot = true + e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } + if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() { + e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) } - e.mu.Unlock() + e.snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID) return nil } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 05ddf49c033c9..c99a771ec64c5 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -502,9 +502,13 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { case kv.SnapshotTS: s.setSnapshotTS(val.(uint64)) case kv.ReplicaRead: + s.mu.Lock() s.replicaRead = val.(kv.ReplicaReadType) + s.mu.Unlock() case kv.TaskID: + s.mu.Lock() s.taskID = val.(uint64) + s.mu.Unlock() case kv.CollectRuntimeStats: s.mu.Lock() s.mu.stats = val.(*SnapshotRuntimeStats) From cce09d67695a9e13d17e0db1d38fba305b33f0e3 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 9 Dec 2020 13:52:51 +0800 Subject: [PATCH 09/10] address comment --- executor/executor.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 75878b1e8eb07..3701145f004ba 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1515,6 +1515,11 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) { e.wg.Done() }() for childID := range e.childIDChan { + e.mu.Lock() + if childID > e.mu.maxOpenedChildID { + e.mu.maxOpenedChildID = childID + } + e.mu.Unlock() if err := e.children[childID].Open(ctx); err != nil { result.err = err e.stopFetchData.Store(true) @@ -1523,11 +1528,6 @@ func (e *UnionExec) resultPuller(ctx context.Context, workerID int) { failpoint.Inject("issue21441", func() { atomic.AddInt32(&e.childInFlightForTest, 1) }) - e.mu.Lock() - if childID > e.mu.maxOpenedChildID { - e.mu.maxOpenedChildID = childID - } - e.mu.Unlock() for { if e.stopFetchData.Load().(bool) { return From 6cb115cdb7c71c5d89186522916246670082c40c Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 9 Dec 2020 14:16:47 +0800 Subject: [PATCH 10/10] address comment --- executor/executor.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 3701145f004ba..e03f6fe7e5697 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1596,12 +1596,13 @@ func (e *UnionExec) Close() error { } // We do not need to acquire the e.mu.Lock since all the resultPuller can be // promised to exit when reaching here (e.childIDChan been closed). + var firstErr error for i := 0; i <= e.mu.maxOpenedChildID; i++ { - if err := e.children[i].Close(); err != nil { - return err + if err := e.children[i].Close(); err != nil && firstErr == nil { + firstErr = err } } - return nil + return firstErr } // ResetContextOfStmt resets the StmtContext and session variables.