From bde3972fa4890f1e5c4b28472802011b26472081 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 20 May 2022 08:30:26 +0200 Subject: [PATCH] PIP-161 Exclusive Producer: ability to fence out an existing Producer (ExclusiveWithFencing mode) (#15488) --- .../pulsar/broker/service/AbstractTopic.java | 51 ++++++++- .../broker/service/ExclusiveProducerTest.java | 102 +++++++++++++++++- .../pulsar/client/api/ProducerAccessMode.java | 6 ++ .../pulsar/client/api/ProducerBuilder.java | 2 + .../pulsar/common/protocol/Commands.java | 4 + pulsar-common/src/main/proto/PulsarApi.proto | 1 + site2/docs/concepts-messaging.md | 1 + 7 files changed, 164 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 02c2c3d2f4445..ad51294916829 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -22,6 +22,7 @@ import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; @@ -738,7 +739,55 @@ protected CompletableFuture> incrementTopicEpochIfNeeded(Producer return topicEpoch; }); } - + case ExclusiveWithFencing: + if (hasExclusiveProducer || !producers.isEmpty()) { + // clear all waiting producers + // otherwise closing any producer will trigger the promotion + // of the next pending producer + List>>> waitingExclusiveProducersCopy = + new ArrayList<>(waitingExclusiveProducers); + waitingExclusiveProducers.clear(); + waitingExclusiveProducersCopy.forEach((Pair>> handle) -> { + log.info("[{}] Failing waiting producer {}", topic, handle.getKey()); + handle.getValue().completeExceptionally(new ProducerFencedException("Fenced out")); + handle.getKey().close(true); + }); + producers.forEach((k, currentProducer) -> { + log.info("[{}] Fencing out producer {}", topic, currentProducer); + currentProducer.close(true); + }); + } + if (producer.getTopicEpoch().isPresent() + && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) { + // If a producer reconnects, but all the topic epoch has already moved forward, + // this producer needs to be fenced, because a new producer had been present in between. + hasExclusiveProducer = false; + return FutureUtil.failedFuture(new ProducerFencedException( + String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", + topicEpoch.get(), producer.getTopicEpoch().get()))); + } else { + // There are currently no existing producers + hasExclusiveProducer = true; + exclusiveProducerName = producer.getProducerName(); + + CompletableFuture future; + if (producer.getTopicEpoch().isPresent()) { + future = setTopicEpoch(producer.getTopicEpoch().get()); + } else { + future = incrementTopicEpoch(topicEpoch); + } + future.exceptionally(ex -> { + hasExclusiveProducer = false; + exclusiveProducerName = null; + return null; + }); + + return future.thenApply(epoch -> { + topicEpoch = Optional.of(epoch); + return topicEpoch; + }); + } case WaitForExclusive: { if (hasExclusiveProducer || !producers.isEmpty()) { CompletableFuture> future = new CompletableFuture<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java index 4d2b16a6a0e75..604abd8d7095f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java @@ -19,10 +19,12 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import io.netty.util.HashedWheelTimer; @@ -73,8 +75,10 @@ public static Object[][] accessMode() { // ProducerAccessMode, partitioned { ProducerAccessMode.Exclusive, Boolean.TRUE}, { ProducerAccessMode.Exclusive, Boolean.FALSE }, + { ProducerAccessMode.ExclusiveWithFencing, Boolean.TRUE}, + { ProducerAccessMode.ExclusiveWithFencing, Boolean.FALSE }, { ProducerAccessMode.WaitForExclusive, Boolean.TRUE }, - { ProducerAccessMode.WaitForExclusive, Boolean.FALSE }, + { ProducerAccessMode.WaitForExclusive, Boolean.FALSE } }; } @@ -88,12 +92,14 @@ private void simpleTest(String topic) throws Exception { Producer p1 = pulsarClient.newProducer(Schema.STRING) .topic(topic) + .producerName("p1") .accessMode(ProducerAccessMode.Exclusive) .create(); try { pulsarClient.newProducer(Schema.STRING) .topic(topic) + .producerName("p-fail-1") .accessMode(ProducerAccessMode.Exclusive) .create(); fail("Should have failed"); @@ -104,6 +110,7 @@ private void simpleTest(String topic) throws Exception { try { pulsarClient.newProducer(Schema.STRING) .topic(topic) + .producerName("p-fail-2") .accessMode(ProducerAccessMode.Shared) .create(); fail("Should have failed"); @@ -116,9 +123,100 @@ private void simpleTest(String topic) throws Exception { // Now producer should be allowed to get in Producer p2 = pulsarClient.newProducer(Schema.STRING) .topic(topic) + .producerName("p2") .accessMode(ProducerAccessMode.Exclusive) .create(); - p2.close(); + + Producer p3 = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .producerName("p3") + .accessMode(ProducerAccessMode.ExclusiveWithFencing) + .create(); + + try { + p2.send("test"); + fail("Should have failed"); + } catch (ProducerFencedException expected) { + } + + // this should work + p3.send("test"); + p3.close(); + + // test now WaitForExclusive vs ExclusiveWithFencing + + // use two different Clients, because sometimes fencing a Producer triggers connection close + // making the test unreliable. + + @Cleanup + PulsarClient pulsarClient2 = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .operationTimeout(2, TimeUnit.SECONDS) + .build(); + + Producer p4 = pulsarClient2.newProducer(Schema.STRING) + .topic(topic) + .producerName("p4") + .accessMode(ProducerAccessMode.Exclusive) + .create(); + + p4.send("test"); + + // p5 will be waiting for the lock to be released + CompletableFuture> p5 = pulsarClient2.newProducer(Schema.STRING) + .topic(topic) + .producerName("p5") + .accessMode(ProducerAccessMode.WaitForExclusive) + .createAsync(); + + // wait for all the Producers to be enqueued in order to prevent races + Thread.sleep(2000); + + // p6 fences out all the current producers, even p5 + Producer p6 = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .producerName("p6") + .accessMode(ProducerAccessMode.ExclusiveWithFencing) + .create(); + + p6.send("test"); + + // p7 is enqueued after p6 + CompletableFuture> p7 = pulsarClient2.newProducer(Schema.STRING) + .topic(topic) + .producerName("p7") + .accessMode(ProducerAccessMode.WaitForExclusive) + .createAsync(); + + // this should work, p6 is the owner + p6.send("test"); + + try { + p4.send("test"); + fail("Should have failed"); + } catch (ProducerFencedException expected) { + } + + // this should work, p6 is the owner + p6.send("test"); + + // p5 fails + try { + p5.get(); + fail("Should have failed"); + } catch (ExecutionException expected) { + assertTrue(expected.getCause() instanceof ProducerFencedException, + "unexpected exception " + expected.getCause()); + } + + // this should work, p6 is the owner + p6.send("test"); + + p6.close(); + + // p7 finally acquires the lock + p7.get().send("test"); + p7.get().close(); } @Test(dataProvider = "topics") diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java index 85199e7cf4c27..33492802d161b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java @@ -33,6 +33,12 @@ public enum ProducerAccessMode { */ Exclusive, + /** + * Acquire exclusive access for the producer. Any existing producer will be removed and + * invalidated immediately. + */ + ExclusiveWithFencing, + /** * Producer creation is pending until it can acquire exclusive access. */ diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index cab4d497307f2..d1778685f84c1 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -133,6 +133,8 @@ public interface ProducerBuilder extends Cloneable { *
  • {@link ProducerAccessMode#Shared}: By default multiple producers can publish on a topic *
  • {@link ProducerAccessMode#Exclusive}: Require exclusive access for producer. Fail immediately if there's * already a producer connected. + *
  • {@link ProducerAccessMode#ExclusiveWithFencing}: Require exclusive access for the producer. + * Any existing producer will be removed and invalidated immediately. *
  • {@link ProducerAccessMode#WaitForExclusive}: Producer creation is pending until it can acquire exclusive * access * diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index d1cbe3ad96141..2d8e043058dd7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1853,6 +1853,8 @@ private static org.apache.pulsar.common.api.proto.ProducerAccessMode convertProd return org.apache.pulsar.common.api.proto.ProducerAccessMode.Shared; case WaitForExclusive: return org.apache.pulsar.common.api.proto.ProducerAccessMode.WaitForExclusive; + case ExclusiveWithFencing: + return org.apache.pulsar.common.api.proto.ProducerAccessMode.ExclusiveWithFencing; default: throw new IllegalArgumentException("Unknonw access mode: " + accessMode); } @@ -1867,6 +1869,8 @@ public static ProducerAccessMode convertProducerAccessMode( return ProducerAccessMode.Shared; case WaitForExclusive: return ProducerAccessMode.WaitForExclusive; + case ExclusiveWithFencing: + return ProducerAccessMode.ExclusiveWithFencing; default: throw new IllegalArgumentException("Unknonw access mode: " + accessMode); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index a5d97e51acfd7..a2bfd76c92c4a 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -99,6 +99,7 @@ enum ProducerAccessMode { Shared = 0; // By default multiple producers can publish on a topic Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected. WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access + ExclusiveWithFencing = 3; // Require exclusive access for producer. Fence out old producer. } message MessageMetadata { diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md index f7df3250762a5..b3520d70e8b34 100644 --- a/site2/docs/concepts-messaging.md +++ b/site2/docs/concepts-messaging.md @@ -76,6 +76,7 @@ Access mode | Description :-----------|------------ `Shared` | Multiple producers can publish on a topic.

    This is the **default** setting. `Exclusive` | Only one producer can publish on a topic.

    If there is already a producer connected, other producers trying to publish on this topic get errors immediately.

    The “old” producer is evicted and a “new” producer is selected to be the next exclusive producer if the “old” producer experiences a network partition with the broker. +`ExclusiveWithFencing`|Only one producer can publish on a topic.

    If there is already a producer connected, it will be removed and invalidated immediately. `WaitForExclusive` | If there is already a producer connected, the producer creation is pending (rather than timing out) until the producer gets the `Exclusive` access.

    The producer that succeeds in becoming the exclusive one is treated as the leader. Consequently, if you want to implement a leader election scheme for your application, you can use this access mode. Note that the leader pattern scheme mentioned refers to using Pulsar as a Write-Ahead Log (WAL) which means the leader writes its "decisions" to the topic. On error cases, the leader will get notified it is no longer the leader *only* when it tries to write a message and fails on appropriate error, by the broker. :::note