Skip to content

Commit 84902ac

Browse files
committed
[improve] [broker] improve read entry error log for troubleshooting (apache#21169)
(cherry picked from commit 65706c6)
1 parent 7b4f82b commit 84902ac

File tree

7 files changed

+43
-3
lines changed

7 files changed

+43
-3
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,11 @@ public void readEntryComplete(Entry entry, Object ctx) {
750750
result.entry = entry;
751751
counter.countDown();
752752
}
753+
754+
@Override
755+
public String toString() {
756+
return String.format("Cursor [{}] get Nth entry", ManagedCursorImpl.this);
757+
}
753758
}, null);
754759

755760
counter.await(ledger.getConfig().getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
@@ -1426,6 +1431,11 @@ public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx)
14261431
callback.readEntriesFailed(exception.get(), ctx);
14271432
}
14281433
}
1434+
1435+
@Override
1436+
public String toString() {
1437+
return String.format("Cursor [{}] async replay entries", ManagedCursorImpl.this);
1438+
}
14291439
};
14301440

14311441
positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position))

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1236,6 +1236,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
12361236
log.error("Error read entry for position {}", nextPos, exception);
12371237
future.completeExceptionally(exception);
12381238
}
1239+
1240+
@Override
1241+
public String toString() {
1242+
return String.format("ML [{}] get earliest message publish time of pos",
1243+
ManagedLedgerImpl.this.name);
1244+
}
12391245
}, null);
12401246

12411247
return future;

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallba
143143
this.getLedgerHandle(position.getLedgerId())
144144
.thenAccept((ledger) -> asyncReadEntry(ledger, position, callback, ctx))
145145
.exceptionally((ex) -> {
146-
log.error("[{}] Error opening ledger for reading at position {} - {}", this.name, position,
147-
ex.getMessage());
146+
log.error("[{}] Error opening ledger for reading at position {} - {}. Op: {}", this.name,
147+
position, ex.getMessage(), callback);
148148
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
149149
return null;
150150
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2558,6 +2558,12 @@ public void readEntryComplete(Entry entry, Object ctx) {
25582558
}
25592559
}
25602560
}
2561+
2562+
@Override
2563+
public String toString() {
2564+
return String.format("Topic [{}] get entry batch size",
2565+
PersistentTopicsBase.this.topicName);
2566+
}
25612567
}, null);
25622568
} catch (NullPointerException npe) {
25632569
batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
@@ -2644,6 +2650,12 @@ public void readEntryComplete(Entry entry, Object ctx) {
26442650
}
26452651
}
26462652
}
2653+
2654+
@Override
2655+
public String toString() {
2656+
return String.format("Topic [{}] internal get message by id",
2657+
PersistentTopicsBase.this.topicName);
2658+
}
26472659
}, null);
26482660
}).exceptionally(ex -> {
26492661
// If the exception is not redirect exception we need to log it.

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
653653
public void readEntryComplete(Entry entry, Object ctx) {
654654
future.complete(entry);
655655
}
656+
657+
@Override
658+
public String toString() {
659+
return String.format("Replication [{}] peek Nth message",
660+
PersistentReplicator.this.producer.getProducerName());
661+
}
656662
}, null);
657663

658664
return future;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
730730
public void readEntryComplete(Entry entry, Object ctx) {
731731
future.complete(entry);
732732
}
733+
734+
@Override
735+
public String toString() {
736+
return String.format("Subscription [{}-{}] async replay entries", PersistentSubscription.this.topicName,
737+
PersistentSubscription.this.subName);
738+
}
733739
}, null);
734740

735741
return future;

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2663,7 +2663,7 @@ public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeco
26632663
(int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp);
26642664
}
26652665
} catch (Exception e) {
2666-
log.warn("[{}] Error while getting the oldest message", topic, e);
2666+
log.warn("[{}] [{}] Error while getting the oldest message", topic, cursor.toString(), e);
26672667
} finally {
26682668
if (entry != null) {
26692669
entry.release();

0 commit comments

Comments
 (0)