From d14bbe527ac9b322c89a367c59e4c1b443859f5b Mon Sep 17 00:00:00 2001 From: pingcap-github-bot Date: Fri, 22 Nov 2019 13:41:56 +0800 Subject: [PATCH] add timeout for gs.GracefulStop (#813) (#824) * add timeout for gs.GracefulStop --- pkg/util/util.go | 19 +++++++++++++++++++ pkg/util/util_test.go | 14 ++++++++++++++ pump/server.go | 6 ++++-- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pkg/util/util.go b/pkg/util/util.go index 8cee45eda..377ec3b5d 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -280,3 +280,22 @@ func AdjustDuration(v *time.Duration, defValue time.Duration) { *v = defValue } } + +// WaitUntilTimeout creates a goroutine to run fn, and then gives up waiting for the goroutine to exit When it timeouts +func WaitUntilTimeout(name string, fn func(), timeout time.Duration) { + fName := zap.String("name", name) + exited := make(chan struct{}) + go func() { + defer func() { + log.Info("goroutine exit by itself (with GoAndAbortGoroutine help)", fName) + close(exited) + }() + fn() + }() + + select { + case <-time.After(timeout): + log.Info("abort goroutine (with GoAndAbortGoroutine help)", fName) + case <-exited: + } +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 7c73757c3..642272b01 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -326,3 +326,17 @@ func (s *retryCtxSuite) TestSuccessAfterRetry(c *C) { c.Assert(err, IsNil) c.Assert(callCount, Equals, 2) } + +type waitUntilTimeoutSuit struct{} + +var _ = Suite(&waitUntilTimeoutSuit{}) + +func (s *waitUntilTimeoutSuit) TestGoAndAbortGoroutine(c *C) { + var called bool + WaitUntilTimeout("test", func() { + c := make(chan struct{}) + called = true + <-c + }, time.Second) + c.Assert(called, IsTrue) +} diff --git a/pump/server.go b/pump/server.go index aa449a689..0323a8dc1 100644 --- a/pump/server.go +++ b/pump/server.go @@ -879,8 +879,10 @@ func (s *Server) Close() { close(s.pullClose) // stop the gRPC server - s.gs.GracefulStop() - log.Info("grpc is stopped") + util.WaitUntilTimeout("grpc_server.GracefulStop", func() { + s.gs.GracefulStop() + log.Info("grpc is stopped") + }, 10*time.Second) if err := s.storage.Close(); err != nil { log.Error("close storage failed", zap.Error(err))