Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a message id tracker for acking messages that are batched. #82

Merged
merged 2 commits into from
Nov 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 63 additions & 14 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,31 @@
package pulsar

import (
"math/big"
"strings"
"sync"
"time"

"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/golang/protobuf/proto"
)

func earliestMessageID() MessageID {
return newMessageID(-1, -1, -1, -1)
}
"github.com/apache/pulsar-client-go/pkg/pb"
)

type messageID struct {
ledgerID int64
entryID int64
batchIdx int
partitionIdx int

tracker *ackTracker
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why adding ackTracker into the messageID structure?

  1. logically I don't think it should belong to a MessageID.
  2. This makes code sound like a hack solution. Is there another solution to implement the tracker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @sijie

Maybe we can do this:

type MsgTracker struct {
    msgID  *messageID
    tracker *ackTracker
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The go client produces messages in a batch format. Each message in that batch needs to be acked before sending the ack to the broker. This seemed like the logical place to put it since acks are based on a message id and it's similar to the java client BatchMessageIdImpl. It's an internal class too so it's not exported and can be changed later without effecting clients or apis. I think this approach makes the code easier to reason about since everything is treated as a batch message. If the messageId.ack() returns true we can send the ack to the broker. The other option is to maintain the state in a consumer and I think that complicates the code. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i agree with you, just a little difference.

In here, tracker does not belong to messageId, as defined by PulsarApi.proto. So, we can abstract new struct(MsgTracker or BatchMsgID or so on) to impl it.

Copy link
Contributor

@merlimat merlimat Nov 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the tracker in the message Id object is the same implementation pattern as the Java lib. The reason is not keep the tracker in a separate map.

The fact that the protobuf doesn’t have the tracker doesn’t matter, since it’s completely different usage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat @cckellogg I understand the implementation follows the Java implementation. The approach doesn't have any problem here. However I think we should do it in golang's way. We can follow what @wolfstudy suggested. It is exactly same as your approach but much clearer.

type messageID struct {
	ledgerID     int64
	entryID      int64
	batchIdx     int
	partitionIdx int
}
type msgTracker struct {
	msgID   *messageID
	tracker *ackTracker
}
type ackTracker struct {
	sync.Mutex
	size     int
	batchIDs *big.Int
}
func newMsgTracker(msgID *messageID, tracker *ackTracker) *msgTracker {
	return &msgTracker{
		msgID:   msgID,
		tracker: tracker,
	}
}
func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
	return &messageID{
		ledgerID:     ledgerID,
		entryID:      entryID,
		batchIdx:     batchIdx,
		partitionIdx: partitionIdx,
	}
}

In this way, msgTracker inherits all the fields from messageID and it avoids polluting messageID struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it looks like this design will cause us always have to check for two different message ids types and then handle them in a different way. Is that correct?

switch v := i.(type) {
case *messageID:
	// handle non batch
case *msgTracker:
       // handle batch

@wolfstudy how do you see your suggested design working when the client sends back a MessageID interface that we need to ack?

The general goal of this patch was to make ack checking in the code simple whether the message was batched or not

func (c *consumer) Ack(msgID MessageID) error {
  if msgID.(*messageID).ack() {
    // message id can be acked send to server
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie can you take another look? the other approach would involve a lot of downcasting, which we're actively trying to avoid.

}

func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
return &messageID{
ledgerID: ledgerID,
entryID: entryID,
batchIdx: batchIdx,
partitionIdx: partitionIdx,
func (id *messageID) ack() bool {
if id.tracker != nil && id.batchIdx > -1 {
return id.tracker.ack(id.batchIdx)
}

return true
}

func (id *messageID) Serialize() []byte {
Expand Down Expand Up @@ -70,10 +71,24 @@ func deserializeMessageID(data []byte) (MessageID, error) {
return id, nil
}

const maxLong int64 = 0x7fffffffffffffff
func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
return &messageID{
ledgerID: ledgerID,
entryID: entryID,
batchIdx: batchIdx,
partitionIdx: partitionIdx,
}
}

func latestMessageID() MessageID {
return newMessageID(maxLong, maxLong, -1, -1)
func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int,
tracker *ackTracker) *messageID {
return &messageID{
ledgerID: ledgerID,
entryID: entryID,
batchIdx: batchIdx,
partitionIdx: partitionIdx,
tracker: tracker,
}
}

func timeFromUnixTimestampMillis(timestamp uint64) time.Time {
Expand Down Expand Up @@ -126,3 +141,37 @@ func (msg *message) EventTime() time.Time {
func (msg *message) Key() string {
return msg.key
}

func newAckTracker(size int) *ackTracker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this ack tracker is self-contained, should we move it into a separate file, perhaps in the internal pkg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense in this file since it's related to the message id implementation.

var batchIDs *big.Int
if size <= 64 {
shift := uint32(64 - size)
setBits := ^uint64(0) >> shift
batchIDs = new(big.Int).SetUint64(setBits)
} else {
batchIDs, _ = new(big.Int).SetString(strings.Repeat("1", size), 2)
}
return &ackTracker{
size: size,
batchIDs: batchIDs,
}
}

type ackTracker struct {
sync.Mutex
size int
batchIDs *big.Int
}

func (t *ackTracker) ack(batchID int) bool {
t.Lock()
defer t.Unlock()
t.batchIDs = t.batchIDs.SetBit(t.batchIDs, batchID, 0)
return len(t.batchIDs.Bits()) == 0
}

func (t *ackTracker) completed() bool {
t.Lock()
defer t.Unlock()
return len(t.batchIDs.Bits()) == 0
}
57 changes: 57 additions & 0 deletions pulsar/impl_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,60 @@ func TestMessageId(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, id)
}

func TestAckTracker(t *testing.T) {
tracker := newAckTracker(1)
assert.Equal(t, true, tracker.ack(0))

// test 64
tracker = newAckTracker(64)
for i := 0; i < 64; i++ {
if i < 63 {
assert.Equal(t, false, tracker.ack(i))
} else {
assert.Equal(t, true, tracker.ack(i))
}
}
assert.Equal(t, true, tracker.completed())

// test large number 1000
tracker = newAckTracker(1000)
for i := 0; i < 1000; i++ {
if i < 999 {
assert.Equal(t, false, tracker.ack(i))
} else {
assert.Equal(t, true, tracker.ack(i))
}

}
assert.Equal(t, true, tracker.completed())
}

func TestAckingMessageIDBatchOne(t *testing.T) {
tracker := newAckTracker(1)
msgId := newTrackingMessageID(1, 1, 0, 0, tracker)
assert.Equal(t, true, msgId.ack())
assert.Equal(t, true, tracker.completed())
}

func TestAckingMessageIDBatchTwo(t *testing.T) {
tracker := newAckTracker(2)
ids := []*messageID{
newTrackingMessageID(1, 1, 0, 0, tracker),
newTrackingMessageID(1, 1, 1, 0, tracker),
}

assert.Equal(t, false, ids[0].ack())
assert.Equal(t, true, ids[1].ack())
assert.Equal(t, true, tracker.completed())

// try reverse order
tracker = newAckTracker(2)
ids = []*messageID{
newTrackingMessageID(1, 1, 0, 0, tracker),
newTrackingMessageID(1, 1, 1, 0, tracker),
}
assert.Equal(t, false, ids[1].ack())
assert.Equal(t, true, ids[0].ack())
assert.Equal(t, true, tracker.completed())
}
19 changes: 12 additions & 7 deletions pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package pulsar

import "time"
import (
"math"
"time"
)

// ProducerMessage abstraction used in Pulsar producer
type ProducerMessage struct {
Expand Down Expand Up @@ -79,10 +82,12 @@ func DeserializeMessageID(data []byte) (MessageID, error) {
return deserializeMessageID(data)
}

var (
// EarliestMessage messageID that points to the earliest message available in a topic
EarliestMessage = earliestMessageID()
// EarliestMessageID returns a messageID that points to the earliest message available in a topic
func EarliestMessageID() MessageID {
return newMessageID(-1, -1, -1, -1)
}

// LatestMessage messageID that points to the latest message
LatestMessage = latestMessageID()
)
// LatestMessage returns a messageID that points to the latest message
func LatestMessageID() MessageID {
return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
}