Skip to content

Commit

Permalink
*: avoid special cases DATA RACE (#38918)
Browse files Browse the repository at this point in the history
close #38914
  • Loading branch information
keeplearning20221 authored Nov 8, 2022
1 parent ad4d432 commit ea26284
Show file tree
Hide file tree
Showing 19 changed files with 103 additions and 77 deletions.
2 changes: 1 addition & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (r *selectResult) readFromDefault(ctx context.Context, chk *chunk.Chunk) er
func (r *selectResult) readFromChunk(ctx context.Context, chk *chunk.Chunk) error {
if r.respChunkDecoder == nil {
r.respChunkDecoder = chunk.NewDecoder(
r.ctx.GetSessionVars().GetNewChunk(r.fieldTypes, 0),
chunk.NewChunkWithCapacity(r.fieldTypes, 0),
r.fieldTypes,
)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1540,7 +1540,7 @@ func (b *executorBuilder) buildHashAgg(v *plannercore.PhysicalHashAgg) Executor
e.defaultVal = nil
} else {
if v.IsFinalAgg() {
e.defaultVal = e.ctx.GetSessionVars().GetNewChunk(retTypes(e), 1)
e.defaultVal = e.ctx.GetSessionVars().GetNewChunkWithCapacity(retTypes(e), 1, 1, e.AllocPool)
}
}
for _, aggDesc := range v.AggFuncs {
Expand Down Expand Up @@ -1603,7 +1603,7 @@ func (b *executorBuilder) buildStreamAgg(v *plannercore.PhysicalStreamAgg) Execu
} else {
// Only do this for final agg, see issue #35295, #30923
if v.IsFinalAgg() {
e.defaultVal = e.ctx.GetSessionVars().GetNewChunk(retTypes(e), 1)
e.defaultVal = e.ctx.GetSessionVars().GetNewChunkWithCapacity(retTypes(e), 1, 1, e.AllocPool)
}
}
for i, aggDesc := range v.AggFuncs {
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
}
}()
retTps := w.idxLookup.getRetTpsByHandle()
chk := w.idxLookup.ctx.GetSessionVars().GetNewChunk(retTps, w.idxLookup.maxChunkSize)
chk := w.idxLookup.ctx.GetSessionVars().GetNewChunkWithCapacity(retTps, w.idxLookup.maxChunkSize, w.idxLookup.maxChunkSize, w.idxLookup.AllocPool)
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if idxID != w.idxLookup.id && w.idxLookup.stats != nil {
Expand Down
4 changes: 3 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ type baseExecutor struct {
children []Executor
retFieldTypes []*types.FieldType
runtimeStats *execdetails.BasicRuntimeStats
AllocPool chunk.Allocator
}

const (
Expand Down Expand Up @@ -234,7 +235,7 @@ func newFirstChunk(e Executor) *chunk.Chunk {
func tryNewCacheChunk(e Executor) *chunk.Chunk {
base := e.base()
s := base.ctx.GetSessionVars()
return s.GetNewChunkWithCapacity(base.retFieldTypes, base.initCap, base.maxChunkSize)
return s.GetNewChunkWithCapacity(base.retFieldTypes, base.initCap, base.maxChunkSize, base.AllocPool)
}

// newList creates a new List to buffer current executor's result.
Expand Down Expand Up @@ -267,6 +268,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int,
schema: schema,
initCap: ctx.GetSessionVars().InitChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
AllocPool: ctx.GetSessionVars().ChunkPool.Alloc,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if e.id > 0 {
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (e *IndexNestedLoopHashJoin) newInnerWorker(taskCh chan *indexHashJoinTask,
innerCtx: e.innerCtx,
outerCtx: e.outerCtx,
ctx: e.ctx,
executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize),
executorChk: e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize, e.maxChunkSize, e.AllocPool),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
Expand Down
6 changes: 3 additions & 3 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork
outerCtx: e.outerCtx,
taskCh: taskCh,
ctx: e.ctx,
executorChk: e.ctx.GetSessionVars().GetNewChunk(e.innerCtx.rowTypes, e.maxChunkSize),
executorChk: e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.innerCtx.rowTypes, e.maxChunkSize, e.maxChunkSize, e.AllocPool),
indexRanges: copiedRanges,
keyOff2IdxOff: e.keyOff2IdxOff,
stats: innerStats,
Expand Down Expand Up @@ -431,7 +431,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
maxChunkSize := ow.ctx.GetSessionVars().MaxChunkSize
for requiredRows > task.outerResult.Len() {
chk := ow.ctx.GetSessionVars().GetNewChunk(ow.outerCtx.rowTypes, maxChunkSize)
chk := ow.ctx.GetSessionVars().GetNewChunkWithCapacity(ow.outerCtx.rowTypes, maxChunkSize, maxChunkSize, ow.executor.base().AllocPool)
chk = chk.SetRequiredRows(requiredRows, maxChunkSize)
err := Next(ctx, ow.executor, chk)
if err != nil {
Expand Down Expand Up @@ -462,7 +462,7 @@ func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
}
task.encodedLookUpKeys = make([]*chunk.Chunk, task.outerResult.NumChunks())
for i := range task.encodedLookUpKeys {
task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunk([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows())
task.encodedLookUpKeys[i] = ow.ctx.GetSessionVars().GetNewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeBlob)}, task.outerResult.GetChunk(i).NumRows(), task.outerResult.GetChunk(i).NumRows(), ow.executor.base().AllocPool)
}
return task, nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func (imw *innerMergeWorker) dedupDatumLookUpKeys(lookUpContents []*indexJoinLoo

// fetchNextInnerResult collects a chunk of inner results from inner child executor.
func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *lookUpMergeJoinTask) (beginRow chunk.Row, err error) {
task.innerResult = imw.ctx.GetSessionVars().GetNewChunk(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize)
task.innerResult = imw.ctx.GetSessionVars().GetNewChunkWithCapacity(retTypes(imw.innerExec), imw.ctx.GetSessionVars().MaxChunkSize, imw.ctx.GetSessionVars().MaxChunkSize, imw.innerExec.base().AllocPool)
err = Next(ctx, imw.innerExec, task.innerResult)
task.innerIter = chunk.NewIterator4Chunk(task.innerResult)
beginRow = task.innerIter.Begin()
Expand Down
4 changes: 2 additions & 2 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func (w *partialTableWorker) syncErr(resultCh chan<- *lookupTableTask, err error

func (w *partialTableWorker) fetchHandles(ctx context.Context, exitCh <-chan struct{}, fetchCh chan<- *lookupTableTask, resultCh chan<- *lookupTableTask,
finished <-chan struct{}, handleCols plannercore.HandleCols) (count int64, err error) {
chk := w.sc.GetSessionVars().GetNewChunk(retTypes(w.tableReader), w.maxChunkSize)
chk := w.sc.GetSessionVars().GetNewChunkWithCapacity(retTypes(w.tableReader), w.maxChunkSize, w.maxChunkSize, w.tableReader.base().AllocPool)
var basic *execdetails.BasicRuntimeStats
if be := w.tableReader.base(); be != nil && be.runtimeStats != nil {
basic = be.runtimeStats
Expand Down Expand Up @@ -817,7 +817,7 @@ func (w *partialIndexWorker) fetchHandles(
resultCh chan<- *lookupTableTask,
finished <-chan struct{},
handleCols plannercore.HandleCols) (count int64, err error) {
chk := w.sc.GetSessionVars().GetNewChunk(handleCols.GetFieldsTypes(), w.maxChunkSize)
chk := chunk.NewChunkWithCapacity(handleCols.GetFieldsTypes(), w.maxChunkSize)
var basicStats *execdetails.BasicRuntimeStats
if w.stats != nil {
if w.idxID != 0 {
Expand Down
2 changes: 1 addition & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu
if e.finished.Load().(bool) {
return
}
chk := e.ctx.GetSessionVars().GetNewChunk(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize)
chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool)
err = Next(ctx, e.buildSideExec, chk)
if err != nil {
e.buildFinished <- errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion executor/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func newJoiner(ctx sessionctx.Context, joinType plannercore.JoinType,
return &antiLeftOuterSemiJoiner{base}
case plannercore.LeftOuterJoin, plannercore.RightOuterJoin, plannercore.InnerJoin:
if len(base.conditions) > 0 {
base.chk = ctx.GetSessionVars().GetNewChunk(shallowRowType, ctx.GetSessionVars().MaxChunkSize)
base.chk = chunk.NewChunkWithCapacity(shallowRowType, ctx.GetSessionVars().MaxChunkSize)
}
switch joinType {
case plannercore.LeftOuterJoin:
Expand Down
2 changes: 1 addition & 1 deletion executor/pipelined_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (e *PipelinedWindowExec) fetchChild(ctx context.Context) (EOF bool, err err
}

// TODO: reuse chunks
resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows)
resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows, e.AllocPool)
err = e.copyChk(childResult, resultChk)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (e *WindowExec) fetchChild(ctx context.Context) (EOF bool, err error) {
return true, nil
}

resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows)
resultChk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.retFieldTypes, 0, numRows, e.AllocPool)
err = e.copyChk(childResult, resultChk)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,8 +1121,8 @@ func (cc *clientConn) Run(ctx context.Context) {

startTime := time.Now()
err = cc.dispatch(ctx, data)
cc.ctx.GetSessionVars().ClearAlloc(&cc.chunkAlloc, err != nil)
cc.chunkAlloc.Reset()
cc.ctx.GetSessionVars().ClearAlloc()
if err != nil {
cc.audit(plugin.Error) // tell the plugin API there was a dispatch error
if terror.ErrorEqual(err, io.EOF) {
Expand Down
2 changes: 1 addition & 1 deletion server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ const (

func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err error) {
cc.ctx.GetSessionVars().StartTime = time.Now()
cc.ctx.GetSessionVars().ClearAlloc()
cc.ctx.GetSessionVars().ClearAlloc(nil, false)

stmtID, fetchSize, err := parseStmtFetchCmd(data)
if err != nil {
Expand Down
60 changes: 27 additions & 33 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ type RetryInfo struct {
LastRcReadTS uint64
}

// ReuseChunkPool save Alloc object
type ReuseChunkPool struct {
mu sync.Mutex
Alloc chunk.Allocator
}

// Clean does some clean work.
func (r *RetryInfo) Clean() {
r.autoIncrementIDs.clean()
Expand Down Expand Up @@ -1292,10 +1298,7 @@ type SessionVars struct {
OptPrefixIndexSingleScan bool

// ChunkPool Several chunks and columns are cached
ChunkPool struct {
Lock sync.Mutex
Alloc chunk.Allocator
}
ChunkPool ReuseChunkPool
// EnableReuseCheck indicates request chunk whether use chunk alloc
EnableReuseCheck bool

Expand All @@ -1304,34 +1307,18 @@ type SessionVars struct {
preUseChunkAlloc bool
}

// GetNewChunk Attempt to request memory from the chunk pool
// thread safety
func (s *SessionVars) GetNewChunk(fields []*types.FieldType, capacity int) *chunk.Chunk {
//Chunk memory pool is not set
if s.ChunkPool.Alloc == nil {
return chunk.NewChunkWithCapacity(fields, capacity)
}
s.ChunkPool.Lock.Lock()
defer s.ChunkPool.Lock.Unlock()
if s.ChunkPool.Alloc.CheckReuseAllocSize() && (!s.GetUseChunkAlloc()) {
s.StmtCtx.SetUseChunkAlloc()
}
chk := s.ChunkPool.Alloc.Alloc(fields, capacity, capacity)
return chk
}

// GetNewChunkWithCapacity Attempt to request memory from the chunk pool
// thread safety
func (s *SessionVars) GetNewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk {
if s.ChunkPool.Alloc == nil {
func (s *SessionVars) GetNewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int, pool chunk.Allocator) *chunk.Chunk {
if pool == nil {
return chunk.New(fields, capacity, maxCachesize)
}
s.ChunkPool.Lock.Lock()
defer s.ChunkPool.Lock.Unlock()
if s.ChunkPool.Alloc.CheckReuseAllocSize() && (!s.GetUseChunkAlloc()) {
s.ChunkPool.mu.Lock()
defer s.ChunkPool.mu.Unlock()
if pool.CheckReuseAllocSize() && (!s.GetUseChunkAlloc()) {
s.StmtCtx.SetUseChunkAlloc()
}
chk := s.ChunkPool.Alloc.Alloc(fields, capacity, maxCachesize)
chk := pool.Alloc(fields, capacity, maxCachesize)
return chk
}

Expand All @@ -1354,8 +1341,19 @@ func (s *SessionVars) SetAlloc(alloc chunk.Allocator) {
}

// ClearAlloc indicates stop reuse chunk
func (s *SessionVars) ClearAlloc() {
func (s *SessionVars) ClearAlloc(alloc *chunk.Allocator, b bool) {
if !b {
s.ChunkPool.Alloc = nil
return
}

// If an error is reported, re-apply for alloc
// Prevent the goroutine left before, affecting the execution of the next sql
// issuse 38918
s.ChunkPool.mu.Lock()
s.ChunkPool.Alloc = nil
s.ChunkPool.mu.Unlock()
*alloc = chunk.NewAllocator()
}

// GetPreparedStmtByName returns the prepared statement specified by stmtName.
Expand Down Expand Up @@ -1654,12 +1652,8 @@ func NewSessionVars(hctx HookContext) *SessionVars {
ForeignKeyChecks: DefTiDBForeignKeyChecks,
HookContext: hctx,
EnableReuseCheck: DefTiDBEnableReusechunk,
//useChunkAlloc: DefTiDBUseAlloc,
preUseChunkAlloc: DefTiDBUseAlloc,
ChunkPool: struct {
Lock sync.Mutex
Alloc chunk.Allocator
}{Alloc: nil},
preUseChunkAlloc: DefTiDBUseAlloc,
ChunkPool: ReuseChunkPool{Alloc: nil},
}
vars.KVVars = tikvstore.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down
31 changes: 11 additions & 20 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,8 @@ func TestGetReuseChunk(t *testing.T) {
require.Nil(t, sessVars.ChunkPool.Alloc)
require.False(t, sessVars.GetUseChunkAlloc())
// alloc is nil ,Allocate memory from the system
chk1 := sessVars.GetNewChunk(fieldTypes, 10)
chk1 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10, sessVars.ChunkPool.Alloc)
require.NotNil(t, chk1)
chk2 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10)
require.NotNil(t, chk2)

chunkReuseMap := make(map[*chunk.Chunk]struct{}, 14)
columnReuseMap := make(map[*chunk.Column]struct{}, 14)
Expand All @@ -461,35 +459,28 @@ func TestGetReuseChunk(t *testing.T) {

//tries to apply from the cache
initCap := 10
chk1 = sessVars.GetNewChunk(fieldTypes, initCap)
chk1 = sessVars.GetNewChunkWithCapacity(fieldTypes, initCap, initCap, sessVars.ChunkPool.Alloc)
require.NotNil(t, chk1)
chunkReuseMap[chk1] = struct{}{}
for i := 0; i < chk1.NumCols(); i++ {
columnReuseMap[chk1.Column(i)] = struct{}{}
}
chk2 = sessVars.GetNewChunkWithCapacity(fieldTypes, initCap, initCap)
require.NotNil(t, chk2)
chunkReuseMap[chk2] = struct{}{}
for i := 0; i < chk2.NumCols(); i++ {
columnReuseMap[chk2.Column(i)] = struct{}{}
}

alloc.Reset()
chkres1 := sessVars.GetNewChunk(fieldTypes, 10)
chkres1 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10, sessVars.ChunkPool.Alloc)
require.NotNil(t, chkres1)
_, exist := chunkReuseMap[chkres1]
require.True(t, exist)
for i := 0; i < chkres1.NumCols(); i++ {
_, exist := columnReuseMap[chkres1.Column(i)]
require.True(t, exist)
}
chkres2 := sessVars.GetNewChunkWithCapacity(fieldTypes, 10, 10)
require.NotNil(t, chkres2)
_, exist = chunkReuseMap[chkres2]
require.True(t, exist)
for i := 0; i < chkres2.NumCols(); i++ {
_, exist := columnReuseMap[chkres2.Column(i)]
require.True(t, exist)
}
sessVars.ClearAlloc()
allocpool := variable.ReuseChunkPool{Alloc: alloc}

sessVars.ClearAlloc(&allocpool.Alloc, false)
require.Equal(t, alloc, allocpool.Alloc)

sessVars.ClearAlloc(&allocpool.Alloc, true)
require.NotEqual(t, allocpool.Alloc, alloc)
require.Nil(t, sessVars.ChunkPool.Alloc)
}
7 changes: 3 additions & 4 deletions testkit/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func (tk *TestKit) Session() session.Session {
// MustExec executes a sql statement and asserts nil error.
func (tk *TestKit) MustExec(sql string, args ...interface{}) {
defer func() {
tk.Session().GetSessionVars().ClearAlloc()
if tk.alloc != nil {
tk.alloc.Reset()
}
Expand All @@ -138,7 +137,6 @@ func (tk *TestKit) MustExecWithContext(ctx context.Context, sql string, args ...
// If expected result is set it asserts the query result equals expected result.
func (tk *TestKit) MustQuery(sql string, args ...interface{}) *Result {
defer func() {
tk.Session().GetSessionVars().ClearAlloc()
if tk.alloc != nil {
tk.alloc.Reset()
}
Expand Down Expand Up @@ -271,7 +269,8 @@ func (tk *TestKit) Exec(sql string, args ...interface{}) (sqlexec.RecordSet, err
}

// ExecWithContext executes a sql statement using the prepared stmt API
func (tk *TestKit) ExecWithContext(ctx context.Context, sql string, args ...interface{}) (sqlexec.RecordSet, error) {
func (tk *TestKit) ExecWithContext(ctx context.Context, sql string, args ...interface{}) (rs sqlexec.RecordSet, err error) {
defer tk.Session().GetSessionVars().ClearAlloc(&tk.alloc, err != nil)
if len(args) == 0 {
sc := tk.session.GetSessionVars().StmtCtx
prevWarns := sc.GetWarnings()
Expand Down Expand Up @@ -315,7 +314,7 @@ func (tk *TestKit) ExecWithContext(ctx context.Context, sql string, args ...inte
}
params := expression.Args2Expressions4Test(args...)
tk.Session().GetSessionVars().SetAlloc(tk.alloc)
rs, err := tk.session.ExecutePreparedStmt(ctx, stmtID, params)
rs, err = tk.session.ExecutePreparedStmt(ctx, stmtID, params)
if err != nil {
return rs, errors.Trace(err)
}
Expand Down
Loading

0 comments on commit ea26284

Please sign in to comment.