Skip to content

Commit

Permalink
[Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Mar 4, 2022
1 parent 06ccd07 commit 4f1e39b
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
Expand All @@ -54,6 +53,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -338,43 +338,104 @@ private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration())
: NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
String messageStr = UUID.randomUUID().toString();
final String messageStr = UUID.randomUUID().toString();
final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader if present.
return pulsar().getBrokerService().getTopic(topicName, true)
// check and clean all subscriptions
.thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
clientAppId(), topicName);
throw new RestException(Status.NOT_FOUND, "Topic [{}] not found after create.");
}
Topic topic = topicOptional.get();
// clean all subscriptions
return FutureUtil.waitForAll(topic.getSubscriptions().values()
.stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
.thenApply(__ -> topic);
}).thenCompose(topic -> {
try {
PulsarClient client = pulsar().getClient();
return client.newProducer(Schema.STRING).topic(topicName).createAsync()
.thenCombine(client.newReader(Schema.STRING).topic(topicName)
.startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
producer.sendAsync(messageStr).thenCompose(__ ->
healthCheckRecursiveReadNext(reader, messageStr))
.thenCompose(__ -> {
List<CompletableFuture<Void>> closeFutures =
new ArrayList<>();
closeFutures.add(producer.closeAsync());
closeFutures.add(reader.closeAsync());
return FutureUtil.waitForAll(closeFutures);
})
).thenAccept(ignore -> {});
} catch (PulsarServerException e) {
LOG.error("[{}] Fail to run health check while get client.", clientAppId());
throw new RestException(e);
.thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
clientAppId(), topicName);
throw new RestException(Status.NOT_FOUND,
String.format("Topic [%s] not found after create.", topicName));
}
PulsarClient client;
try {
client = pulsar().getClient();
} catch (PulsarServerException e) {
LOG.error("[{}] Fail to run health check while get client.", clientAppId());
throw new RestException(e);
}
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
client.newProducer(Schema.STRING).topic(topicName).createAsync()
.thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName)
.subscriptionName(subscriptionName)
.startMessageId(MessageId.latest)
.createAsync().exceptionally(createException -> {
producer.closeAsync().exceptionally(ex -> {
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
return null;
});
throw FutureUtil.wrapToCompletionException(createException);
}).thenCompose(reader -> producer.sendAsync(messageStr)
.thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
.whenComplete((__, ex) -> {
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
.whenComplete((unused, innerEx) -> {
if (ex != null) {
resultFuture.completeExceptionally(ex);
} else {
resultFuture.complete(null);
}
});
}
))
).exceptionally(ex -> {
resultFuture.completeExceptionally(ex);
return null;
});
return resultFuture;
});
}

/**
* Close producer and reader and then to re-check if this operation is success.
*
* Re-check
* - Producer: If close fails we will print error log to notify user.
* - Consumer: If close fails we will force delete subscription.
*
* @param producer Producer
* @param reader Reader
* @param topic Topic
* @param subscriptionName Subscription name
*/
private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName) {
// no matter exception or success, we still need to
// close producer/reader
CompletableFuture<Void> producerFuture = producer.closeAsync();
CompletableFuture<Void> readerFuture = reader.closeAsync();
List<CompletableFuture<Void>> futures = new ArrayList<>(2);
futures.add(producerFuture);
futures.add(readerFuture);
return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
.exceptionally(closeException -> {
if (readerFuture.isCompletedExceptionally()) {
LOG.error("[{}] Close reader fail while heath check.", clientAppId());
Subscription subscription =
topic.getSubscription(subscriptionName);
// re-check subscription after reader close
if (subscription != null) {
LOG.warn("[{}] Force delete subscription {} "
+ "when it still exists after the"
+ " reader is closed.",
clientAppId(), subscription);
subscription.deleteForcefully()
.exceptionally(ex -> {
LOG.error("[{}] Force delete subscription fail"
+ " while health check",
clientAppId(), ex);
return null;
});
}
} else {
// producer future fail.
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
}
return null;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.springframework.util.CollectionUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@Test(groups = "broker-admin")
@Slf4j
Expand Down Expand Up @@ -55,16 +62,100 @@ public void cleanup() throws Exception {

@Test
public void testHealthCheckup() throws Exception {
admin.brokers().healthcheck();
final int times = 30;
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar.getExecutor().execute(() -> {
try {
for (int i = 0; i < times; i++) {
admin.brokers().healthcheck();
}
future.complete(null);
}catch (PulsarAdminException e) {
future.completeExceptionally(e);
}
});
for (int i = 0; i < times; i++) {
admin.brokers().healthcheck();
}
// To ensure we don't have any subscription
final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
pulsar.getConfig().getWebServicePort().get());
Awaitility.await().untilAsserted(() -> {
Assert.assertFalse(future.isCompletedExceptionally());
});
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
.getSubscriptions(testHealthCheckTopic).stream()
// All system topics are using compaction, even though is not explicitly set in the policies.
.filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
.collect(Collectors.toList())
))
);
}

@Test
public void testHealthCheckupV1() throws Exception {
admin.brokers().healthcheck(TopicVersion.V1);
final int times = 30;
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar.getExecutor().execute(() -> {
try {
for (int i = 0; i < times; i++) {
admin.brokers().healthcheck(TopicVersion.V1);
}
future.complete(null);
}catch (PulsarAdminException e) {
future.completeExceptionally(e);
}
});
for (int i = 0; i < times; i++) {
admin.brokers().healthcheck(TopicVersion.V1);
}
final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
pulsar.getConfig().getWebServicePort().get());
Awaitility.await().untilAsserted(() -> {
Assert.assertFalse(future.isCompletedExceptionally());
});
// To ensure we don't have any subscription
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
.getSubscriptions(testHealthCheckTopic).stream()
// All system topics are using compaction, even though is not explicitly set in the policies.
.filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
.collect(Collectors.toList())
))
);
}

@Test
public void testHealthCheckupV2() throws Exception {
admin.brokers().healthcheck(TopicVersion.V2);
final int times = 30;
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar.getExecutor().execute(() -> {
try {
for (int i = 0; i < times; i++) {
admin.brokers().healthcheck(TopicVersion.V2);
}
future.complete(null);
}catch (PulsarAdminException e) {
future.completeExceptionally(e);
}
});
for (int i = 0; i < times; i++) {
admin.brokers().healthcheck(TopicVersion.V2);
}
final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck",
pulsar.getConfig().getWebServicePort().get());
Awaitility.await().untilAsserted(() -> {
Assert.assertFalse(future.isCompletedExceptionally());
});
// To ensure we don't have any subscription
Awaitility.await().untilAsserted(() ->
Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
.getSubscriptions(testHealthCheckTopic).stream()
// All system topics are using compaction, even though is not explicitly set in the policies.
.filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
.collect(Collectors.toList())
))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,18 @@ public static <T> Optional<Throwable> getException(CompletableFuture<T> future)
}
return Optional.empty();
}

/**
* Wrap throwable exception to CompletionException if that exception is not an instance of CompletionException.
*
* @param throwable Exception
* @return CompletionException
*/
public static CompletionException wrapToCompletionException(Throwable throwable) {
if (throwable instanceof CompletionException) {
return (CompletionException) throwable;
} else {
return new CompletionException(throwable);
}
}
}

0 comments on commit 4f1e39b

Please sign in to comment.