Skip to content

Commit

Permalink
Handle unknown runtime exception while reading entries (#2993)
Browse files Browse the repository at this point in the history
* Handle unknown runtime exception while reading entries

* make asyncReadEntry0 private
  • Loading branch information
rdhabalia authored Nov 16, 2018
1 parent 579e0dc commit 2c8c288
Showing 1 changed file with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,20 @@ public void invalidateAllEntries(long ledgerId) {
@Override
public void asyncReadEntry(ReadHandle lh, PositionImpl position, final ReadEntryCallback callback,
final Object ctx) {
try {
asyncReadEntry0(lh, position, callback, ctx);
} catch (Throwable t) {
log.warn("failed to read entries for {}-{}", lh.getId(), position, t);
// invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
// (entry.data is already deallocate due to any race-condition) so, invalidate cache and next time read from
// the bookie)
invalidateAllEntries(lh.getId());
callback.readEntryFailed(createManagedLedgerException(t), ctx);
}
}

private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEntryCallback callback,
final Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), lh.getId(), position.getEntryId());
}
Expand Down Expand Up @@ -202,9 +216,23 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, final ReadEntry
}

@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final ReadEntriesCallback callback, Object ctx) {
try {
asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx);
} catch (Throwable t) {
log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry, t);
// invalidate all entries related to ledger from the cache (it might happen if entry gets corrupt
// (entry.data is already deallocate due to any race-condition) so, invalidate cache and next time read from
// the bookie)
invalidateAllEntries(lh.getId());
callback.readEntriesFailed(createManagedLedgerException(t), ctx);
}
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final ReadEntriesCallback callback, Object ctx) {
final long ledgerId = lh.getId();
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry);
Expand Down

0 comments on commit 2c8c288

Please sign in to comment.