From 9c2a61bc1c609b1f74f6f5d98dc11c048f023aa3 Mon Sep 17 00:00:00 2001 From: ming luo Date: Tue, 11 Oct 2022 22:49:10 -0400 Subject: [PATCH 1/3] preemtively increament waitgroup for wait on left requests to drain --- pulsar/internal/connection.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 6ca462e745..30a379e6e7 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -371,6 +371,10 @@ func (c *connection) failLeftRequestsWhenClose() { func (c *connection) run() { pingSendTicker := time.NewTicker(c.keepAliveInterval) pingCheckTicker := time.NewTicker(c.keepAliveInterval) + // incomingRequestsWG.Add(1) and Wait() runs concurrently, if Wait() happens before Add(), it'll lead to panic + // it's described at https://github.com/golang/go/blob/master/src/sync/waitgroup.go#L30 + // so we preemptively Add(1) and subtract when the closeCh is closed. + c.incomingRequestsWG.Add(1) defer func() { // stop tickers @@ -395,6 +399,11 @@ func (c *connection) run() { for { select { case <-c.closeCh: + // clear the preemptive WG.Add() + go func() { + c.log.Debugf("ready to drain left requests before close") + c.incomingRequestsWG.Done() + }() c.failLeftRequestsWhenClose() return From fdaf53c7b7345aae4015c202ed4bd95720080ec2 Mon Sep 17 00:00:00 2001 From: ming luo Date: Sat, 15 Oct 2022 22:23:47 -0400 Subject: [PATCH 2/3] remove waitgroup used to drain left incoming requests --- pulsar/internal/connection.go | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 30a379e6e7..33f71a2e29 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -144,7 +144,6 @@ type connection struct { log log.Logger - incomingRequestsWG sync.WaitGroup incomingRequestsCh chan *request incomingCmdCh chan *incomingCmd closeCh chan interface{} @@ -347,9 +346,13 @@ func (c *connection) waitUntilReady() error { } func (c *connection) failLeftRequestsWhenClose() { + // incomingRequestsWG.Add(1) in SendRequst/SendRequestNoWait and Wait() runs concurrently, + // if Wait() happens before Add(), it'll lead to panic + // it's described at https://github.com/golang/go/blob/master/src/sync/waitgroup.go#L30 + // so we preemptively Add(1) and subtract when the closeCh is closed. + // wait for outstanding incoming requests to complete before draining // and closing the channel - c.incomingRequestsWG.Wait() ch := c.incomingRequestsCh go func() { @@ -371,10 +374,6 @@ func (c *connection) failLeftRequestsWhenClose() { func (c *connection) run() { pingSendTicker := time.NewTicker(c.keepAliveInterval) pingCheckTicker := time.NewTicker(c.keepAliveInterval) - // incomingRequestsWG.Add(1) and Wait() runs concurrently, if Wait() happens before Add(), it'll lead to panic - // it's described at https://github.com/golang/go/blob/master/src/sync/waitgroup.go#L30 - // so we preemptively Add(1) and subtract when the closeCh is closed. - c.incomingRequestsWG.Add(1) defer func() { // stop tickers @@ -399,11 +398,6 @@ func (c *connection) run() { for { select { case <-c.closeCh: - // clear the preemptive WG.Add() - go func() { - c.log.Debugf("ready to drain left requests before close") - c.incomingRequestsWG.Done() - }() c.failLeftRequestsWhenClose() return @@ -607,9 +601,6 @@ func (c *connection) Write(data Buffer) { func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand, err error)) { - c.incomingRequestsWG.Add(1) - defer c.incomingRequestsWG.Done() - state := c.getState() if state == connectionClosed || state == connectionClosing { callback(req, ErrConnectionClosed) @@ -629,9 +620,6 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, } func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error { - c.incomingRequestsWG.Add(1) - defer c.incomingRequestsWG.Done() - state := c.getState() if state == connectionClosed || state == connectionClosing { return ErrConnectionClosed From 1ae5cadebcd7cdbe575582d76cf69a20aabb68f2 Mon Sep 17 00:00:00 2001 From: ming luo Date: Sat, 15 Oct 2022 22:59:34 -0400 Subject: [PATCH 3/3] remove unrelated comments --- pulsar/internal/connection.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 33f71a2e29..bcaff07de4 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -346,14 +346,6 @@ func (c *connection) waitUntilReady() error { } func (c *connection) failLeftRequestsWhenClose() { - // incomingRequestsWG.Add(1) in SendRequst/SendRequestNoWait and Wait() runs concurrently, - // if Wait() happens before Add(), it'll lead to panic - // it's described at https://github.com/golang/go/blob/master/src/sync/waitgroup.go#L30 - // so we preemptively Add(1) and subtract when the closeCh is closed. - - // wait for outstanding incoming requests to complete before draining - // and closing the channel - ch := c.incomingRequestsCh go func() { // send a nil message to drain instead of