Skip to content

Commit

Permalink
Avoid triggering startReplProducer on newAddProducer as it may flips …
Browse files Browse the repository at this point in the history
…replicator state wrongly (#232)

* Avoid triggering startReplProducer on newAddProducer as it may flips replicator state wrongly

* Signal replicator is stopping if porducer is not created yet

* read repl-cluster from policies to avoid restart of closing-replicator
  • Loading branch information
rdhabalia committed Feb 24, 2017
1 parent 24360d9 commit 16a5221
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public synchronized void startProducer() {
// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: ", topicName,
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
}
return null;
Expand Down Expand Up @@ -624,7 +624,7 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
return closeProducerAsync();
} else {
// If there's already a reconnection happening, signal to close it whenever it's ready
STATE_UPDATER.set(this, State.Stopped);
STATE_UPDATER.set(this, State.Stopping);
}
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,26 @@ private boolean hasRemoteProducers() {
return foundRemote.get();
}

private void startReplProducers() {
replicators.forEach((region, replicator) -> replicator.startProducer());
public void startReplProducers() {
// read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path("policies", DestinationName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
if (policies.replication_clusters != null) {
Set<String> configuredClusters = Sets.newTreeSet(policies.replication_clusters);
replicators.forEach((region, replicator) -> {
if (configuredClusters.contains(region)) {
replicator.startProducer();
}
});
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies while starting repl-producers {}", topic, e.getMessage());
}
replicators.forEach((region, replicator) -> replicator.startProducer());
}
}

public CompletableFuture<Void> stopReplProducers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand All @@ -32,7 +33,9 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -57,8 +60,11 @@
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
Expand All @@ -69,12 +75,15 @@

import com.yahoo.pulsar.broker.PulsarService;
import com.yahoo.pulsar.broker.ServiceConfiguration;
import com.yahoo.pulsar.broker.admin.AdminResource;
import com.yahoo.pulsar.broker.cache.ConfigurationCacheService;
import com.yahoo.pulsar.broker.namespace.NamespaceService;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
Expand Down Expand Up @@ -828,4 +837,62 @@ public void testFailoverSubscription() throws Exception {

assertNull(topic2.getPersistentSubscription(successSubName));
}

/**
* {@link PersistentReplicator.removeReplicator} doesn't remove replicator in atomic way and does in multiple step:
* 1. disconnect replicator producer
* <p>
* 2. close cursor
* <p>
* 3. remove from replicator-list.
* <p>
*
* If we try to startReplicationProducer before step-c finish then it should not avoid restarting repl-producer.
*
* @throws Exception
*/
@Test
public void testAtomicReplicationRemoval() throws Exception {
final String globalTopicName = "persistent://prop/global/ns-abc/successTopic";
String localCluster = "local";
String remoteCluster = "remote";
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject());
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();

PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster;
ConcurrentOpenHashMap<String, PersistentReplicator> replicatorMap = topic.getReplicators();
;
final URL brokerUrl = new URL(
"http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
PulsarClient client = PulsarClient.create(brokerUrl.toString());
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = spy(
new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService));
replicatorMap.put(remoteReplicatorName, replicator);

// step-1 remove replicator : it will disconnect the producer but it will wait for callback to be completed
Method removeMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
removeMethod.setAccessible(true);
removeMethod.invoke(topic, remoteReplicatorName);

// step-2 now, policies doesn't have removed replication cluster so, it should not invoke "startProducer" of the
// replicator
when(pulsar.getConfigurationCache().policiesCache()
.get(AdminResource.path("policies", DestinationName.get(globalTopicName).getNamespace())))
.thenReturn(Optional.of(new Policies()));
// try to start replicator again
topic.startReplProducers();
// verify: replicator.startProducer is not invoked
verify(replicator, Mockito.times(0)).startProducer();

// step-3 : complete the callback to remove replicator from the list
ArgumentCaptor<DeleteCursorCallback> captor = ArgumentCaptor.forClass(DeleteCursorCallback.class);
Mockito.verify(ledgerMock).asyncDeleteCursor(anyObject(), captor.capture(), anyObject());
DeleteCursorCallback callback = captor.getValue();
callback.deleteCursorComplete(null);
}
}

0 comments on commit 16a5221

Please sign in to comment.