diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index b4903516e..f6835a2d1 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -639,7 +639,11 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert msgID: msgID, }, } - if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries { + if c.dlq.policy == nil { + c.log.Warn("Receive retry message but the DLQPolicy is nil, please check") + return + } + if c.dlq.policy != nil && uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries { c.dlq.Chan() <- consumerMsg } else { c.rlq.Chan() <- RetryMessage{ diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 83521dd4e..ef085b295 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4490,6 +4490,50 @@ func TestMultiConsumerMemoryLimit(t *testing.T) { }) } +// Test for issue #664 +func TestIssue664(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/my-topic" + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + if _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", 10)), + }); err != nil { + log.Fatal(err) + } + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + expectMsg := fmt.Sprintf("hello-%d", 10) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + consumer.ReconsumeLater(msg, time.Second) + +} + func TestConsumerAckCumulativeOnSharedSubShouldFailed(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL,