diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 5a5fe4949cd2f..44e9d67b56d2c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1657,6 +1657,13 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob if (log.isDebugEnabled()) { log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId()); } + if (!ledgers.containsKey(position.getLedgerId())) { + log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic.", + name, position.getLedgerId(), position.getEntryId()); + callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, " + + "the ledgerId does not belong to this topic or has been deleted"), ctx); + return; + } if (position.getLedgerId() == currentLedger.getId()) { asyncReadEntry(currentLedger, position, callback, ctx); } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 13fd4dbbad366..56f6ca32daa56 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2966,4 +2966,55 @@ public void testInvalidateReadHandleWhenConsumed() throws Exception { cursor3.close(); ledger.close(); } + + @Test(timeOut = 30000) + public void testReadOtherManagedLedgersEntry() throws Exception { + ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl) factory.open("my_test_ledger_a"); + ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl) factory.open("my_test_ledger_b"); + + PositionImpl pa = (PositionImpl) managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding)); + PositionImpl pb = (PositionImpl) managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding)); + + // read managedLegerA's entry using managedLedgerA + CompletableFuture completableFutureA = new CompletableFuture<>(); + managedLedgerA.asyncReadEntry(pa, new ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + completableFutureA.complete(entry.getData()); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + completableFutureA.completeExceptionally(exception.getCause()); + } + }, null); + + assertEquals("dummy-entry-a".getBytes(Encoding), completableFutureA.get()); + + // read managedLedgerB's entry using managedLedgerA + CompletableFuture completableFutureB = new CompletableFuture<>(); + managedLedgerA.asyncReadEntry(pb, new ReadEntryCallback() { + @Override + public void readEntryComplete(Entry entry, Object ctx) { + completableFutureB.complete(entry.getData()); + } + + @Override + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + completableFutureB.completeExceptionally(exception); + } + }, null); + + try { + completableFutureB.get(); + Assert.fail(); + } catch (Exception e) { + assertEquals(e.getCause().getMessage(), + "Message not found, the ledgerId does not belong to this topic or has been deleted"); + } + + managedLedgerA.close(); + managedLedgerB.close(); + + } }