Skip to content

Commit

Permalink
Fix panic when scale down partitions (#601)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolongran <xiaolongran@tencent.com>



### Motivation

When the program is running, if the business is forced to delete certain sub partitions, the following error message will be caused, that is, old_partitions is greater than new_partitions, it looks like it is doing scale down partitions, and the current code logic only deals with the scenario of scale up partitions , So if the user is forced to delete some sub partitions, the following error will be encountered:

```
level=info msg="[Changed number of partitions in topic]" new_partitions=1 old_partitions=20 topic="persistent://pulsar-xxxxxxx/xxxx/gxxxxxxxx"
```

```
panic: runtime error: index out of range [1] with length 1

goroutine 166288 [running]:
github.com/apache/pulsar-client-go/pulsar.(*producer).internalCreatePartitionsProducers(0xc0070aa6e0, 0x0, 0x0)
        github.com/apache/pulsar-client-go/pulsar/producer_impl.go:194 +0x785
github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery.func1(0xc004167cd0, 0xc00559f5c0, 0xc006af6dc0, 0xc0070aa6e0)
       github.com/apache/pulsar-client-go/pulsar/producer_impl.go:152 +0xce
created by github.com/apache/pulsar-client-go/pulsar.(*producer).runBackgroundPartitionDiscovery
       github.com/apache/pulsar-client-go/pulsar/producer_impl.go:144 +0xcd
```

### Modifications

Increase the processing logic of scale down partition
  • Loading branch information
wolfstudy authored Aug 26, 2021
1 parent 1a3ad70 commit b684151
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 24 deletions.
30 changes: 18 additions & 12 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/pkg/errors"
)

const defaultNackRedeliveryDelay = 1 * time.Minute
Expand Down Expand Up @@ -258,30 +257,26 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {

c.Lock()
defer c.Unlock()

oldConsumers := c.consumers
oldNumPartitions = len(oldConsumers)

if oldConsumers != nil {
oldNumPartitions = len(oldConsumers)
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}

if oldNumPartitions > newNumPartitions {
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Error("Does not support scaling down operations on topic partitions")
return errors.New("Does not support scaling down operations on topic partitions")
}

c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}

c.consumers = make([]*partitionConsumer, newNumPartitions)

if oldConsumers != nil {
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
Expand All @@ -297,12 +292,19 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties

startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions

if partitionsToAdd < 0 {
partitionsToAdd = newNumPartitions
startPartition = 0
}

var wg sync.WaitGroup
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)

for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]

go func(idx int, pt string) {
Expand Down Expand Up @@ -366,7 +368,11 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
return err
}

c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
if newNumPartitions < oldNumPartitions {
c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
} else {
c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
}
return nil
}

Expand Down
27 changes: 15 additions & 12 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -175,21 +174,14 @@ func (p *producer) internalCreatePartitionsProducers() error {
defer p.Unlock()

oldProducers := p.producers
oldNumPartitions = len(oldProducers)

if oldProducers != nil {
oldNumPartitions = len(oldProducers)
if oldNumPartitions == newNumPartitions {
p.log.Debug("Number of partitions in topic has not changed")
return nil
}

if oldNumPartitions > newNumPartitions {
p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Error("Does not support scaling down operations on topic partitions")
return errors.New("Does not support scaling down operations on topic partitions")
}

p.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
Expand All @@ -198,7 +190,9 @@ func (p *producer) internalCreatePartitionsProducers() error {

p.producers = make([]Producer, newNumPartitions)

if oldProducers != nil {
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new producers, otherwise the array will be out of bounds.
if oldProducers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
p.producers[i] = oldProducers[i]
Expand All @@ -211,10 +205,15 @@ func (p *producer) internalCreatePartitionsProducers() error {
err error
}

startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
if partitionsToAdd < 0 {
partitionsToAdd = newNumPartitions
startPartition = 0
}
c := make(chan ProducerError, partitionsToAdd)

for partitionIdx := oldNumPartitions; partitionIdx < newNumPartitions; partitionIdx++ {
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partition := partitions[partitionIdx]

go func(partitionIdx int, partition string) {
Expand Down Expand Up @@ -248,7 +247,11 @@ func (p *producer) internalCreatePartitionsProducers() error {
return err
}

p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
if newNumPartitions < oldNumPartitions {
p.metrics.ProducersPartitions.Set(float64(newNumPartitions))
} else {
p.metrics.ProducersPartitions.Add(float64(partitionsToAdd))
}
atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
return nil
Expand Down

0 comments on commit b684151

Please sign in to comment.