diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java index 12e28793557b3..669562055214c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java @@ -18,49 +18,95 @@ */ package org.apache.pulsar.broker.service; +import it.unimi.dsi.fastutil.longs.Long2IntMap; +import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; import java.util.List; +import java.util.concurrent.locks.StampedLock; import org.apache.bookkeeper.mledger.Position; -import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; -import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair; public class InMemoryRedeliveryTracker implements RedeliveryTracker { - - private ConcurrentLongLongPairHashMap trackerCache = ConcurrentLongLongPairHashMap.newBuilder() - .concurrencyLevel(1) - .expectedItems(256) - .autoShrink(true) - .build(); + // ledgerId -> entryId -> count + private Long2ObjectMap trackerCache = new Long2ObjectOpenHashMap<>(); + private final StampedLock rwLock = new StampedLock(); @Override public int incrementAndGetRedeliveryCount(Position position) { - Position positionImpl = position; - LongPair count = trackerCache.get(positionImpl.getLedgerId(), positionImpl.getEntryId()); - int newCount = (int) (count != null ? count.first + 1 : 1); - trackerCache.put(positionImpl.getLedgerId(), positionImpl.getEntryId(), newCount, 0L); + long stamp = rwLock.writeLock(); + int newCount; + try { + Long2IntMap entryMap = trackerCache.computeIfAbsent(position.getLedgerId(), + k -> new Long2IntOpenHashMap()); + newCount = entryMap.getOrDefault(position.getEntryId(), 0) + 1; + entryMap.put(position.getEntryId(), newCount); + } finally { + rwLock.unlockWrite(stamp); + } return newCount; } @Override public int getRedeliveryCount(long ledgerId, long entryId) { - LongPair count = trackerCache.get(ledgerId, entryId); - return (int) (count != null ? count.first : 0); + long stamp = rwLock.tryOptimisticRead(); + Long2IntMap entryMap = trackerCache.get(ledgerId); + int count = entryMap != null ? entryMap.get(entryId) : 0; + if (!rwLock.validate(stamp)) { + stamp = rwLock.readLock(); + try { + entryMap = trackerCache.get(ledgerId); + count = entryMap != null ? entryMap.get(entryId) : 0; + } finally { + rwLock.unlockRead(stamp); + } + } + return count; } @Override public void remove(Position position) { - Position positionImpl = position; - trackerCache.remove(positionImpl.getLedgerId(), positionImpl.getEntryId()); + long stamp = rwLock.writeLock(); + try { + Long2IntMap entryMap = trackerCache.get(position.getLedgerId()); + if (entryMap != null) { + entryMap.remove(position.getEntryId()); + if (entryMap.isEmpty()) { + trackerCache.remove(position.getLedgerId()); + } + } + } finally { + rwLock.unlockWrite(stamp); + } } @Override public void removeBatch(List positions) { - if (positions != null) { - positions.forEach(this::remove); + if (positions == null) { + return; + } + long stamp = rwLock.writeLock(); + try { + for (Position position : positions) { + Long2IntMap entryMap = trackerCache.get(position.getLedgerId()); + if (entryMap != null) { + entryMap.remove(position.getEntryId()); + if (entryMap.isEmpty()) { + trackerCache.remove(position.getLedgerId()); + } + } + } + } finally { + rwLock.unlockWrite(stamp); } } @Override public void clear() { - trackerCache.clear(); + long stamp = rwLock.writeLock(); + try { + trackerCache.clear(); + } finally { + rwLock.unlockWrite(stamp); + } } }