From 55ed084888f63349475f8a8934406b809da3ac56 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 15 Nov 2018 13:42:19 -0800 Subject: [PATCH 1/2] Handle unknown runtime exception while reading entries --- .../mledger/impl/EntryCacheImpl.java | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index ff80feccf6c1f..7e5233b8c865e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -162,6 +162,20 @@ public void invalidateAllEntries(long ledgerId) { @Override public void asyncReadEntry(ReadHandle lh, PositionImpl position, final ReadEntryCallback callback, final Object ctx) { + try { + asyncReadEntry0(lh, position, callback, ctx); + } catch (Throwable t) { + log.warn("failed to read entries for {}-{}", lh.getId(), position, t); + // invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt + // (entry.data is already deallocate due to any race-condition) so, invalidate cache and next time read from + // the bookie) + invalidateAllEntries(lh.getId()); + callback.readEntryFailed(createManagedLedgerException(t), ctx); + } + } + + private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEntryCallback callback, + final Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), lh.getId(), position.getEntryId()); } @@ -202,9 +216,23 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, final ReadEntry } @Override - @SuppressWarnings({ "unchecked", "rawtypes" }) public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback callback, Object ctx) { + try { + asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx); + } catch (Throwable t) { + log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t); + // invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt + // (entry.data is already deallocate due to any race-condition) so, invalidate cache and next time read from + // the bookie) + invalidateAllEntries(lh.getId()); + callback.readEntriesFailed(createManagedLedgerException(t), ctx); + } + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + final ReadEntriesCallback callback, Object ctx) { final long ledgerId = lh.getId(); final int entriesToRead = (int) (lastEntry - firstEntry) + 1; final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry); From b081d8b7631571f771442f64e7794e9783de7873 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 15 Nov 2018 14:13:38 -0800 Subject: [PATCH 2/2] make asyncReadEntry0 private --- .../java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index 7e5233b8c865e..a435c1442b90b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -231,7 +231,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole } @SuppressWarnings({ "unchecked", "rawtypes" }) - public void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, + private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, final ReadEntriesCallback callback, Object ctx) { final long ledgerId = lh.getId(); final int entriesToRead = (int) (lastEntry - firstEntry) + 1;