diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 4be9ba2928..0e13380114 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -107,14 +107,14 @@ type ConsumerHandler interface { ConnectionClosed() } -type connectionState int +type connectionState int32 const ( - connectionInit connectionState = iota - connectionConnecting - connectionTCPConnected - connectionReady - connectionClosed + connectionInit = 0 + connectionConnecting = 1 + connectionTCPConnected = 2 + connectionReady = 3 + connectionClosed = 4 ) func (s connectionState) String() string { @@ -150,7 +150,7 @@ type incomingCmd struct { type connection struct { sync.Mutex cond *sync.Cond - state connectionState + state int32 connectionTimeout time.Duration logicalAddr *url.URL @@ -190,7 +190,7 @@ type connection struct { func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions, connectionTimeout time.Duration, auth auth.Provider) *connection { cnx := &connection{ - state: connectionInit, + state: int32(connectionInit), connectionTimeout: connectionTimeout, logicalAddr: logicalAddr, physicalAddr: physicalAddr, @@ -397,7 +397,34 @@ func (c *connection) runPingCheck() { } func (c *connection) WriteData(data Buffer) { - c.writeRequestsCh <- data + select { + case c.writeRequestsCh <- data: + // Channel is not full + return + + default: + // Channel full, fallback to probe if connection is closed + } + + for { + select { + case c.writeRequestsCh <- data: + // Successfully wrote on the channel + return + + case <-time.After(100 * time.Millisecond): + // The channel is either: + // 1. blocked, in which case we need to wait until we have space + // 2. the connection is already closed, then we need to bail out + c.log.Debug("Couldn't write on connection channel immediately") + state := connectionState(atomic.LoadInt32(&c.state)) + if state != connectionReady { + c.log.Debug("Connection was already closed") + return + } + } + } + } func (c *connection) internalWriteData(data Buffer) { @@ -729,7 +756,7 @@ func (c *connection) Close() { func (c *connection) changeState(state connectionState) { c.Lock() - c.state = state + atomic.StoreInt32(&c.state, int32(state)) c.cond.Broadcast() c.Unlock() }