Key_Shared is one of the subscription types which allows multiple consumer connections. Messages are distributed across consumers, and messages with the same key or same ordering key are delivered to only one consumer. No matter how many times the message is re-delivered, it is delivered to the same consumer.
When disabling allowOutOfOrderDelivery
, Key_Shared subscription guarantees a key will be processed in order by a single consumer, even if a new consumer is connected.
Key_Shared has a mechanism called the "recently joined consumers" to keep message ordering. However, currently, it doesn't care about some corner cases. More specifically, we found two out-of-order issues cased by:
- [issue-1] The race condition in the "recently joined consumers", where consumers can be added before finishing reading and dispatching messages from ledgers.
- [issue-2] Messages could be added to messagesToRedeliver without consumer-side operations such as unacknowledgement.
We should care about these cases in Key_Shared subscription.
Key_Shared subscription has out-of-order cases because of the race condition of the "recently joined consumers".
Consider the following flow.
- Assume that the current read position is
1:6
and the recently joined consumers is empty. - Called OpReadEntry#internalReadEntriesComplete from thread-1.
Then, the current read position is updated to1:12
(Messages from1:6
to1:11
have yet to be dispatched to consumers). - Called PersistentStickyKeyDispatcherMultipleConsumers#addConsumer from thread-2.
Then, the new consumer is stored to the recently joined consumers with read position1:12
. - Called PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers from thread-5.
Then, messages from1:6
to1:11
can be dispatched to the new consumer since the "recently joined consumers" allow brokers to send messages before the joined position (i.e.,1:12
here). However, it is not expected.
For example, if existing consumers have some unacked messages, disconnecting, and redelivering them can cause out-of-order.
An example scenario is shown below.
- Assume that the entries has the following messages, and the dispatcher has two consumers (
c1
messagesForC
is 1,c2
messageForC
is 1000), and the selector will returnc1
ifkey-a
andc2
ifkey-b
.1:6
key:key-a
1:7
key:key-a
1:8
key:key-a
1:9
key:key-b
1:10
key:key-b
1:11
key:key-b
- Send
1:6
toc1
and1:9
-1:11
toc2
.- So, the current read position is
1:12
. c1
never acknowledge1:6
.
- So, the current read position is
- Add new consumer
c3
, the selector will returnc3
ifkey-a
, and therecentlyJoinedConsumers
is{c3=1:12}
- Send
1:7
-1:8
toc3
because1:7
, and1:8
are less than the recently joined consumers position,1:12
. - Disconnect
c1
. - Send
1:6
toc3
.
As a resultc3
receives messages with the following order:1:7
,1:8
,1:6
// out-of-order
Key_Shared subscription has out-of-order cases because messages could be added to messagesToRedeliver without consumer-side operations such as unacknowledgement.
Consider the following flow.
- Assume that,
readPosition:2:1
messagesToRedeliver: []
recentlyJoinedConsumers: []
c1: messagesForC: 1, pending: []
c2: messagesForC: 1000, pending: [] // Necessary to ensure that the dispatcher reads entries even if c1 has no more permits.
selector: key-a: c1 - Dispatch
2:1
(key:key-a
, type: Normal)
readPosition:2:2
messagesToRedeliver: []
recentlyJoinedConsumers: []
c1: messagesForC: 0, pending: [2:1
]
c2: messagesForC: 1000, pending: []
selector: key-a: c1 - Try to dispatch
2:2
(key:key-a
, type: Normal), but it can't be sent to c1 because c1 has no more permits. Then, it is added to messagesToRedeliver.
readPosition:2:3
messagesToRedeliver: [2:2
]
recentlyJoinedConsumers: []
c1: messagesForC: 0, pending: [2:1
]
c2: messagesForC: 1000, pending: []
selector: key-a: c1 - Add consumer c3
readPosition:2:3
messagesToRedeliver: [2:2
]
recentlyJoinedConsumers: [c3:2:3
]
c1: messagesForC: 0, pending: [2:1
]
c2: messagesForC: 1000, pending: []
c3: messagesForC: 1000, pending: []
selector: key-a: c3 // modified - Dispatch
2:2
(key:key-a
, type: Replay) from messagesToRedeliver.
readPosition:2:3
messagesToRedeliver: []
recentlyJoinedConsumers: [c3:2:3
]
c1: messagesForC: 0, pending: [2:1
]
c2: messagesForC: 1000, pending: []
c3: messagesForC: 999, pending: [2:2
]
selector: key-a: c3 - Disconnect c1 and redelivery
2:1
readPosition:2:3
messagesToRedeliver: []
recentlyJoinedConsumers: [c3:2:3
]
c2: messagesForC: 1000, pending: []
c3: messagesForC: 998, pending: [2:2
,2:1
] // out-of-order
selector: key-a: c3
Fix out-of-order issues above.
Simplify or improve the specification of Key_Shared.
The root cause of the issues described above is that recentlyJoinedConsumers
uses "read position" as joined positions for consumers, because this does not guarantee that messages less than or equal to it have already been scheduled to be sent.
Instead, we propose to use "last sent position" as joined positions for consumers.
Also, change (or add) some stats to know Key_Shared subscription status easily.
First, introduce the new position, like the mark delete position and the individually deleted messages. In other words,
- All positions less than or equal to it are already scheduled to be sent.
- Manage individually sent positions to update the position as expected.
An example of updating the individually sent messages and the last sent position will be as follows.
Initially, the last sent position is 3:0
, and the individually sent positions is []
.
- Read
3:1
-3:10
positions - Send
3:1
-3:3
,3:5
, and3:8
-3:10
positions- last sent position:
3:3
- individually sent positions:
[(3:4, 3:5], (3:7, 3:10]]
- last sent position:
- Send
3:7
position- last sent position:
3:3
- individually sent positions:
[(3:4, 3:5], (3:6, 3:10]]
- last sent position:
- Send
3:6
position- last sent position:
3:3
- individually sent positions:
[(3:4, 3:10]]
- last sent position:
- Send
3:4
position- last sent position:
3:10
- individually sent positions:
[]
- last sent position:
More specifically, the recently joined consumers related fields will be as follows.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 8f05530f58b..2b17c580832 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -69,8 +69,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
* This means that, in order to preserve ordering, new consumers can only receive old
* messages, until the mark-delete position will move past this point.
*/
+ // Map(key: recently joined consumer, value: last sent position when joining)
private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;
+ private PositionImpl lastSentPosition;
+ private final RangeSetWrapper<PositionImpl> individuallySentPositions;
+
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
Next, rename the consumer stats as follows.
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -74,8 +74,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */
public boolean blockedConsumerOnUnackedMsgs;
- /** The read position of the cursor when the consumer joining. */
- public String readPositionWhenJoining;
+ /** The last sent position of the cursor when the consumer joining. */
+ public String lastSentPositionWhenJoining;
/** Address of this consumer. */
private String address;
Note that I just renamed the stats from readPositionWhenJoining
to lastSentPositionWhenJoining
without keeping the backward-compatibility because readPositionWhenJoining is no longer meaningful and redundant.
And finally, modify the subscription stats of the definition as follows.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index dc666f3a18e..7591369277f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -1177,7 +1177,14 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
.getRecentlyJoinedConsumers();
if (recentlyJoinedConsumers != null && recentlyJoinedConsumers.size() > 0) {
recentlyJoinedConsumers.forEach((k, v) -> {
- subStats.consumersAfterMarkDeletePosition.put(k.consumerName(), v.toString());
+ // The dispatcher allows same name consumers
+ final StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("consumerName=").append(k.consumerName())
+ .append(", consumerId=").append(k.consumerId());
+ if (k.cnx() != null) {
+ stringBuilder.append(", address=").append(k.cnx().clientAddress());
+ }
+ subStats.consumersAfterMarkDeletePosition.put(stringBuilder.toString(), v.toString());
});
}
}
[issue-1]
Consider the following flow.
- Assume that the entries has the following messages, and the dispatcher has two consumers (
c1
messagesForC
is 1,c2
messageForC
is 1000), and the selector will returnc1
ifkey-a
andc2
ifkey-b
.1:6
key:key-a
1:7
key:key-a
1:8
key:key-a
1:9
key:key-b
1:10
key:key-b
1:11
key:key-b
- Send
1:6
toc1
and1:9
-1:11
toc2
.- So, the current last sent position is
1:6
and the individually sent positions is[(1:8, 1:11]]
. c1
never acknowledge1:6
.
- So, the current last sent position is
- Add new consumer
c3
, the selector will returnc3
ifkey-a
, and therecentlyJoinedConsumers
is{c3=1:6}
. - Can't send
1:7
-1:8
toc3
because1:7
, and1:8
are greater than the recently joined consumers position,1:6
. - Disconnect
c1
. - Send
1:6
-1:8
toc3
.
Now,c3
receives messages with expected order regardingkey-a
.
[issue-2]
This mechanism guarantees all messages less than or equal to the last sent position are already scheduled to be sent. Therefore, skipped messages (e.g. 2:2
) are greater than the last sent position.
- The last sent position is
2:1
. - When add new consumer
c3
,recentlyJoinedConsumers
is[{c3: 2:1}]
.
The dispatcher can't send2:2
toc3
because2:2
is greater than the joined position2:1
. - When
c3
receives2:1
and acknowledges it, then the mark delete position is advanced to2:1
.
When all messages up to the joined position (i.e.,2:1
) have been acknowledged, then the consumer (i.e.,c3
) is removed fromrecentlyJoinedConsumers
.
Therefore,c3
will be able to receive2:2
.
[stats]
readPositionWhenJoining
is replaced with lastSentPositionWhenJoining
in each consumer stats instead.
- The consumer stats
readPositionWhenJoining
is renamed tolastSentPositionWhenJoining
. - The subscription stats
consumersAfterMarkDeletePosition
of the definition is modified as described.
See #20179 in detail. It isn't merged when publishing this proposal.
The only difference is the message key, i.e., this approach leverages per-key information in addition to the proposal described in this PIP.
For example, the recentlyJoinedConsumers
will be:
// Map(key: recently joined consumer, value: Map(key: message key, value: last sent position in the key when joining))
private final LinkedHashMap<Consumer, Map<ByteBuffer, PositionImpl>> recentlyJoinedConsumers;
With this change, message delivery stuck on one key will no longer prevent other keys from being dispatched. However, the codes will be vulnerable to an increase in keys, causing OOM in the worst case.
Make updating the read position, dispatching messages, and adding new consumers exclusive to ensure that messages less than the read position have already been sent. However, introducing such an exclusion mechanism disrupts the throughput of the dispatcher.
- Mailing List discussion thread: https://lists.apache.org/thread/69fpb0d30y7pc02k3zvg2lpb2lj0smdg
- Mailing List voting thread: https://lists.apache.org/thread/45x056t8njjnzflbkhkofh00gcy4z5g6