diff --git a/executor/distsql.go b/executor/distsql.go index c67fd084275f9..ba97d98110665 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -19,7 +19,6 @@ import ( "math" "sort" "strconv" - "sync" "time" "github.com/juju/errors" @@ -59,6 +58,9 @@ var BaseLookupTableTaskSize = 1024 // MaxLookupTableTaskSize represents max number of handles for a lookupTableTask. var MaxLookupTableTaskSize = 20480 +// LookupTableTaskChannelSize represents the channel size of the index double read taskChan. +var LookupTableTaskChannelSize = 50 + // lookupTableTask is created from a partial result of an index request which // contains the handles in those index keys. type lookupTableTask struct { @@ -292,16 +294,19 @@ type Closeable interface { Close() error } +// closeAll closes all objects even if an object returns an error. +// If multiple objects returns error, the first error will be returned. func closeAll(objs ...Closeable) error { + var err error for _, obj := range objs { if obj != nil { - err := obj.Close() - if err != nil { - return errors.Trace(err) + err1 := obj.Close() + if err == nil && err1 != nil { + err = err1 } } } - return nil + return errors.Trace(err) } // XSelectIndexExec represents the DistSQL select index executor. @@ -327,28 +332,30 @@ func closeAll(objs ...Closeable) error { // After finishing the task, the workers send the task to a taskChan. At the outer most Executor.Next method, // we receive the finished task through taskChan, and return each row in that task until no more tasks to receive. type XSelectIndexExec struct { - tableInfo *model.TableInfo - table table.Table - asName *model.CIStr - ctx context.Context - supportDesc bool - isMemDB bool + tableInfo *model.TableInfo + table table.Table + asName *model.CIStr + ctx context.Context + supportDesc bool + isMemDB bool + singleReadMode bool + + indexPlan *plan.PhysicalIndexScan + + // Variables only used for single read. result distsql.SelectResult partialResult distsql.PartialResult - where *tipb.Expr - startTS uint64 - taskChan chan *lookupTableTask - tasksErr error // not nil if tasks closed due to error. - taskCurr *lookupTableTask - - indexPlan *plan.PhysicalIndexScan - singleReadMode bool + // Variables only used for double read. + doubleReadIdxResult distsql.SelectResult + taskChan chan *lookupTableTask + tasksErr error // not nil if tasks closed due to error. + taskCurr *lookupTableTask + handleCount uint64 // returned handle count in double read. + where *tipb.Expr + startTS uint64 returnedRows uint64 // returned row count - handleCount uint64 // returned handle count when reading first - - mu sync.Mutex /* The following attributes are used for aggregation push down. @@ -356,7 +363,7 @@ type XSelectIndexExec struct { byItem is the groupby items in protobuf format. They will be added to distsql request msg. aggFields is used to decode returned rows from distsql. aggregate indicates of the executor is handling aggregate result. - It is more convenient to use a single varible than use a long condition. + It is more convenient to use a single variable than use a long condition. */ aggFuncs []*tipb.Expr byItems []*tipb.ByItem @@ -375,17 +382,21 @@ func (e *XSelectIndexExec) Schema() *expression.Schema { // Close implements Exec Close interface. func (e *XSelectIndexExec) Close() error { - err := closeAll(e.result, e.partialResult) - if err != nil { - return errors.Trace(err) - } + err := closeAll(e.result, e.partialResult, e.doubleReadIdxResult) e.result = nil e.partialResult = nil + e.doubleReadIdxResult = nil + e.taskCurr = nil - e.taskChan = nil + if e.taskChan != nil { + // Consume the task channel in case channel is full. + for range e.taskChan { + } + e.taskChan = nil + } e.returnedRows = 0 e.partialCount = 0 - return nil + return errors.Trace(err) } // Next implements the Executor Next interface. @@ -479,13 +490,14 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) { if err != nil { return nil, errors.Trace(err) } + e.doubleReadIdxResult = idxResult idxResult.IgnoreData() idxResult.Fetch(context.CtxForCancel{e.ctx}) // Use a background goroutine to fetch index and put the result in e.taskChan. // e.taskChan serves as a pipeline, so fetching index and getting table data can // run concurrently. - e.taskChan = make(chan *lookupTableTask, 50) + e.taskChan = make(chan *lookupTableTask, LookupTableTaskChannelSize) go e.fetchHandles(idxResult, e.taskChan) } @@ -505,11 +517,6 @@ func (e *XSelectIndexExec) nextForDoubleRead() (*Row, error) { } row, err := e.taskCurr.getRow() if err != nil { - // Consume the task channel in case channel is full. - go func() { - for range e.taskChan { - } - }() return nil, errors.Trace(err) } if row != nil { diff --git a/executor/distsql_test.go b/executor/distsql_test.go new file mode 100644 index 0000000000000..41fbae2496e77 --- /dev/null +++ b/executor/distsql_test.go @@ -0,0 +1,67 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "bytes" + "fmt" + "runtime/pprof" + "strings" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/store/tikv" + "github.com/pingcap/tidb/util/testkit" +) + +// This test checks that when a index double read returns before reading all the rows, the goroutine doesn't +// leak. For testing distsql with multiple regions, we need to manually split a mock TiKV. +func (s *testSuite) TestIndexDoubleReadClose(c *C) { + if _, ok := s.store.GetClient().(*tikv.CopClient); !ok { + // Make sure the store is tikv store. + return + } + originSize := executor.LookupTableTaskChannelSize + executor.LookupTableTaskChannelSize = 1 + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table dist (id int primary key, c_idx int, c_col int, index (c_idx))") + + // Insert 100 rows. + var values []string + for i := 0; i < 100; i++ { + values = append(values, fmt.Sprintf("(%d, %d, %d)", i, i, i)) + } + tk.MustExec("insert dist values " + strings.Join(values, ",")) + + rss, err := tk.Se.Execute("select * from dist where c_idx between 0 and 100") + c.Assert(err, IsNil) + rs := rss[0] + _, err = rs.Next() + c.Assert(err, IsNil) + c.Check(taskGoroutineExists(), IsTrue) + rs.Close() + time.Sleep(time.Millisecond * 50) + c.Check(taskGoroutineExists(), IsFalse) + executor.LookupTableTaskChannelSize = originSize +} + +func taskGoroutineExists() bool { + buf := new(bytes.Buffer) + profile := pprof.Lookup("goroutine") + profile.WriteTo(buf, 1) + str := buf.String() + return strings.Contains(str, "pickAndExecTask") +}