Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] close managed-ledgers before giving up bundle ownership to avoid bad zk-version #5599

Merged
merged 4 commits into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1860,7 +1860,7 @@ protected void unloadTopic(TopicName topicName, boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
Topic topic = getTopicReference(topicName);
topic.close().get();
topic.close(false).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have an enum here because it's not clear what false means in this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm.. I think closing forcefully can be represent with boolean flag as we do similar thing at multiple places: PersistentTopic::delete(boolean... flags)

Also, I was also trying to think about how to accommodate enum here instead of flag. One thing I can think of is to add below enum under Topic instead of flag.

enum CLOSE_ACTION { CLOSE_ALL, CLOSE_WITHOUT_CLIENT_WAIT }

But I feel enum is not helping much. Instead we can rename the flag to give more meaning closeWithoutWaitingClientDisconnect.

So, for PersistentTopic if flag is enabled then broker skips waiting on client-disconnect and immediately closes managed-ledger before giving up bundle ownership.
And for NonPersistentTopic just completes the close if flag is enabled.

Can you please let me know if I am missing anything while renaming flag instead making enum.? any thoughts?

log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
} catch (NullPointerException e) {
log.error("[{}] topic {} not found", clientAppId(), topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ public void handleUnloadRequest(PulsarService pulsar, long timeout, TimeUnit tim

// close topics forcefully
try {
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get(timeout, timeoutUnit);
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle, false).get(timeout, timeoutUnit);
} catch (TimeoutException e) {
// ignore topic-close failure to unload bundle
LOG.error("Failed to close topics in namespace {} in {}/{} timeout", bundle.toString(), timeout,
timeoutUnit);
try {
LOG.info("Forcefully close topics for bundle {}", bundle);
pulsar.getBrokerService().unloadServiceUnit(bundle, true).get(timeout, timeoutUnit);
} catch (Exception e1) {
LOG.error("Failed to close topics forcefully under bundle {}", bundle, e1);
}
} catch (Exception e) {
// ignore topic-close failure to unload bundle
LOG.error("Failed to close topics under namespace {}", bundle.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,9 +1153,10 @@ public void checkTopicNsOwnership(final String topic) throws RuntimeException {
* Unload all the topic served by the broker service under the given service unit
*
* @param serviceUnit
* @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect and forcefully close managed-ledger
* @return
*/
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit) {
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Integer> result = new CompletableFuture<Integer>();
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
topics.forEach((name, topicFuture) -> {
Expand All @@ -1164,7 +1165,7 @@ public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit)
// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close() : CompletableFuture.completedFuture(null)));
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null)));
}
});
CompletableFuture<Void> aggregator = FutureUtil.waitForAll(closeFutures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> checkReplication();

CompletableFuture<Void> close();
CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);

void checkGC(int gcInterval);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
schemaValidationEnforced = policies.schema_validation_enforced;

} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic,
e.getMessage());
isEncryptionRequired = false;
}
}
Expand Down Expand Up @@ -297,8 +298,8 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
name -> new NonPersistentSubscription(this, subscriptionName));

try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
Expand Down Expand Up @@ -328,7 +329,8 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
}

@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}

Expand All @@ -347,9 +349,8 @@ public CompletableFuture<Void> deleteForcefully() {
return delete(false, true, false);
}

private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean closeIfClientsConnected,
boolean deleteSchema) {
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected,
boolean deleteSchema) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand Down Expand Up @@ -430,16 +431,18 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,

/**
* Close this topic - close all producers and subscriptions associated with this topic
*
*
* @param closeWithoutWaitingClientDisconnect
* don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close() {
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
try {
if (!isFenced) {
if (!isFenced || closeWithoutWaitingClientDisconnect) {
isFenced = true;
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
Expand All @@ -456,7 +459,10 @@ public CompletableFuture<Void> close() {
producers.forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

FutureUtil.waitForAll(futures).thenRun(() -> {
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);

clientCloseFuture.thenRun(() -> {
log.info("[{}] Topic closed", topic);
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
// so, execute it in different thread
Expand Down Expand Up @@ -543,10 +549,11 @@ public CompletableFuture<Void> checkReplication() {
boolean startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster);
}

protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
Expand Down Expand Up @@ -630,8 +637,9 @@ public Replicator getPersistentReplicator(String remoteCluster) {
return replicators.get(remoteCluster);
}

public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace,
boolean hydratePublishers) {

TopicStats topicStats = threadLocalTopicStats.get();
topicStats.reset();
Expand Down Expand Up @@ -660,7 +668,6 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
});
topicStatsStream.endList();


// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
Expand Down Expand Up @@ -869,7 +876,8 @@ public void checkInactiveSubscriptions() {
@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
data.encryption_required);
}
isEncryptionRequired = data.encryption_required;
setSchemaCompatibilityStrategy(data);
Expand Down Expand Up @@ -922,17 +930,14 @@ public Position getLastMessageId() {

private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);


@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion->
CompletableFuture.completedFuture(null));
}
});
return hasSchema().thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -911,18 +911,25 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
return deleteFuture;
}

public CompletableFuture<Void> close() {
return close(false);
}

/**
* Close this topic - close all producers and subscriptions associated with this topic
*
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close() {
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
try {
if (!isFenced) {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
if (!isFenced || closeWithoutWaitingClientDisconnect) {
isFenced = true;
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
Expand All @@ -938,8 +945,11 @@ public CompletableFuture<Void> close() {
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);

FutureUtil.waitForAll(futures).thenRun(() -> {
clientCloseFuture.thenRun(() -> {
// After having disconnected all producers/consumers, close the managed ledger
ledger.asyncClose(new CloseCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwa
result.completeExceptionally(new RuntimeException("first time failed"));
return result;
}
}).when(spyTopic).close();
}).when(spyTopic).close(false);
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
try {
pulsar.getNamespaceService().unloadNamespaceBundle(bundle);
Expand Down Expand Up @@ -316,7 +316,7 @@ public void testUnloadNamespaceBundleWithStuckTopic() throws Exception {
public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
return new CompletableFuture<Void>();
}
}).when(spyTopic).close();
}).when(spyTopic).close(false);
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));

// try to unload bundle whose topic will be stuck
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.pulsar.broker.namespace;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.PulsarService.webAddress;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -88,7 +88,7 @@ public void setup() throws Exception {
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
brokerService = mock(BrokerService.class);
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any());
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any(), anyBoolean());

doReturn(zkCache).when(pulsar).getLocalZkCache();
doReturn(localCache).when(pulsar).getLocalZkCacheService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -49,6 +50,7 @@

import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
Expand All @@ -58,6 +60,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
Expand All @@ -66,6 +69,7 @@
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -899,4 +903,49 @@ public void testCreateNamespacePolicy() throws Exception {
assertEquals(policy.get().bundles.numBundles, totalBundle);
}

/**
* It verifies that unloading bundle gracefully closes managed-ledger before removing ownership to avoid bad-zk
* version.
*
* @throws Exception
*/
@Test
public void testStuckTopicUnloading() throws Exception {
final String namespace = "prop/ns-abc";
final String topicName = "persistent://" + namespace + "/unoadTopic";
final String topicMlName = namespace + "/persistent/unoadTopic";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscribe();
consumer.close();

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName).sendTimeout(5,
TimeUnit.SECONDS);

Producer<byte[]> producer = producerBuilder.create();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();

ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerClientFactory()
.getManagedLedgerFactory();
Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
ledgersField.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
.get(mlFactory);
assertNotNull(ledgers.get(topicMlName));

org.apache.pulsar.broker.service.Producer prod = spy(topic.producers.values().get(0));
topic.producers.clear();
topic.producers.add(prod);
CompletableFuture<Void> waitFuture = new CompletableFuture<Void>();
doReturn(waitFuture).when(prod).disconnect();
Set<NamespaceBundle> bundles = pulsar.getNamespaceService().getOwnedServiceUnits();
for (NamespaceBundle bundle : bundles) {
String ns = bundle.getNamespaceObject().toString();
System.out.println();
if (namespace.equals(ns)) {
pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 2, TimeUnit.SECONDS);
}
}
assertNull(ledgers.get(topicMlName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ public void testMessageAvailableAfterRestart() throws Exception {
}

// cause broker to drop topic. Will be loaded next time we access it
pulsar.getBrokerService().getTopicReference(topic).get().close().get();
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();

try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageId(MessageId.earliest).create()) {
Expand Down
Loading