From 8f32ad0b1347e2fd1200c356a29263ef4d579d70 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 13 Dec 2024 11:36:09 +0800 Subject: [PATCH] [fix] [broker] fix NPE when calculating a topic's backlogQuota (#23720) (cherry picked from commit 8e80f88cd46ad041b87773d49c5ce4420df95b9a) (cherry picked from commit 216d89e036a207dbbe3057a99f38a4e7e0b1bf09) --- .../service/persistent/PersistentTopic.java | 6 +- .../PersistentTopicProtectedMethodsTest.java | 114 ++++++++++++++++++ 2 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 94403735c97ed..ca65f7f0ca3bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3383,7 +3383,8 @@ public CompletableFuture checkTimeBacklogExceeded(boolean shouldUpdateO }); } - private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck( + @VisibleForTesting + EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck( PositionImpl markDeletePosition) throws ExecutionException, InterruptedException { int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); @@ -3403,7 +3404,8 @@ private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaC // if the mark-delete position is the last entry it means all entries for // that ledger are acknowledged - if (markDeletePosition.getEntryId() == markDeletePositionLedgerInfo.getEntries() - 1) { + if (markDeletePositionLedgerInfo != null + && (markDeletePosition.getEntryId() == markDeletePositionLedgerInfo.getEntries() - 1)) { PositionImpl positionToCheck = managedLedger.getNextValidPosition(markDeletePosition); positionToCheckLedgerInfo = ledger.getLedgerInfo(positionToCheck.getLedgerId()).get(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java new file mode 100644 index 0000000000000..b582eb94d1264 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicProtectedMethodsTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.testng.Assert.assertEquals; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class PersistentTopicProtectedMethodsTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + protected void doInitConf() throws Exception { + this.conf.setPreciseTimeBasedBacklogQuotaCheck(true); + this.conf.setManagedLedgerMaxEntriesPerLedger(2); + this.conf.setManagedLedgerMaxLedgerRolloverTimeMinutes(10); + this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + } + + /*** + * Background: the steps for checking backlog metadata are as follows. + * - Get the oldest cursor. + * - Return the result if the oldest `cursor.md` equals LAC. + * - Else, calculate the estimated backlog quota. + * + * What case been covered by this test. + * - The method `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` may get an NPE when the + * `@param position(cursor.markDeletedPositon)` equals LAC and the latest ledger has been removed by a + * `ML.trimLedgers`, which was introduced by https://github.com/apache/pulsar/pull/21816. + * - Q: The broker checked whether the oldest `cursor.md` equals LAC at step 2 above, why does it still call + * `PersistentTopic.estimatedTimeBasedBacklogQuotaCheck` with a param that equals `LAC`? + * - A: There may be some `acknowledgments` and `ML.trimLedgers` that happened between `step2 above and step 3`. + */ + @Test + public void testEstimatedTimeBasedBacklogQuotaCheckWhenNoBacklog() throws Exception { + final String tp = BrokerTestUtil.newUniqueName("public/default/tp"); + admin.topics().createNonPartitionedTopic(tp); + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopic(tp, false).join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + Consumer c1 = pulsarClient.newConsumer().topic(tp).subscriptionName("s1").subscribe(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("s1"); + + // Generated multi ledgers. + Producer p1 = pulsarClient.newProducer().topic(tp).create(); + byte[] content = new byte[]{1}; + for (int i = 0; i < 10; i++) { + p1.send(content); + } + + // Consume all messages. + // Trim ledgers, then the LAC relates to a ledger who has been deleted. + admin.topics().skipAllMessages(tp, "s1"); + Awaitility.await().untilAsserted(() -> { + assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0); + assertEquals(cursor.getMarkDeletedPosition(), ml.getLastConfirmedEntry()); + }); + CompletableFuture completableFuture = new CompletableFuture(); + ml.trimConsumedLedgersInBackground(completableFuture); + completableFuture.join(); + Awaitility.await().untilAsserted(() -> { + assertEquals(ml.getLedgersInfo().size(), 1); + assertEquals(cursor.getNumberOfEntriesInBacklog(true), 0); + assertEquals(cursor.getMarkDeletedPosition(), ml.getLastConfirmedEntry()); + }); + + // Verify: "persistentTopic.estimatedTimeBasedBacklogQuotaCheck" will not get a NullPointerException. + PositionImpl oldestPosition = ml.getCursors().getCursorWithOldestPosition().getPosition(); + persistentTopic.estimatedTimeBasedBacklogQuotaCheck(oldestPosition); + + p1.close(); + c1.close(); + admin.topics().delete(tp, false); + } +}