Skip to content

Commit

Permalink
feat: add reconsumeLater to regexConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
pacop committed Jan 23, 2025
1 parent 95232de commit ea15d3e
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 4 deletions.
35 changes: 31 additions & 4 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,34 @@ func (c *regexConsumer) Ack(msg Message) error {
return c.AckID(msg.ID())
}

func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.ReconsumeLaterWithCustomProperties(msg, map[string]string{}, delay)
}

func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ map[string]string, _ time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLaterWithCustomProperties yet.")
func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string,
delay time.Duration) {
names, err := validateTopicNames(msg.Topic())
if err != nil {
c.log.Errorf("validate msg topic %q failed: %v", msg.Topic(), err)
return
}
if len(names) != 1 {
c.log.Errorf("invalid msg topic %q names: %+v ", msg.Topic(), names)
return
}

tn := names[0]
fqdnTopic := internal.TopicNameWithoutPartitionPart(tn)
consumer, ok := c.consumers[fqdnTopic]
if !ok {
// check to see if the topic with the partition part is in the consumers
// this can happen when the consumer is configured to consume from a specific partition
if consumer, ok = c.consumers[tn.Name]; !ok {
c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic())
return
}
}
consumer.ReconsumeLaterWithCustomProperties(msg, customProperties, delay)
}

// AckID the consumption of a single message, identified by its MessageID
Expand Down Expand Up @@ -454,6 +476,11 @@ func (c *regexConsumer) topics() ([]string, error) {
}

filtered := filterTopics(topics, c.pattern)

if c.options.RetryEnable {
filtered = append(filtered, c.options.DLQ.RetryLetterTopic)
}

return filtered, nil
}

Expand Down
115 changes: 115 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2121,6 +2121,121 @@ func TestRLQMultiTopics(t *testing.T) {
assert.Nil(t, checkMsg)
}

func TestRLQRegex(t *testing.T) {
now := time.Now().Unix()
topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
topic02 := fmt.Sprintf("topic-%d-2", now)
topicPattern := fmt.Sprintf("topic-%d-.", now)

subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
maxRedeliveries := 2
N := 100
ctx := context.Background()

client, err := NewClient(ClientOptions{URL: lookupURL})
assert.Nil(t, err)
defer client.Close()

rlqTopic := fmt.Sprintf("persistent://public/default/rlq-topic-%d-1", now)
dlqTopic := fmt.Sprintf("persistent://public/default/dlq-topic-%d-1", now)

// subscribe regex topics with Retry Topics
rlqConsumer, err := client.Subscribe(ConsumerOptions{
TopicsPattern: topicPattern,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
DLQ: &DLQPolicy{
MaxDeliveries: uint32(maxRedeliveries),
RetryLetterTopic: rlqTopic,
DeadLetterTopic: dlqTopic,
},
RetryEnable: true,
NackRedeliveryDelay: 1 * time.Second,
})
assert.Nil(t, err)
defer rlqConsumer.Close()

// subscribe DLQ Topic
dlqConsumer, err := client.Subscribe(ConsumerOptions{
Topic: dlqTopic,
SubscriptionName: subName,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
AutoDiscoveryPeriod: 10 * time.Second,
})
assert.Nil(t, err)
defer dlqConsumer.Close()

// create multi producers
producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01})
assert.Nil(t, err)
defer producer01.Close()

producer02, err := client.CreateProducer(ProducerOptions{Topic: topic02})
assert.Nil(t, err)
defer producer02.Close()

// 1. Pre-produce N messages for every topic
for i := 0; i < N; i++ {
_, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
assert.Nil(t, err)
_, err = producer02.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_02_%d", i))})
assert.Nil(t, err)
}

// 2. Create consumer on the Retry Topics to reconsume 2*N messages (maxRedeliveries+1) times
rlqReceived := 0
for rlqReceived < 2*N*(maxRedeliveries+1) {
msg, err := rlqConsumer.Receive(ctx)
assert.Nil(t, err)
rlqConsumer.ReconsumeLater(msg, 1*time.Second)
rlqReceived++
}
assert.Equal(t, rlqReceived, 2*N*(maxRedeliveries+1))
fmt.Println("retry consumed:", rlqReceived, 2*N*(maxRedeliveries+1)) // 600

// No more messages on the Retry Topic
rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer rlqCancel()
msg, err := rlqConsumer.Receive(rlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)

// 3. Create consumer on the DLQ topic to verify the routing
dlqReceived := 0
for dlqReceived < 2*N {
msg, err := dlqConsumer.Receive(ctx)
assert.Nil(t, err)
dlqConsumer.Ack(msg)
dlqReceived++
}
fmt.Println("dlq received:", dlqReceived) // 200
assert.Equal(t, dlqReceived, 2*N)

// No more messages on the DLQ Topic
dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer dlqCancel()
msg, err = dlqConsumer.Receive(dlqCtx)
assert.Error(t, err)
assert.Nil(t, msg)

// 4. No more messages for same subscription
checkConsumer, err := client.Subscribe(ConsumerOptions{
TopicsPattern: topicPattern,
SubscriptionName: subName,
Type: Shared,
SubscriptionInitialPosition: SubscriptionPositionEarliest,
})
assert.Nil(t, err)
defer checkConsumer.Close()

timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
checkMsg, err := checkConsumer.Receive(timeoutCtx)
assert.Error(t, err)
assert.Nil(t, checkMsg)
}

func TestRLQSpecifiedPartitionTopic(t *testing.T) {
topic := newTopicName()
testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions"
Expand Down

0 comments on commit ea15d3e

Please sign in to comment.