Skip to content

Commit

Permalink
add timeout for gs.GracefulStop (#813) (#824)
Browse files Browse the repository at this point in the history
* add timeout for gs.GracefulStop
  • Loading branch information
sre-bot authored and IANTHEREAL committed Nov 22, 2019
1 parent cea04f8 commit d14bbe5
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 2 deletions.
19 changes: 19 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,22 @@ func AdjustDuration(v *time.Duration, defValue time.Duration) {
*v = defValue
}
}

// WaitUntilTimeout creates a goroutine to run fn, and then gives up waiting for the goroutine to exit When it timeouts
func WaitUntilTimeout(name string, fn func(), timeout time.Duration) {
fName := zap.String("name", name)
exited := make(chan struct{})
go func() {
defer func() {
log.Info("goroutine exit by itself (with GoAndAbortGoroutine help)", fName)
close(exited)
}()
fn()
}()

select {
case <-time.After(timeout):
log.Info("abort goroutine (with GoAndAbortGoroutine help)", fName)
case <-exited:
}
}
14 changes: 14 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,17 @@ func (s *retryCtxSuite) TestSuccessAfterRetry(c *C) {
c.Assert(err, IsNil)
c.Assert(callCount, Equals, 2)
}

type waitUntilTimeoutSuit struct{}

var _ = Suite(&waitUntilTimeoutSuit{})

func (s *waitUntilTimeoutSuit) TestGoAndAbortGoroutine(c *C) {
var called bool
WaitUntilTimeout("test", func() {
c := make(chan struct{})
called = true
<-c
}, time.Second)
c.Assert(called, IsTrue)
}
6 changes: 4 additions & 2 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,8 +879,10 @@ func (s *Server) Close() {

close(s.pullClose)
// stop the gRPC server
s.gs.GracefulStop()
log.Info("grpc is stopped")
util.WaitUntilTimeout("grpc_server.GracefulStop", func() {
s.gs.GracefulStop()
log.Info("grpc is stopped")
}, 10*time.Second)

if err := s.storage.Close(); err != nil {
log.Error("close storage failed", zap.Error(err))
Expand Down

0 comments on commit d14bbe5

Please sign in to comment.