Skip to content

Commit

Permalink
[fix] Fix DLQ producer name conflicts when multiples consumers send m…
Browse files Browse the repository at this point in the history
…essages to DLQ (#1156)

### Motivation

To keep consistent with the Java client.
Releted PR: apache/pulsar#21890

### Modifications

Set DLQ producerName `fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName)`

(cherry picked from commit 4e13822)
  • Loading branch information
crossoverJie authored and RobertIndie committed Jan 15, 2024
1 parent 47785ed commit b0db28c
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
}

dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, client.log)
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name, client.log)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
opts := ConsumerOptions{
SubscriptionName: "regex-sub",
AutoDiscoveryPeriod: 5 * time.Minute,
Name: "regex-consumer",
}

dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger())
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
Expand Down Expand Up @@ -190,9 +191,10 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
opts := ConsumerOptions{
SubscriptionName: "regex-sub",
AutoDiscoveryPeriod: 5 * time.Minute,
Name: "regex-consumer",
}

dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger())
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", "regex-consumer", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,13 +1449,15 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
if prodOpt != nil {
dlqPolicy.ProducerOptions = *prodOpt
}
sub := "my-sub"
sub, consumerName := "my-sub", "my-consumer"

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: sub,
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &dlqPolicy,
Name: consumerName,
})
assert.Nil(t, err)
defer consumer.Close()
Expand Down Expand Up @@ -1508,7 +1510,7 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
assert.Equal(t, []byte(expectMsg), msg.Payload())

// check dql produceName
assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ", topic, sub))
assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-%s-DLQ", topic, sub, consumerName))

// check original messageId
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])
Expand Down
6 changes: 4 additions & 2 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ type dlqRouter struct {
closeCh chan interface{}
topicName string
subscriptionName string
consumerName string
log log.Logger
}

func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string,
func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName, consumerName string,
logger log.Logger) (*dlqRouter, error) {
r := &dlqRouter{
client: client,
policy: policy,
topicName: topicName,
subscriptionName: subscriptionName,
consumerName: consumerName,
log: logger,
}

Expand Down Expand Up @@ -159,7 +161,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema
if opt.Name == "" {
opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName, r.subscriptionName)
opt.Name = fmt.Sprintf("%s-%s-%s-DLQ", r.topicName, r.subscriptionName, r.consumerName)
}

// the origin code sets to LZ4 compression with no options
Expand Down
2 changes: 1 addition & 1 deletion pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
}

// Provide dummy dlq router with not dlq policy
dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, client.log)
dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name, client.log)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit b0db28c

Please sign in to comment.