Skip to content

Commit

Permalink
Fix producer goroutine leak (#331)
Browse files Browse the repository at this point in the history
Per Producer there is a goroutine leaked. The goroutine is used for the
partition auto-discovery and will never exit.

This changes the goroutine to be properly exiting after a Close() on the
producer.
  • Loading branch information
simonswine authored Jul 24, 2020
1 parent 1f212f5 commit 65cb19a
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type producer struct {
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
tickerStop chan struct{}

log *log.Entry
}
Expand Down Expand Up @@ -118,12 +119,19 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
return nil, err
}

p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval)
ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
p.ticker = ticker
p.tickerStop = make(chan struct{})

go func() {
for range p.ticker.C {
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
for {
select {
case <-ticker.C:
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
case <-p.tickerStop:
return
}
}
}()

Expand Down Expand Up @@ -282,6 +290,8 @@ func (p *producer) Close() {
defer p.RUnlock()
if p.ticker != nil {
p.ticker.Stop()
close(p.tickerStop)
p.ticker = nil
}

for _, pp := range p.producers {
Expand Down

0 comments on commit 65cb19a

Please sign in to comment.