Apache Pulsar's Key_Shared subscription mode is designed to provide ordered message delivery on a per-key basis while allowing multiple consumers to process messages concurrently. This mode is particularly useful in scenarios where maintaining message order for specific keys is crucial, but overall throughput can be improved by parallelizing message consumption across multiple consumers.
Key concepts:
- Key_Shared subscription: A subscription mode that maintains message ordering per key while allowing multiple consumers.
- Hash ranges: In AUTO_SPLIT mode, the hash space is divided among active consumers to distribute message processing.
- Pending messages: Messages that have been sent to a consumer but not yet acknowledged (also called "pending acks" or "unacknowledged messages").
The Key_Shared subscription is described in the Pulsar documentation.
For this PIP, the most important detail is the "Preserving order of processing" section. There are recent changes in this section that apply to the master branch of Pulsar and, therefore, to the upcoming Pulsar 4.0. The changes were made as part of "PIP-282: Change definition of the recently joined consumers position".
PIP-282 (master branch / Pulsar 4.0) version of the "Preserving order of processing" section:
Key_Shared Subscription type guarantees a key will be processed by a single consumer at any given time. When a new consumer is connected, some keys will change their mapping from existing consumers to the new consumer. Once the connection has been established, the broker will record the current
lastSentPosition
and associate it with the new consumer. ThelastSentPosition
is a marker indicating that messages have been dispatched to the consumers up to this point. The broker will start delivering messages to the new consumer only when all messages up to thelastSentPosition
have been acknowledged. This will guarantee that a certain key is processed by a single consumer at any given time. The trade-off is that if one of the existing consumers is stuck and no time-out was defined (acknowledging for you), the new consumer won't receive any messages until the stuck consumer resumes or gets disconnected.
Previous version (applies to Pulsar 3.x) of the "Preserving order of processing" section:
Key Shared Subscription type guarantees a key will be processed by a single consumer at any given time. When a new consumer is connected, some keys will change their mapping from existing consumers to the new consumer. Once the connection has been established, the broker will record the current read position and associate it with the new consumer. The read position is a marker indicating that messages have been dispatched to the consumers up to this point, and after it, no messages have been dispatched yet. The broker will start delivering messages to the new consumer only when all messages up to the read position have been acknowledged. This will guarantee that a certain key is processed by a single consumer at any given time. The trade-off is that if one of the existing consumers is stuck and no time-out was defined (acknowledging for you), the new consumer won't receive any messages until the stuck consumer resumes or gets disconnected.
The current implementation of Key_Shared subscriptions faces several challenges:
- Complex Contract of Preserving Ordering: The current contract of preserving ordering is hard to understand and contains a fundamental problem. It explains a solution and then ties the guarantee to the provided solution. It could be interpreted that there's a guarantee as long as this solution is able to handle the case.
- Incomplete Ordering Contract Fulfillment: The current contract seems to make a conditional guarantee that a certain key is processed by a single consumer at any given time. Outside of the described solution in the contract, the current implementation struggles to consistently prevent messages from being sent to another consumer while pending on the original consumer. While Key_Shared subscriptions aim to preserve message ordering per key, the current implementation may not always achieve this, especially during consumer changes. There's a potential corner case reported in issue #23307.
- Usability Issues: Understanding the current system and detecting the reason why messages get blocked is time-consuming and difficult.
- Unnecessary Message Blocking: The current implementation blocks delivery for all messages when any hash range is blocked, even if other keys could be processed independently. This leads to suboptimal utilization of consumers and increased latency for messages that could otherwise be processed.
- Observability Challenges: The current implementation lacks clear visibility into the consuming state when processing gets stuck, making it harder to build automation for detecting and mitigating issues.
- Complexity: The existing solution for managing "recently joined consumers" is overly complex, making the system harder to maintain and debug.
- Clarify and fulfill the key-ordered message delivery contract for Key_Shared AUTO_SPLIT mode.
- Fix current issues where messages are sent out-of-order or when a single key is outstanding in multiple consumers at a time.
- Improve the handling of unacknowledged messages to prevent indefinite blocking and consumers getting stuck.
- Minimize memory usage for pending message tracking, eliminating PIP-282's "sent positions" tracking.
- Implement a new "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions.
- Enhance the reliability, usability, and scalability of Key_Shared subscriptions.
- Improve observability of Key_Shared subscriptions to aid in troubleshooting and automation.
- Ensure strict ordering guarantees for messages with the same key, even during consumer changes.
- Changes to other subscription types (Exclusive, Failover, Shared).
- Adding support key based ordering guarantees when negative acknowledgements are used
The "Preserving order of processing" section of the Key_Shared documentation would be updated to contain this contract:
In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time.
When new consumers join or leave, the consumer handling a message key can change when the default AUTO_SPLIT mode is used, but only after all pending messages for a particular key are acknowledged or the original consumer disconnects.
The Key_Shared subscription doesn't prevent using any methods in the consumer API. For example, the application might call negativeAcknowledge
or the redeliverUnacknowledgedMessages
method. When messages are scheduled for delivery due to these methods, they will get redelivered as soon as possible. There's no ordering guarantee in these cases, however the guarantee of delivering a message key to a single consumer at a time will continue to be preserved.
Wikipedia tells us about invariants: "In computer science, an invariant is a logical assertion that is always held to be true during a certain phase of execution of a computer program."
The contract "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time." can be seen as an invariant for Key_Shared subscriptions. It is something that must always be held true for Key_Shared subscriptions. The design and implementation in PIP-379 focuses on ensuring this.
The updated contract explicitly states that it is not possible to retain key-based ordering of messages when negative acknowledgements are used. Changing this is out of scope for PIP-379. A potential future solution for handling this would be to modify the client so that when a message is negatively acknowledged, it would also reject all further messages with the same key until the original message gets redelivered. It's already possible to attempt to implement this in client-side code. However, a proper solution would require support on the broker side to block further delivery of the specific key when there are pending negatively acknowledged messages until all negatively acknowledged messages for that particular key have been acknowledged by the consumer. This solution is out of scope for PIP-379. A future implementation to address these problems could build upon PIP-379 concepts such as "draining hashes" and extend that to cover the negative acknowledgement scenarios.
The proposed solution introduces a "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions:
1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.
Pending messages of the consumer are iterated, and if the hash of a pending message belongs to one of the impacted ranges, the hash gets added to the "draining hashes" tracker.
Code example to illustrate the implementation:
private synchronized void registerDrainingHashes(Consumer skipConsumer,
Map<Consumer, NavigableSet<Range>> impactedRangesByConsumer) {
for (Map.Entry<Consumer, NavigableSet<Range>> entry : impactedRangesByConsumer.entrySet()) {
Consumer c = entry.getKey();
if (c != skipConsumer) {
// perf optimization: convert the set to an array to avoid iterator allocation in the pending acks loop
Range[] ranges = entry.getValue().toArray(new Range[0]);
// add all pending acks in the impacted hash ranges to the draining hashes tracker
c.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
for (Range range : ranges) {
if (range.contains(stickyKeyHash)) {
// add the pending ack to the draining hashes tracker if the hash is in the range
drainingHashesTracker.addEntry(c, stickyKeyHash);
break;
}
// Since ranges are sorted, stop checking further ranges if the start of the current range is
// greater than the stickyKeyHash.
if (range.getStart() > stickyKeyHash) {
break;
}
}
});
}
}
}
2. Following messages with hashes in the "draining hashes" set are blocked from further delivery until pending messages are processed.
Code example to illustrate the implementation:
// If the hash is draining, do not send the message
if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash)) {
return false;
}
3. A reference counter tracks pending messages for each hash in the "draining hashes" set.
Code example to illustrate the implementation:
// optimize the memory consumption of the map by using primitive int keys
private final Int2ObjectOpenHashMap<DrainingHashEntry> drainingHashes = new Int2ObjectOpenHashMap<>();
public static class DrainingHashEntry {
private final Consumer consumer;
private int refCount;
private int blockedCount;
DrainingHashEntry(Consumer consumer) {
this.consumer = consumer;
}
public Consumer getConsumer() {
return consumer;
}
void incrementRefCount() {
refCount++;
}
boolean decrementRefCount() {
return --refCount == 0;
}
void incrementBlockedCount() {
blockedCount++;
}
boolean isBlocking() {
return blockedCount > 0;
}
}
The memory consumption estimate for tracking a hash is 52 bytes: key: 16 bytes (object header) + 4 bytes (int) = 20 bytes entry: 16 bytes (object header) + 8 bytes (long) + 4 bytes (int) + 4 bytes (int) = 32 bytes
Although the estimate is 52 bytes per entry, calculations have been made with 80 bytes per entry to account for possible additional overheads such as memory alignment and the overhead of the Int2ObjectOpenHashMap.
Memory usage estimate for each subscription after there have been consumer changes:
- Worst case (all 64k hashes draining for a subscription): about 5MB
- Practical case (less than 1000 hashes draining): less than 80 kilobytes
- For 10,000 draining hashes: about 800 kB
The memory usage of draining hashes tracking will go down to 0 after all hashes have "drained" and are no longer blocked. This memory usage isn't an overhead that applies at all times.
The hash range size is reduced to 65535 (2^16-1) from the current 2^31-1 (Integer.MAX_VALUE) in ConsistentHashingStickyKeyConsumerSelector to reduce the worst-case memory consumption. Reducing the hash range size won't significantly impact the accuracy of distributing messages across connected consumers. The proof-of-concept implementation of PIP-379 includes the changes to reduce the hash range size.
4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented.
Individual acks are removed in Consumer's removePendingAcks
method:
private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position) {
PendingAcksMap ownedConsumerPendingAcks = ackOwnedConsumer.getPendingAcks();
if (!ownedConsumerPendingAcks.remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return false;
}
When the remove
method in PendingAcksMap
is called, it will use the PendingAcksMap.PendingAcksRemoveHandler
callback method handleRemoving
provided by the dispatcher to trigger the removal also from the DrainingHashesTracker
:
consumer.setPendingAcksRemoveHandler(new PendingAcksMap.PendingAcksRemoveHandler() {
@Override
public void handleRemoving(Consumer consumer, long ledgerId, long entryId, int stickyKeyHash,
boolean closing) {
drainingHashesTracker.reduceRefCount(consumer, stickyKeyHash, closing);
}
Also when a consumer disconnects, hashes of pending acks are removed. This happens in the PersistentDispatcherMultipleConsumers
's removeConsumer
consumer method:
consumer.getPendingAcks().forEachAndClose((ledgerId, entryId, batchSize, stickyKeyHash) -> {
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
});
PendingAcksMap
's forEachAndClose
method will trigger removals from DrainingHashesTracker
using the PendingAcksMap.PendingAcksRemoveHandler
callback method handleRemoving
after processing each entry. This is how the DrainingHashesTracker
stays in sync with the PendingAcksMap
state without having the need to add all logic to PendingAcksMap
. This is about following the "separation of concerns" design principle where each class handles a specific concern.
5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery. The dispatcher is notified about this so that the delivery of the blocked messages can occur. Unblocked hashes are batched together to prevent a new notification for each call. This is handled with the keySharedUnblockingIntervalMs
configuration setting.
In the implementation, this is handled in the DrainingHashesTracker's reduceRefCount method:
// code example is simplified for focus on the essential details
public synchronized void reduceRefCount(Consumer consumer, int stickyHash) {
DrainingHashEntry entry = drainingHashes.get(stickyHash);
if (entry == null) {
return;
}
if (entry.decrementRefCount()) {
DrainingHashEntry removed = drainingHashes.remove(stickyHash);
if (removed.isBlocking()) {
unblockingHandler.stickyKeyHashUnblocked(stickyHash);
}
}
}
The isBlocking()
method of DrainingHashEntry
returns true when delivery was attempted for that hash, indicating a need to unblock it when it's removed.
The dispatcher is notified via the unblockingHandler.stickyKeyHashUnblocked(stickyHash)
callback. The implementation simply schedules a read, batching all calls together, and then calls readMoreEntries
in the dispatcher.
// code example is simplified for focus on the essential details
private void stickyKeyHashUnblocked(int stickyKeyHash) {
reScheduleReadInMs(keySharedUnblockingIntervalMsSupplier.getAsLong());
}
protected void reScheduleReadInMs(long readAfterMs) {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Runnable runnable = () -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
};
topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS);
}
}
6. Consumer hash assignments may change multiple times, and a draining hash might be reassigned to the original consumer.
The draining hash data structure contains information about the draining consumer. When a message is attempted for delivery, the system can check if the target consumer is the same as the draining consumer. If they match, there's no need to block the hash. The implementation should also remove such hashes from the draining hashes set. This "lazy" approach reduces the need for actively scanning all draining hashes whenever hash assignments change.
This is handled in the DrainingHashesTracker
public synchronized boolean shouldBlockStickyKeyHash(Consumer consumer, int stickyKeyHash) {
DrainingHashEntry entry = drainingHashes.get(stickyKeyHash);
// if the entry is not found, the hash is not draining. Don't block the hash.
if (entry == null) {
return false;
}
// hash has been reassigned to the original consumer, remove the entry
// and don't block the hash
if (entry.getConsumer() == consumer) {
drainingHashes.remove(stickyKeyHash, entry);
return false;
}
// increment the blocked count which is used to determine if the hash is blocking
// dispatching to other consumers
entry.incrementBlockedCount();
// block the hash
return true;
}
7. When sending out messages, there are potential race conditions that could allow the delivery of a message that should be blocked.
This could happen when a consumer is added while reading and sending messages are already in progress. In PIP-379, the sending process has been modified to perform a check when adding the message to the pending acknowledgments map. There are also additional locks in the pending acks handling which prevent race conditions.
addPendingAckIfAllowed
method in PendingAcksMap
class:
public boolean addPendingAckIfAllowed(long ledgerId, long entryId, int batchSize, int stickyKeyHash) {
try {
writeLock.lock();
// prevent adding sticky hash to pending acks if the PendingAcksMap has already been closed
// and there's a race condition between closing the consumer and sending new messages
if (closed) {
return false;
}
// prevent adding sticky hash to pending acks if it's already in draining hashes
// to avoid any race conditions that would break consistency
PendingAcksAddHandler pendingAcksAddHandler = pendingAcksAddHandlerSupplier.get();
if (pendingAcksAddHandler != null
&& !pendingAcksAddHandler.handleAdding(consumer, ledgerId, entryId, stickyKeyHash)) {
return false;
}
Long2ObjectSortedMap<IntIntPair> ledgerPendingAcks =
pendingAcks.computeIfAbsent(ledgerId, k -> new Long2ObjectRBTreeMap<>());
ledgerPendingAcks.put(entryId, IntIntPair.of(batchSize, stickyKeyHash));
return true;
} finally {
writeLock.unlock();
}
}
This addPendingAckIfAllowed
method is called from Consumer's sendMessages
method:
boolean sendingAllowed =
pendingAcks.addPendingAckIfAllowed(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash);
if (!sendingAllowed) {
// sending isn't allowed when pending acks doesn't accept adding the entry
// this happens when Key_Shared draining hashes contains the stickyKeyHash
// because of race conditions, it might be resolved at the time of sending
totalEntries--;
entries.set(i, null);
entry.release();
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Skipping sending of {}:{} ledger entry with batchSize of {} since adding"
+ " to pending acks failed in broker.service.Consumer for consumerId: {}",
topicName, subscription, entry.getLedgerId(), entry.getEntryId(), batchSize,
consumerId);
}
If sending isn't allowed, the entry will be skipped from delivery. The PendingAcksAddHandler
callback will add the message to redelivery if this is the case.
The callback maps to handleAddingPendingAck
in the dispatcher (PersistentStickyKeyDispatcherMultipleConsumers
).
private boolean handleAddingPendingAck(Consumer consumer, long ledgerId, long entryId, int stickyKeyHash) {
DrainingHashesTracker.DrainingHashEntry drainingHashEntry = drainingHashesTracker.getEntry(stickyKeyHash);
if (drainingHashEntry != null && drainingHashEntry.getConsumer() != consumer) {
log.warn("[{}] Another consumer id {} is already draining hash {}. Skipping adding {}:{} to pending acks "
+ "for consumer {}. Adding the message to replay.",
getName(), drainingHashEntry.getConsumer(), stickyKeyHash, ledgerId, entryId, consumer);
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
// block message from sending
return false;
}
if (recentReadTypeInSending == ReadType.Normal && redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
log.warn("[{}] Sticky hash {} is already in the replay queue. "
+ "Skipping adding {}:{} to pending acks. Adding the message to replay.",
getName(), stickyKeyHash, ledgerId, entryId);
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
// block message from sending
return false;
}
// allow adding the message to pending acks and sending the message to the consumer
return true;
}
This logic will prevent any inconsistency when consumers get added or removed and hash ranges change while the sending of messages is already in progress. It will ensure that the view on pending acknowledgments is consistent so that the tracking of draining hashes will also be consistent in all cases. In addition, this logic will block hashes of messages that have recently been added to the redelivery queue and therefore, for message ordering reasons, should get delivered before any further message delivery happens.
Summary
This high-level design approach will meet the updated contract of preserving ordering: "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time."
It also minimizes the impact on performance and memory usage. The tracking only comes into play during transition states. When consumers have been connected for a longer duration and all draining hashes have been removed, there won't be a need to check any special rules or maintain any extra state. When the draining hashes are empty, lookups will essentially be no-ops and won't consume CPU or memory resources.
Topic stats for the removed PIP-282 "recently joined consumers"/"last sent position" solution are removed:
lastSentPositionWhenJoining
field for each consumerconsumersAfterMarkDeletePosition
field for each Key_Shared subscriptionindividuallySentPositions
field for each Key_Shared subscription
New topic stats will be added to monitor the "draining hashes" feature at the subscription level and consumer level:
draining_hashes_count
: The current number of hashes in the draining state.draining_hashes_pending_messages
: The total number of pending messages for all draining hashes.draining_hashes_cleared_total
: The total number of hashes cleared from the draining state.draining_hashes
: Details at the hash level (available at the consumer level to reduce redundancy of information)- hash
- number of pending messages
For improved observability, a separate REST API for listing all pending messages ("pending acks") for a consumer will be considered. This API would allow querying which messages are currently part of a draining hash, providing a way to identify specific message IDs of messages that are holding onto a specific hash and blocking delivery to another consumer.
The "draining hashes" feature doesn't introduce backward or forward compatibility issues. The state is handled at runtime, and the changes are on the broker side without changes to the client protocol.
Slightly unrelated to PIP-379 changes, there's a need to ensure that users upgrading from Pulsar 3.x can revert to the "recently joined consumers" logic (before PIP-282) in case of possible regressions caused by PIP-379. Since PIP-282 is also new in Pulsar 4.0.0, there needs to be a feature flag that toggles between the PIP-379 implementation for Key_Shared and the "recently joined consumers" logic before PIP-282. Implemention details for this feature toggle can be handled in the pull request for implementing this.
-
Mailing List discussion thread: https://lists.apache.org/thread/l5zjq0fb2dscys3rsn6kfl7505tbndlx
-
Mailing List voting thread: https://lists.apache.org/thread/z1kgo34qfkkvdnn3l007bdvjr3qqf4rw
-
PIP-379 implementation PR: #23352
-
PIP-282: Change definition of the recently joined consumers position
-
Pulsar issue #21199: Key_Shared subscription gets stuck after consumer reconnects