Skip to content

Commit

Permalink
[fix] [broker] Part-1: Replicator can not created successfully due to…
Browse files Browse the repository at this point in the history
… an orphan replicator in the previous topic owner (apache#21946)
  • Loading branch information
poorbarcode authored and hanmz committed Feb 12, 2025
1 parent eab5aa0 commit 8f93d6e
Show file tree
Hide file tree
Showing 12 changed files with 656 additions and 170 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterNam
if (ot.isPresent()) {
Replicator r = ot.get().getReplicators().get(clusterName);
if (r != null && r.isConnected()) {
r.disconnect(false).whenComplete((v, e) -> f.complete(null));
r.terminate().whenComplete((v, e) -> f.complete(null));
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public interface Replicator {

ReplicatorStatsImpl getStats();

CompletableFuture<Void> disconnect();
CompletableFuture<Void> terminate();

CompletableFuture<Void> disconnect(boolean b);
CompletableFuture<Void> disconnect(boolean failIfHasBacklog, boolean closeTheStartingProducer);

void updateRates();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected String getProducerName() {
}

@Override
protected void readEntries(Producer<byte[]> producer) {
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
this.producer = (ProducerImpl) producer;

if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
Expand All @@ -78,8 +78,7 @@ protected void readEntries(Producer<byte[]> producer) {
"[{}] Replicator was stopped while creating the producer."
+ " Closing it. Replicator state: {}",
replicatorId, STATE_UPDATER.get(this));
STATE_UPDATER.set(this, State.Stopping);
closeProducerAsync();
doCloseProducerAsync(producer, () -> {});
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty())));
FutureUtil.waitForAll(futures).thenRun(() -> {
Expand Down Expand Up @@ -523,7 +523,7 @@ public CompletableFuture<Void> close(

List<CompletableFuture<Void>> futures = new ArrayList<>();

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
Expand Down Expand Up @@ -582,7 +582,7 @@ public CompletableFuture<Void> close(

public CompletableFuture<Void> stopReplProducers() {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate()));
return FutureUtil.waitForAll(closeFutures);
}

Expand Down Expand Up @@ -663,7 +663,7 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {

String name = NonPersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);

replicators.get(remoteCluster).disconnect().thenRun(() -> {
replicators.get(remoteCluster).terminate().thenRun(() -> {
log.info("[{}] Successfully removed replicator {}", name, remoteCluster);
replicators.remove(remoteCluster);

Expand Down Expand Up @@ -1032,7 +1032,7 @@ private CompletableFuture<Void> disconnectReplicators() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = getReplicators();
replicators.forEach((r, replicator) -> {
futures.add(replicator.disconnect());
futures.add(replicator.terminate());
});
return FutureUtil.waitForAll(futures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.AbstractReplicator.State.Started;
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Starting;
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminated;
import static org.apache.pulsar.broker.service.AbstractReplicator.State.Terminating;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
Expand All @@ -26,7 +30,6 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand All @@ -43,10 +46,10 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
Expand Down Expand Up @@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
}

@Override
protected void readEntries(Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting.
cursor.rewind();

cursor.cancelPendingReadRequest();
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
this.producer = (ProducerImpl) producer;

if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
log.info("[{}] Created replicator producer", replicatorId);
/**
* 1. Try change state to {@link Started}.
* 2. Atoms modify multiple properties if change state success, to avoid another thread get a null value
* producer when the state is {@link Started}.
*/
Pair<Boolean, State> changeStateRes;
changeStateRes = compareSetAndGetState(Starting, Started);
if (changeStateRes.getLeft()) {
this.producer = (ProducerImpl) producer;
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
// Trigger a new read.
log.info("[{}] Created replicator producer, Replicator state: {}", replicatorId, state);
backOff.reset();
// activate cursor: so, entries can be cached
// activate cursor: so, entries can be cached.
this.cursor.setActive();
// read entries
readMoreEntries();
} else {
log.info(
"[{}] Replicator was stopped while creating the producer."
+ " Closing it. Replicator state: {}",
replicatorId, STATE_UPDATER.get(this));
STATE_UPDATER.set(this, State.Stopping);
closeProducerAsync();
if (changeStateRes.getRight() == Started) {
// Since only one task can call "producerBuilder.createAsync()", this scenario is not expected.
// So print a warn log.
log.warn("[{}] Replicator was already started by another thread while creating the producer."
+ " Closing the producer newly created. Replicator state: {}", replicatorId, state);
} else if (changeStateRes.getRight() == Terminating || changeStateRes.getRight() == Terminated) {
log.info("[{}] Replicator was terminated, so close the producer. Replicator state: {}",
replicatorId, state);
} else {
log.error("[{}] Replicator state is not expected, so close the producer. Replicator state: {}",
replicatorId, changeStateRes.getRight());
}
// Close the producer if change the state fail.
doCloseProducerAsync(producer, () -> {});
}

}

@Override
Expand Down Expand Up @@ -420,8 +437,8 @@ public CompletableFuture<MessageId> getFuture() {

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (STATE_UPDATER.get(this) != State.Started) {
log.info("[{}] Replicator was stopped while reading entries."
if (state != Started) {
log.info("[{}] Replicator was disconnected while reading entries."
+ " Stop reading. Replicator state: {}",
replicatorId, STATE_UPDATER.get(this));
return;
Expand All @@ -436,8 +453,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Error reading entries because replicator is"
+ " already deleted and cursor is already closed {}, ({})",
replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be stopped
closeProducerAsync();
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
terminate();
return;
} else if (!(exception instanceof TooManyRequestsException)) {
log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})",
Expand Down Expand Up @@ -555,8 +572,8 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
if (exception instanceof CursorAlreadyClosedException) {
log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be stopped
closeProducerAsync();
// replicator is already deleted and cursor is already closed so, producer should also be disconnected.
terminate();
return;
}
if (ctx instanceof PositionImpl) {
Expand Down Expand Up @@ -675,30 +692,6 @@ protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl<
}
}

@Override
public CompletableFuture<Void> disconnect() {
return disconnect(false);
}

@Override
public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog) {
final CompletableFuture<Void> future = new CompletableFuture<>();

super.disconnect(failIfHasBacklog).thenRun(() -> {
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
future.complete(null);
}).exceptionally(ex -> {
Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex);
if (!(t instanceof TopicBusyException)) {
log.error("[{}] Failed to close dispatch rate limiter: {}", replicatorId, ex.getMessage());
}
future.completeExceptionally(t);
return null;
});

return future;
}

@Override
public boolean isConnected() {
ProducerImpl<?> producer = this.producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,15 +833,15 @@ public CompletableFuture<Void> startReplProducers() {

public CompletableFuture<Void> stopReplProducers() {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect()));
replicators.forEach((region, replicator) -> closeFutures.add(replicator.terminate()));
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.terminate()));
return FutureUtil.waitForAll(closeFutures);
}

private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true)));
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true)));
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true, true)));
shadowReplicators.forEach((__, replicator) -> closeFutures.add(replicator.disconnect(true, true)));
return FutureUtil.waitForAll(closeFutures);
}

