From c27a8afe23ad40085c09f9fa4d982d12d777e155 Mon Sep 17 00:00:00 2001 From: coderzc Date: Sat, 17 Aug 2024 18:45:23 +0800 Subject: [PATCH 1/2] Optimize high CPU usage when consuming from topics with ongoing txn --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f99ee957e025a..1892b2cef38fe 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -953,7 +953,7 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes); - if (hasMoreEntries()) { + if (hasMoreEntries() && maxPosition.compareTo(readPosition) > 0) { // If we have available entries, we can read them immediately if (log.isDebugEnabled()) { log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name); From 46655ca35b904034840225cd22ca04e878c17324 Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 20 Aug 2024 17:02:41 +0800 Subject: [PATCH 2/2] fix code --- .../org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1892b2cef38fe..e808c31bc89f1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -953,7 +953,7 @@ public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, Re int numberOfEntriesToRead = applyMaxSizeCap(maxEntries, maxSizeBytes); - if (hasMoreEntries() && maxPosition.compareTo(readPosition) > 0) { + if (hasMoreEntries() && maxPosition.compareTo(readPosition) >= 0) { // If we have available entries, we can read them immediately if (log.isDebugEnabled()) { log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);