Skip to content

Commit

Permalink
Fix reconsume broken while using non-FQDN topics (#386)
Browse files Browse the repository at this point in the history
### Issue
Retry policy not effective with non-FQDN topic.

- reproduction
	```go
	client, _ := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
	consumer, _ := client.Subscribe(pulsar.ConsumerOptions{
		Topic:            "topic-01",
		SubscriptionName: "my-sub",
		RetryEnable:      true,
		DLQ:              &pulsar.DLQPolicy{MaxDeliveries: 2},
	})
	msg, _ := consumer.Receive(context.Background())
	consumer.ReconsumeLater(msg, 5*time.Second)
	```
- logs

	```
	RN[0000] consumer of topic [persistent://public/default/topic-01] not exist unexpectedly  topic="[topic-01 persistent://public/default/my-sub-RETRY]"
	```

### Cause
For MultiTopicConsumer `consumers` map filed:
- key: user provided topic, maybe non-FQDN.
- value: consumer instance.

`ReconsumeLater` using msg's FQDN topic as key to find `consumer` in `consumers`,
 if mismatch with non-FQDN topic, this invoke will be ignored, lead to Retry policy not effective.

### Modifications
- Normalize user provided topics as FQDN topics before initializing consumers.
- Add non-FQDN topic consumption case in Retry policy tests.


### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
wuYin authored Nov 5, 2020
1 parent 793ea91 commit 8465c55
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
11 changes: 8 additions & 3 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,29 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}

// normalize as FQDN topics
var tns []*internal.TopicName
// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
topic := options.Topic
if topic == "" {
topic = options.Topics[0]
}

if err := validateTopicNames(topic); err != nil {
if tns, err = validateTopicNames(topic); err != nil {
return nil, err
}

topic = tns[0].Name
return topicSubscribe(client, options, topic, messageCh, dlq, rlq)
}

if len(options.Topics) > 1 {
if err := validateTopicNames(options.Topics...); err != nil {
if tns, err = validateTopicNames(options.Topics...); err != nil {
return nil, err
}
for i := range options.Topics {
options.Topics[i] = tns[i].Name
}

return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}
Expand Down
6 changes: 3 additions & 3 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ func TestDLQMultiTopics(t *testing.T) {
}

func TestRLQ(t *testing.T) {
topic := "persistent://public/default/" + newTopicName()
topic := newTopicName()
subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100
Expand Down Expand Up @@ -1243,7 +1243,7 @@ func TestRLQ(t *testing.T) {
func TestRLQMultiTopics(t *testing.T) {
now := time.Now().Unix()
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
topic02 := fmt.Sprintf("persistent://public/default/topic-%d-2", now)
topic02 := fmt.Sprintf("topic-%d-2", now)
topics := []string{topic01, topic02}

subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
Expand All @@ -1270,7 +1270,7 @@ func TestRLQMultiTopics(t *testing.T) {

// subscribe DLQ Topic
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: "persistent://public/default/" + subName + "-DLQ",
Topic: subName + "-DLQ",
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
Expand Down
15 changes: 8 additions & 7 deletions pulsar/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ func (e *unexpectedErrMsg) Error() string {
return msg
}

func validateTopicNames(topics ...string) error {
var errs error
for _, t := range topics {
if _, err := internal.ParseTopicName(t); err != nil {
errs = pkgerrors.Wrapf(err, "invalid topic name: %s", t)
func validateTopicNames(topics ...string) ([]*internal.TopicName, error) {
tns := make([]*internal.TopicName, len(topics))
for i, t := range topics {
tn, err := internal.ParseTopicName(t)
if err != nil {
return nil, pkgerrors.Wrapf(err, "invalid topic name: %s", t)
}
tns[i] = tn
}

return errs
return tns, nil
}

func toKeyValues(metadata map[string]string) []*pb.KeyValue {
Expand Down

0 comments on commit 8465c55

Please sign in to comment.