-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-54] Support acknowledgment at batch index level #6052
Conversation
run java8 tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui thank you for your work for introducing this great feature. Before going into details, I would like to first discuss the "ack set" introduced in the cusor storage information and wire protocol. I have some suggestions there. once we agree on how the data structure would look like, I will review the implementation.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
c92f2dc
to
94fba9e
Compare
51f599e
to
201bc3e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui Thanks for implementing batch acknowledgement. This is a complex task. Would you expand this PR to C++ library? I would suggest to add more documentation to explain the expected haviour and supported or non-supported consumer subscriptions; also the performance considerations like whether re-transmission of the entire batch of partially acked messages. Thanks!
.newBuilder(); | ||
MLDataFormats.BatchDeletedIndexInfo.Builder batchDeletedIndexInfoBuilder = MLDataFormats.BatchDeletedIndexInfo | ||
.newBuilder(); | ||
List<MLDataFormats.BatchDeletedIndexInfo> result = Lists.newArrayList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that a large code section has been locked/unlocked. You might want to consider to move these initialization outside of protect section.
Isn't ConcurrentSkipListMap already thread safe when you read? Probably only the iteration should be protected. I think the goal is avoid lock a large section of code not to let another thread to wait on.
public long[] getDeletedBatchIndexesLongArray(PositionImpl position) { | ||
lock.readLock().lock(); | ||
try { | ||
BitSet bitSet = batchDeletedIndexes.get(position); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. I think only batchDeleteIndexes.get() part should be locked. Again, double check with concurrentSkipList get() if it is already a thread safe data struct in terms of read. I think it should although I have never used this map collection.
List<Long> result = new ArrayList<>(array.length); | ||
for (long l : array) { | ||
result.add(l); | ||
} | ||
return result; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using stream in Java 8?
return Arrays.stream(a).boxed().collect(Collectors.toList())
Actually the entire function can be
return array == null ? Collections.emptyList() : Arrays.stream(a).boxed().collect(Collectors.toList())
Collections.emptyList() returns an immutable list however new ArrayList is mutable. So I do not know whether you would like to have a mutable or immutable list. This needs to be clarified.
if (list == null || list.size() == 0) { | ||
return new long[0]; | ||
} else { | ||
long[] array = new long[list.size()]; | ||
for (int i = 0; i < list.size(); i++) { | ||
array[i] = list.get(i); | ||
} | ||
return array; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again consider using stream
return list == null ? new long[0] : list.stream().mapToLong(l->l).toArray()
Probably there is no need to have a file to deal with primitive type array and list conversion. Does Apache common lang or Guava have library for that?
private void initFields() { | ||
ledgerId_ = 0L; | ||
entryId_ = 0L; | ||
partition_ = -1; | ||
batchIndex_ = -1; | ||
ackSet_ = java.util.Collections.emptyList();; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double ;;
You might want to do a global replacement since these are in multiple places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PulsarApi.java is generated by a proto tool. So I won’t do it manually here, it's better to improve the generation tool.
@@ -219,7 +226,8 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba | |||
} | |||
|
|||
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs | |||
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages); | |||
int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount(); | |||
MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this case be covered if the total number of messages in a batch is greater the allowed number of permits? Or is this not even possible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The batch size can greater than available permits and already handled at the client size.
@@ -171,6 +171,9 @@ | |||
+ " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.") | |||
private long delayedDeliveryTickTimeMillis = 1000; | |||
|
|||
@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index") | |||
private boolean batchIndexAcknowledgeEnable = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is a feature has to enabled on the broker, is there a way for consumer to know or set such feature is supported? I think the problem comes how to help troubleshooting when both broker and consumer need to support this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider adding a flag to enable or disable at consumer side? I think this feature does not introduce too much overhead on the consumer side.
For troubleshooting, users just to ensure that their brokers are enabled this feature and the client version is 2.6.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also create a master issue for this PIP for tracking the support for other language clients?
conf/broker.conf
Outdated
@@ -312,6 +312,9 @@ delayedDeliveryEnabled=true | |||
# Default is 1 second. | |||
delayedDeliveryTickTimeMillis=1000 | |||
|
|||
# Whether to enable acknowledge of batch local index. | |||
batchIndexAcknowledgeEnable = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batchIndexAcknowledgeEnable = true | |
batchIndexAcknowledgementEnabled = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a better name for this feature would be "acknowledgment at batch index level". So we can name this setting "acknowledgmentAtBatchIndexLevelEnabled".
/** | ||
* Get deleted batch indexes list for a batch message. | ||
*/ | ||
long[] getDeletedBatchIndexesLongArray(PositionImpl position); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
long[] getDeletedBatchIndexesLongArray(PositionImpl position); | |
long[] getDeletedBatchIndexesAsLongArray(PositionImpl position); |
@@ -39,6 +39,8 @@ | |||
|
|||
private boolean createIfMissing = true; | |||
private int maxUnackedRangesToPersist = 10000; | |||
private int maxBatchDeletedIndexToPersist = 10000; | |||
private boolean batchIndexDeleteEnabled = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private boolean batchIndexDeleteEnabled = true; | |
private boolean deletionAtBatchIndexLevelEnabled = true; |
@@ -148,6 +151,10 @@ | |||
return position; | |||
}; | |||
private final LongPairRangeSet<PositionImpl> individualDeletedMessages; | |||
|
|||
// Maintain the indexes deleted status of batch messages that not deleted completely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Maintain the indexes deleted status of batch messages that not deleted completely | |
// Maintain the deletion status for batch messages |
@@ -148,6 +151,10 @@ | |||
return position; | |||
}; | |||
private final LongPairRangeSet<PositionImpl> individualDeletedMessages; | |||
|
|||
// Maintain the indexes deleted status of batch messages that not deleted completely | |||
// (ledgerId, entryId) -> deleted indexes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// (ledgerId, entryId) -> deleted indexes | |
// (ledgerId, entryId) -> deletion indexes |
if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) { | ||
doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties); | ||
} else if (ackType == AckType.Cumulative) { | ||
ConcurrentBitSet bitSet = new ConcurrentBitSet(batchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a concurrent bit set?
|
||
/** | ||
* This is a set of all the individual acks that the application has issued and that were not already sent to | ||
* broker. | ||
*/ | ||
private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks; | ||
private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSet> pendingIndividualBatchIndexAcks; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bookkeeper has a pretty efficient AckSet / WriteSet implementation that can avoid producing a lot of garbages. I think we should leverage the implementation there - https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java#L76
|
||
// Maintain the indexes deleted status of batch messages that not deleted completely | ||
// (ledgerId, entryId) -> deleted indexes | ||
private final ConcurrentSkipListMap<PositionImpl, BitSet> batchDeletedIndexes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as I made to client side change. We should consider re-using some of the AckSet implementation in bookkeeper client.
@@ -78,6 +81,11 @@ message MessageRange { | |||
required NestedPositionInfo upperEndpoint = 2; | |||
} | |||
|
|||
message BatchDeletedIndexInfo { | |||
required NestedPositionInfo position = 1; | |||
repeated int64 ackBitSet = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments here:
- I would prefer calling it
delete_set
. "Acknowledgement" is the concept at the client & broker side. In managed ledger level, we use "delete". So it is better to keep the name consistent.
@@ -78,6 +81,11 @@ message MessageRange { | |||
required NestedPositionInfo upperEndpoint = 2; | |||
} | |||
|
|||
message BatchDeletedIndexInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name is not very clear. I think it is keeping the deletion index for a batched entry. So I would suggest naming it "BatchedEntryDeletionIndexInfo"
@codelipenghui I think the change looks very good in general. However, there are a few major comments I would like to clarify here.
|
Also, I feel we should clarify that the feature is implementing "acknowledgment at batch index level". It is confusing when we are saying "batch acknowledgment". So we'd better rename the PIP to "Support acknowledgment at batch index level" |
@sijie I will change the default value of batchIndexAcknowledgementEnabled to false since this feature brings some resource overhead. |
@zzzming Thanks for your comments. I have added an issue #6353 to track implementation for other language clients. I'm not familiar with C++.
This feature has no restrictions on subscription types.
This is not introduced by this feature and we really need to do some tradeoffs. If users care about re-transmission of the entire batch, they can disable the batch message. But in my opinion, to disabling batch message could be worse. If the broker splits and sends batch messages, this will introduce more overhead on the broker. |
335cdff
to
00f0865
Compare
/pulsarbot run-failure-checks |
4 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
2 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
1 similar comment
/pulsarbot run-failure-checks |
Doc has been added. |
Master issue: apache#6253 Fixes apache#5969 ### Motivation Add support for ack batch message local index. Can be disabled at broker side by set batchIndexAcknowledgeEnable=false at broker.conf PIP-54 documentation will be created soon. ### Modifications 1. Managed cursor support track and persistent local index of batch message. 2. Client support send batch index ack to broker. 3. The batch messages with index ack information dispatched to the client. 4. Client skip the acked index. ### Verifying this change New unit tests added
Master issue: apache#6253 Fixes apache#5969 ### Motivation Add support for ack batch message local index. Can be disabled at broker side by set batchIndexAcknowledgeEnable=false at broker.conf PIP-54 documentation will be created soon. ### Modifications 1. Managed cursor support track and persistent local index of batch message. 2. Client support send batch index ack to broker. 3. The batch messages with index ack information dispatched to the client. 4. Client skip the acked index. ### Verifying this change New unit tests added
@codelipenghui penghui please see the url: |
Master issue: #6253
Fixes #5969
Motivation
Add support for ack batch message local index. Can be disabled at broker side by set batchIndexAcknowledgeEnable=false at broker.conf
PIP-54 documentation will be created soon.
Modifications
Verifying this change
New unit tests added
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation