Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition/goroutine leak in partition discovery goroutine #474

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ type consumer struct {
// channel used to deliver message to clients
messageCh chan ConsumerMessage

dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
ticker *time.Ticker
dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
stopDiscovery func()

log log.Logger
metrics *internal.TopicMetrics
Expand Down Expand Up @@ -210,26 +210,40 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
}
consumer.ticker = time.NewTicker(duration)
consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)

return consumer, nil
}

// Name returns the name of consumer.
func (c *consumer) Name() string {
return c.consumerName
}

func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
ticker := time.NewTicker(period)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-consumer.closeCh:
case <-stopDiscoveryCh:
return
case <-consumer.ticker.C:
consumer.log.Debug("Auto discovering new partitions")
consumer.internalTopicSubscribeToPartitions()
case <-ticker.C:
c.log.Debug("Auto discovering new partitions")
c.internalTopicSubscribeToPartitions()
}
}
}()

return consumer, nil
}

// Name returns the name of consumer.
func (c *consumer) Name() string {
return c.consumerName
return func() {
ticker.Stop()
close(stopDiscoveryCh)
wg.Wait()
}
}

func (c *consumer) internalTopicSubscribeToPartitions() error {
Expand Down Expand Up @@ -485,6 +499,8 @@ func (c *consumer) NackID(msgID MessageID) {

func (c *consumer) Close() {
c.closeOnce.Do(func() {
c.stopDiscovery()

c.Lock()
defer c.Unlock()

Expand All @@ -498,7 +514,6 @@ func (c *consumer) Close() {
}
wg.Wait()
close(c.closeCh)
c.ticker.Stop()
c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
Expand Down
55 changes: 33 additions & 22 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type producer struct {
producersPtr unsafe.Pointer
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
tickerStop chan struct{}
closeOnce sync.Once
stopDiscovery func()
log log.Logger
metrics *internal.TopicMetrics
}
Expand Down Expand Up @@ -125,24 +125,36 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {
return nil, err
}

ticker := time.NewTicker(partitionsAutoDiscoveryInterval)
p.ticker = ticker
p.tickerStop = make(chan struct{})
p.stopDiscovery = p.runBackgroundPartitionDiscovery(partitionsAutoDiscoveryInterval)

p.metrics.ProducersOpened.Inc()
return p, nil
}

func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) {
var wg sync.WaitGroup
stopDiscoveryCh := make(chan struct{})
ticker := time.NewTicker(period)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopDiscoveryCh:
return
case <-ticker.C:
p.log.Debug("Auto discovering new partitions")
p.internalCreatePartitionsProducers()
case <-p.tickerStop:
return
}
}
}()

p.metrics.ProducersOpened.Inc()
return p, nil
return func() {
ticker.Stop()
close(stopDiscoveryCh)
wg.Wait()
}
}

func (p *producer) internalCreatePartitionsProducers() error {
Expand Down Expand Up @@ -292,18 +304,17 @@ func (p *producer) Flush() error {
}

func (p *producer) Close() {
p.Lock()
defer p.Unlock()
if p.ticker != nil {
p.ticker.Stop()
close(p.tickerStop)
p.ticker = nil
}
p.closeOnce.Do(func() {
p.stopDiscovery()

for _, pp := range p.producers {
pp.Close()
}
p.client.handlers.Del(p)
p.metrics.ProducersPartitions.Sub(float64(len(p.producers)))
p.metrics.ProducersClosed.Inc()
p.Lock()
defer p.Unlock()

for _, pp := range p.producers {
pp.Close()
}
p.client.handlers.Del(p)
p.metrics.ProducersPartitions.Sub(float64(len(p.producers)))
p.metrics.ProducersClosed.Inc()
})
}