Skip to content

Commit

Permalink
Fix producer deadlock after write failure (#378)
Browse files Browse the repository at this point in the history
### Motivation

There is a deadlock that can happen in Go client when the client has a write failure and tries to process that.

The issue is that Go mutexes are not re-entrant and we trigger a connection.Close() while already holding the connection mutex.

```
goroutine 1077 [semacquire, 83 minutes]:
sync.runtime_SemacquireMutex(0xc00c31fb04, 0xc110a12000, 0x1)
	/usr/local/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc00c31fb00)
	/usr/local/go/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
	/usr/local/go/src/sync/mutex.go:81
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close(0xc00c31fb00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:718 +0x547
github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt(0xc0033926e0, 0xc09ba0fe00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/producer_partition.go:475 +0x6f0
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt(0xc00c31fb00, 0xc09ba0fe00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:588 +0xee
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc00c31fb00, 0xc00e40e8c0, 0x0, 0x0)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:507 +0x1ce
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc00c31fb00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:368 +0x2db
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc00c31fb00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:230 +0x71
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:226 +0x3f

```

### Modifications

We don't need to hold the connection lock while the producer is processing the write failure. Releasing the lock earlier is fixing the problem.
  • Loading branch information
merlimat authored Oct 9, 2020
1 parent c03f45f commit 02b244e
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,9 +641,10 @@ func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
producerID := response.GetProducerId()

c.Lock()
defer c.Unlock()
producer, ok := c.listeners[producerID]
c.Unlock()

if producer, ok := c.listeners[producerID]; ok {
if ok {
producer.ReceivedSendReceipt(response)
} else {
c.log.WithField("producerID", producerID).Warn("Got unexpected send receipt for message: ", response.MessageId)
Expand Down

0 comments on commit 02b244e

Please sign in to comment.