Skip to content

Commit

Permalink
executor: range task channel in XSelectIndexExec.Close. (#2775)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored and shenli committed Mar 6, 2017
1 parent 754c58b commit e6ad664
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 35 deletions.
77 changes: 42 additions & 35 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"math"
"sort"
"strconv"
"sync"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -59,6 +58,9 @@ var BaseLookupTableTaskSize = 1024
// MaxLookupTableTaskSize represents max number of handles for a lookupTableTask.
var MaxLookupTableTaskSize = 20480

// LookupTableTaskChannelSize represents the channel size of the index double read taskChan.
var LookupTableTaskChannelSize = 50

// lookupTableTask is created from a partial result of an index request which
// contains the handles in those index keys.
type lookupTableTask struct {
Expand Down Expand Up @@ -292,16 +294,19 @@ type Closeable interface {
Close() error
}

// closeAll closes all objects even if an object returns an error.
// If multiple objects returns error, the first error will be returned.
func closeAll(objs ...Closeable) error {
var err error
for _, obj := range objs {
if obj != nil {
err := obj.Close()
if err != nil {
return errors.Trace(err)
err1 := obj.Close()
if err == nil && err1 != nil {
err = err1
}
}
}
return nil
return errors.Trace(err)
}

// XSelectIndexExec represents the DistSQL select index executor.
Expand All @@ -327,36 +332,38 @@ func closeAll(objs ...Closeable) error {
// After finishing the task, the workers send the task to a taskChan. At the outer most Executor.Next method,
// we receive the finished task through taskChan, and return each row in that task until no more tasks to receive.
type XSelectIndexExec struct {
tableInfo *model.TableInfo
table table.Table
asName *model.CIStr
ctx context.Context
supportDesc bool
isMemDB bool
tableInfo *model.TableInfo
table table.Table
asName *model.CIStr
ctx context.Context
supportDesc bool
isMemDB bool
singleReadMode bool

indexPlan *plan.PhysicalIndexScan

// Variables only used for single read.
result distsql.SelectResult
partialResult distsql.PartialResult
where *tipb.Expr
startTS uint64

taskChan chan *lookupTableTask
tasksErr error // not nil if tasks closed due to error.
taskCurr *lookupTableTask

indexPlan *plan.PhysicalIndexScan
singleReadMode bool
// Variables only used for double read.
doubleReadIdxResult distsql.SelectResult
taskChan chan *lookupTableTask
tasksErr error // not nil if tasks closed due to error.
taskCurr *lookupTableTask
handleCount uint64 // returned handle count in double read.

where *tipb.Expr
startTS uint64
returnedRows uint64 // returned row count
handleCount uint64 // returned handle count when reading first

mu sync.Mutex

/*
The following attributes are used for aggregation push down.
aggFuncs is the aggregation functions in protobuf format. They will be added to distsql request msg.
byItem is the groupby items in protobuf format. They will be added to distsql request msg.
aggFields is used to decode returned rows from distsql.
aggregate indicates of the executor is handling aggregate result.
It is more convenient to use a single varible than use a long condition.
It is more convenient to use a single variable than use a long condition.
*/
aggFuncs []*tipb.Expr
byItems []*tipb.ByItem
Expand All @@ -375,17 +382,21 @@ func (e *XSelectIndexExec) Schema() *expression.Schema {

// Close implements Exec Close interface.
func (e *XSelectIndexExec) Close() error {
err := closeAll(e.result, e.partialResult)
if err != nil {
return errors.Trace(err)
}
err := closeAll(e.result, e.partialResult, e.doubleReadIdxResult)
e.result = nil
e.partialResult = nil
e.doubleReadIdxResult = nil

e.taskCurr = nil
e.taskChan = nil
if e.taskChan != nil {
// Consume the task channel in case channel is full.
for range e.taskChan {
}
e.taskChan = nil
}
e.returnedRows = 0
e.partialCount = 0
return nil
return errors.Trace(err)
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -479,13 +490,14 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) {
if err != nil {
return nil, errors.Trace(err)
}
e.doubleReadIdxResult = idxResult
idxResult.IgnoreData()
idxResult.Fetch(context.CtxForCancel{e.ctx})

// Use a background goroutine to fetch index and put the result in e.taskChan.
// e.taskChan serves as a pipeline, so fetching index and getting table data can
// run concurrently.
e.taskChan = make(chan *lookupTableTask, 50)
e.taskChan = make(chan *lookupTableTask, LookupTableTaskChannelSize)
go e.fetchHandles(idxResult, e.taskChan)
}

Expand All @@ -505,11 +517,6 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) {
}
row, err := e.taskCurr.getRow()
if err != nil {
// Consume the task channel in case channel is full.
go func() {
for range e.taskChan {
}
}()
return nil, errors.Trace(err)
}
if row != nil {
Expand Down
67 changes: 67 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
"bytes"
"fmt"
"runtime/pprof"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/testkit"
)

// This test checks that when a index double read returns before reading all the rows, the goroutine doesn't
// leak. For testing distsql with multiple regions, we need to manually split a mock TiKV.
func (s *testSuite) TestIndexDoubleReadClose(c *C) {
if _, ok := s.store.GetClient().(*tikv.CopClient); !ok {
// Make sure the store is tikv store.
return
}
originSize := executor.LookupTableTaskChannelSize
executor.LookupTableTaskChannelSize = 1
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table dist (id int primary key, c_idx int, c_col int, index (c_idx))")

// Insert 100 rows.
var values []string
for i := 0; i < 100; i++ {
values = append(values, fmt.Sprintf("(%d, %d, %d)", i, i, i))
}
tk.MustExec("insert dist values " + strings.Join(values, ","))

rss, err := tk.Se.Execute("select * from dist where c_idx between 0 and 100")
c.Assert(err, IsNil)
rs := rss[0]
_, err = rs.Next()
c.Assert(err, IsNil)
c.Check(taskGoroutineExists(), IsTrue)
rs.Close()
time.Sleep(time.Millisecond * 50)
c.Check(taskGoroutineExists(), IsFalse)
executor.LookupTableTaskChannelSize = originSize
}

func taskGoroutineExists() bool {
buf := new(bytes.Buffer)
profile := pprof.Lookup("goroutine")
profile.WriteTo(buf, 1)
str := buf.String()
return strings.Contains(str, "pickAndExecTask")
}

0 comments on commit e6ad664

Please sign in to comment.