diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0823886e0bdbb..d5c83ea9b15f0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1821,6 +1821,13 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob if (log.isDebugEnabled()) { log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId()); } + if (!ledgers.containsKey(position.getLedgerId())) { + log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic " + + "or has been deleted.", name, position.getLedgerId(), position.getEntryId()); + callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, " + + "the ledgerId does not belong to this topic or has been deleted"), ctx); + return; + } if (position.getLedgerId() == currentLedger.getId()) { asyncReadEntry(currentLedger, position, callback, ctx); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 2516357dc44f2..fc14a3e466981 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -68,6 +68,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -118,16 +119,12 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Stat; import org.awaitility.Awaitility; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j public class ManagedLedgerTest extends MockedBookKeeperTestCase { - - private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class); - private static final Charset Encoding = Charsets.UTF_8; @DataProvider(name = "checkOwnershipFlag") @@ -3030,7 +3027,7 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio Assert.assertEquals(finalManagedLedger.getTotalSize(), 0); }); } - + @Test(timeOut = 20000) public void testAsyncTruncateLedgerRetention() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -3204,4 +3201,55 @@ public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exc ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE); verify(ledgerOffloader, times(1)).getOffloadPolicies(); } + + @Test(timeOut = 30000) + public void testReadOtherManagedLedgersEntry() throws Exception { + ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl) factory.open("my_test_ledger_a"); + ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl) factory.open("my_test_ledger_b"); + + PositionImpl pa = (PositionImpl) managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding)); + PositionImpl pb = (PositionImpl) managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding)); + + // read managedLegerA's entry using managedLedgerA + CompletableFuture completableFutureA = new CompletableFuture<>(); + managedLedgerA.asyncReadEntry(pa, new ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + completableFutureA.complete(entry.getData()); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + completableFutureA.completeExceptionally(exception.getCause()); + } + }, null); + + assertEquals("dummy-entry-a".getBytes(Encoding), completableFutureA.get()); + + // read managedLedgerB's entry using managedLedgerA + CompletableFuture completableFutureB = new CompletableFuture<>(); + managedLedgerA.asyncReadEntry(pb, new ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + completableFutureB.complete(entry.getData()); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + completableFutureB.completeExceptionally(exception); + } + }, null); + + try { + completableFutureB.get(); + Assert.fail(); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), + "Message not found, the ledgerId does not belong to this topic or has been deleted"); + } + + managedLedgerA.close(); + managedLedgerB.close(); + + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 27aeca71e886b..d29fff6f33510 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2300,13 +2300,6 @@ protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId } PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); - if (null == ledger.getLedgerInfo(ledgerId).get()) { - log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, " - + "the ledgerId does not belong to this topic.", - clientAppId(), ledgerId, entryId, topicName); - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Message not found, the ledgerId does not belong to this topic")); - } ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index d6ad1ac8ae60e..182d2ccb51067 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -881,11 +881,21 @@ public void testGetMessageById() throws Exception { Message message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId()); Assert.assertEquals(message2.getData(), data2.getBytes()); - Message message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId()); - Assert.assertNull(message3); + Message message3 = null; + try { + message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId()); + Assert.fail(); + } catch (Exception e) { + Assert.assertNull(message3); + } - Message message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId()); - Assert.assertNull(message4); + Message message4 = null; + try { + message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId()); + Assert.fail(); + } catch (Exception e) { + Assert.assertNull(message4); + } } @Test