Skip to content

Commit

Permalink
PIP-161 Exclusive Producer: ability to fence out an existing Producer…
Browse files Browse the repository at this point in the history
… (ExclusiveWithFencing mode) (#15488)
  • Loading branch information
eolivelli authored May 20, 2022
1 parent f3c072e commit bde3972
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -738,7 +739,55 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
return topicEpoch;
});
}

case ExclusiveWithFencing:
if (hasExclusiveProducer || !producers.isEmpty()) {
// clear all waiting producers
// otherwise closing any producer will trigger the promotion
// of the next pending producer
List<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducersCopy =
new ArrayList<>(waitingExclusiveProducers);
waitingExclusiveProducers.clear();
waitingExclusiveProducersCopy.forEach((Pair<Producer,
CompletableFuture<Optional<Long>>> handle) -> {
log.info("[{}] Failing waiting producer {}", topic, handle.getKey());
handle.getValue().completeExceptionally(new ProducerFencedException("Fenced out"));
handle.getKey().close(true);
});
producers.forEach((k, currentProducer) -> {
log.info("[{}] Fencing out producer {}", topic, currentProducer);
currentProducer.close(true);
});
}
if (producer.getTopicEpoch().isPresent()
&& producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
// If a producer reconnects, but all the topic epoch has already moved forward,
// this producer needs to be fenced, because a new producer had been present in between.
hasExclusiveProducer = false;
return FutureUtil.failedFuture(new ProducerFencedException(
String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
topicEpoch.get(), producer.getTopicEpoch().get())));
} else {
// There are currently no existing producers
hasExclusiveProducer = true;
exclusiveProducerName = producer.getProducerName();

CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(producer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
exclusiveProducerName = null;
return null;
});

