From 613cb9bbc7a3fa7faba14e38ba94ff39726b1d1d Mon Sep 17 00:00:00 2001 From: Guillaume Nodet Date: Thu, 6 Feb 2025 16:25:19 +0100 Subject: [PATCH] [MNG-8563] Improve the object cache memory usage --- src/mdo/java/CacheManager.java | 186 ++++++++++++++++++++++++++++----- 1 file changed, 159 insertions(+), 27 deletions(-) diff --git a/src/mdo/java/CacheManager.java b/src/mdo/java/CacheManager.java index 95fa7a2529d8..84f17e5a69d8 100644 --- a/src/mdo/java/CacheManager.java +++ b/src/mdo/java/CacheManager.java @@ -24,9 +24,14 @@ import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + import org.apache.maven.api.annotations.ThreadSafe; /** @@ -36,9 +41,11 @@ @ThreadSafe class CacheManager { private static final CacheManager INSTANCE = new CacheManager(); + private static final int BATCH_SIZE = 32; + private static final long CLEANUP_WAIT_MS = 100; // Time to wait when queue is empty // Cache of object instances - private final ConcurrentHashMap, ConcurrentHashMap>> instanceCache = + private final ConcurrentHashMap, WeakReference>> instanceCache = new ConcurrentHashMap<>(); // Cache of reflected fields per class @@ -47,22 +54,111 @@ class CacheManager { // Reference queue private final ReferenceQueue refQueue = new ReferenceQueue<>(); + /** + * Lightweight structure to handle hash collisions. + * Acts as both a WeakReference and a linked list node. + */ + private static class CacheNode extends WeakReference { + final Class clazz; + final int hash; + volatile CacheNode next; // null if this is the only element + + CacheNode(Cacheable referent, ReferenceQueue q, Class clazz, int hash) { + super(referent, q); + this.clazz = clazz; + this.hash = hash; + } + + /** + * Finds a matching object in this chain of nodes. + * Also cleans up any cleared references it encounters. + * Returns the node containing the match, or null if none found. + */ + CacheNode findMatch(Cacheable obj, ConcurrentHashMap classCache) { + CacheNode prev = null; + CacheNode current = this; + + while (current != null) { + Cacheable cached = current.get(); + CacheNode next = current.next; + + if (cached == null) { + // Remove cleared reference + if (prev == null) { + // This is the head node + if (next == null) { + // Last node in chain, remove entire entry + classCache.remove(hash); + } else { + // Replace head with next node + classCache.replace(hash, this, next); + } + } else { + // Skip this node + prev.next = next; + } + } else if (obj.cacheEquals(cached)) { + return current; + } else { + prev = current; + } + current = next; + } + return null; + } + } + private CacheManager() { // Start a background thread to process cleared references Thread cleanupThread = new Thread(() -> { + List refBatch = new ArrayList<>(BATCH_SIZE); + while (!Thread.currentThread().isInterrupted()) { try { - CacheReference ref = (CacheReference) refQueue.remove(); - ConcurrentHashMap> classCache = instanceCache.get(ref.clazz); - if (classCache != null) { - classCache.computeIfPresent(ref.hash, (k, list) -> { - synchronized (list) { - list.remove(ref); - return list.isEmpty() ? null : list; + // Try to collect a batch of references + refBatch.clear(); + CacheNode ref; + + // Poll for references until either batch is full or queue is empty + while (refBatch.size() < BATCH_SIZE && + (ref = (CacheNode)refQueue.poll()) != null) { + refBatch.add(ref); + } + + if (refBatch.isEmpty()) { + // Queue is empty, wait for new references + ref = (CacheNode)refQueue.remove(CLEANUP_WAIT_MS); + if (ref != null) { + refBatch.add(ref); + } + } + + // Process the batch + if (!refBatch.isEmpty()) { + // Group references by class for more efficient processing + Map, List> byClass = refBatch.stream() + .collect(Collectors.groupingBy(node -> node.clazz)); + + for (var entry : byClass.entrySet()) { + WeakReference> weakClassCache = + instanceCache.get(entry.getKey()); + if (weakClassCache != null) { + ConcurrentHashMap classCache = weakClassCache.get(); + if (classCache != null) { + // Process all references for this class + for (CacheNode node : entry.getValue()) { + CacheNode head = classCache.get(node.hash); + if (head != null) { + head.findMatch(null, classCache); + } + } + + // If the class cache is empty, remove its weak reference + if (classCache.isEmpty()) { + instanceCache.remove(entry.getKey(), weakClassCache); + } + } } - }); - if (classCache.isEmpty()) { - instanceCache.remove(ref.clazz); } } } catch (InterruptedException e) { @@ -207,31 +303,67 @@ public T cached(T obj) { } Class clazz = obj.getClass(); - ConcurrentHashMap> classCache = - instanceCache.computeIfAbsent(clazz, k -> new ConcurrentHashMap<>()); - + ConcurrentHashMap classCache = getOrCreateClassCache(clazz); int cacheHash = obj.cacheIdentityHash(); - List refs = classCache.compute(cacheHash, (k, oldList) -> { - if (oldList == null) { - oldList = new ArrayList<>(); + while (true) { + CacheNode head = classCache.get(cacheHash); + if (head == null) { + // No existing entry, try to add new one + CacheNode newNode = new CacheNode(obj, refQueue, clazz, cacheHash); + if (classCache.putIfAbsent(cacheHash, newNode) == null) { + return obj; // Successfully added + } + continue; // Race condition, try again } - List newList = new ArrayList<>(oldList); - newList.removeIf(ref -> ref == null || ref.get() == null); - return newList; - }); - synchronized (refs) { - for (WeakReference ref : refs) { - Cacheable cached = ref.get(); - if (cached != null && obj.cacheEquals(cached)) { + // Look for existing match + CacheNode match = head.findMatch(obj, classCache); + if (match != null) { + Cacheable cached = match.get(); + if (cached != null) { return (T) cached; } + continue; // Reference was cleared, try again + } + + // No match found, try to add to chain + synchronized (head) { // Synchronize on head node for chain modifications + // Verify head is still valid + if (classCache.get(cacheHash) != head) { + continue; // Head changed, try again + } + + // Add new node to chain + CacheNode newNode = new CacheNode(obj, refQueue, clazz, cacheHash); + newNode.next = head.next; + head.next = newNode; + return obj; } + } + } + + private ConcurrentHashMap getOrCreateClassCache(Class clazz) { + WeakReference> weakRef = instanceCache.get(clazz); + ConcurrentHashMap classCache = (weakRef != null) ? weakRef.get() : null; - refs.add(new CacheReference(obj, refQueue, clazz, cacheHash)); - return obj; + if (classCache == null) { + classCache = new ConcurrentHashMap<>(); + weakRef = new WeakReference<>(classCache); + WeakReference> existing = + instanceCache.putIfAbsent(clazz, weakRef); + + if (existing != null) { + ConcurrentHashMap existingCache = existing.get(); + if (existingCache != null) { + classCache = existingCache; + } else { + instanceCache.replace(clazz, existing, weakRef); + } + } } + + return classCache; } /**