Skip to content

Commit

Permalink
executor: make projection executor unparallel for insert/update/delete (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jun 16, 2022
1 parent faba7c1 commit 6121832
Showing 1 changed file with 12 additions and 0 deletions.
12 changes: 12 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type executorBuilder struct {
// isStaleness means whether this statement use stale read.
isStaleness bool
readReplicaScope string
inUpdateStmt bool
inDeleteStmt bool
inInsertStmt bool
}

// CTEStorages stores resTbl and iterInTbl for CTEExec.
Expand Down Expand Up @@ -793,6 +796,7 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor {
}

func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
b.inInsertStmt = true
if v.SelectPlan != nil {
// Try to update the forUpdateTS for insert/replace into select statements.
// Set the selectPlan parameter to nil to make it always update the forUpdateTS.
Expand Down Expand Up @@ -1416,6 +1420,12 @@ func (b *executorBuilder) buildProjection(v *plannercore.PhysicalProjection) Exe
if int64(v.StatsCount()) < int64(b.ctx.GetSessionVars().MaxChunkSize) {
e.numWorkers = 0
}

// Use un-parallel projection for query that write on memdb to avoid data race.
// See also https://github.com/pingcap/tidb/issues/26832
if b.inUpdateStmt || b.inDeleteStmt || b.inInsertStmt || b.hasLock {
e.numWorkers = 0
}
return e
}

Expand Down Expand Up @@ -1938,6 +1948,7 @@ func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor
}

func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
b.inUpdateStmt = true
tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos))
multiUpdateOnSameTable := make(map[int64]bool)
for _, info := range v.TblColPosInfos {
Expand Down Expand Up @@ -2010,6 +2021,7 @@ func getAssignFlag(ctx sessionctx.Context, v *plannercore.Update, schemaLen int)
}

func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
b.inDeleteStmt = true
tblID2table := make(map[int64]table.Table, len(v.TblColPosInfos))
for _, info := range v.TblColPosInfos {
tblID2table[info.TblID], _ = b.is.TableByID(info.TblID)
Expand Down

0 comments on commit 6121832

Please sign in to comment.