Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support retry letter topic #359

Merged
merged 11 commits into from
Sep 9, 2020
12 changes: 11 additions & 1 deletion pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ type DLQPolicy struct {
MaxDeliveries uint32

// Name of the topic where the failing messages will be sent.
Topic string
DeadLetterTopic string

// Name of the topic where the retry messages will be sent.
RetryLetterTopic string
}

// ConsumerOptions is used to configure and create instances of Consumer
Expand Down Expand Up @@ -107,6 +110,10 @@ type ConsumerOptions struct {
// By default is nil and there's no DLQ
DLQ *DLQPolicy

// Auto retry send messages to default filled DLQPolicy topics
// Default is false
RetryEnable bool

// Sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ConsumerMessage
Expand Down Expand Up @@ -163,6 +170,9 @@ type Consumer interface {
// AckID the consumption of a single message, identified by its MessageID
AckID(MessageID)

// ReconsumeLater mark a message for redelivery after custom delay
ReconsumeLater(msg Message, delay time.Duration)

// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
Expand Down
104 changes: 98 additions & 6 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -73,6 +74,7 @@ type consumer struct {
messageCh chan ConsumerMessage

dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
Expand Down Expand Up @@ -108,10 +110,50 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
messageCh = make(chan ConsumerMessage, 10)
}

if options.RetryEnable {
usingTopic := ""
if options.Topic != "" {
usingTopic = options.Topic
} else if len(options.Topics) > 0 {
usingTopic = options.Topics[0]
}
tn, err := internal.ParseTopicName(usingTopic)
if err != nil {
return nil, err
}

retryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
if options.DLQ == nil {
options.DLQ = &DLQPolicy{
MaxDeliveries: MaxReconsumeTimes,
DeadLetterTopic: dlqTopic,
RetryLetterTopic: retryTopic,
}
} else {
if options.DLQ.DeadLetterTopic == "" {
options.DLQ.DeadLetterTopic = dlqTopic
}
if options.DLQ.RetryLetterTopic == "" {
options.DLQ.RetryLetterTopic = retryTopic
}
}
if options.Topic != "" && len(options.Topics) == 0 {
options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
options.Topic = ""
} else if options.Topic == "" && len(options.Topics) > 0 {
options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
}
}

dlq, err := newDlqRouter(client, options.DLQ)
if err != nil {
return nil, err
}
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable)
if err != nil {
return nil, err
}

// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
Expand All @@ -124,15 +166,15 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}

return topicSubscribe(client, options, topic, messageCh, dlq)
return topicSubscribe(client, options, topic, messageCh, dlq, rlq)
}

if len(options.Topics) > 1 {
if err := validateTopicNames(options.Topics...); err != nil {
return nil, err
}

return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq)
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}

if options.TopicsPattern != "" {
Expand All @@ -145,14 +187,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
if err != nil {
return nil, err
}
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq)
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
}

return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
}

func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, disableForceTopicCreation bool) (*consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {

consumer := &consumer{
topic: topic,
Expand All @@ -163,6 +205,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
closeCh: make(chan struct{}),
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
log: log.WithField("topic", topic),
consumerName: options.Name,
}
Expand Down Expand Up @@ -306,8 +349,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
}

func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, false)
messageCh chan ConsumerMessage, dlqRouter *dlqRouter, retryRouter *retryRouter) (Consumer, error) {
c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, retryRouter, false)
if err == nil {
consumersOpened.Inc()
}
Expand Down Expand Up @@ -375,6 +418,54 @@ func (c *consumer) AckID(msgID MessageID) {
c.consumers[mid.partitionIdx].AckID(mid)
}

// ReconsumeLater mark a message for redelivery after custom delay
func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
if delay < 0 {
delay = 0
}
msgID, ok := c.messageID(msg.ID())
if !ok {
return
}
props := make(map[string]string)
for k, v := range msg.Properties() {
props[k] = v
}

reconsumeTimes := 1
if s, ok := props[SysPropertyReconsumeTimes]; ok {
reconsumeTimes, _ = strconv.Atoi(s)
reconsumeTimes++
} else {
props[SysPropertyRealTopic] = msg.Topic()
props[SysPropertyOriginMessageID] = msgID.messageID.String()
}
props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)

consumerMsg := ConsumerMessage{
Consumer: c,
Message: &message{
payLoad: msg.Payload(),
properties: props,
msgID: msgID,
},
}
if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
c.dlq.Chan() <- consumerMsg
} else {
c.rlq.Chan() <- RetryMessage{
consumerMsg: consumerMsg,
producerMsg: ProducerMessage{
Payload: msg.Payload(),
Key: msg.Key(),
Properties: props,
DeliverAfter: delay,
},
}
}
}

