Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Possible panic on WaitGroup.Wait() #268

Merged
merged 1 commit into from
Mar 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ type Conn struct {
Statistics
mu sync.Mutex
Opts Options
wg sync.WaitGroup
wg *sync.WaitGroup
url *url.URL
conn net.Conn
srvPool []*srv
Expand Down Expand Up @@ -872,15 +872,17 @@ func (nc *Conn) makeTLSConn() {

// waitForExits will wait for all socket watcher Go routines to
// be shutdown before proceeding.
func (nc *Conn) waitForExits() {
func (nc *Conn) waitForExits(wg *sync.WaitGroup) {
// Kick old flusher forcefully.
select {
case nc.fch <- true:
default:
}

// Wait for any previous go routines.
nc.wg.Wait()
if wg != nil {
wg.Wait()
}
}

// spinUpGoRoutines will launch the Go routines responsible for
Expand All @@ -890,14 +892,16 @@ func (nc *Conn) waitForExits() {
// reconnect when the previous ones have exited.
func (nc *Conn) spinUpGoRoutines() {
// Make sure everything has exited.
nc.waitForExits()
nc.waitForExits(nc.wg)

// Create a new waitGroup instance for this run.
nc.wg = &sync.WaitGroup{}
// We will wait on both.
nc.wg.Add(2)

// Spin up the readLoop and the socket flusher.
go nc.readLoop()
go nc.flusher()
go nc.readLoop(nc.wg)
go nc.flusher(nc.wg)

nc.mu.Lock()
if nc.Opts.PingInterval > 0 {
Expand Down Expand Up @@ -1232,7 +1236,10 @@ func (nc *Conn) flushReconnectPendingItems() {
func (nc *Conn) doReconnect() {
// We want to make sure we have the other watchers shutdown properly
// here before we proceed past this point.
nc.waitForExits()
nc.mu.Lock()
wg := nc.wg
nc.mu.Unlock()
nc.waitForExits(wg)

// FIXME(dlc) - We have an issue here if we have
// outstanding flush points (pongs) and they were not
Expand Down Expand Up @@ -1423,9 +1430,9 @@ func (nc *Conn) asyncDispatch() {
// readLoop() will sit on the socket reading and processing the
// protocol from the server. It will dispatch appropriately based
// on the op type.
func (nc *Conn) readLoop() {
func (nc *Conn) readLoop(wg *sync.WaitGroup) {
// Release the wait group on exit
defer nc.wg.Done()
defer wg.Done()

// Create a parseState if needed.
nc.mu.Lock()
Expand Down Expand Up @@ -1629,9 +1636,9 @@ func (nc *Conn) processPermissionsViolation(err string) {

// flusher is a separate Go routine that will process flush requests for the write
// bufio. This allows coalescing of writes to the underlying socket.
func (nc *Conn) flusher() {
func (nc *Conn) flusher(wg *sync.WaitGroup) {
// Release the wait group
defer nc.wg.Done()
defer wg.Done()

// snapshot the bw and conn since they can change from underneath of us.
nc.mu.Lock()
Expand Down
1 change: 1 addition & 0 deletions staticcheck.ignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
github.com/nats-io/go-nats/*_test.go:SA2002
github.com/nats-io/go-nats/*/*_test.go:SA2002
github.com/nats-io/go-nats/nats.go:SA6000