Expand Down Expand Up @@ -1423,8 +1423,8 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
List<CompletableFuture<Void>> futures = new ArrayList<>();
subscriptions.forEach((s, sub) -> futures.add(sub.close(true, Optional.empty())));
if (closeIfClientsConnected) {
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
}
FutureUtil.waitForAll(futures).thenRunAsync(() -> {
Expand Down Expand Up @@ -1565,8 +1565,8 @@ public CompletableFuture<Void> close(
List<CompletableFuture<Void>> futures = new ArrayList<>();

futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
replicators.forEach((cluster, replicator) -> futures.add(replicator.terminate()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.terminate()));
if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData -> {
Expand Down Expand Up @@ -1942,7 +1942,7 @@ CompletableFuture<Void> removeReplicator(String remoteCluster) {

String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);

Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::disconnect)
Optional.ofNullable(replicators.get(remoteCluster)).map(Replicator::terminate)
.orElse(CompletableFuture.completedFuture(null)).thenRun(() -> {
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
@Override
Expand Down Expand Up @@ -2014,7 +2014,7 @@ CompletableFuture<Void> removeShadowReplicator(String shadowTopic) {
log.info("[{}] Removing shadow topic replicator to {}", topic, shadowTopic);
final CompletableFuture<Void> future = new CompletableFuture<>();
String name = ShadowReplicator.getShadowReplicatorName(replicatorPrefix, shadowTopic);
shadowReplicators.get(shadowTopic).disconnect().thenRun(() -> {
shadowReplicators.get(shadowTopic).terminate().thenRun(() -> {

ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
@Override
Expand Down Expand Up @@ -2898,7 +2898,7 @@ private CompletableFuture<Void> checkAndDisconnectReplicators() {
ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators();
replicators.forEach((r, replicator) -> {
if (replicator.getNumberOfEntriesInBacklog() <= 0) {
futures.add(replicator.disconnect());
futures.add(replicator.terminate());
}
});
return FutureUtil.waitForAll(futures);
Expand Down Expand Up @@ -2949,6 +2949,15 @@ public void checkGC() {
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
maxInactiveDurationInSec);
}
/**
* There is a race condition that may cause a NPE:
* - task 1: a callback of "replicator.cursor.asyncRead" will trigger a replication.
* - task 2: "closeReplProducersIfNoBacklog" called by current thread will make the variable
* "replicator.producer" to a null value.
* Race condition: task 1 will get a NPE when it tries to send messages using the variable
* "replicator.producer", because task 2 will set this variable to "null".
* TODO Create a seperated PR to fix it.
*/
closeReplProducersIfNoBacklog().thenRun(() -> {
if (hasRemoteProducers()) {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName,
replicatorPrefix, broker, remoteClient);
replicator.startProducer();
replicator.disconnect();
replicator.terminate();

// Verify task will done.
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
Expand Down Expand Up @@ -129,7 +130,7 @@ protected String getProducerName() {
}

@Override
protected void readEntries(Producer<byte[]> producer) {
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {

}

Expand All @@ -139,7 +140,22 @@ protected Position getReplicatorReadPosition() {
}

@Override
protected long getNumberOfEntriesInBacklog() {
public ReplicatorStatsImpl getStats() {
return null;
}

@Override
public void updateRates() {

}

@Override
public boolean isConnected() {
return false;
}

@Override
public long getNumberOfEntriesInBacklog() {
return 0;
}

Expand Down
Loading

0 comments on commit 8f93d6e

Please sign in to comment.