func (c *consumer) Nack(msg Message) {
c.NackID(msg.ID())
}
Expand Down Expand Up @@ -411,6 +502,7 @@ func (c *consumer) Close() {
c.ticker.Stop()
c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
consumersPartitions.Sub(float64(len(c.consumers)))
})
Expand Down
16 changes: 14 additions & 2 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,28 @@ type multiTopicConsumer struct {
consumers map[string]Consumer

dlq *dlqRouter
rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}

log *log.Entry
}

func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
mtc := &multiTopicConsumer{
options: options,
messageCh: messageCh,
consumers: make(map[string]Consumer, len(topics)),
closeCh: make(chan struct{}),
dlq: dlq,
rlq: rlq,
log: &log.Entry{},
consumerName: options.Name,
}

var errs error
for ce := range subscriber(client, topics, options, messageCh, dlq) {
for ce := range subscriber(client, topics, options, messageCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
Expand Down Expand Up @@ -134,6 +136,15 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) {
mid.Ack()
}

func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
consumer, ok := c.consumers[msg.Topic()]
if !ok {
c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic())
return
}
consumer.ReconsumeLater(msg, delay)
}

func (c *multiTopicConsumer) Nack(msg Message) {
c.NackID(msg.ID())
}
Expand Down Expand Up @@ -166,6 +177,7 @@ func (c *multiTopicConsumer) Close() {
wg.Wait()
close(c.closeCh)
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
})
}
Expand Down
21 changes: 14 additions & 7 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
type regexConsumer struct {
client *client
dlq *dlqRouter
rlq *retryRouter

options ConsumerOptions

Expand All @@ -64,10 +65,11 @@ type regexConsumer struct {
}

func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
msgCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
rc := &regexConsumer{
client: c,
dlq: dlq,
rlq: rlq,
options: opts,
messageCh: msgCh,

Expand All @@ -90,7 +92,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
}

var errs error
for ce := range subscriber(c, topics, opts, msgCh, dlq) {
for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
Expand Down Expand Up @@ -163,6 +165,10 @@ func (c *regexConsumer) Ack(msg Message) {
c.AckID(msg.ID())
}

func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
}

// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
mid, ok := toTrackingMessageID(msgID)
Expand Down Expand Up @@ -215,6 +221,7 @@ func (c *regexConsumer) Close() {
}
wg.Wait()
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
})
}
Expand Down Expand Up @@ -253,7 +260,7 @@ func (c *regexConsumer) monitor() {
}
case topics := <-c.subscribeCh:
if len(topics) > 0 && !c.closed() {
c.subscribe(topics, c.dlq)
c.subscribe(topics, c.dlq, c.rlq)
}
case topics := <-c.unsubscribeCh:
if len(topics) > 0 && !c.closed() {
Expand Down Expand Up @@ -298,12 +305,12 @@ func (c *regexConsumer) knownTopics() []string {
return topics
}

func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter) {
func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) {
if log.GetLevel() == log.DebugLevel {
c.log.WithField("topics", topics).Debug("subscribe")
}
consumers := make(map[string]Consumer, len(topics))
for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq) {
for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) {
if ce.err != nil {
c.log.Warnf("Failed to subscribe to topic=%s", ce.topic)
} else {
Expand Down Expand Up @@ -359,7 +366,7 @@ type consumerError struct {
}

func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage,
dlq *dlqRouter) <-chan consumerError {
dlq *dlqRouter, rlq *retryRouter) <-chan consumerError {
consumerErrorCh := make(chan consumerError, len(topics))
var wg sync.WaitGroup
wg.Add(len(topics))
Expand All @@ -371,7 +378,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
for _, t := range topics {
go func(topic string) {
defer wg.Done()
c, err := newInternalConsumer(c, opts, topic, ch, dlq, true)
c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true)
consumerErrorCh <- consumerError{
err: err,
topic: topic,
Expand Down
6 changes: 4 additions & 2 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
}

dlq, _ := newDlqRouter(c.(*client), nil)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq)
rlq, _ := newRetryRouter(c.(*client), nil, false)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -202,7 +203,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
}

dlq, _ := newDlqRouter(c.(*client), nil)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq)
rlq, _ := newRetryRouter(c.(*client), nil, false)
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading