Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-34 Add flag to enable or disable Key_Shared subscription. #4120

Merged
merged 3 commits into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ subscriptionRedeliveryTrackerEnabled=true
# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5

# Enable Key_Shared subscription (default is enabled)
subscriptionKeySharedEnable=true

# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long subscriptionExpiryCheckIntervalInMinutes = 5;

@FieldContext(
category = CATEGORY_POLICIES,
dynamic = true,
doc = "Enable Key_Shared subscription (default is enabled)"
)
private boolean subscriptionKeySharedEnable = true;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Set the default behavior for message deduplication in the broker.\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,14 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
new NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
return future;
}

if (subType == SubType.Key_Shared
&& !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
future.completeExceptionally(
new NotAllowedException("Key_Shared subscription is disabled by broker.")
);
return future;
}
if (isBlank(subscriptionName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Empty subscription name", topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {

private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);


@BeforeMethod
@Override
protected void setup() throws Exception {
Expand All @@ -51,7 +50,7 @@ protected void cleanup() throws Exception {

@Test
public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {

this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared";

Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
Expand Down Expand Up @@ -153,7 +152,7 @@ public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws Pu

@Test
public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {

this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_none_key";

Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
Expand Down Expand Up @@ -251,6 +250,7 @@ public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() thr

@Test
public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_ordering_key";

Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
Expand Down Expand Up @@ -327,4 +327,16 @@ public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws Pulsa
Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
}

@Test(expectedExceptions = PulsarClientException.class)
public void testDisableKeySharedSubscription() throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(false);
String topic = "persistent://public/default/key_shared_disabled";
pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(10, TimeUnit.SECONDS)
.subscribe();
}
}
2 changes: 2 additions & 0 deletions site2/docs/concepts-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ In *Key_Shared* mode, multiple consumers can attach to the same subscription. Me

![Key_Shared subscriptions](assets/pulsar-key-shared-subscriptions.png)

**Key_Shared subscription is a beta feature. You can disable it at broker.config.**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui shouldn't this be "You can enable it"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie Key_shared subscription is enabled by default. so i think we need to tell users can disable it at broker.config

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a comments above to enable it by default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. sorry I missed that.


## Multi-topic subscriptions

When a consumer subscribes to a Pulsar topic, by default it subscribes to one specific topic, such as `persistent://public/default/my-topic`. As of Pulsar version 1.23.0-incubating, however, Pulsar consumers can simultaneously subscribe to multiple topics. You can define a list of topics in two ways:
Expand Down