return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
});
}
case WaitForExclusive: {
if (hasExclusiveProducer || !producers.isEmpty()) {
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import io.netty.util.HashedWheelTimer;
Expand Down Expand Up @@ -73,8 +75,10 @@ public static Object[][] accessMode() {
// ProducerAccessMode, partitioned
{ ProducerAccessMode.Exclusive, Boolean.TRUE},
{ ProducerAccessMode.Exclusive, Boolean.FALSE },
{ ProducerAccessMode.ExclusiveWithFencing, Boolean.TRUE},
{ ProducerAccessMode.ExclusiveWithFencing, Boolean.FALSE },
{ ProducerAccessMode.WaitForExclusive, Boolean.TRUE },
{ ProducerAccessMode.WaitForExclusive, Boolean.FALSE },
{ ProducerAccessMode.WaitForExclusive, Boolean.FALSE }
};
}

Expand All @@ -88,12 +92,14 @@ private void simpleTest(String topic) throws Exception {

Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("p1")
.accessMode(ProducerAccessMode.Exclusive)
.create();

try {
pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("p-fail-1")
.accessMode(ProducerAccessMode.Exclusive)
.create();
fail("Should have failed");
Expand All @@ -104,6 +110,7 @@ private void simpleTest(String topic) throws Exception {
try {
pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("p-fail-2")
.accessMode(ProducerAccessMode.Shared)
.create();
fail("Should have failed");
Expand All @@ -116,9 +123,100 @@ private void simpleTest(String topic) throws Exception {
// Now producer should be allowed to get in
Producer<String> p2 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("p2")
.accessMode(ProducerAccessMode.Exclusive)
.create();
p2.close();

Producer<String> p3 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("p3")
.accessMode(ProducerAccessMode.ExclusiveWithFencing)
.create();

try {
p2.send("test");
fail("Should have failed");
} catch (ProducerFencedException expected) {
}

// this should work
p3.send("test");
p3.close();

// test now WaitForExclusive vs ExclusiveWithFencing

// use two different Clients, because sometimes fencing a Producer triggers connection close
// making the test unreliable.

@Cleanup
PulsarClient pulsarClient2 = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.operationTimeout(2, TimeUnit.SECONDS)
.build();

Producer<String> p4 = pulsarClient2.newProducer(Schema.STRING)
.topic(topic)
.producerName("p4")
.accessMode(ProducerAccessMode.Exclusive)
.create();

p4.send("test");

// p5 will be waiting for the lock to be released
CompletableFuture<Producer<String>> p5 = pulsarClient2.newProducer(Schema.STRING)
.topic(topic)
.producerName("p5")
.accessMode(ProducerAccessMode.WaitForExclusive)
.createAsync();

// wait for all the Producers to be enqueued in order to prevent races
Thread.sleep(2000);

// p6 fences out all the current producers, even p5
Producer<String> p6 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.producerName("p6")
.accessMode(ProducerAccessMode.ExclusiveWithFencing)
.create();

p6.send("test");

// p7 is enqueued after p6
CompletableFuture<Producer<String>> p7 = pulsarClient2.newProducer(Schema.STRING)
.topic(topic)
.producerName("p7")
.accessMode(ProducerAccessMode.WaitForExclusive)
.createAsync();

// this should work, p6 is the owner
p6.send("test");

try {
p4.send("test");
fail("Should have failed");
} catch (ProducerFencedException expected) {
}

// this should work, p6 is the owner
p6.send("test");

// p5 fails
try {
p5.get();
fail("Should have failed");
} catch (ExecutionException expected) {
assertTrue(expected.getCause() instanceof ProducerFencedException,
"unexpected exception " + expected.getCause());
}

// this should work, p6 is the owner
p6.send("test");

p6.close();

// p7 finally acquires the lock
p7.get().send("test");
p7.get().close();
}

@Test(dataProvider = "topics")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public enum ProducerAccessMode {
*/
Exclusive,

/**
* Acquire exclusive access for the producer. Any existing producer will be removed and
* invalidated immediately.
*/
ExclusiveWithFencing,

/**
* Producer creation is pending until it can acquire exclusive access.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public interface ProducerBuilder<T> extends Cloneable {
* <li>{@link ProducerAccessMode#Shared}: By default multiple producers can publish on a topic
* <li>{@link ProducerAccessMode#Exclusive}: Require exclusive access for producer. Fail immediately if there's
* already a producer connected.
* <li>{@link ProducerAccessMode#ExclusiveWithFencing}: Require exclusive access for the producer.
* Any existing producer will be removed and invalidated immediately.
* <li>{@link ProducerAccessMode#WaitForExclusive}: Producer creation is pending until it can acquire exclusive
* access
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,8 @@ private static org.apache.pulsar.common.api.proto.ProducerAccessMode convertProd
return org.apache.pulsar.common.api.proto.ProducerAccessMode.Shared;
case WaitForExclusive:
return org.apache.pulsar.common.api.proto.ProducerAccessMode.WaitForExclusive;
case ExclusiveWithFencing:
return org.apache.pulsar.common.api.proto.ProducerAccessMode.ExclusiveWithFencing;
default:
throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
}
Expand All @@ -1867,6 +1869,8 @@ public static ProducerAccessMode convertProducerAccessMode(
return ProducerAccessMode.Shared;
case WaitForExclusive:
return ProducerAccessMode.WaitForExclusive;
case ExclusiveWithFencing:
return ProducerAccessMode.ExclusiveWithFencing;
default:
throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
}
Expand Down
1 change: 1 addition & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ enum ProducerAccessMode {
Shared = 0; // By default multiple producers can publish on a topic
Exclusive = 1; // Require exclusive access for producer. Fail immediately if there's already a producer connected.
WaitForExclusive = 2; // Producer creation is pending until it can acquire exclusive access
ExclusiveWithFencing = 3; // Require exclusive access for producer. Fence out old producer.
}

message MessageMetadata {
Expand Down
1 change: 1 addition & 0 deletions site2/docs/concepts-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Access mode | Description
:-----------|------------
`Shared` | Multiple producers can publish on a topic. <br /><br />This is the **default** setting.
`Exclusive` | Only one producer can publish on a topic. <br /><br />If there is already a producer connected, other producers trying to publish on this topic get errors immediately.<br /><br />The “old” producer is evicted and a “new” producer is selected to be the next exclusive producer if the “old” producer experiences a network partition with the broker.
`ExclusiveWithFencing`|Only one producer can publish on a topic. <br /><br />If there is already a producer connected, it will be removed and invalidated immediately.
`WaitForExclusive` | If there is already a producer connected, the producer creation is pending (rather than timing out) until the producer gets the `Exclusive` access.<br /><br />The producer that succeeds in becoming the exclusive one is treated as the leader. Consequently, if you want to implement a leader election scheme for your application, you can use this access mode. Note that the leader pattern scheme mentioned refers to using Pulsar as a Write-Ahead Log (WAL) which means the leader writes its "decisions" to the topic. On error cases, the leader will get notified it is no longer the leader *only* when it tries to write a message and fails on appropriate error, by the broker.

:::note
Expand Down

0 comments on commit bde3972

Please sign in to comment.