From 39146d5960bd9efd7fd240828e56fbb39704fe15 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Tue, 19 Nov 2019 06:32:23 -0800 Subject: [PATCH 1/3] add timeout for gs.GracefulStop --- pkg/util/util.go | 19 +++++++++++++++++++ pkg/util/util_test.go | 18 ++++++++++++++++++ pump/server.go | 6 ++++-- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index 8cee45eda..3c5b736cd 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -280,3 +280,22 @@ func AdjustDuration(v *time.Duration, defValue time.Duration) { *v = defValue } } + +// GoAndAbortGoroutine creates a goroutine to run fn, and then gives up waiting for the goroutine to exit When it timeouts +func GoAndAbortGoroutine(name string, fn func(), timeout time.Duration) { + fName := zap.String("name", name) + exited := make(chan struct{}) + go func() { + defer func() { + log.Info("goroutine exit bt itself (with GoAndAbortGoroutine help)", fName) + close(exited) + }() + fn() + }() + + select { + case <-time.After((timeout)): + log.Info("abort goroutine (with GoAndAbortGoroutine help)", fName) + case <-exited: + } +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 7c73757c3..d1ea83573 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -326,3 +326,21 @@ func (s *retryCtxSuite) TestSuccessAfterRetry(c *C) { c.Assert(err, IsNil) c.Assert(callCount, Equals, 2) } + +type goAndAbortGoroutineSuit struct{} + +var _ = Suite(&goAndAbortGoroutineSuit{}) + +func (s *goAndAbortGoroutineSuit) TestGoAndAbortGoroutine(c *C) { + var logHook LogHook + logHook.SetUp() + defer logHook.TearDown() + + var called bool + GoAndAbortGoroutine("test", func() { + c := make(chan struct{}) + called = true + <-c + }, time.Second) + c.Assert(called, IsTrue) +} diff --git a/pump/server.go b/pump/server.go index aa449a689..f2aab0b84 100644 --- a/pump/server.go +++ b/pump/server.go @@ -879,8 +879,10 @@ func (s *Server) Close() { close(s.pullClose) // stop the gRPC server - s.gs.GracefulStop() - log.Info("grpc is stopped") + util.GoAndAbortGoroutine("grpc_server.GracefulStop", func() { + s.gs.GracefulStop() + log.Info("grpc is stopped") + }, 10*time.Second) if err := s.storage.Close(); err != nil { log.Error("close storage failed", zap.Error(err)) From 6f8351d6207b16aadf7ea9455f0e3d36029beb54 Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Tue, 19 Nov 2019 13:39:50 -0800 Subject: [PATCH 2/3] remove useless code --- pkg/util/util_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index d1ea83573..91729ed37 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -332,10 +332,6 @@ type goAndAbortGoroutineSuit struct{} var _ = Suite(&goAndAbortGoroutineSuit{}) func (s *goAndAbortGoroutineSuit) TestGoAndAbortGoroutine(c *C) { - var logHook LogHook - logHook.SetUp() - defer logHook.TearDown() - var called bool GoAndAbortGoroutine("test", func() { c := make(chan struct{}) From 974bb1fe31a7fd8777391aed0df69208f0edfc8b Mon Sep 17 00:00:00 2001 From: GregoryIan Date: Tue, 19 Nov 2019 19:24:36 -0800 Subject: [PATCH 3/3] address comment --- pkg/util/util.go | 8 ++++---- pkg/util/util_test.go | 8 ++++---- pump/server.go | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index 3c5b736cd..377ec3b5d 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -281,20 +281,20 @@ func AdjustDuration(v *time.Duration, defValue time.Duration) { } } -// GoAndAbortGoroutine creates a goroutine to run fn, and then gives up waiting for the goroutine to exit When it timeouts -func GoAndAbortGoroutine(name string, fn func(), timeout time.Duration) { +// WaitUntilTimeout creates a goroutine to run fn, and then gives up waiting for the goroutine to exit When it timeouts +func WaitUntilTimeout(name string, fn func(), timeout time.Duration) { fName := zap.String("name", name) exited := make(chan struct{}) go func() { defer func() { - log.Info("goroutine exit bt itself (with GoAndAbortGoroutine help)", fName) + log.Info("goroutine exit by itself (with GoAndAbortGoroutine help)", fName) close(exited) }() fn() }() select { - case <-time.After((timeout)): + case <-time.After(timeout): log.Info("abort goroutine (with GoAndAbortGoroutine help)", fName) case <-exited: } diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 91729ed37..642272b01 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -327,13 +327,13 @@ func (s *retryCtxSuite) TestSuccessAfterRetry(c *C) { c.Assert(callCount, Equals, 2) } -type goAndAbortGoroutineSuit struct{} +type waitUntilTimeoutSuit struct{} -var _ = Suite(&goAndAbortGoroutineSuit{}) +var _ = Suite(&waitUntilTimeoutSuit{}) -func (s *goAndAbortGoroutineSuit) TestGoAndAbortGoroutine(c *C) { +func (s *waitUntilTimeoutSuit) TestGoAndAbortGoroutine(c *C) { var called bool - GoAndAbortGoroutine("test", func() { + WaitUntilTimeout("test", func() { c := make(chan struct{}) called = true <-c diff --git a/pump/server.go b/pump/server.go index f2aab0b84..0323a8dc1 100644 --- a/pump/server.go +++ b/pump/server.go @@ -879,7 +879,7 @@ func (s *Server) Close() { close(s.pullClose) // stop the gRPC server - util.GoAndAbortGoroutine("grpc_server.GracefulStop", func() { + util.WaitUntilTimeout("grpc_server.GracefulStop", func() { s.gs.GracefulStop() log.Info("grpc is stopped") }, 10*time.Second)