From f8f7eeec78142bed5ef62d81d45f173fbc4615f8 Mon Sep 17 00:00:00 2001 From: Jerven Bolleman Date: Mon, 5 Jun 2023 15:07:00 +0200 Subject: [PATCH] GH-4581 Make sure collection factories are used when we use a collection This allows to always be sure that we can fall back to disk if required. Also allows optimized datastructures to be injected. Signed-off-by: Jerven Bolleman --- .../common/iteration/DistinctIteration.java | 18 +++++++++ .../common/iteration/IntersectIteration.java | 12 ++++-- .../iteration/DistinctIterationTest.java | 2 +- .../impl/DefaultEvaluationStrategy.java | 13 +++--- .../IntersectionQueryEvaluationStep.java | 40 ++++++------------- .../evaluation/iterator/PathIteration.java | 3 -- .../iterator/ZeroLengthPathIteration.java | 6 --- .../sail/base/SailDatasetTripleSource.java | 2 +- .../eclipse/rdf4j/federated/FedXConfig.java | 18 +++++++++ .../rdf4j/federated/FedXConnection.java | 32 ++++----------- .../eclipse/rdf4j/federated/FedXFactory.java | 1 - .../evaluation/FederationEvalStrategy.java | 3 +- .../org/eclipse/rdf4j/federated/FedXRule.java | 1 + 13 files changed, 75 insertions(+), 76 deletions(-) diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/DistinctIteration.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/DistinctIteration.java index 12f873f587a..0f9215ac837 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/DistinctIteration.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/DistinctIteration.java @@ -38,12 +38,30 @@ public class DistinctIteration extends FilterIteration { * * @param iter The underlying iterator. */ + @Deprecated public DistinctIteration(CloseableIteration iter) { super(iter); excludeSet = new HashSet<>(); } + /** + * Creates a new DistinctIterator. + * + * @param Set a hopefully optimized set + * @param iter The underlying iterator. + */ + public DistinctIteration(CloseableIteration iter, Set excludeSet) { + super(iter); + this.excludeSet = excludeSet; + } + + /** + * Creates a new DistinctIterator. + * + * @param Supplier> a supplier of a hopefully optimized set + * @param iter The underlying iterator. + */ public DistinctIteration(CloseableIteration iter, Supplier> setMaker) { super(iter); excludeSet = setMaker.get(); diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IntersectIteration.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IntersectIteration.java index 69dc026e733..d9db82969eb 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IntersectIteration.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IntersectIteration.java @@ -14,6 +14,7 @@ import java.util.HashSet; import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Stream; /** * An Iteration that returns the intersection of the results of two Iterations. Optionally, the Iteration can be @@ -34,7 +35,7 @@ public class IntersectIteration extends FilterIteration { private boolean initialized; - private final Set includeSet; + private Set includeSet; private final Supplier> setMaker; @@ -64,7 +65,6 @@ public IntersectIteration(CloseableIteration arg1, CloseableIterati * @param arg1 An Iteration containing the first set of elements. * @param arg2 An Iteration containing the second set of elements. * @param distinct Flag indicating whether duplicate elements should be filtered from the result. - * @param set A set used to determine the intersection */ public IntersectIteration(CloseableIteration arg1, CloseableIteration arg2, boolean distinct) { @@ -107,7 +107,13 @@ public IntersectIteration(CloseableIteration arg1, CloseableIterati @Override protected boolean accept(E object) { if (!initialized) { - initialize(); + // Build set of elements-to-include from second argument + includeSet = setMaker.get(); + while (arg2.hasNext()) { + includeSet.add(arg2.next()); + } + arg2.close(); + initialized = true; } if (inIncludeSet(object)) { diff --git a/core/query/src/test/java/org/eclipse/rdf4j/common/iteration/DistinctIterationTest.java b/core/query/src/test/java/org/eclipse/rdf4j/common/iteration/DistinctIterationTest.java index 04a79bc10a1..c3e9a9120ab 100644 --- a/core/query/src/test/java/org/eclipse/rdf4j/common/iteration/DistinctIterationTest.java +++ b/core/query/src/test/java/org/eclipse/rdf4j/common/iteration/DistinctIterationTest.java @@ -16,7 +16,7 @@ public class DistinctIterationTest extends CloseableIterationTest { @Override protected CloseableIteration createTestIteration() { - return new DistinctIteration<>(createStringList1Iteration(), new HashSet<>()); + return new DistinctIteration<>(createStringList1Iteration(), HashSet::new); } @Override diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java index 695986b50d1..6ecd3c9a502 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/DefaultEvaluationStrategy.java @@ -533,16 +533,15 @@ protected QueryEvaluationStep prepare(Difference node, QueryEvaluationContext co } protected QueryEvaluationStep prepare(Group node, QueryEvaluationContext context) throws QueryEvaluationException { - return bindings -> new GroupIterator(DefaultEvaluationStrategy.this, node, bindings, - iterationCacheSyncThreshold, - context); + return bindings -> new GroupIterator(DefaultEvaluationStrategy.this, node, bindings, context); } protected QueryEvaluationStep prepare(Intersection node, QueryEvaluationContext context) throws QueryEvaluationException { QueryEvaluationStep leftArg = precompile(node.getLeftArg(), context); QueryEvaluationStep rightArg = precompile(node.getRightArg(), context); - return new IntersectionQueryEvaluationStep(leftArg, rightArg, this.getCollectionFactory().get()); + + return new IntersectionQueryEvaluationStep(leftArg, rightArg, getCollectionFactory()); } protected QueryEvaluationStep prepare(Join node, QueryEvaluationContext context) throws QueryEvaluationException { @@ -714,9 +713,9 @@ protected QueryEvaluationStep prepare(Distinct node, QueryEvaluationContext cont return new QueryEvaluationStep() { @Override - public CloseableIteration evaluate(BindingSet bindings) { - final CloseableIteration evaluate = child.evaluate(bindings); - return new DistinctIteration(evaluate, + public CloseableIteration evaluate(BindingSet bindings) { + final CloseableIteration evaluate = child.evaluate(bindings); + return new DistinctIteration(evaluate, cf.createSetOfBindingSets()) { @Override diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/evaluationsteps/IntersectionQueryEvaluationStep.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/evaluationsteps/IntersectionQueryEvaluationStep.java index 14845fef907..464411ad034 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/evaluationsteps/IntersectionQueryEvaluationStep.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/evaluationsteps/IntersectionQueryEvaluationStep.java @@ -10,13 +10,12 @@ *******************************************************************************/ package org.eclipse.rdf4j.query.algebra.evaluation.impl.evaluationsteps; -import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.IntersectIteration; -import org.eclipse.rdf4j.common.iteration.Iteration; import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep; @@ -25,41 +24,26 @@ */ public class IntersectionQueryEvaluationStep implements QueryEvaluationStep { - private static final class IntersectIterationUsingSetFromCollectionFactory - extends IntersectIteration { - private final CollectionFactory cf; - - private IntersectIterationUsingSetFromCollectionFactory(Iteration arg1, - Iteration arg2, CollectionFactory cf) { - super(arg1, arg2, false, cf.createSetOfBindingSets()); - this.cf = cf; - } - - @Override - protected void handleClose() throws QueryEvaluationException { - try { - cf.close(); - } catch (QueryEvaluationException e) { - super.handleClose(); - throw e; - } - } - } - private final QueryEvaluationStep leftArg; private final Function rightArgDelayed; - private final CollectionFactory collectionFactory; + private final Supplier cfs; public IntersectionQueryEvaluationStep(QueryEvaluationStep leftArg, QueryEvaluationStep rightArg, - CollectionFactory collectionFactory) { - this.collectionFactory = collectionFactory; + Supplier cfs) { + this.cfs = cfs; this.leftArg = leftArg; rightArgDelayed = bs -> new DelayedEvaluationIteration(rightArg, bs); } @Override public CloseableIteration evaluate(BindingSet bs) { - return new IntersectIterationUsingSetFromCollectionFactory(leftArg.evaluate(bs), rightArgDelayed.apply(bs), - collectionFactory); + CollectionFactory cf = cfs.get(); + return new IntersectIteration<>(leftArg.evaluate(bs), rightArgDelayed.apply(bs), cf::createSetOfBindingSets) { + @Override + protected void handleClose() { + cf.close(); + } + }; } + } diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java index 74449bae7c2..9e3875195eb 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/PathIteration.java @@ -65,8 +65,6 @@ public class PathIteration extends LookAheadIteration { private ValuePair currentVp; - private final CollectionFactory cf; - private static final String JOINVAR_PREFIX = "intermediate_join_"; private final Set namedIntermediateJoins = new HashSet<>(); @@ -90,7 +88,6 @@ public PathIteration(EvaluationStrategy strategy, Scope scope, Var startVar, this.currentLength = minLength; this.bindings = bindings; - this.collectionFactory = strategy.getCollectionFactory().get(); this.reportedValues = collectionFactory.createSet(); this.unreportedValues = collectionFactory.createSet(); diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/ZeroLengthPathIteration.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/ZeroLengthPathIteration.java index 27e7fc01595..5241ef22dde 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/ZeroLengthPathIteration.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/iterator/ZeroLengthPathIteration.java @@ -176,14 +176,8 @@ public Var createAnonVar(String varName) { return new Var(varName, true); } - @Override - protected void handleClose() { - - } - @Override protected void handleClose() throws QueryEvaluationException { cf.close(); - super.handleClose(); } } diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailDatasetTripleSource.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailDatasetTripleSource.java index 80b6243f42b..0301a4675f1 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailDatasetTripleSource.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailDatasetTripleSource.java @@ -11,8 +11,8 @@ package org.eclipse.rdf4j.sail.base; import java.util.Comparator; -import java.util.Set; import java.util.HashSet; +import java.util.Set; import org.eclipse.rdf4j.common.annotation.InternalUseOnly; import org.eclipse.rdf4j.common.iteration.CloseableIteration; diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java index d25392daa2f..06bfe527e25 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConfig.java @@ -12,6 +12,8 @@ import java.util.Optional; +import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; +import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory; import org.eclipse.rdf4j.federated.cache.SourceSelectionCache; import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache; import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; @@ -63,6 +65,7 @@ public class FedXConfig { private int consumingIterationMax = 1000; + private CollectionFactory cf = new DefaultCollectionFactory(); /* factory like setters */ /** @@ -445,4 +448,19 @@ public FedXConfig withConsumingIterationMax(int max) { public int getConsumingIterationMax() { return consumingIterationMax; } + + /** + * Set the CollectionFactory to be used by the federation + * + *

+ * Can only be set before federation initialization. + *

+ * + * @param cf + * @return the current config + */ + public FedXConfig withCollectionFactory(CollectionFactory cf) { + this.cf = cf; + return this; + } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java index 5bb78dc9830..3968b8a15a5 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java @@ -11,7 +11,6 @@ package org.eclipse.rdf4j.federated; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; @@ -19,7 +18,6 @@ import org.eclipse.rdf4j.common.iteration.DistinctIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.common.iteration.ExceptionConvertingIteration; -import org.eclipse.rdf4j.common.iteration.Iteration; import org.eclipse.rdf4j.common.iteration.Iterations; import org.eclipse.rdf4j.common.transaction.TransactionSetting; import org.eclipse.rdf4j.federated.algebra.PassThroughTupleExpr; @@ -267,23 +265,23 @@ public void cancel() { // execute the union in a separate thread federationContext.getManager().getExecutor().execute(union); - CollectionFactory cf = federation.getCollectionFactory().get(); - return new DistinctIteration<>(new ToSailExceptionConvertingIteration<>(union), cf.createSet()) { + CollectionFactory cf = federation.getCollectionFactory().get(); + return new DistinctIteration<>(new ExceptionConvertingIteration<>(union) { @Override protected SailException convert(RuntimeException e) { return new SailException(e); + } - protected void handleClose() throws SailException { + @Override + protected void handleClose() { try { cf.close(); - } catch (SailException e) { + } finally { super.handleClose(); - throw e; } } - - }; + }); } @Override @@ -313,16 +311,12 @@ protected CloseableIteration getStatementsInternal(Resource CloseableIteration res = null; try { res = strategy.getStatements(queryInfo, subj, pred, obj, contexts); -<<<<<<< HEAD return new ExceptionConvertingIteration<>(res) { @Override protected SailException convert(RuntimeException e) { return new SailException(e); } }; -======= - return new ToSailExceptionConvertingIteration<>(res); ->>>>>>> 7559682f8f (GH-4581 Remove limitedsize evaluation strategies.) } catch (Throwable t) { if (res != null) { res.close(); @@ -471,18 +465,6 @@ private static int getOriginalMaxExecutionTime(BindingSet b) { return 0; } - private static final class ToSailExceptionConvertingIteration - extends ExceptionConvertingIteration { - private ToSailExceptionConvertingIteration(Iteration iter) { - super(iter); - } - - @Override - protected SailException convert(Exception e) { - return new SailException(e); - } - } - /** * A default implementation for {@link AbstractSail}. This implementation has no further use, however it is needed * for the constructor call. diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXFactory.java index 6e686ac131e..f2b2f3d6495 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXFactory.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXFactory.java @@ -220,7 +220,6 @@ public FedXRepository create() { if (this.writeStrategyFactory != null) { federation.setWriteStrategyFactory(writeStrategyFactory); } - FedXRepository repo = new FedXRepository(federation, this.config); if (this.repositoryResolver != null) { repo.setRepositoryResolver(repositoryResolver); diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java index 14fbdbd9dc6..07965138bb3 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java @@ -15,8 +15,10 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.stream.Collectors; +import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.common.iteration.SingletonIteration; @@ -987,5 +989,4 @@ protected CloseableIteration evaluateAtStatementSources( throw new QueryEvaluationException(e); } } - } diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/FedXRule.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/FedXRule.java index 1cfc97fac61..9283594cc5d 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/FedXRule.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/FedXRule.java @@ -43,6 +43,7 @@ public void beforeEach(ExtensionContext ctx) { for (Consumer configConsumer : configurations) { configConsumer.accept(fedxConfig); } + List endpoints = Collections.emptyList(); repository = FedXFactory.newFederation().withMembers(endpoints).withConfig(fedxConfig).create(); repository.init();