From f2feb1f76df073d3dc9d2025425ae5dc0e243e1a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Fri, 3 Dec 2021 15:13:54 +0800 Subject: [PATCH] cherry pick #30290 to release-5.1 Signed-off-by: ti-srebot --- executor/builder.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/executor/builder.go b/executor/builder.go index 3a315862d74f3..a217f1b755a76 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -86,9 +86,18 @@ type executorBuilder struct { err error // err is set when there is error happened during Executor building process. hasLock bool Ti *TelemetryInfo +<<<<<<< HEAD // ExplicitStaleness means whether the 'SELECT' clause are using 'AS OF TIMESTAMP' to perform stale read explicitly. explicitStaleness bool txnScope string +======= + // isStaleness means whether this statement use stale read. + isStaleness bool + readReplicaScope string + inUpdateStmt bool + inDeleteStmt bool + inInsertStmt bool +>>>>>>> 4fbbd5a77... executor: make projection executor unparallel for insert/update/delete (#30290) } // CTEStorages stores resTbl and iterInTbl for CTEExec. @@ -781,6 +790,7 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor { } func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { + b.inInsertStmt = true if v.SelectPlan != nil { // Try to update the forUpdateTS for insert/replace into select statements. // Set the selectPlan parameter to nil to make it always update the forUpdateTS. @@ -1388,6 +1398,12 @@ func (b *executorBuilder) buildProjection(v *plannercore.PhysicalProjection) Exe if int64(v.StatsCount()) < int64(b.ctx.GetSessionVars().MaxChunkSize) { e.numWorkers = 0 } + + // Use un-parallel projection for query that write on memdb to avoid data race. + // See also https://github.com/pingcap/tidb/issues/26832 + if b.inUpdateStmt || b.inDeleteStmt || b.inInsertStmt || b.hasLock { + e.numWorkers = 0 + } return e } @@ -1861,6 +1877,7 @@ func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor } func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { + b.inUpdateStmt = true tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos)) multiUpdateOnSameTable := make(map[int64]bool) for _, info := range v.TblColPosInfos { @@ -1932,6 +1949,7 @@ func getAssignFlag(ctx sessionctx.Context, v *plannercore.Update, schemaLen int) } func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { + b.inDeleteStmt = true tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos)) for _, info := range v.TblColPosInfos { tblID2table[info.TblID], _ = b.is.TableByID(info.TblID)