From f07514745ab6317f376310a80148d99712529a77 Mon Sep 17 00:00:00 2001 From: PowerStateFailure <29687050+PowerStateFailure@users.noreply.github.com> Date: Mon, 22 Feb 2021 22:05:17 +0500 Subject: [PATCH 1/3] pulsar/consumer_impl: rewrite partition discovery goroutine so that it actually gets closed before close active consumers --- pulsar/consumer_impl.go | 51 ++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index d05c495b45..b7bc607040 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -49,12 +49,12 @@ type consumer struct { // channel used to deliver message to clients messageCh chan ConsumerMessage - dlq *dlqRouter - rlq *retryRouter - closeOnce sync.Once - closeCh chan struct{} - errorCh chan error - ticker *time.Ticker + dlq *dlqRouter + rlq *retryRouter + closeOnce sync.Once + closeCh chan struct{} + errorCh chan error + stopDiscovery func() log log.Logger metrics *internal.TopicMetrics @@ -210,26 +210,40 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string, if duration <= 0 { duration = defaultAutoDiscoveryDuration } - consumer.ticker = time.NewTicker(duration) + consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration) + return consumer, nil +} + +// Name returns the name of consumer. +func (c *consumer) Name() string { + return c.consumerName +} + +func (c *consumer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) { + var wg sync.WaitGroup + stopDiscoveryCh := make(chan struct{}) + ticker := time.NewTicker(period) + + wg.Add(1) go func() { + defer wg.Done() for { select { - case <-consumer.closeCh: + case <-stopDiscoveryCh: return - case <-consumer.ticker.C: - consumer.log.Debug("Auto discovering new partitions") - consumer.internalTopicSubscribeToPartitions() + case <-ticker.C: + c.log.Debug("Auto discovering new partitions") + c.internalTopicSubscribeToPartitions() } } }() - return consumer, nil -} - -// Name returns the name of consumer. -func (c *consumer) Name() string { - return c.consumerName + return func() { + ticker.Stop() + close(stopDiscoveryCh) + wg.Wait() + } } func (c *consumer) internalTopicSubscribeToPartitions() error { @@ -485,6 +499,8 @@ func (c *consumer) NackID(msgID MessageID) { func (c *consumer) Close() { c.closeOnce.Do(func() { + c.stopDiscovery() + c.Lock() defer c.Unlock() @@ -498,7 +514,6 @@ func (c *consumer) Close() { } wg.Wait() close(c.closeCh) - c.ticker.Stop() c.client.handlers.Del(c) c.dlq.close() c.rlq.close() From b7ead67145dee115bb1ae5c5a5c5440df0ef4278 Mon Sep 17 00:00:00 2001 From: PowerStateFailure <29687050+PowerStateFailure@users.noreply.github.com> Date: Sun, 28 Feb 2021 19:38:02 +0500 Subject: [PATCH 2/3] pulsar/producer_impl: rewrite partition discovery goroutine so that it actually gets closed before close active producers --- pulsar/producer_impl.go | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index bf4d92ebfd..fd281903d8 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -51,8 +51,7 @@ type producer struct { producersPtr unsafe.Pointer numPartitions uint32 messageRouter func(*ProducerMessage, TopicMetadata) int - ticker *time.Ticker - tickerStop chan struct{} + stopDiscovery func() log log.Logger metrics *internal.TopicMetrics } @@ -125,24 +124,36 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { return nil, err } - ticker := time.NewTicker(partitionsAutoDiscoveryInterval) - p.ticker = ticker - p.tickerStop = make(chan struct{}) + p.stopDiscovery = p.runBackgroundPartitionDiscovery(partitionsAutoDiscoveryInterval) + p.metrics.ProducersOpened.Inc() + return p, nil +} + +func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) { + var wg sync.WaitGroup + stopDiscoveryCh := make(chan struct{}) + ticker := time.NewTicker(period) + + wg.Add(1) go func() { + defer wg.Done() for { select { + case <-stopDiscoveryCh: + return case <-ticker.C: p.log.Debug("Auto discovering new partitions") p.internalCreatePartitionsProducers() - case <-p.tickerStop: - return } } }() - p.metrics.ProducersOpened.Inc() - return p, nil + return func() { + ticker.Stop() + close(stopDiscoveryCh) + wg.Wait() + } } func (p *producer) internalCreatePartitionsProducers() error { @@ -292,13 +303,13 @@ func (p *producer) Flush() error { } func (p *producer) Close() { + if p.stopDiscovery != nil { + p.stopDiscovery() + p.stopDiscovery = nil + } + p.Lock() defer p.Unlock() - if p.ticker != nil { - p.ticker.Stop() - close(p.tickerStop) - p.ticker = nil - } for _, pp := range p.producers { pp.Close() From 2039c91649d9b80b0420cf2ec8d4369884e1231b Mon Sep 17 00:00:00 2001 From: PowerStateFailure <29687050+PowerStateFailure@users.noreply.github.com> Date: Sun, 28 Feb 2021 19:43:58 +0500 Subject: [PATCH 3/3] pulsar/producer_impl: avoid consequtive Close calls side-effects by wrapping method body with sync.Once (similar to consumer.Close) --- pulsar/producer_impl.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index fd281903d8..e8d43e04b1 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -51,6 +51,7 @@ type producer struct { producersPtr unsafe.Pointer numPartitions uint32 messageRouter func(*ProducerMessage, TopicMetadata) int + closeOnce sync.Once stopDiscovery func() log log.Logger metrics *internal.TopicMetrics @@ -303,18 +304,17 @@ func (p *producer) Flush() error { } func (p *producer) Close() { - if p.stopDiscovery != nil { + p.closeOnce.Do(func() { p.stopDiscovery() - p.stopDiscovery = nil - } - p.Lock() - defer p.Unlock() + p.Lock() + defer p.Unlock() - for _, pp := range p.producers { - pp.Close() - } - p.client.handlers.Del(p) - p.metrics.ProducersPartitions.Sub(float64(len(p.producers))) - p.metrics.ProducersClosed.Inc() + for _, pp := range p.producers { + pp.Close() + } + p.client.handlers.Del(p) + p.metrics.ProducersPartitions.Sub(float64(len(p.producers))) + p.metrics.ProducersClosed.Inc() + }) }