Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: reduce chunk capacity #7141

Closed
wants to merge 3 commits into from
Closed

executor: reduce chunk capacity #7141

wants to merge 3 commits into from

Conversation

lysu
Copy link
Contributor

@lysu lysu commented Jul 24, 2018

What have you changed? (mandatory)

ref #7077 2 of 3 reduce chunk size.

At first, separate concept InitChunkSize from maxChunkSize, use initChunkSize to allocate less memory, maxChunkSize to keep maintain "max return data" mean.

After that, we should choose the value of initChunkSize. for now

  • only callee operator know well about how much min buffer needed for this operator(caller only see the interface, but callee know who it is, e.g. it's an update operator and no need buffer any more).
  • but caller know well about how returned buffer's usage, e.g. reusable? renew every time? in loop or use once?

so we need return initChunkSize for different operator's via executor#newChunk, and take care about executor#newChunk or chunk.NewChunk*() callee's usage.

In summary, initChunkSize for each operator

  • to operator that no need chunk, use void-chunk(cap=-1)
  • to operator that need fixed chunk, use fixed chunk
  • for reusable chunk, extend them by mechanism of go slice
  • for unreusable chunk, need renew chunk in loop, will determine next capacity size by previous turn.

What is the type of the changes? (mandatory)

  • Improvement (non-breaking change which is an improvement to an existing feature)

How has this PR been tested? (mandatory)

  • unit tests
  • integration tests

Does this PR affect documentation (docs/docs-cn) update? (mandatory)

No

Does this PR affect tidb-ansible update? (mandatory)

No

Does this PR need to be added to the release notes? (mandatory)

WIP

Refer to a related PR or issue link (optional)

#7077

Benchmark result if necessary (optional)

WIP


This change is Reviewable

@lysu lysu changed the title executor: make different executors create more smaller as they can executor: make different executors create more smaller chunk as they can Jul 24, 2018
@lysu lysu changed the title executor: make different executors create more smaller chunk as they can executor: separate InitChunkSize from MaxChunkSize and InitChunkSize be 32 Aug 2, 2018
@lysu lysu added the sig/execution SIG execution label Aug 2, 2018
@lysu lysu changed the title executor: separate InitChunkSize from MaxChunkSize and InitChunkSize be 32 executor: separate InitChunkSize from MaxChunkSize Aug 2, 2018
@lysu lysu added type/enhancement The issue or PR belongs to an enhancement. and removed status/WIP labels Aug 2, 2018
@lysu
Copy link
Contributor Author

lysu commented Aug 2, 2018

PTAL if free @coocood @zz-jason @XuHuaiyu ~ thx

Copy link
Member

@winoros winoros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any performance result?

config/config.go Outdated
@@ -316,6 +317,7 @@ var defaultConf = Config{
Binlog: Binlog{
WriteTimeout: "15s",
},
BigChunkThreshold: 512,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous, it's 1024?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no~ this is a new added configure. in previous, initChunkSize is 1024, but now initChunkSize is 32 if stats info rows < BigChunkThreshold, else keep 1024

@@ -134,6 +147,18 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
return e
}

type baseNoResultExecutor struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this?

Copy link
Contributor Author

@lysu lysu Aug 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a sub-executor to override newChunk() method to make updateExec/insertExec new a void chunk~

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this?

@@ -224,6 +224,7 @@ func NewContext() *Context {
cancel: cancel,
}
sctx.sessionVars.MaxChunkSize = 2
sctx.sessionVars.MaxChunkSize = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set it 1 after setting it 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

orz....it's mistake...

e.executed = true
return errors.Trace(err)
}
if e.executed || chk.NumRows() >= e.maxChunkSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does chk.NumRows() >= e.maxChunkSize mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh....I shouldn't modify that..- -

chunks []*Chunk
freelist []*Chunk
fieldTypes []*types.FieldType
initChunkSize int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need this?

@lysu lysu changed the title executor: separate InitChunkSize from MaxChunkSize executor: reduce chunk capacity Aug 6, 2018
@lysu
Copy link
Contributor Author

lysu commented Aug 6, 2018

PTAL @zz-jason @coocood @winoros thx~

