From 50c56f3d693d837e3b73828f6fd810b4803f4b84 Mon Sep 17 00:00:00 2001 From: hmottestad Date: Fri, 7 Jan 2022 08:15:21 +0100 Subject: [PATCH] GH-3523 Cache mechanism for MemStatementIterator(s) (#3524) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GH-3523 implement a cache mechanism for MemStatementIterator(s) Signed-off-by: Håvard Ottestad --- .../rdf4j/sail/memory/MemorySailStore.java | 24 ++- .../memory/model/MemStatementIterator.java | 201 +++++++++++++++++- .../model/MemStatementIteratorCache.java | 104 +++++++++ 3 files changed, 320 insertions(+), 9 deletions(-) create mode 100644 core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/model/MemStatementIteratorCache.java diff --git a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java index d778e56ef79..a065726cc80 100644 --- a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java +++ b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/MemorySailStore.java @@ -9,7 +9,6 @@ import java.util.ArrayList; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; @@ -45,6 +44,7 @@ import org.eclipse.rdf4j.sail.memory.model.MemResource; import org.eclipse.rdf4j.sail.memory.model.MemStatement; import org.eclipse.rdf4j.sail.memory.model.MemStatementIterator; +import org.eclipse.rdf4j.sail.memory.model.MemStatementIteratorCache; import org.eclipse.rdf4j.sail.memory.model.MemStatementList; import org.eclipse.rdf4j.sail.memory.model.MemTriple; import org.eclipse.rdf4j.sail.memory.model.MemValue; @@ -62,7 +62,9 @@ class MemorySailStore implements SailStore { public static final EmptyIteration EMPTY_ITERATION = new EmptyIteration<>(); public static final EmptyIteration EMPTY_TRIPLE_ITERATION = new EmptyIteration<>(); public static final MemResource[] EMPTY_CONTEXT = new MemResource[0]; - private final Logger logger = LoggerFactory.getLogger(MemorySailStore.class); + private final static Logger logger = LoggerFactory.getLogger(MemorySailStore.class); + + private final MemStatementIteratorCache iteratorCache = new MemStatementIteratorCache(10); /** * Factory/cache for MemValue objects. @@ -132,6 +134,7 @@ public void close() { try { valueFactory.clear(); statements.clear(); + invalidateCache(); } finally { stLock.release(); } @@ -140,6 +143,10 @@ public void close() { } } + private void invalidateCache() { + iteratorCache.invalidateCache(); + } + @Override public EvaluationStatistics getEvaluationStatistics() { return new MemEvaluationStatistics(valueFactory, statements); @@ -230,8 +237,7 @@ private CloseableIteration createStatementIterator( } private CloseableIteration createStatementIterator(MemResource subj, MemIRI pred, - MemValue obj, - Boolean explicit, int snapshot, MemResource... contexts) { + MemValue obj, Boolean explicit, int snapshot, MemResource... contexts) { MemResource[] memContexts; MemStatementList smallestList; @@ -287,7 +293,8 @@ private CloseableIteration getMemStatementIterator( return EMPTY_ITERATION; } - return new MemStatementIterator<>(smallestList, subj, pred, obj, explicit, snapshot, memContexts); + return MemStatementIterator.cacheAwareInstance(smallestList, subj, pred, obj, explicit, snapshot, memContexts, + iteratorCache); } /** @@ -517,6 +524,7 @@ public synchronized void prepare() throws SailException { @Override public synchronized void flush() throws SailException { if (txnLock) { + invalidateCache(); currentSnapshot = Math.max(currentSnapshot, nextSnapshot); if (requireCleanup) { scheduleSnapshotCleanup(); @@ -579,6 +587,7 @@ public synchronized void observe(Resource subj, IRI pred, Value obj, Resource... @Override public synchronized void clear(Resource... contexts) throws SailException { acquireExclusiveTransactionLock(); + invalidateCache(); requireCleanup = true; try (CloseableIteration iter = createStatementIterator(null, null, null, explicit, nextSnapshot, contexts)) { @@ -592,12 +601,14 @@ public synchronized void clear(Resource... contexts) throws SailException { @Override public synchronized void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws SailException { acquireExclusiveTransactionLock(); + invalidateCache(); addStatement(subj, pred, obj, ctx, explicit); } @Override public synchronized void deprecate(Statement statement) throws SailException { acquireExclusiveTransactionLock(); + invalidateCache(); requireCleanup = true; if (statement instanceof MemStatement) { MemStatement toDeprecate = (MemStatement) statement; @@ -625,6 +636,7 @@ public synchronized void deprecate(Statement statement) throws SailException { } } } + } private void acquireExclusiveTransactionLock() throws SailException { @@ -676,6 +688,7 @@ private MemStatement addStatement(Resource subj, IRI pred, Value obj, Resource c MemStatement st = new MemStatement(memSubj, memPred, memObj, memContext, explicit, nextSnapshot); statements.add(st); st.addToComponentLists(); + invalidateCache(); return st; } @@ -692,6 +705,7 @@ public boolean deprecateByQuery(Resource subj, IRI pred, Value obj, Resource[] c st.setTillSnapshot(nextSnapshot); } } + invalidateCache(); return deprecated; } diff --git a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/model/MemStatementIterator.java b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/model/MemStatementIterator.java index 47a9d94aff6..e813b625eb5 100644 --- a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/model/MemStatementIterator.java +++ b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/model/MemStatementIterator.java @@ -7,7 +7,12 @@ *******************************************************************************/ package org.eclipse.rdf4j.sail.memory.model; +import java.util.Arrays; +import java.util.Objects; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.LookAheadIteration; +import org.eclipse.rdf4j.sail.SailException; /** * A StatementIterator that can iterate over a list of Statement objects. This iterator compares Resource and Literal @@ -23,7 +28,7 @@ public class MemStatementIterator extends LookAheadIteratio /** * The lists of statements over which to iterate. */ - private final MemStatementList statementList; + private MemStatementList statementList; /** * The subject of statements to return, or null if any subject is OK. @@ -64,13 +69,18 @@ public class MemStatementIterator extends LookAheadIteratio /** * The index of the last statement that has been returned. */ - private int statementIdx; + private int statementIndex; /** * True if there are no more elements to retrieve. */ private boolean exhausted; + /** + * The number of returned statements + */ + private int matchingStatements; + /*--------------* * Constructors * *--------------*/ @@ -102,7 +112,15 @@ public MemStatementIterator(MemStatementList statementList, MemResource subject, } this.snapshot = snapshot; this.noIsolation = snapshot < 0; - this.statementIdx = 0; + this.statementIndex = 0; + } + + public static CloseableIteration cacheAwareInstance(MemStatementList smallestList, + MemResource subj, MemIRI pred, MemValue obj, Boolean explicit, int snapshot, MemResource[] memContexts, + MemStatementIteratorCache iteratorCache) { + return new CacheAwareIteration<>( + new MemStatementIterator<>(smallestList, subj, pred, obj, explicit, snapshot, memContexts), + iteratorCache); } /*---------* @@ -120,7 +138,7 @@ protected MemStatement getNextElement() { while (!exhausted) { // First getting the size to check if we are out-of-bounds is more expensive (cache wise) than having a // method in MemStatementList that does this for us. - MemStatement statement = statementList.getIfExists(statementIdx++); + MemStatement statement = statementList.getIfExists(statementIndex++); if (statement == null) { exhausted = true; break; @@ -133,6 +151,7 @@ protected MemStatement getNextElement() { if ((statement.matchesSPO(subject, predicate, object)) && matchesContext(statement) && matchesExplicitAndSnapshot(statement)) { + matchingStatements++; return statement; } } @@ -160,4 +179,178 @@ private boolean matchesExplicitAndSnapshot(MemStatement st) { (noIsolation || st.isInSnapshot(snapshot)); } + @Override + protected void handleClose() throws X { + try { + super.handleClose(); + } finally { + statementList = null; + } + + } + + /** + * Returns true if this iterator was particularly costly and should be considered for caching + * + * @return true if it should be cached + */ + private boolean isCandidateForCache() { + if (exhausted) { // we will only consider caching if the iterator has been completely consumed + if (statementIndex > 1000) { // minimum 1000 statements need to have been checked by the iterator + if (matchingStatements == 0) { // if the iterator was effectively empty we can always cache it + return true; + } else if (matchingStatements < 100) { // we will not cache iterators that returned more than 99 + // statements + double ratio = (statementIndex + 0.0) / matchingStatements; + return ratio > 100; // for every returned statement we need to have checked 100 non-matching + // statements + } + } + } + return false; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MemStatementIterator)) { + return false; + } + MemStatementIterator that = (MemStatementIterator) o; + return explicit == that.explicit && explicitNotSpecified == that.explicitNotSpecified + && snapshot == that.snapshot && noIsolation == that.noIsolation + && subject == that.subject + && predicate == that.predicate && object == that.object + && Arrays.equals(contexts, that.contexts); + } + + private int cachedHashCode = 0; + + @Override + public int hashCode() { + if (cachedHashCode == 0) { + int cachedHashCode = Objects.hash(subject, predicate, object, explicit, explicitNotSpecified, snapshot, + noIsolation); + if (contexts != null) { + if (contexts.length == 1) { + if (contexts[0] == null) { + cachedHashCode += 23; + } else { + cachedHashCode = 29 * cachedHashCode + contexts[0].hashCode(); + } + } else if (contexts.length > 0) { + cachedHashCode = 31 * cachedHashCode + Arrays.hashCode(contexts); + } + } + + this.cachedHashCode = cachedHashCode; + } + return cachedHashCode; + } + + @Override + public String toString() { + return "MemStatementIterator{" + + "subject=" + subject + + ", predicate=" + predicate + + ", object=" + object + + ", contexts=" + Arrays.toString(contexts) + + ", explicit=" + explicit + + ", explicitNotSpecified=" + explicitNotSpecified + + ", snapshot=" + snapshot + + ", noIsolation=" + noIsolation + + '}'; + } + + public Stats getStats() { + return new Stats(statementIndex, matchingStatements); + } + + static class Stats { + private final int checkedStatements; + private final int matchingStatements; + + public Stats(int checkStatements, int matchingStatements) { + this.checkedStatements = checkStatements; + this.matchingStatements = matchingStatements; + } + + @Override + public String toString() { + return "Stats{" + + "checkedStatements=" + checkedStatements + + ", matchingStatements=" + matchingStatements + + '}'; + } + } + + /** + * A wrapper for a MemStatementIterator that checks if the iterator should be cached and retrieves the cached one if + * that is the case. + * + * @author Håvard M. Ottestad + */ + private static class CacheAwareIteration extends LookAheadIteration { + + private final MemStatementIteratorCache iteratorCache; + private final MemStatementIterator memStatementIterator; + private final CloseableIteration cachedIterator; + private Exception e; + + private CacheAwareIteration(MemStatementIterator memStatementIterator, + MemStatementIteratorCache iteratorCache) { + if (iteratorCache.shouldBeCached(memStatementIterator)) { + CloseableIteration cachedIterator = null; + try { + cachedIterator = iteratorCache.getCachedIterator(memStatementIterator); + } catch (Exception e) { + this.e = e; + } + this.cachedIterator = cachedIterator; + this.memStatementIterator = null; + } else { + this.memStatementIterator = memStatementIterator; + this.cachedIterator = null; + } + + this.iteratorCache = iteratorCache; + } + + @Override + protected MemStatement getNextElement() throws X { + if (e != null) { + throw ((X) e); + } + + if (memStatementIterator != null) { + if (memStatementIterator.hasNext()) { + return memStatementIterator.next(); + } + } else { + if (cachedIterator.hasNext()) { + return cachedIterator.next(); + } + } + + return null; + } + + @Override + protected void handleClose() throws X { + try { + super.handleClose(); + } finally { + if (memStatementIterator != null) { + if (memStatementIterator.isCandidateForCache()) { + iteratorCache.incrementIteratorFrequencyMap(memStatementIterator); + } + } else { + cachedIterator.close(); + } + } + } + } + } diff --git a/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/model/MemStatementIteratorCache.java b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/model/MemStatementIteratorCache.java new file mode 100644 index 00000000000..ee557a00e47 --- /dev/null +++ b/core/sail/memory/src/main/java/org/eclipse/rdf4j/sail/memory/model/MemStatementIteratorCache.java @@ -0,0 +1,104 @@ +/******************************************************************************* + * Copyright (c) 2022 Eclipse RDF4J contributors. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + ******************************************************************************/ + +package org.eclipse.rdf4j.sail.memory.model; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.common.iteration.CloseableIteratorIteration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * A cache for MemStatementIterator that tracks how frequently an iterator is used and caches the iterator as a list + * + * @author Håvard M. Ottestad + */ +public class MemStatementIteratorCache { + + private final static Logger logger = LoggerFactory.getLogger(MemStatementIteratorCache.class); + + // the number of times an iterator needs to be used before it will be cached + public final int CACHE_FREQUENCY_THRESHOLD; + + // a map that tracks the number of times a cacheable iterator has been used + private final ConcurrentHashMap, Integer> iteratorFrequencyMap = new ConcurrentHashMap<>(); + + // a cache for commonly used iterators that are particularly costly + private final Cache, List> iteratorCache = CacheBuilder + .newBuilder() + .softValues() + .build(); + + public MemStatementIteratorCache(int cacheFrequencyThreshold) { + this.CACHE_FREQUENCY_THRESHOLD = cacheFrequencyThreshold; + } + + public void invalidateCache() { + if (!(iteratorFrequencyMap.isEmpty())) { + iteratorFrequencyMap.clear(); + iteratorCache.invalidateAll(); + + if (logger.isTraceEnabled()) { + logger.debug("Invalidated cache", new Throwable()); + } else if (logger.isDebugEnabled()) { + logger.debug("Invalidated cache"); + } + } + } + + void incrementIteratorFrequencyMap(MemStatementIterator iterator) { + Integer compute = iteratorFrequencyMap.compute(iterator, (key, value) -> { + if (value == null) { + return 0; + } + return value + 1; + }); + if (logger.isDebugEnabled()) { + logger.debug("Incremented iteratorFrequencyMap to {}\n{} \n{}", compute, iterator, iterator.getStats()); + } + } + + boolean shouldBeCached(MemStatementIterator iterator) { + if (!iteratorFrequencyMap.isEmpty()) { + Integer integer = iteratorFrequencyMap.get(iterator); + return integer != null && integer > CACHE_FREQUENCY_THRESHOLD; + } else { + return false; + } + } + + CloseableIteration getCachedIterator(MemStatementIterator iterator) + throws Exception { + + List cached = iteratorCache.getIfPresent(iterator); + + if (cached == null) { + try (iterator) { + logger.debug("Filling cache {}", iterator); + ArrayList newCache = new ArrayList<>(); + while (iterator.hasNext()) { + newCache.add(iterator.next()); + } + newCache.trimToSize(); + cached = Collections.unmodifiableList(newCache); + } + iteratorCache.put(iterator, cached); + } + + return new CloseableIteratorIteration<>(cached.iterator()); + } + +}