diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java index 52ab96ecbcd6..b600b122f9b4 100644 --- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java +++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java @@ -21,17 +21,18 @@ import org.slf4j.Logger; +import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; - /** - * A registry containing snapshots of timeline data structures. - * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time. - * Therefore, we use ArrayLists here rather than a data structure with higher overhead. + * A registry containing snapshots of timeline data structures. All timeline data structures must + * be registered here, so that they can be reverted to the expected state when desired. + * Because the registry only keeps a weak reference to each timeline data structure, it does not + * prevent them from being garbage collected. */ public class SnapshotRegistry { public static final long LATEST_EPOCH = Long.MAX_VALUE; @@ -107,12 +108,39 @@ public Snapshot next() { private final Snapshot head = new Snapshot(Long.MIN_VALUE); /** - * Collection of all Revertable registered with this registry + * A collection of all Revertable objects registered here. Since we store only weak + * references, every time we access a revertable through this list, we must check to + * see if it has been garbage collected. If so, WeakReference.get will return null. + * + * Although the garbage collector handles freeing the underlying Revertables, over + * time slots in the ArrayList will fill up with expired references. Therefore, after + * enough registrations, we scrub the ArrayList of the expired references by creating + * a new arraylist. */ - private final List revertables = new ArrayList<>(); + private List> revertables = new ArrayList<>(); + + /** + * The maximum number of registrations to allow before we compact the revertable list. + */ + private final int maxRegistrationsSinceScrub; + + /** + * The number of registrations we have done since removing all expired weak references. + */ + private int numRegistrationsSinceScrub = 0; + + /** + * The number of scrubs that we have done. + */ + private long numScrubs = 0; public SnapshotRegistry(LogContext logContext) { + this(logContext, 10_000); + } + + public SnapshotRegistry(LogContext logContext, int maxRegistrationsSinceScrub) { this.log = logContext.logger(SnapshotRegistry.class); + this.maxRegistrationsSinceScrub = maxRegistrationsSinceScrub; } /** @@ -283,21 +311,60 @@ public long latestEpoch() { return head.prev().epoch(); } + /** + * Return the number of scrub operations that we have done. + */ + public long numScrubs() { + return numScrubs; + } + /** * Associate a revertable with this registry. */ void register(Revertable revertable) { - revertables.add(revertable); + numRegistrationsSinceScrub++; + if (numRegistrationsSinceScrub > maxRegistrationsSinceScrub) { + scrub(); + } + revertables.add(new WeakReference<>(revertable)); + } + + /** + * Remove all expired weak references from the revertable list. + */ + void scrub() { + ArrayList> newRevertables = + new ArrayList<>(revertables.size() / 2); + for (WeakReference ref : revertables) { + if (ref.get() != null) { + newRevertables.add(ref); + } + } + numScrubs++; + this.revertables = newRevertables; + numRegistrationsSinceScrub = 0; } /** - * Delete all snapshots and resets all of the Revertable object registered. + * Delete all snapshots and reset all of the Revertable objects. */ public void reset() { deleteSnapshotsUpTo(LATEST_EPOCH); - for (Revertable revertable : revertables) { - revertable.reset(); + ArrayList> newRevertables = new ArrayList<>(); + for (WeakReference ref : revertables) { + Revertable revertable = ref.get(); + if (revertable != null) { + try { + revertable.reset(); + } catch (Exception e) { + log.error("Error reverting {}", revertable, e); + } + newRevertables.add(ref); + } } + numScrubs++; + this.revertables = newRevertables; + numRegistrationsSinceScrub = 0; } } diff --git a/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java b/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java index 264c9231f9c4..dacc91fa931d 100644 --- a/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java +++ b/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java @@ -94,4 +94,28 @@ public void testCreateSnapshotOfLatest() { assertEquals(latest, duplicate); } + + @Test + public void testScrub() { + SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2); + new TimelineInteger(registry).set(123); + new TimelineInteger(registry).set(123); + assertEquals(0, registry.numScrubs()); + new TimelineInteger(registry).set(123); + assertEquals(1, registry.numScrubs()); + new TimelineInteger(registry).set(123); + new TimelineInteger(registry).set(123); + new TimelineInteger(registry).set(123); + assertEquals(2, registry.numScrubs()); + } + + @Test + public void testReset() { + SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2); + TimelineInteger integer = new TimelineInteger(registry); + integer.set(123); + registry.reset(); + assertEquals(0, integer.get()); + assertEquals(1, registry.numScrubs()); + } }