-
Notifications
You must be signed in to change notification settings - Fork 337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a message id tracker for acking messages that are batched. #82
Added a message id tracker for acking messages that are batched. #82
Conversation
@@ -126,3 +142,36 @@ func (msg *message) EventTime() time.Time { | |||
func (msg *message) Key() string { | |||
return msg.key | |||
} | |||
|
|||
func newAckTracker(size int) *ackTracker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this ack tracker is self-contained, should we move it into a separate file, perhaps in the internal
pkg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense in this file since it's related to the message id implementation.
pulsar/impl_message.go
Outdated
t.batchIDs = t.batchIDs.SetBit(t.batchIDs, batchID, 0) | ||
} | ||
|
||
func (t *ackTracker) cleared() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use a term that's more specific to acks rather than bitset. eg;
IsComplete()
EverythigWasAcked()
Another option could be to return bool from ack(batchID int)
. That will also avoid getting the mutex twice each time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add a bool to the return value to ack(batchID int) bool and change the cleared() -> completed(). Thoughts?
|
||
type messageID struct { | ||
ledgerID int64 | ||
entryID int64 | ||
batchIdx int | ||
partitionIdx int | ||
|
||
tracker *ackTracker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why adding ackTracker
into the messageID
structure?
- logically I don't think it should belong to a
MessageID
. - This makes code sound like a hack solution. Is there another solution to implement the tracker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with @sijie
Maybe we can do this:
type MsgTracker struct {
msgID *messageID
tracker *ackTracker
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The go client produces messages in a batch format. Each message in that batch needs to be acked before sending the ack to the broker. This seemed like the logical place to put it since acks are based on a message id and it's similar to the java client BatchMessageIdImpl. It's an internal class too so it's not exported and can be changed later without effecting clients or apis. I think this approach makes the code easier to reason about since everything is treated as a batch message. If the messageId.ack() returns true we can send the ack to the broker. The other option is to maintain the state in a consumer and I think that complicates the code. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i agree with you, just a little difference.
In here, tracker
does not belong to messageId
, as defined by PulsarApi.proto. So, we can abstract new struct(MsgTracker or BatchMsgID or so on) to impl it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having the tracker in the message Id object is the same implementation pattern as the Java lib. The reason is not keep the tracker in a separate map.
The fact that the protobuf doesn’t have the tracker doesn’t matter, since it’s completely different usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@merlimat @cckellogg I understand the implementation follows the Java implementation. The approach doesn't have any problem here. However I think we should do it in golang's way. We can follow what @wolfstudy suggested. It is exactly same as your approach but much clearer.
type messageID struct {
ledgerID int64
entryID int64
batchIdx int
partitionIdx int
}
type msgTracker struct {
msgID *messageID
tracker *ackTracker
}
type ackTracker struct {
sync.Mutex
size int
batchIDs *big.Int
}
func newMsgTracker(msgID *messageID, tracker *ackTracker) *msgTracker {
return &msgTracker{
msgID: msgID,
tracker: tracker,
}
}
func newMessageID(ledgerID int64, entryID int64, batchIdx int, partitionIdx int) MessageID {
return &messageID{
ledgerID: ledgerID,
entryID: entryID,
batchIdx: batchIdx,
partitionIdx: partitionIdx,
}
}
In this way, msgTracker inherits all the fields from messageID
and it avoids polluting messageID
struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it looks like this design will cause us always have to check for two different message ids types and then handle them in a different way. Is that correct?
switch v := i.(type) {
case *messageID:
// handle non batch
case *msgTracker:
// handle batch
@wolfstudy how do you see your suggested design working when the client sends back a MessageID interface that we need to ack?
The general goal of this patch was to make ack checking in the code simple whether the message was batched or not
func (c *consumer) Ack(msgID MessageID) error {
if msgID.(*messageID).ack() {
// message id can be acked send to server
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sijie can you take another look? the other approach would involve a lot of downcasting, which we're actively trying to avoid.
Added a tracker so the client can ack batched messages. This is just the framework and needs additional patches to make it work.