Skip to content

Commit

Permalink
Forbid to read other topic's data in managedLedger layer (apache#11912)
Browse files Browse the repository at this point in the history
* forbid to read other topic's data in managedLedger layer

* format code

* update exception type

* fix test

(cherry picked from commit a7bdc5e)
  • Loading branch information
hangc0276 committed Sep 4, 2021
1 parent cc09a38 commit 8bf8000
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1821,6 +1821,13 @@ public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Ob
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
}
if (!ledgers.containsKey(position.getLedgerId())) {
log.error("[{}] Failed to get message with ledger {}:{} the ledgerId does not belong to this topic "
+ "or has been deleted.", name, position.getLedgerId(), position.getEntryId());
callback.readEntryFailed(new ManagedLedgerException.NonRecoverableLedgerException("Message not found, "
+ "the ledgerId does not belong to this topic or has been deleted"), ctx);
return;
}
if (position.getLedgerId() == currentLedger.getId()) {
asyncReadEntry(currentLedger, position, callback, ctx);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -118,16 +119,12 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
public class ManagedLedgerTest extends MockedBookKeeperTestCase {

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class);

private static final Charset Encoding = Charsets.UTF_8;

@DataProvider(name = "checkOwnershipFlag")
Expand Down Expand Up @@ -3033,7 +3030,7 @@ public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exceptio
Assert.assertEquals(finalManagedLedger.getTotalSize(), 0);
});
}

@Test(timeOut = 20000)
public void testAsyncTruncateLedgerRetention() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
Expand Down Expand Up @@ -3207,4 +3204,55 @@ public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exc
ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
verify(ledgerOffloader, times(1)).getOffloadPolicies();
}

@Test(timeOut = 30000)
public void testReadOtherManagedLedgersEntry() throws Exception {
ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl) factory.open("my_test_ledger_a");
ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl) factory.open("my_test_ledger_b");

PositionImpl pa = (PositionImpl) managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding));
PositionImpl pb = (PositionImpl) managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding));

// read managedLegerA's entry using managedLedgerA
CompletableFuture<byte[]> completableFutureA = new CompletableFuture<>();
managedLedgerA.asyncReadEntry(pa, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
completableFutureA.complete(entry.getData());
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
completableFutureA.completeExceptionally(exception.getCause());
}
}, null);

assertEquals("dummy-entry-a".getBytes(Encoding), completableFutureA.get());

// read managedLedgerB's entry using managedLedgerA
CompletableFuture<byte[]> completableFutureB = new CompletableFuture<>();
managedLedgerA.asyncReadEntry(pb, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
completableFutureB.complete(entry.getData());
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
completableFutureB.completeExceptionally(exception);
}
}, null);

try {
completableFutureB.get();
Assert.fail();
} catch (Exception e) {
assertEquals(e.getCause().getMessage(),
"Message not found, the ledgerId does not belong to this topic or has been deleted");
}

managedLedgerA.close();
managedLedgerB.close();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2298,13 +2298,6 @@ protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId
}
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
if (null == ledger.getLedgerInfo(ledgerId).get()) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, "
+ "the ledgerId does not belong to this topic.",
clientAppId(), ledgerId, entryId, topicName);
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Message not found, the ledgerId does not belong to this topic"));
}
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,10 +883,20 @@ public void testGetMessageById() throws Exception {
Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
Assert.assertEquals(message2.getData(), data2.getBytes());

Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
Assert.assertNull(message3);
Message<byte[]> message3 = null;
try {
message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
Assert.fail();
} catch (Exception e) {
Assert.assertNull(message3);
}

Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
Assert.assertNull(message4);
Message<byte[]> message4 = null;
try {
message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
Assert.fail();
} catch (Exception e) {
Assert.assertNull(message4);
}
}
}

0 comments on commit 8bf8000

Please sign in to comment.