From c17c2f958c4fa57e35a7776d0a9ced5194a14f67 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Sat, 26 Mar 2022 01:20:47 +0800 Subject: [PATCH 1/4] fix #664 --- pulsar/consumer_impl.go | 5 +++- pulsar/consumer_test.go | 56 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 2bd3ed50e4..c60d61496f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -496,7 +496,10 @@ func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) { msgID: msgID, }, } - if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries { + if c.dlq.policy == nil { + c.log.Warn("Receive retry message but the DLQPolicy is nil, please check") + } + if c.dlq.policy != nil && uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries { c.dlq.Chan() <- consumerMsg } else { c.rlq.Chan() <- RetryMessage{ diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index cadd8e4b5e..5cc9fbff0c 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -3017,3 +3017,59 @@ func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) { assert.NotNil(t, msg) consumer.Ack(msg) } + +// Test for issue #664 +func TestIssue664(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/my-topic" + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + if _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", 10)), + }); err != nil { + log.Fatal(err) + } + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + //DLQ: &DLQPolicy{MaxDeliveries: 10, DeadLetterTopic: "dlq-topic"}, + //RetryEnable: true, + }) + assert.Nil(t, err) + defer consumer.Close() + + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + expectMsg := fmt.Sprintf("hello-%d", 10) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + consumer.ReconsumeLater(msg, time.Second) + //consumer.Ack(msg) + + //for { + // select { + // case cm := <-consumer.Chan(): + // fmt.Println(string(cm.Payload())) + // //cm.ReconsumeLater(cm.Message, time.Second) + // cm.Ack(cm.Message) + // } + //} + +} From fb078dba55a042240adb86c01df6622a45794c39 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 13 Apr 2022 13:30:46 +0800 Subject: [PATCH 2/4] remove comments --- pulsar/consumer_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 5cc9fbff0c..9887123aa2 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -3048,8 +3048,6 @@ func TestIssue664(t *testing.T) { Topic: topic, SubscriptionName: "my-sub", Type: Exclusive, - //DLQ: &DLQPolicy{MaxDeliveries: 10, DeadLetterTopic: "dlq-topic"}, - //RetryEnable: true, }) assert.Nil(t, err) defer consumer.Close() @@ -3061,15 +3059,5 @@ func TestIssue664(t *testing.T) { expectMsg := fmt.Sprintf("hello-%d", 10) assert.Equal(t, []byte(expectMsg), msg.Payload()) consumer.ReconsumeLater(msg, time.Second) - //consumer.Ack(msg) - - //for { - // select { - // case cm := <-consumer.Chan(): - // fmt.Println(string(cm.Payload())) - // //cm.ReconsumeLater(cm.Message, time.Second) - // cm.Ack(cm.Message) - // } - //} } From a5e75f0387095123944b17730d533959fcb6de08 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 6 Mar 2024 09:30:42 +0800 Subject: [PATCH 3/4] merge from master --- pulsar/consumer_test.go | 82 ++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index f046c9db29..32325f9ea4 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4395,44 +4395,44 @@ func TestMultiConsumerMemoryLimit(t *testing.T) { // Test for issue #664 func TestIssue664(t *testing.T) { - client, err := NewClient(ClientOptions{ - URL: lookupURL, - }) - - assert.Nil(t, err) - defer client.Close() - - topic := "persistent://public/default/my-topic" - ctx := context.Background() - - // create producer - producer, err := client.CreateProducer(ProducerOptions{ - Topic: topic, - DisableBatching: false, - }) - assert.Nil(t, err) - defer producer.Close() - if _, err = producer.Send(ctx, &ProducerMessage{ - Payload: []byte(fmt.Sprintf("hello-%d", 10)), - }); err != nil { - log.Fatal(err) - } - - // create consumer - consumer, err := client.Subscribe(ConsumerOptions{ - Topic: topic, - SubscriptionName: "my-sub", - Type: Exclusive, - }) - assert.Nil(t, err) - defer consumer.Close() - - msg, err := consumer.Receive(context.Background()) - if err != nil { - log.Fatal(err) - } - expectMsg := fmt.Sprintf("hello-%d", 10) - assert.Equal(t, []byte(expectMsg), msg.Payload()) - consumer.ReconsumeLater(msg, time.Second) - -} \ No newline at end of file + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := "persistent://public/default/my-topic" + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: false, + }) + assert.Nil(t, err) + defer producer.Close() + if _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", 10)), + }); err != nil { + log.Fatal(err) + } + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + }) + assert.Nil(t, err) + defer consumer.Close() + + msg, err := consumer.Receive(context.Background()) + if err != nil { + log.Fatal(err) + } + expectMsg := fmt.Sprintf("hello-%d", 10) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + consumer.ReconsumeLater(msg, time.Second) + +} From a6e44c75eaf44d346b78897e47483b222264ebcb Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Wed, 6 Mar 2024 11:03:27 +0800 Subject: [PATCH 4/4] return directly when policy is nil --- pulsar/consumer_impl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index c7ab9f39f6..66c59397d6 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -584,6 +584,7 @@ func (c *consumer) ReconsumeLaterWithCustomProperties(msg Message, customPropert } if c.dlq.policy == nil { c.log.Warn("Receive retry message but the DLQPolicy is nil, please check") + return } if c.dlq.policy != nil && uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries { c.dlq.Chan() <- consumerMsg