Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different MessageID implementations for message Production and Consumption #324

Merged
merged 1 commit into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ var ErrConsumerClosed = errors.New("consumer closed")
const defaultNackRedeliveryDelay = 1 * time.Minute

type acker interface {
AckID(id messageID)
NackID(id messageID)
AckID(id trackingMessageID)
NackID(id trackingMessageID)
}

type consumer struct {
Expand Down Expand Up @@ -259,7 +259,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
nackRedeliveryDelay: nackRedeliveryDelay,
metadata: metadata,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: messageID{},
startMessageID: trackingMessageID{},
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
}
Expand Down Expand Up @@ -483,19 +483,19 @@ func toProtoInitialPosition(p SubscriptionInitialPosition) pb.CommandSubscribe_I
return pb.CommandSubscribe_Latest
}

func (c *consumer) messageID(msgID MessageID) (messageID, bool) {
mid, ok := msgID.(messageID)
func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) {
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return messageID{}, false
return trackingMessageID{}, false
}

partition := int(mid.partitionIdx)
// did we receive a valid partition index?
if partition < 0 || partition >= len(c.consumers) {
c.log.Warnf("invalid partition index %d expected a partition between [0-%d]",
partition, len(c.consumers))
return messageID{}, false
return trackingMessageID{}, false
}