return NewChunkWithCapacity(l.fieldTypes, l.maxChunkSize)
newChk := NewChunkWithCapacity(l.fieldTypes, l.chunkCap)
if l.chunkCap < l.maxChunkSize {
l.chunkCap <<= 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*= 2 is easier to read.

@zz-jason zz-jason self-assigned this Aug 6, 2018
@@ -40,6 +40,9 @@ const (
// NewChunkWithCapacity creates a new chunk with field types and capacity.
func NewChunkWithCapacity(fields []*types.FieldType, cap int) *Chunk {
chk := new(Chunk)
if cap < 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add comments about negative capacity.

ast/ast.go Outdated
NewChunk() *chunk.Chunk

// NewChunk creates a new chunk with given capacity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add comments about negative capacity.

func (ow *outerWorker) buildTask(ctx context.Context) (*lookUpJoinTask, error) {
ow.executor.newChunk()

func (ow *outerWorker) buildTask(ctx context.Context, preTask *lookUpJoinTask) (*lookUpJoinTask, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We change the method signature, how about change the comment at the same time?

@@ -1184,6 +1201,7 @@ func (b *executorBuilder) buildExists(v *plan.PhysicalExists) Executor {
e := &ExistsExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
}
e.chunkCap = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If just assigning a const to a variable of a struct, I feel like it is better to put it as part of the creation of such struct.

@@ -1196,6 +1214,7 @@ func (b *executorBuilder) buildMaxOneRow(v *plan.PhysicalMaxOneRow) Executor {
e := &MaxOneRowExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
}
e.chunkCap = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@zhexuany
Copy link
Contributor

zhexuany commented Aug 7, 2018

I think we need a benchmark for this change.

ast/ast.go Outdated
NewChunk() *chunk.Chunk

// NewChunk creates a new chunk with given capacity.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's NewChunkWithCapacity

@winoros
Copy link
Member

winoros commented Aug 7, 2018

So chunkCap is something the same with initChunkSize?

@lysu
Copy link
Contributor Author

lysu commented Aug 7, 2018

@winoros yes, it's better to use initChunkSize as the name - -

@@ -94,11 +94,15 @@ func (r *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {
}

func (r *recordSet) NewChunk() *chunk.Chunk {
return r.NewChunkWithCapacity(32)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use chunk.InitialCapacity.

ByItems: v.ByItems,
schema: v.Schema(),
}
metrics.ExecutorCounter.WithLabelValues("TopNExec").Inc()
return &TopNExec{
e := &TopNExec{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that isn't no need after address @zhexuany's comment

model/model.go Outdated
@@ -141,6 +142,7 @@ type TableInfo struct {

// GetPartitionInfo returns the partition information.
func (t *TableInfo) GetPartitionInfo() *PartitionInfo {
fmt.Println()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this line

model/model.go Outdated
@@ -141,6 +142,7 @@ type TableInfo struct {

// GetPartitionInfo returns the partition information.
func (t *TableInfo) GetPartitionInfo() *PartitionInfo {
fmt.Println()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this line?

server/conn.go Outdated
@@ -1084,6 +1085,13 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet
return errors.Trace(cc.writeEOF(serverStatus))
}

func (cc *clientConn) chunkCapacity(wantSize int) int {
if wantSize > 1024 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use DefMaxChunkSize here?

return NewChunkWithCapacity(l.fieldTypes, l.maxChunkSize)
newChk := NewChunkWithCapacity(l.fieldTypes, l.initChunkSize)
if l.initChunkSize < l.maxChunkSize {
l.initChunkSize *= 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is interesting that some parts use *=2 and others use <<= 1.

executor/sort.go Outdated
childKeyChk = chunk.NewChunkWithCapacity(e.keyTypes, e.maxChunkSize)
childKeyChk = chunk.NewChunkWithCapacity(e.keyTypes, e.initChunkSize)
if e.initChunkSize < e.maxChunkSize {
e.initChunkSize <<= 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*= 2 is easier to read.

lookupMap: mvmap.NewMVMap(),
}
if ow.initChunkSize < ow.ctx.GetSessionVars().MaxChunkSize {
ow.initChunkSize <<= 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*= 2 is easier to read.

@@ -636,6 +636,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBDMLBatchSize, strconv.Itoa(DefDMLBatchSize)},
{ScopeSession, TiDBCurrentTS, strconv.Itoa(DefCurretTS)},
{ScopeGlobal | ScopeSession, TiDBMaxChunkSize, strconv.Itoa(DefMaxChunkSize)},
{ScopeGlobal | ScopeSession, TiDBInitChunkSize, strconv.Itoa(DefInitChunkSize)},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to add a variable for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe sometime we need online tunning? I'm not sure~

@@ -1084,6 +1086,13 @@ func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet
return errors.Trace(cc.writeEOF(serverStatus))
}

func (cc *clientConn) chunkCapacity(wantSize int) int {
if wantSize > variable.DefMaxChunkSize {
Copy link
Member

@jackysp jackysp Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I've got a mistake. We should return the current max chunk size here, instead of the default one. And in the next line the return value should not be init chunk size.

@@ -1064,7 +1070,7 @@ func (b *executorBuilder) buildTableDual(v *plan.PhysicalTableDual) Executor {
return nil
}
e := &TableDualExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
baseExecutor: newBaseExecutorWithInitChunkSize(b.ctx, v.Schema(), v.ExplainID(), 1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to s/1/v.rowCount/

@@ -133,6 +133,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}
rowCount++
}
chk = e.children[0].newChunkWithCapacity(chk.NumRows())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the child operator's initial chunk size is 32, the newly allocated chunk will always be 32 sized?

Copy link
Contributor Author

@lysu lysu Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the child's initial size = 32, it will allocate 32 in the first loop and after Next() return chk NumRows will be both <=32 or >32, so in the 2nd loop will use numRows as capacity if it works without sth wrong

@lysu
Copy link
Contributor Author

lysu commented Aug 9, 2018

/run-all-tests

Copy link
Contributor Author

@lysu lysu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some explain in code @zz-jason @coocood

NewChunk() *chunk.Chunk

// NewFixedChunk creates a new chunk with given capacity.
NewFixedChunk(cap int) *chunk.Chunk
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we added a new one parameter ctor, to help renew a chunk.

why not use chunk.Renew()? some stituation using resultSet is out of executor and not easy to get retTypes()

@@ -126,6 +126,7 @@ func (ut *benchDB) mustExec(sql string) {
if chk.NumRows() == 0 {
break
}
chk.GrowIfFull()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one reusable chunk, we only need growIfFull previous loop full.

ddl/db_test.go Outdated
@@ -3167,7 +3167,7 @@ func (s *testDBSuite) TestPartitionDropIndex(c *C) {
LOOP:
for {
select {
case err := <-done:
case err = <-done:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix make dev vet check

ddl/util/util.go Outdated
@@ -83,6 +83,7 @@ func LoadDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRan
EndKey: endKey,
})
}
chk.GrowIfFull()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reusable chunk, need grow

@@ -122,7 +122,7 @@ func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {
// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
for chk.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
for chk.NumRows() < chk.MaxRows() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure Next return max rows(reuse chunk property) passed by outer.

@@ -172,6 +172,7 @@ func (t *mergeJoinInnerTable) hasNullInJoinKey(row chunk.Row) bool {
func (t *mergeJoinInnerTable) reallocReaderResult() {
if !t.curResultInUse {
// If "t.curResult" is not in use, we can just reuse it.
t.curResult.GrowIfFull()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grow if is reused readerResult, more use time more huge in MJ

executor/join.go Outdated
@@ -365,6 +366,7 @@ func (e *HashJoinExec) runJoinWorker(workerID uint) {
if !ok {
break
}
outerResult.GrowIfFull()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HJ's outer is reusable

executor/join.go Outdated
@@ -613,6 +615,9 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
outerIter := chunk.NewIterator4Chunk(e.outerChunk)
for {
if e.outerChunkCursor >= e.outerChunk.NumRows() {
if e.outerChunk.NumRows() > 0 {
e.outerChunk.GrowIfFull()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nestApply pull data as a reuse block, but later block can be bigger

executor/join.go Outdated
@@ -667,6 +672,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {
e.innerList.AppendRow(row)
}
}
e.innerChunk.GrowIfFull()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nestapply inner data can be reuse(because transfomr to datarow)

@@ -349,6 +351,9 @@ func (e *MergeJoinExec) fetchNextInnerRows() (err error) {
// may not all belong to the same join key, but are guaranteed to be sorted
// according to the join key.
func (e *MergeJoinExec) fetchNextOuterRows(ctx context.Context) (err error) {
if e.outerTable.chk.NumRows() > 0 {
e.outerTable.chk.GrowIfFull()
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MJ's outer need grow..

resultCh: e.resultCh,
keepOrder: e.keepOrder,
initChunkSize: e.initChunkSize,
maxBatchSize: e.ctx.GetSessionVars().IndexLookupSize,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchSize is impl by chunk.MaxRows

@lysu
Copy link
Contributor Author

lysu commented Aug 21, 2018

/run-all-tests

}
c.numVirtualRows = 0
for _, col := range c.columns {
col.grow(c.maxRows, lastNumRows)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the lastNumRows is the same as col.length?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's same i will fix it

@lysu
Copy link
Contributor Author

lysu commented Aug 21, 2018

/run-all-tests

@winoros
Copy link
Member

winoros commented Aug 21, 2018

In distsql/select_result.go, the SelectResult directly use MaxChunkSize will this affect the chunk size?

@lysu
Copy link
Contributor Author

lysu commented Aug 21, 2018

/run-all-tests

@lysu
Copy link
Contributor Author

lysu commented Sep 27, 2018

PR splited.

@lysu lysu closed this Sep 27, 2018
@lysu lysu deleted the dev/reduce_chunk_alloc branch September 27, 2018 04:12
@lysu lysu restored the dev/reduce_chunk_alloc branch September 27, 2018 04:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sig/execution SIG execution type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants