Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Not close the socket if lookup failed caused by bundle unloading or metadata ex #21211

Merged
merged 2 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.ws.rs.Encoded;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
Expand All @@ -48,6 +47,7 @@
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -318,35 +318,37 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
requestId, shouldRedirectThroughServiceUrl(conf, lookupData)));
}
}).exceptionally(ex -> {
if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
log.info("Failed to lookup {} for topic {} with error {}", clientAppId,
topicName.toString(), ex.getCause().getMessage());
} else {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId,
topicName.toString(), ex.getMessage(), ex);
}
lookupfuture.complete(
newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
return null;
});
handleLookupError(lookupfuture, topicName.toString(), clientAppId, requestId, ex);
return null;
});
}

}).exceptionally(ex -> {
if (ex instanceof CompletionException && ex.getCause() instanceof IllegalStateException) {
log.info("Failed to lookup {} for topic {} with error {}", clientAppId, topicName.toString(),
ex.getCause().getMessage());
} else {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, topicName.toString(),
ex.getMessage(), ex);
}

lookupfuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
handleLookupError(lookupfuture, topicName.toString(), clientAppId, requestId, ex);
return null;
});

return lookupfuture;
}

private static void handleLookupError(CompletableFuture<ByteBuf> lookupFuture, String topicName, String clientAppId,
long requestId, Throwable ex){
final Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
final String errorMsg = unwrapEx.getMessage();
if (unwrapEx instanceof IllegalStateException) {
// Current broker still hold the bundle's lock, but the bundle is being unloading.
log.info("Failed to lookup {} for topic {} with error {}", clientAppId, topicName, errorMsg);
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError, errorMsg, requestId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we know IllegalStateException is always MetadataError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current case, the IllegalStateException is only throwing when the namespace bundle is unloading. See https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L453-L455C36

} else if (nsData.get().isDisabled()) {
    future.completeExceptionally(
            new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle)));
}

I agree with you, We should clearly define this exception. Since there are so many places that rely on the method NamespaceService.findBrokerServiceUrl, such as PulsarWebResource.validateTopicOwnershipAsync. We need a separate PR to do focus on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a side effect here.
Because the connection can be reset, the previous producer could fail fast when bundle unloaded and move to a new Broker.
This now causes each partition's producer to have to wait for a timeout.

Copy link
Contributor Author

@poorbarcode poorbarcode Sep 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@315157973

Because the connection can be reset, the previous producer could fail fast when bundle unloaded and move to a new Broker.

The previous producer will finally receive a CommandCloseProducer and try to reconnect even if the topic is closed without waiting for the client to disconnect, right?

This now causes each partition's producer to have to wait for a timeout.

The partition's producer will try to reconnect according to backoff's rules, which will not result in a timeout.

I also improve the test to ensure the producer and consumer are still working. See testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx.

Maybe I misunderstood what you meant, could you explain the details?

} else if (unwrapEx instanceof MetadataStoreException){
// Load bundle ownership or acquire lock failed.
// Differ with "IllegalStateException", print warning log.
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, topicName, errorMsg);
lookupFuture.complete(newLookupErrorResponse(ServerError.MetadataError, errorMsg, requestId));
} else {
log.warn("Failed to lookup {} for topic {} with error {}", clientAppId, topicName, errorMsg);
lookupFuture.complete(newLookupErrorResponse(ServerError.ServiceNotReady, errorMsg, requestId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to return ServiceNotReady?

why not UnknownError?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to guarantee that uncontrollable errprs continue the previous behavior

}
}

protected TopicName getTopicName(String topicDomain, String tenant, String cluster, String namespace,
@Encoded String encodedTopic) {
String decodedName = Codec.decode(encodedTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public final class ServiceUnitUtils {
*/
private static final String OWNER_INFO_ROOT = "/namespace";

static String path(NamespaceBundle suname) {
public static String path(NamespaceBundle suname) {
// The ephemeral node path for new namespaces should always have bundle name appended
return OWNER_INFO_ROOT + "/" + suname.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster
pulsar.getPulsarResources().getClusterResources().getClusterAsync(cluster)
.whenComplete((clusterDataResult, ex) -> {
if (ex != null) {
log.warn("[{}] Load cluster data failed: requested={}", clientAppId, cluster);
clusterDataFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ private void checkLookupException(String tenant, String namespace, PulsarClient
.topic("persistent://" + tenant + "/" + namespace + "/1p")
.create();
} catch (PulsarClientException t) {
Assert.assertTrue(t instanceof PulsarClientException.LookupException);
Assert.assertTrue(t instanceof PulsarClientException.BrokerMetadataException
|| t instanceof PulsarClientException.LookupException);
Assert.assertTrue(
t.getMessage().contains(
"java.lang.IllegalStateException: The leader election has not yet been completed!"));
t.getMessage().contains("The leader election has not yet been completed"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -71,9 +72,13 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.namespace.ServiceUnitUtils;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand All @@ -87,6 +92,7 @@
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.zookeeper.KeeperException;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
Expand Down Expand Up @@ -1105,4 +1111,101 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat
return "invalid";
}
}

@Test
public void testLookupConnectionNotCloseIfGetUnloadingExOrMetadataEx() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
admin.topics().createNonPartitionedTopic(tpName);
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
Producer<String> producer = pulsarClientImpl.newProducer(Schema.STRING).topic(tpName).create();
Consumer<String> consumer = pulsarClientImpl.newConsumer(Schema.STRING).topic(tpName)
.subscriptionName("s1").isAckReceiptEnabled(true).subscribe();
LookupService lookupService = pulsarClientImpl.getLookup();
assertTrue(lookupService instanceof BinaryProtoLookupService);
ClientCnx lookupConnection = pulsarClientImpl.getCnxPool().getConnection(lookupService.resolveHost()).join();

// Verify the socket will not be closed if the bundle is unloading.
BundleOfTopic bundleOfTopic = new BundleOfTopic(tpName);
bundleOfTopic.setBundleIsUnloading();
try {
lookupService.getBroker(TopicName.get(tpName)).get();
fail("It should failed due to the namespace bundle is unloading.");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("is being unloaded"));
}
// Do unload topic, trigger producer & consumer reconnection.
pulsar.getBrokerService().getTopic(tpName, false).join().get().close(true);
assertTrue(lookupConnection.ctx().channel().isActive());
bundleOfTopic.setBundleIsNotUnloading();
// Assert producer & consumer could reconnect successful.
producer.send("1");
HashSet<String> messagesReceived = new HashSet<>();
while (true) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
messagesReceived.add(msg.getValue());
}
assertTrue(messagesReceived.contains("1"));

// Verify the socket will not be closed if get a metadata ex.
bundleOfTopic.releaseBundleLockAndMakeAcquireFail();
try {
lookupService.getBroker(TopicName.get(tpName)).get();
fail("It should failed due to the acquire bundle lock fail.");
} catch (Exception ex) {
assertTrue(ex.getMessage().contains("OperationTimeout"));
}
// Do unload topic, trigger producer & consumer reconnection.
pulsar.getBrokerService().getTopic(tpName, false).join().get().close(true);
assertTrue(lookupConnection.ctx().channel().isActive());
bundleOfTopic.makeAcquireBundleLockSuccess();
// Assert producer could reconnect successful.
producer.send("2");
while (true) {
Message<String> msg = consumer.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
messagesReceived.add(msg.getValue());
}
assertTrue(messagesReceived.contains("2"));

// cleanup.
producer.close();
consumer.close();
admin.topics().delete(tpName);
}

private class BundleOfTopic {

private NamespaceBundle namespaceBundle;
private OwnershipCache ownershipCache;
private AsyncLoadingCache<NamespaceBundle, OwnedBundle> ownedBundlesCache;

public BundleOfTopic(String tpName) {
namespaceBundle = pulsar.getNamespaceService().getBundle(TopicName.get(tpName));
ownershipCache = pulsar.getNamespaceService().getOwnershipCache();
ownedBundlesCache = WhiteboxImpl.getInternalState(ownershipCache, "ownedBundlesCache");
}

private void setBundleIsUnloading() {
ownedBundlesCache.get(namespaceBundle).join().setActive(false);
}

private void setBundleIsNotUnloading() {
ownedBundlesCache.get(namespaceBundle).join().setActive(true);
}

private void releaseBundleLockAndMakeAcquireFail() throws Exception {
ownedBundlesCache.synchronous().invalidateAll();
mockZooKeeper.delete(ServiceUnitUtils.path(namespaceBundle), -1);
mockZooKeeper.setAlwaysFail(KeeperException.Code.OPERATIONTIMEOUT);
}

private void makeAcquireBundleLockSuccess() throws Exception {
mockZooKeeper.unsetAlwaysFail();
}
}
}