return mid, true
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (c *multiTopicConsumer) Ack(msg Message) {

// Ack the consumption of a single message, identified by its MessageID
func (c *multiTopicConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand All @@ -139,7 +139,7 @@ func (c *multiTopicConsumer) Nack(msg Message) {
}

func (c *multiTopicConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand Down
104 changes: 58 additions & 46 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type partitionConsumerOpts struct {
nackRedeliveryDelay time.Duration
metadata map[string]string
replicateSubscriptionState bool
startMessageID messageID
startMessageID trackingMessageID
startMessageIDInclusive bool
subscriptionMode subscriptionMode
readCompacted bool
Expand Down Expand Up @@ -140,13 +140,13 @@ type partitionConsumer struct {
// the size of the queue channel for buffering messages
queueSize int32
queueCh chan []*message
startMessageID messageID
lastDequeuedMsg messageID
startMessageID trackingMessageID
lastDequeuedMsg trackingMessageID

eventsCh chan interface{}
connectedCh chan struct{}
closeCh chan struct{}
clearQueueCh chan func(id messageID)
clearQueueCh chan func(id trackingMessageID)

nackTracker *negativeAcksTracker
dlq *dlqRouter
Expand Down Expand Up @@ -174,7 +174,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
connectedCh: make(chan struct{}),
messageCh: messageCh,
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id messageID)),
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
log: log.WithField("topic", options.topic),
Expand Down Expand Up @@ -238,7 +238,7 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
pc.state = consumerClosed
}

func (pc *partitionConsumer) getLastMessageID() (messageID, error) {
func (pc *partitionConsumer) getLastMessageID() (trackingMessageID, error) {
req := &getLastMsgIDRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req

Expand Down Expand Up @@ -266,8 +266,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
}
}

func (pc *partitionConsumer) AckID(msgID messageID) {
if !msgID.IsZero() && msgID.ack() {
func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
if !msgID.Undefined() && msgID.ack() {
acksCounter.Inc()
processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
req := &ackRequest{
Expand All @@ -277,8 +277,8 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
}
}

func (pc *partitionConsumer) NackID(msgID messageID) {
pc.nackTracker.Add(msgID)
func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
pc.nackTracker.Add(msgID.messageID)
nacksCounter.Inc()
}

Expand Down Expand Up @@ -317,7 +317,7 @@ func (pc *partitionConsumer) Close() {
<-req.doneCh
}

func (pc *partitionConsumer) Seek(msgID messageID) error {
func (pc *partitionConsumer) Seek(msgID trackingMessageID) error {
req := &seekRequest{
doneCh: make(chan struct{}),
msgID: msgID,
Expand Down Expand Up @@ -506,17 +506,17 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return nil
}

func (pc *partitionConsumer) messageShouldBeDiscarded(msgID messageID) bool {
if pc.startMessageID.IsZero() {
func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) bool {
if pc.startMessageID.Undefined() {
return false
}

if pc.options.startMessageIDInclusive {
return pc.startMessageID.greater(msgID)
return pc.startMessageID.greater(msgID.messageID)
}

// Non inclusive
return pc.startMessageID.greaterEqual(msgID)
return pc.startMessageID.greaterEqual(msgID.messageID)
}

func (pc *partitionConsumer) ConnectionClosed() {
Expand Down Expand Up @@ -631,16 +631,16 @@ func (pc *partitionConsumer) dispatcher() {
case clearQueueCb := <-pc.clearQueueCh:
// drain the message queue on any new connection by sending a
// special nil message to the channel so we know when to stop dropping messages
var nextMessageInQueue messageID
var nextMessageInQueue trackingMessageID
go func() {
pc.queueCh <- nil
}()
for m := range pc.queueCh {
// the queue has been drained
if m == nil {
break
} else if nextMessageInQueue.IsZero() {
nextMessageInQueue = m[0].msgID.(messageID)
} else if nextMessageInQueue.Undefined() {
nextMessageInQueue = m[0].msgID.(trackingMessageID)
}
}

Expand All @@ -650,7 +650,7 @@ func (pc *partitionConsumer) dispatcher() {
}

type ackRequest struct {
msgID messageID
msgID trackingMessageID
}

type unsubscribeRequest struct {
Expand All @@ -668,13 +668,13 @@ type redeliveryRequest struct {

type getLastMsgIDRequest struct {
doneCh chan struct{}
msgID messageID
msgID trackingMessageID
err error
}

type seekRequest struct {
doneCh chan struct{}
msgID messageID
msgID trackingMessageID
err error
}

Expand Down Expand Up @@ -854,15 +854,15 @@ func (pc *partitionConsumer) grabConn() error {
}
}

func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
func (pc *partitionConsumer) clearQueueAndGetNextMessage() trackingMessageID {
if pc.state != consumerReady {
return messageID{}
return trackingMessageID{}
}
wg := &sync.WaitGroup{}
wg.Add(1)
var msgID messageID
var msgID trackingMessageID

pc.clearQueueCh <- func(id messageID) {
pc.clearQueueCh <- func(id trackingMessageID) {
msgID = id
wg.Done()
}
Expand All @@ -875,12 +875,12 @@ func (pc *partitionConsumer) clearQueueAndGetNextMessage() messageID {
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application
*/
func (pc *partitionConsumer) clearReceiverQueue() messageID {
func (pc *partitionConsumer) clearReceiverQueue() trackingMessageID {
nextMessageInQueue := pc.clearQueueAndGetNextMessage()

if !nextMessageInQueue.IsZero() {
if !nextMessageInQueue.Undefined() {
return getPreviousMessage(nextMessageInQueue)
} else if !pc.lastDequeuedMsg.IsZero() {
} else if !pc.lastDequeuedMsg.Undefined() {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return pc.lastDequeuedMsg
Expand All @@ -890,22 +890,32 @@ func (pc *partitionConsumer) clearReceiverQueue() messageID {
}
}

func getPreviousMessage(mid messageID) messageID {
func getPreviousMessage(mid trackingMessageID) trackingMessageID {
if mid.batchIdx >= 0 {
return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID,
batchIdx: mid.batchIdx - 1,
partitionIdx: mid.partitionIdx,
return trackingMessageID{
messageID: messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID,
batchIdx: mid.batchIdx - 1,
partitionIdx: mid.partitionIdx,
},
tracker: mid.tracker,
consumer: mid.consumer,
receivedTime: mid.receivedTime,
}
}

// Get on previous message in previous entry
return messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID - 1,
batchIdx: mid.batchIdx,
partitionIdx: mid.partitionIdx,
return trackingMessageID{
messageID: messageID{
ledgerID: mid.ledgerID,
entryID: mid.entryID - 1,
batchIdx: mid.batchIdx,
partitionIdx: mid.partitionIdx,
},
tracker: mid.tracker,
consumer: mid.consumer,
receivedTime: mid.receivedTime,
}
}

Expand Down Expand Up @@ -961,8 +971,8 @@ func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
})
}

func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
if msgID.IsZero() {
func convertToMessageIDData(msgID trackingMessageID) *pb.MessageIdData {
if msgID.Undefined() {
return nil
}

Expand All @@ -972,14 +982,16 @@ func convertToMessageIDData(msgID messageID) *pb.MessageIdData {
}
}

func convertToMessageID(id *pb.MessageIdData) messageID {
func convertToMessageID(id *pb.MessageIdData) trackingMessageID {
if id == nil {
return messageID{}
return trackingMessageID{}
}

msgID := messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
msgID := trackingMessageID{
messageID: messageID{
ledgerID: int64(*id.LedgerId),
entryID: int64(*id.EntryId),
},
}
if id.BatchIndex != nil {
msgID.batchIdx = *id.BatchIndex
Expand Down
14 changes: 7 additions & 7 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(messageID).tracker)
assert.Nil(t, m.ID().(trackingMessageID).tracker)
}

// ack the message id
pc.AckID(messages[0].msgID.(messageID))
pc.AckID(messages[0].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand All @@ -73,11 +73,11 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.Nil(t, m.ID().(messageID).tracker)
assert.Nil(t, m.ID().(trackingMessageID).tracker)
}

// ack the message id
pc.AckID(messages[0].msgID.(messageID))
pc.AckID(messages[0].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand All @@ -102,12 +102,12 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
// ensure the tracker was set on the message id
messages := <-pc.queueCh
for _, m := range messages {
assert.NotNil(t, m.ID().(messageID).tracker)
assert.NotNil(t, m.ID().(trackingMessageID).tracker)
}

// ack all message ids except the last one
for i := 0; i < 9; i++ {
pc.AckID(messages[i].msgID.(messageID))
pc.AckID(messages[i].msgID.(trackingMessageID))
}

select {
Expand All @@ -117,7 +117,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
}

// ack last message
pc.AckID(messages[9].msgID.(messageID))
pc.AckID(messages[9].msgID.(trackingMessageID))

select {
case <-eventsCh:
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *regexConsumer) Ack(msg Message) {

// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand All @@ -184,7 +184,7 @@ func (c *regexConsumer) Nack(msg Message) {
}

func (c *regexConsumer) NackID(msgID MessageID) {
mid, ok := msgID.(messageID)
mid, ok := toTrackingMessageID(msgID)
if !ok {
c.log.Warnf("invalid message id type %T", msgID)
return
Expand Down
Loading