From 4f1e39b6921ea401b8c27f17a041d06d85f8abf8 Mon Sep 17 00:00:00 2001 From: Qiang Zhao <74767115+mattisonchao@users.noreply.github.com> Date: Fri, 4 Mar 2022 10:29:34 +0800 Subject: [PATCH] [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (#14367) --- .../pulsar/broker/admin/impl/BrokersBase.java | 129 +++++++++++++----- .../broker/admin/AdminApiHealthCheckTest.java | 97 ++++++++++++- .../apache/pulsar/common/util/FutureUtil.java | 14 ++ 3 files changed, 203 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 4530d8cf567a3..eda186a465bc2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -30,7 +30,6 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -54,6 +53,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; @@ -338,43 +338,104 @@ private CompletableFuture internalRunHealthCheck(TopicVersion topicVersion NamespaceName namespaceName = (topicVersion == TopicVersion.V2) ? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration()) : NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), pulsar().getConfiguration()); - String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX); + final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX); LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName); - String messageStr = UUID.randomUUID().toString(); + final String messageStr = UUID.randomUUID().toString(); + final String subscriptionName = "healthCheck-" + messageStr; // create non-partitioned topic manually and close the previous reader if present. return pulsar().getBrokerService().getTopic(topicName, true) - // check and clean all subscriptions - .thenCompose(topicOptional -> { - if (!topicOptional.isPresent()) { - LOG.error("[{}] Fail to run health check while get topic {}. because get null value.", - clientAppId(), topicName); - throw new RestException(Status.NOT_FOUND, "Topic [{}] not found after create."); - } - Topic topic = topicOptional.get(); - // clean all subscriptions - return FutureUtil.waitForAll(topic.getSubscriptions().values() - .stream().map(Subscription::deleteForcefully).collect(Collectors.toList())) - .thenApply(__ -> topic); - }).thenCompose(topic -> { - try { - PulsarClient client = pulsar().getClient(); - return client.newProducer(Schema.STRING).topic(topicName).createAsync() - .thenCombine(client.newReader(Schema.STRING).topic(topicName) - .startMessageId(MessageId.latest).createAsync(), (producer, reader) -> - producer.sendAsync(messageStr).thenCompose(__ -> - healthCheckRecursiveReadNext(reader, messageStr)) - .thenCompose(__ -> { - List> closeFutures = - new ArrayList<>(); - closeFutures.add(producer.closeAsync()); - closeFutures.add(reader.closeAsync()); - return FutureUtil.waitForAll(closeFutures); - }) - ).thenAccept(ignore -> {}); - } catch (PulsarServerException e) { - LOG.error("[{}] Fail to run health check while get client.", clientAppId()); - throw new RestException(e); + .thenCompose(topicOptional -> { + if (!topicOptional.isPresent()) { + LOG.error("[{}] Fail to run health check while get topic {}. because get null value.", + clientAppId(), topicName); + throw new RestException(Status.NOT_FOUND, + String.format("Topic [%s] not found after create.", topicName)); + } + PulsarClient client; + try { + client = pulsar().getClient(); + } catch (PulsarServerException e) { + LOG.error("[{}] Fail to run health check while get client.", clientAppId()); + throw new RestException(e); + } + CompletableFuture resultFuture = new CompletableFuture<>(); + client.newProducer(Schema.STRING).topic(topicName).createAsync() + .thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName) + .subscriptionName(subscriptionName) + .startMessageId(MessageId.latest) + .createAsync().exceptionally(createException -> { + producer.closeAsync().exceptionally(ex -> { + LOG.error("[{}] Close producer fail while heath check.", clientAppId()); + return null; + }); + throw FutureUtil.wrapToCompletionException(createException); + }).thenCompose(reader -> producer.sendAsync(messageStr) + .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr)) + .whenComplete((__, ex) -> { + closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName) + .whenComplete((unused, innerEx) -> { + if (ex != null) { + resultFuture.completeExceptionally(ex); + } else { + resultFuture.complete(null); + } + }); + } + )) + ).exceptionally(ex -> { + resultFuture.completeExceptionally(ex); + return null; + }); + return resultFuture; + }); + } + + /** + * Close producer and reader and then to re-check if this operation is success. + * + * Re-check + * - Producer: If close fails we will print error log to notify user. + * - Consumer: If close fails we will force delete subscription. + * + * @param producer Producer + * @param reader Reader + * @param topic Topic + * @param subscriptionName Subscription name + */ + private CompletableFuture closeAndReCheck(Producer producer, Reader reader, + Topic topic, String subscriptionName) { + // no matter exception or success, we still need to + // close producer/reader + CompletableFuture producerFuture = producer.closeAsync(); + CompletableFuture readerFuture = reader.closeAsync(); + List> futures = new ArrayList<>(2); + futures.add(producerFuture); + futures.add(readerFuture); + return FutureUtil.waitForAll(Collections.unmodifiableList(futures)) + .exceptionally(closeException -> { + if (readerFuture.isCompletedExceptionally()) { + LOG.error("[{}] Close reader fail while heath check.", clientAppId()); + Subscription subscription = + topic.getSubscription(subscriptionName); + // re-check subscription after reader close + if (subscription != null) { + LOG.warn("[{}] Force delete subscription {} " + + "when it still exists after the" + + " reader is closed.", + clientAppId(), subscription); + subscription.deleteForcefully() + .exceptionally(ex -> { + LOG.error("[{}] Force delete subscription fail" + + " while health check", + clientAppId(), ex); + return null; + }); + } + } else { + // producer future fail. + LOG.error("[{}] Close producer fail while heath check.", clientAppId()); } + return null; }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java index 4f01cb1aa4782..b9886b20410e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java @@ -21,12 +21,19 @@ import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.compaction.Compactor; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.springframework.util.CollectionUtils; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; @Test(groups = "broker-admin") @Slf4j @@ -55,16 +62,100 @@ public void cleanup() throws Exception { @Test public void testHealthCheckup() throws Exception { - admin.brokers().healthcheck(); + final int times = 30; + CompletableFuture future = new CompletableFuture<>(); + pulsar.getExecutor().execute(() -> { + try { + for (int i = 0; i < times; i++) { + admin.brokers().healthcheck(); + } + future.complete(null); + }catch (PulsarAdminException e) { + future.completeExceptionally(e); + } + }); + for (int i = 0; i < times; i++) { + admin.brokers().healthcheck(); + } + // To ensure we don't have any subscription + final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck", + pulsar.getConfig().getWebServicePort().get()); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse(future.isCompletedExceptionally()); + }); + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(CollectionUtils.isEmpty(admin.topics() + .getSubscriptions(testHealthCheckTopic).stream() + // All system topics are using compaction, even though is not explicitly set in the policies. + .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION)) + .collect(Collectors.toList()) + )) + ); } @Test public void testHealthCheckupV1() throws Exception { - admin.brokers().healthcheck(TopicVersion.V1); + final int times = 30; + CompletableFuture future = new CompletableFuture<>(); + pulsar.getExecutor().execute(() -> { + try { + for (int i = 0; i < times; i++) { + admin.brokers().healthcheck(TopicVersion.V1); + } + future.complete(null); + }catch (PulsarAdminException e) { + future.completeExceptionally(e); + } + }); + for (int i = 0; i < times; i++) { + admin.brokers().healthcheck(TopicVersion.V1); + } + final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck", + pulsar.getConfig().getWebServicePort().get()); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse(future.isCompletedExceptionally()); + }); + // To ensure we don't have any subscription + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(CollectionUtils.isEmpty(admin.topics() + .getSubscriptions(testHealthCheckTopic).stream() + // All system topics are using compaction, even though is not explicitly set in the policies. + .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION)) + .collect(Collectors.toList()) + )) + ); } @Test public void testHealthCheckupV2() throws Exception { - admin.brokers().healthcheck(TopicVersion.V2); + final int times = 30; + CompletableFuture future = new CompletableFuture<>(); + pulsar.getExecutor().execute(() -> { + try { + for (int i = 0; i < times; i++) { + admin.brokers().healthcheck(TopicVersion.V2); + } + future.complete(null); + }catch (PulsarAdminException e) { + future.completeExceptionally(e); + } + }); + for (int i = 0; i < times; i++) { + admin.brokers().healthcheck(TopicVersion.V2); + } + final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck", + pulsar.getConfig().getWebServicePort().get()); + Awaitility.await().untilAsserted(() -> { + Assert.assertFalse(future.isCompletedExceptionally()); + }); + // To ensure we don't have any subscription + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(CollectionUtils.isEmpty(admin.topics() + .getSubscriptions(testHealthCheckTopic).stream() + // All system topics are using compaction, even though is not explicitly set in the policies. + .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION)) + .collect(Collectors.toList()) + )) + ); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 687cbd2e502cb..a29ac8c2ee8b4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -188,4 +188,18 @@ public static Optional getException(CompletableFuture future) } return Optional.empty(); } + + /** + * Wrap throwable exception to CompletionException if that exception is not an instance of CompletionException. + * + * @param throwable Exception + * @return CompletionException + */ + public static CompletionException wrapToCompletionException(Throwable throwable) { + if (throwable instanceof CompletionException) { + return (CompletionException) throwable; + } else { + return new CompletionException(throwable); + } + } }