From d5a0934017f0fe7ba5ed0577d418a66ee7379015 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 12 Mar 2024 02:40:40 +0800 Subject: [PATCH 1/4] clean up --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 0b9a9c3e9fc94..9c3598f46ef24 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1502,10 +1502,7 @@ public Set asyncReplayEntries(Set positi Set alreadyAcknowledgedPositions = new HashSet<>(); lock.readLock().lock(); try { - positions.stream() - .filter(position -> ((PositionImpl) position).compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) - .forEach(alreadyAcknowledgedPositions::add); + positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add); } finally { lock.readLock().unlock(); } @@ -2278,8 +2275,7 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb return; } - if (position.compareTo(markDeletePosition) <= 0 - || individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId())) { + if (isMessageDeleted(position)) { if (config.isDeletionAtBatchIndexLevelEnabled()) { BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position); if (bitSetRecyclable != null) { @@ -3504,8 +3500,7 @@ public Range getLastIndividualDeletedRange() { @Override public void trimDeletedEntries(List entries) { entries.removeIf(entry -> { - boolean isDeleted = markDeletePosition.compareTo(entry.getLedgerId(), entry.getEntryId()) >= 0 - || individualDeletedMessages.contains(entry.getLedgerId(), entry.getEntryId()); + boolean isDeleted = isMessageDeleted(entry.getPosition()); if (isDeleted) { entry.release(); } From ef8c55d06c6adb2b932c2533b7d1207305c433e2 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 12 Mar 2024 03:08:11 +0800 Subject: [PATCH 2/4] clean up --- .../bookkeeper/mledger/impl/EntryImpl.java | 20 ++++++++++++++++++- .../mledger/impl/PositionImplRecyclable.java | 2 ++ .../pendingack/impl/PendingAckHandleImpl.java | 3 +-- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 6512399173f0a..11ba9222401f5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -42,6 +42,7 @@ protected EntryImpl newObject(Handle handle) { private long timestamp; private long ledgerId; private long entryId; + private PositionImplRecyclable position; ByteBuf data; private Runnable onDeallocate; @@ -51,6 +52,9 @@ public static EntryImpl create(LedgerEntry ledgerEntry) { entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerEntry.getLedgerId(); entry.entryId = ledgerEntry.getEntryId(); + entry.position = PositionImplRecyclable.create(); + entry.position.ledgerId = entry.ledgerId; + entry.position.entryId = entry.entryId; entry.data = ledgerEntry.getEntryBuffer(); entry.data.retain(); entry.setRefCnt(1); @@ -63,6 +67,9 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) { entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; + entry.position = PositionImplRecyclable.create(); + entry.position.ledgerId = entry.ledgerId; + entry.position.entryId = entry.entryId; entry.data = Unpooled.wrappedBuffer(data); entry.setRefCnt(1); return entry; @@ -73,6 +80,9 @@ public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) { entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; + entry.position = PositionImplRecyclable.create(); + entry.position.ledgerId = entry.ledgerId; + entry.position.entryId = entry.entryId; entry.data = data; entry.data.retain(); entry.setRefCnt(1); @@ -84,6 +94,9 @@ public static EntryImpl create(PositionImpl position, ByteBuf data) { entry.timestamp = System.nanoTime(); entry.ledgerId = position.getLedgerId(); entry.entryId = position.getEntryId(); + entry.position = PositionImplRecyclable.create(); + entry.position.ledgerId = entry.ledgerId; + entry.position.entryId = entry.entryId; entry.data = data; entry.data.retain(); entry.setRefCnt(1); @@ -95,6 +108,9 @@ public static EntryImpl create(EntryImpl other) { entry.timestamp = System.nanoTime(); entry.ledgerId = other.ledgerId; entry.entryId = other.entryId; + entry.position = PositionImplRecyclable.create(); + entry.position.ledgerId = entry.ledgerId; + entry.position.entryId = entry.entryId; entry.data = other.data.retainedDuplicate(); entry.setRefCnt(1); return entry; @@ -151,7 +167,7 @@ public int getLength() { @Override public PositionImpl getPosition() { - return new PositionImpl(ledgerId, entryId); + return position; } @Override @@ -197,6 +213,8 @@ protected void deallocate() { timestamp = -1; ledgerId = -1; entryId = -1; + position.recycle(); + position = null; recyclerHandle.recycle(this); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java index eb2b33e858d63..d1578dc42aa33 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java @@ -43,6 +43,8 @@ public static PositionImplRecyclable create() { } public void recycle() { + ledgerId = -1; + entryId = -1; ackSet = null; recyclerHandle.recycle(this); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 7dbe0385fd7e9..6011d7e7e094c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -848,8 +848,7 @@ protected void handleIndividualAckRecover(TxnID txnID, List Date: Tue, 12 Mar 2024 12:52:32 +0800 Subject: [PATCH 3/4] clean up --- .../bookkeeper/mledger/impl/EntryImpl.java | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 11ba9222401f5..803979313575a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -42,7 +42,7 @@ protected EntryImpl newObject(Handle handle) { private long timestamp; private long ledgerId; private long entryId; - private PositionImplRecyclable position; + private PositionImpl position; ByteBuf data; private Runnable onDeallocate; @@ -52,9 +52,6 @@ public static EntryImpl create(LedgerEntry ledgerEntry) { entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerEntry.getLedgerId(); entry.entryId = ledgerEntry.getEntryId(); - entry.position = PositionImplRecyclable.create(); - entry.position.ledgerId = entry.ledgerId; - entry.position.entryId = entry.entryId; entry.data = ledgerEntry.getEntryBuffer(); entry.data.retain(); entry.setRefCnt(1); @@ -67,9 +64,6 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) { entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; - entry.position = PositionImplRecyclable.create(); - entry.position.ledgerId = entry.ledgerId; - entry.position.entryId = entry.entryId; entry.data = Unpooled.wrappedBuffer(data); entry.setRefCnt(1); return entry; @@ -80,9 +74,6 @@ public static EntryImpl create(long ledgerId, long entryId, ByteBuf data) { entry.timestamp = System.nanoTime(); entry.ledgerId = ledgerId; entry.entryId = entryId; - entry.position = PositionImplRecyclable.create(); - entry.position.ledgerId = entry.ledgerId; - entry.position.entryId = entry.entryId; entry.data = data; entry.data.retain(); entry.setRefCnt(1); @@ -94,9 +85,6 @@ public static EntryImpl create(PositionImpl position, ByteBuf data) { entry.timestamp = System.nanoTime(); entry.ledgerId = position.getLedgerId(); entry.entryId = position.getEntryId(); - entry.position = PositionImplRecyclable.create(); - entry.position.ledgerId = entry.ledgerId; - entry.position.entryId = entry.entryId; entry.data = data; entry.data.retain(); entry.setRefCnt(1); @@ -108,9 +96,6 @@ public static EntryImpl create(EntryImpl other) { entry.timestamp = System.nanoTime(); entry.ledgerId = other.ledgerId; entry.entryId = other.entryId; - entry.position = PositionImplRecyclable.create(); - entry.position.ledgerId = entry.ledgerId; - entry.position.entryId = entry.entryId; entry.data = other.data.retainedDuplicate(); entry.setRefCnt(1); return entry; @@ -167,6 +152,9 @@ public int getLength() { @Override public PositionImpl getPosition() { + if (position == null) { + position = PositionImpl.get(ledgerId, entryId); + } return position; } @@ -213,7 +201,6 @@ protected void deallocate() { timestamp = -1; ledgerId = -1; entryId = -1; - position.recycle(); position = null; recyclerHandle.recycle(this); } From 27f0f78f86ed834d0aeeb9c09ecc086152109c74 Mon Sep 17 00:00:00 2001 From: dao-jun Date: Tue, 12 Mar 2024 13:46:02 +0800 Subject: [PATCH 4/4] clean up --- .../apache/bookkeeper/mledger/impl/PositionImplRecyclable.java | 2 -- .../transaction/pendingack/impl/PendingAckHandleImpl.java | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java index d1578dc42aa33..eb2b33e858d63 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImplRecyclable.java @@ -43,8 +43,6 @@ public static PositionImplRecyclable create() { } public void recycle() { - ledgerId = -1; - entryId = -1; ackSet = null; recyclerHandle.recycle(this); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index 6011d7e7e094c..7dbe0385fd7e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -848,7 +848,8 @@ protected void handleIndividualAckRecover(TxnID txnID, List