diff --git a/conf/broker.conf b/conf/broker.conf index 1bb171c7e7490..68222476af76a 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -107,6 +107,9 @@ subscriptionRedeliveryTrackerEnabled=true # How frequently to proactively check and purge expired subscription subscriptionExpiryCheckIntervalInMinutes=5 +# Enable Key_Shared subscription (default is enabled) +subscriptionKeySharedEnable=true + # Set the default behavior for message deduplication in the broker # This can be overridden per-namespace. If enabled, broker will reject # messages that were already stored in the topic diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index b3262c1e59659..c9a72f0103f36 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -266,6 +266,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private long subscriptionExpiryCheckIntervalInMinutes = 5; + @FieldContext( + category = CATEGORY_POLICIES, + dynamic = true, + doc = "Enable Key_Shared subscription (default is enabled)" + ) + private boolean subscriptionKeySharedEnable = true; + @FieldContext( category = CATEGORY_POLICIES, doc = "Set the default behavior for message deduplication in the broker.\n\n" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e1a050979e282..19bc174370311 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -498,6 +498,14 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri new NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions")); return future; } + + if (subType == SubType.Key_Shared + && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) { + future.completeExceptionally( + new NotAllowedException("Key_Shared subscription is disabled by broker.") + ); + return future; + } if (isBlank(subscriptionName)) { if (log.isDebugEnabled()) { log.debug("[{}] Empty subscription name", topic); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index e5f1edc058ad7..14c969309883a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -35,7 +35,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class); - @BeforeMethod @Override protected void setup() throws Exception { @@ -51,7 +50,7 @@ protected void cleanup() throws Exception { @Test public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException { - + this.conf.setSubscriptionKeySharedEnable(true); String topic = "persistent://public/default/key_shared"; Consumer consumer1 = pulsarClient.newConsumer() @@ -153,7 +152,7 @@ public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws Pu @Test public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException { - + this.conf.setSubscriptionKeySharedEnable(true); String topic = "persistent://public/default/key_shared_none_key"; Consumer consumer1 = pulsarClient.newConsumer() @@ -251,6 +250,7 @@ public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() thr @Test public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException { + this.conf.setSubscriptionKeySharedEnable(true); String topic = "persistent://public/default/key_shared_ordering_key"; Consumer consumer1 = pulsarClient.newConsumer() @@ -327,4 +327,16 @@ public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws Pulsa Assert.assertEquals(consumer2ExpectMessages, consumer2Received); Assert.assertEquals(consumer3ExpectMessages, consumer3Received); } + + @Test(expectedExceptions = PulsarClientException.class) + public void testDisableKeySharedSubscription() throws PulsarClientException { + this.conf.setSubscriptionKeySharedEnable(false); + String topic = "persistent://public/default/key_shared_disabled"; + pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("key_shared") + .subscriptionType(SubscriptionType.Key_Shared) + .ackTimeout(10, TimeUnit.SECONDS) + .subscribe(); + } } diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md index 9714596d17d1d..5bda2e9205845 100644 --- a/site2/docs/concepts-messaging.md +++ b/site2/docs/concepts-messaging.md @@ -150,6 +150,8 @@ In *Key_Shared* mode, multiple consumers can attach to the same subscription. Me ![Key_Shared subscriptions](assets/pulsar-key-shared-subscriptions.png) +**Key_Shared subscription is a beta feature. You can disable it at broker.config.** + ## Multi-topic subscriptions When a consumer subscribes to a Pulsar topic, by default it subscribes to one specific topic, such as `persistent://public/default/my-topic`. As of Pulsar version 1.23.0-incubating, however, Pulsar consumers can simultaneously subscribe to multiple topics. You can define a list of topics in two ways: