Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid producer deadlock on connection closing #337

Merged
merged 4 commits into from
Jul 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ type ConsumerHandler interface {
ConnectionClosed()
}

type connectionState int
type connectionState int32

const (
connectionInit connectionState = iota
connectionConnecting
connectionTCPConnected
connectionReady
connectionClosed
connectionInit = 0
connectionConnecting = 1
connectionTCPConnected = 2
connectionReady = 3
connectionClosed = 4
)

func (s connectionState) String() string {
Expand Down Expand Up @@ -150,7 +150,7 @@ type incomingCmd struct {
type connection struct {
sync.Mutex
cond *sync.Cond
state connectionState
state int32
connectionTimeout time.Duration

logicalAddr *url.URL
Expand Down Expand Up @@ -190,7 +190,7 @@ type connection struct {
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
connectionTimeout time.Duration, auth auth.Provider) *connection {
cnx := &connection{
state: connectionInit,
state: int32(connectionInit),
connectionTimeout: connectionTimeout,
logicalAddr: logicalAddr,
physicalAddr: physicalAddr,
Expand Down Expand Up @@ -397,7 +397,34 @@ func (c *connection) runPingCheck() {
}

func (c *connection) WriteData(data Buffer) {
c.writeRequestsCh <- data
select {
case c.writeRequestsCh <- data:
// Channel is not full
return

default:
// Channel full, fallback to probe if connection is closed
}

for {
select {
case c.writeRequestsCh <- data:
// Successfully wrote on the channel
return

case <-time.After(100 * time.Millisecond):
// The channel is either:
// 1. blocked, in which case we need to wait until we have space
// 2. the connection is already closed, then we need to bail out
c.log.Debug("Couldn't write on connection channel immediately")
state := connectionState(atomic.LoadInt32(&c.state))
if state != connectionReady {
c.log.Debug("Connection was already closed")
return
}
}
}

}

func (c *connection) internalWriteData(data Buffer) {
Expand Down Expand Up @@ -729,7 +756,7 @@ func (c *connection) Close() {

func (c *connection) changeState(state connectionState) {
c.Lock()
c.state = state
atomic.StoreInt32(&c.state, int32(state))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In here, Is it necessary for us to c.Lock() and c.Unlock() actions on an atomic operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's still needed to ensure that the mutex condition is broadcasted. To trigger a condition you need to have a lock on the associated mutex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if there are other problems with this approach, because for an atomic primitive, we rarely see the operation of Lock and Unlock it.

atomic.StoreInt32(&c.state, int32(state))
c.Lock()
c.cond.Broadcast()
c.Unlock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 different usages here:

  • On one hand we want other go routines to be able to check the state, without taking a lock
  • On the other hand, we still need to maintain the atomic state update and notification

Copy link
Member

@wolfstudy wolfstudy Jul 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i agree with you, The main difference between us here is whether we need to lock to protect the atomic primitive, I think it is not needed, atomic itself is a synchronization primitive, so here, we can reduce the scope of the lock and only lock sync.cond. The code example is as follows:

atomic.StoreInt32(&c.state, int32(state))
c.Lock()
c.cond.Broadcast()
c.Unlock()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore me

c.cond.Broadcast()
c.Unlock()
}
Expand Down