From 81f03181e9e4c1d87999ef674215a6d82a6b303b Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Mon, 15 Jan 2018 05:10:16 -0600 Subject: [PATCH 1/2] store/tikv: call Next() after copIterator closed lead to goroutine leak (#5624) After Close(), worker groutine receive from copIterator.finished and exit directly, without writing any thing to taskCh. Next() receives from taskCh and may hang forever, cause the caller goroutine leak. --- executor/executor_test.go | 33 +++++++++++++++++++++++++++++++++ store/tikv/coprocessor.go | 16 +++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 98035ce42b86c..bce2785fd46ff 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -17,6 +17,7 @@ import ( "flag" "fmt" "os" + "strings" "sync" "testing" "time" @@ -2306,3 +2307,35 @@ func (s *testSuite) TestTableScanWithPointRanges(c *C) { tk.MustExec("insert into t values(1), (5), (10)") tk.MustQuery("select * from t where id in(1, 2, 10)").Check(testkit.Rows("1", "10")) } + +func (s *testSuite) TestEarlyClose(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table earlyclose (id int primary key)") + + // Insert 1000 rows. + var values []string + for i := 0; i < 1000; i++ { + values = append(values, fmt.Sprintf("(%d)", i)) + } + tk.MustExec("insert earlyclose values " + strings.Join(values, ",")) + + // Get table ID for split. + dom := sessionctx.GetDomain(tk.Se) + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("earlyclose")) + c.Assert(err, IsNil) + tblID := tbl.Meta().ID + + // Split the table. + s.cluster.SplitTable(s.mvccStore, tblID, 500) + + for i := 0; i < 500; i++ { + rss, err := tk.Se.Execute("select * from earlyclose order by id") + c.Assert(err, IsNil) + rs := rss[0] + _, err = rs.Next() + c.Assert(err, IsNil) + rs.Close() + } +} diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 91c3171c30ffa..236f07d5fe9da 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -434,6 +434,15 @@ func (it *copIterator) run(ctx goctx.Context) { }) } +func recvFromRespCh(respCh <-chan copResponse, finished <-chan struct{}) (resp copResponse, ok bool, exit bool) { + select { + case resp, ok = <-respCh: + case <-finished: + exit = true + } + return +} + func (it *copIterator) sendToTaskCh(ctx goctx.Context, t *copTask, taskCh chan<- *copTask) (exit bool) { select { case taskCh <- t: @@ -475,13 +484,18 @@ func (it *copIterator) Next() ([]byte, error) { return nil, nil } } else { + var closed bool for { if it.curr >= len(it.tasks) { // Resp will be nil if iterator is finished. return nil, nil } task := it.tasks[it.curr] - resp, ok = <-task.respChan + resp, ok, closed = recvFromRespCh(task.respChan, it.finished) + if closed { + // Close() is already called, so Next() is invalid. + return nil, nil + } if ok { break } From c130d226a4e14745987eebac8781406c5797ef0e Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 16 Jan 2018 15:02:32 +0800 Subject: [PATCH 2/2] fix ci --- executor/executor_test.go | 7 ++++--- util/arena/arena.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index bce2785fd46ff..d5bfad775359a 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2321,7 +2321,7 @@ func (s *testSuite) TestEarlyClose(c *C) { tk.MustExec("insert earlyclose values " + strings.Join(values, ",")) // Get table ID for split. - dom := sessionctx.GetDomain(tk.Se) + dom := domain.GetDomain(tk.Se) is := dom.InfoSchema() tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("earlyclose")) c.Assert(err, IsNil) @@ -2330,11 +2330,12 @@ func (s *testSuite) TestEarlyClose(c *C) { // Split the table. s.cluster.SplitTable(s.mvccStore, tblID, 500) + goCtx := goctx.Background() for i := 0; i < 500; i++ { - rss, err := tk.Se.Execute("select * from earlyclose order by id") + rss, err := tk.Se.Execute(goCtx, "select * from earlyclose order by id") c.Assert(err, IsNil) rs := rss[0] - _, err = rs.Next() + _, err = rs.Next(goCtx) c.Assert(err, IsNil) rs.Close() } diff --git a/util/arena/arena.go b/util/arena/arena.go index 94729b19a429a..a8f8e1ab63950 100644 --- a/util/arena/arena.go +++ b/util/arena/arena.go @@ -60,7 +60,7 @@ func NewAllocator(capacity int) *SimpleAllocator { // Alloc implements Allocator.AllocBytes interface. func (s *SimpleAllocator) Alloc(capacity int) []byte { if s.off+capacity < cap(s.arena) { - slice := s.arena[s.off:s.off : s.off+capacity] + slice := s.arena[s.off : s.off : s.off+capacity] s.off += capacity return slice }