From 5338dc99ed398060b03a4ecc5b63849caeae3590 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 20 Nov 2024 21:00:32 +0800 Subject: [PATCH] [improve][pip] PIP-392: Enable consistent hashing to select active consumer in partitioned topic for failover subscription (#23583) --- pip/pip-392.md | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 pip/pip-392.md diff --git a/pip/pip-392.md b/pip/pip-392.md new file mode 100644 index 0000000000000..ff5237fdb48e1 --- /dev/null +++ b/pip/pip-392.md @@ -0,0 +1,97 @@ +# PIP-392: Add configuration to enable consistent hashing to select active consumer for partitioned topic + +# Background knowledge + +After [#19502](https://github.com/apache/pulsar/pull/19502) will use consistent hashing to select active consumer for non-partitioned topic + +# Motivation + +Currently, for partitioned topics, the active consumer is selected using the formula [partitionedIndex % consumerSize](https://github.com/apache/pulsar/blob/137df29f85798b00de75460a1acb91c7bc25453f/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L129-L130). +This method can lead to uneven distribution of active consumers. + +Consider a scenario with 100 topics named `public/default/topic-{0~100}`, each having `one partition`. +If 10 consumers are created using a `regex` subscription with the `Failover type`, all topic will be assigned to the same consumer(the first connected consumer). This results in an imbalanced distribution of consumers. + +# Goals + +## In Scope +- Address the issue of imbalance for `failover` subscription type consumers in single-partition or few-partition topics. + +## Out of Scope +- Excluding the `exclusive` subscription type. + +It's important to note that both the `modulo algorithm` and the `consistent hashing algorithm` can cause the consumer to be transferred. +This might result in messages being delivered multiple times to consumers, which is a known issue and has been mentioned in the documentation. +https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover + +# High Level Design +The solution involves adding a configuration setting that allows users to enable consistent hashing for partitioned topics. +When enabled, the consumer selection process will use consistent hashing instead of the modulo operation. + +The algorithm already exists through [#19502](https://github.com/apache/pulsar/pull/19502) + +In simple terms, the hash algorithm includes the following steps: + +1. Hash Ring Creation: Traverse all consumers and use `consumer name` to calculate a hash ring with 100 virtual nodes. + +[Exist code](https://github.com/apache/pulsar/blob/1b1bd4b610dd768a6908964ef841a6790bb0f4f0/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L152-L162) +```java + private NavigableMap makeHashRing(int consumerSize) { + NavigableMap hashRing = new TreeMap<>(); + for (int i = 0; i < consumerSize; i++) { + for (int j = 0; j < CONSUMER_CONSISTENT_HASH_REPLICAS; j++) { + String key = consumers.get(i).consumerName() + j; + int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes()); + hashRing.put(hash, i); + } + } + return Collections.unmodifiableNavigableMap(hashRing); + } +``` + +2. Consumer Selection: Use the hash of the topic name to select the matching consumer from the hash ring. + +[Exist code](https://github.com/apache/pulsar/blob/1b1bd4b610dd768a6908964ef841a6790bb0f4f0/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L146-L150) +```java + private int peekConsumerIndexFromHashRing(NavigableMap hashRing) { + int hash = Murmur3Hash32.getInstance().makeHash(topicName); + Map.Entry ceilingEntry = hashRing.ceilingEntry(hash); + return ceilingEntry != null ? ceilingEntry.getValue() : hashRing.firstEntry().getValue(); + } +``` + +This approach ensures a more even distribution of active consumers across topics, improving load balancing and resource utilization. + +# Detailed Design + +## Design & Implementation Details +Refer to implementation PR: https://github.com/apache/pulsar/pull/23584 + +The implementation is simple. If this activeConsumerFailoverConsistentHashing is enabled, the consistent hashing algorithm is used regardless of whether the topic is partitioned. + +## Public-facing Changes + +If activeConsumerFailoverConsistentHashing is enabled, when users use the failover subscription model, +the `first consumer` will not necessarily consume `P1`, and the `second consumer` will not necessarily consume `P2`. + +As described in the documentation:: https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#failover--partitioned-topics + +Instead, the hash algorithm will determine which consumer consumes which partition. + +### Configuration + +A new configuration field will be added: + +```java +@FieldContext( + category = CATEGORY_POLICIES, + doc = "Enable consistent hashing for selecting the active consumer in partitioned " + + "topics with Failover subscription type. " + + "For non-partitioned topics, consistent hashing is used by default." +) +private boolean activeConsumerFailoverConsistentHashing = false; +``` + + +# Backward & Forward Compatibility +The default value is false to keep original behavior.