From 7f04364f13330b56cabeab48b9b5055a70a88119 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 16 Aug 2024 23:30:34 +0800 Subject: [PATCH] [improve][broker] Support customized shadow managed ledger implementation (#23179) --- .../pulsar/broker/service/persistent/PersistentTopic.java | 3 +-- .../pulsar/broker/namespace/NamespaceServiceTest.java | 2 ++ .../pulsar/broker/service/MessageCumulativeAckTest.java | 5 ++++- .../service/PersistentDispatcherFailoverConsumerTest.java | 1 + .../apache/pulsar/broker/service/PersistentTopicTest.java | 6 ++++++ .../org/apache/pulsar/broker/service/ServerCnxTest.java | 1 + .../systopic/NamespaceEventsSystemTopicServiceTest.java | 7 ++++++- .../apache/pulsar/broker/transaction/TransactionTest.java | 2 ++ .../mledger/offload/jcloud/impl/MockManagedLedger.java | 2 +- 9 files changed, 24 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index e890bac620e7b..c26725deaeab5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -84,7 +84,6 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.CursorInfo; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; -import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils; import org.apache.bookkeeper.net.BookieId; @@ -426,7 +425,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS this.transactionBuffer = new TransactionBufferDisable(this); } transactionBuffer.syncMaxReadPositionForNormalPublish(ledger.getLastConfirmedEntry(), true); - if (ledger instanceof ShadowManagedLedgerImpl) { + if (ledger.getConfig().getShadowSource() != null) { shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource()); } else { shadowSourceTopic = null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 422e9b80aeffa..6b2669275dfdb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicReference; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; @@ -198,6 +199,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception { ManagedLedger ledger = mock(ManagedLedger.class); when(ledger.getCursors()).thenReturn(new ArrayList<>()); + when(ledger.getConfig()).thenReturn(new ManagedLedgerConfig()); doReturn(CompletableFuture.completedFuture(null)).when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class)); Field ownership = NamespaceService.class.getDeclaredField("ownershipCache"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java index f3fe26af4b968..cc4fe22962484 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java @@ -37,6 +37,7 @@ import io.netty.channel.ChannelHandlerContext; import java.net.InetSocketAddress; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -74,7 +75,9 @@ public void setup() throws Exception { .when(serverCnx).getCommandSender(); String topicName = TopicName.get("MessageCumulativeAckTest").toString(); - PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), pulsarTestContext.getBrokerService()); + var mockManagedLedger = mock(ManagedLedger.class); + when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig()); + var persistentTopic = new PersistentTopic(topicName, mockManagedLedger, pulsarTestContext.getBrokerService()); sub = spy(new PersistentSubscription(persistentTopic, "sub-1", mock(ManagedCursorImpl.class), false)); doNothing().when(sub).acknowledgeMessage(any(), any(), any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index f30ee62b64659..000ea7af91525 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -180,6 +180,7 @@ void setupMLAsyncCallbackMocks() { cursorMock = mock(ManagedCursorImpl.class); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); + doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig(); doReturn("mockCursor").when(cursorMock).getName(); // call openLedgerComplete with ledgerMock on ML factory asyncOpen diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index e83b1bd9b7b79..f2ed015bd1e67 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -239,6 +239,7 @@ public void teardown() throws Exception { @Test public void testCreateTopic() { final ManagedLedger ledgerMock = mock(ManagedLedger.class); + doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig(); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); final String topicName = "persistent://prop/use/ns-abc/topic1"; @@ -366,6 +367,7 @@ public void testPublishMessageMLFailure() throws Exception { final String successTopicName = "persistent://prop/use/ns-abc/successTopic"; final ManagedLedger ledgerMock = mock(ManagedLedger.class); + doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig(); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); @@ -1374,6 +1376,7 @@ void setupMLAsyncCallbackMocks() { final CompletableFuture closeFuture = new CompletableFuture<>(); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); + doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig(); doReturn("mockCursor").when(cursorMock).getName(); doReturn(true).when(cursorMock).isDurable(); // doNothing().when(cursorMock).asyncClose(new CloseCallback() { @@ -1671,6 +1674,7 @@ public void testAtomicReplicationRemoval() throws Exception { String remoteCluster = "remote"; final ManagedLedger ledgerMock = mock(ManagedLedger.class); doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any()); + doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig(); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); @@ -1730,6 +1734,7 @@ public void testClosingReplicationProducerTwice() throws Exception { final ManagedLedger ledgerMock = mock(ManagedLedger.class); doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any()); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); + doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig(); PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); @@ -2120,6 +2125,7 @@ public void testTopicCloseFencingTimeout() throws Exception { @Test public void testGetDurableSubscription() throws Exception { ManagedLedger mockLedger = mock(ManagedLedger.class); + doReturn(new ManagedLedgerConfig()).when(mockLedger).getConfig(); ManagedCursor mockCursor = mock(ManagedCursorImpl.class); Position mockPosition = mock(Position.class); doReturn("test").when(mockCursor).getName(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 58c6b96a0f346..03115d79af0a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -2919,6 +2919,7 @@ private void setupMLAsyncCallbackMocks() { ledgerMock = mock(ManagedLedger.class); cursorMock = mock(ManagedCursor.class); doReturn(new ArrayList<>()).when(ledgerMock).getCursors(); + doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig(); // call openLedgerComplete with ledgerMock on ML factory asyncOpen doAnswer((Answer) invocationOnMock -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java index 44a4de5e8a923..e66140efb32bb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java @@ -20,10 +20,13 @@ import static org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import com.google.common.collect.Sets; import java.util.HashSet; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -99,7 +102,9 @@ public void testSystemTopicSchemaCompatibility() throws Exception { TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory .createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1)); String topicName = systemTopicClientForNamespace1.getTopicName().toString(); - SystemTopic topic = new SystemTopic(topicName, mock(ManagedLedger.class), pulsar.getBrokerService()); + final var mockManagedLedger = mock(ManagedLedger.class); + when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig()); + SystemTopic topic = new SystemTopic(topicName, mockManagedLedger, pulsar.getBrokerService()); Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 2a928084e648a..246ab5ef26a8f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -71,6 +71,7 @@ import org.apache.bookkeeper.common.util.Bytes; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.Position; @@ -1648,6 +1649,7 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout // Mock managedLedger. ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class); ManagedCursorContainer managedCursors = new ManagedCursorContainer(); + when(managedLedger.getConfig()).thenReturn(new ManagedLedgerConfig()); when(managedLedger.getCursors()).thenReturn(managedCursors); Position position = PositionFactory.EARLIEST; when(managedLedger.getLastConfirmedEntry()).thenReturn(position); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index 66ace69d7cda2..8f52d20c5ee83 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -274,7 +274,7 @@ public boolean isTerminated() { @Override public ManagedLedgerConfig getConfig() { - return null; + return new ManagedLedgerConfig(); } @Override