diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 6ca462e745..bcaff07de4 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -144,7 +144,6 @@ type connection struct { log log.Logger - incomingRequestsWG sync.WaitGroup incomingRequestsCh chan *request incomingCmdCh chan *incomingCmd closeCh chan interface{} @@ -347,10 +346,6 @@ func (c *connection) waitUntilReady() error { } func (c *connection) failLeftRequestsWhenClose() { - // wait for outstanding incoming requests to complete before draining - // and closing the channel - c.incomingRequestsWG.Wait() - ch := c.incomingRequestsCh go func() { // send a nil message to drain instead of @@ -598,9 +593,6 @@ func (c *connection) Write(data Buffer) { func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand, err error)) { - c.incomingRequestsWG.Add(1) - defer c.incomingRequestsWG.Done() - state := c.getState() if state == connectionClosed || state == connectionClosing { callback(req, ErrConnectionClosed) @@ -620,9 +612,6 @@ func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, } func (c *connection) SendRequestNoWait(req *pb.BaseCommand) error { - c.incomingRequestsWG.Add(1) - defer c.incomingRequestsWG.Done() - state := c.getState() if state == connectionClosed || state == connectionClosing { return ErrConnectionClosed