Skip to content

Commit

Permalink
GH-4581 Make sure collection factories are used when we use a collection
Browse files Browse the repository at this point in the history
This allows to always be sure that we can fall back to disk if required.
Also allows optimized datastructures to be injected.
  • Loading branch information
JervenBolleman committed May 31, 2024
1 parent 65ba065 commit a58ff06
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,30 @@ public class DistinctIteration<E> extends FilterIteration<E> {
*
* @param iter The underlying iterator.
*/
@Deprecated
public DistinctIteration(CloseableIteration<? extends E> iter) {
super(iter);

excludeSet = new HashSet<>();
}

public DistinctIteration(CloseableIteration<? extends E> iter, Set<E> set) {
/**
* Creates a new DistinctIterator.
*
* @param Set<E> a hopefully optimized set
* @param iter The underlying iterator.
*/
public DistinctIteration(CloseableIteration<? extends E> iter, Set<E> excludeSet) {
super(iter);
excludeSet = set;
this.excludeSet = excludeSet;
}

/**
* Creates a new DistinctIterator.
*
* @param Supplier<Set<E>> a supplier of a hopefully optimized set
* @param iter The underlying iterator.
*/
public DistinctIteration(CloseableIteration<? extends E> iter, Supplier<Set<E>> setMaker) {
super(iter);
excludeSet = setMaker.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class DistinctIterationTest extends CloseableIterationTest {

@Override
protected CloseableIteration<String> createTestIteration() {
return new DistinctIteration<>(createStringList1Iteration(), new HashSet<>());
return new DistinctIteration<>(createStringList1Iteration(), HashSet::new);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,16 +533,15 @@ protected QueryEvaluationStep prepare(Difference node, QueryEvaluationContext co
}

protected QueryEvaluationStep prepare(Group node, QueryEvaluationContext context) throws QueryEvaluationException {
return bindings -> new GroupIterator(DefaultEvaluationStrategy.this, node, bindings,
iterationCacheSyncThreshold,
context);
return bindings -> new GroupIterator(DefaultEvaluationStrategy.this, node, bindings, context);
}

protected QueryEvaluationStep prepare(Intersection node, QueryEvaluationContext context)
throws QueryEvaluationException {
QueryEvaluationStep leftArg = precompile(node.getLeftArg(), context);
QueryEvaluationStep rightArg = precompile(node.getRightArg(), context);
return new IntersectionQueryEvaluationStep(leftArg, rightArg, this.getCollectionFactory().get());

return new IntersectionQueryEvaluationStep(leftArg, rightArg, getCollectionFactory());
}

protected QueryEvaluationStep prepare(Join node, QueryEvaluationContext context) throws QueryEvaluationException {
Expand Down Expand Up @@ -710,20 +709,19 @@ protected QueryEvaluationStep prepare(DescribeOperator node, QueryEvaluationCont
protected QueryEvaluationStep prepare(Distinct node, QueryEvaluationContext context)
throws QueryEvaluationException {
final QueryEvaluationStep child = precompile(node.getArg(), context);
CollectionFactory cf = this.getCollectionFactory().get();
final CollectionFactory cf = this.getCollectionFactory().get();
return bindings -> {
final CloseableIteration<BindingSet> evaluate = child.evaluate(bindings);
return new DistinctIteration<BindingSet>(evaluate, cf::createSet) {
return new DistinctIteration<BindingSet>(evaluate, cf.createSetOfBindingSets()) {

@Override
protected void handleClose() {
protected void handleClose() throws QueryEvaluationException {
try {
cf.close();
} finally {
super.handleClose();
}
}

};
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,53 +11,43 @@
package org.eclipse.rdf4j.query.algebra.evaluation.impl.evaluationsteps;

import java.util.function.Function;
import java.util.function.Supplier;

import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.IntersectIteration;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryEvaluationStep;

/**
* A step that prepares the arguments of an Intersection operator before execution.
*/
public class IntersectionQueryEvaluationStep implements QueryEvaluationStep {

private static final class IntersectIterationUsingSetFromCollectionFactory
extends IntersectIteration<BindingSet> {
private final CollectionFactory cf;

private IntersectIterationUsingSetFromCollectionFactory(CloseableIteration<BindingSet> arg1,
CloseableIteration<BindingSet> arg2, CollectionFactory cf) {
super(arg1, arg2, false, cf::createSetOfBindingSets);
this.cf = cf;
}

@Override
protected void handleClose() throws QueryEvaluationException {
try {
cf.close();
} finally {
super.handleClose();
}
}
}

private final QueryEvaluationStep leftArg;
private final Function<BindingSet, DelayedEvaluationIteration> rightArgDelayed;
private final CollectionFactory collectionFactory;
private final Supplier<CollectionFactory> cfs;

public IntersectionQueryEvaluationStep(QueryEvaluationStep leftArg, QueryEvaluationStep rightArg,
CollectionFactory collectionFactory) {
this.collectionFactory = collectionFactory;
Supplier<CollectionFactory> cfs) {
this.cfs = cfs;
this.leftArg = leftArg;
rightArgDelayed = bs -> new DelayedEvaluationIteration(rightArg, bs);
}

@Override
public CloseableIteration<BindingSet> evaluate(BindingSet bs) {
return new IntersectIterationUsingSetFromCollectionFactory(leftArg.evaluate(bs), rightArgDelayed.apply(bs),
collectionFactory);
CollectionFactory cf = cfs.get();
return new IntersectIteration<>(leftArg.evaluate(bs), rightArgDelayed.apply(bs), cf::createSetOfBindingSets) {
@Override
protected void handleClose() {
try {
cf.close();
} finally {
super.handleClose();
}
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public PathIteration(EvaluationStrategy strategy, Scope scope, Var startVar,

this.currentLength = minLength;
this.bindings = bindings;

this.collectionFactory = strategy.getCollectionFactory().get();

// This is all necessary for optimized collections to be usable. This only becomes important on very large
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import java.util.Optional;

import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
import org.eclipse.rdf4j.federated.cache.SourceSelectionCache;
import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory;
import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache;
Expand Down Expand Up @@ -66,6 +68,7 @@ public class FedXConfig {

private int consumingIterationMax = 1000;

private CollectionFactory cf = new DefaultCollectionFactory();
/* factory like setters */

/**
Expand Down Expand Up @@ -472,4 +475,19 @@ public FedXConfig withConsumingIterationMax(int max) {
public int getConsumingIterationMax() {
return consumingIterationMax;
}

/**
* Set the CollectionFactory to be used by the federation
*
* <p>
* Can only be set before federation initialization.
* </p>
*
* @param cf
* @return the current config
*/
public FedXConfig withCollectionFactory(CollectionFactory cf) {
this.cf = cf;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ protected SailException convert(RuntimeException e) {
};
return new DistinctIteration<Resource>(conv, cf::createSet) {

@Override
protected void handleClose() {
try {
cf.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public FedXRepository create() {
if (this.writeStrategyFactory != null) {
federation.setWriteStrategyFactory(writeStrategyFactory);
}

FedXRepository repo = new FedXRepository(federation, this.config);
if (this.repositoryResolver != null) {
repo.setRepositoryResolver(repositoryResolver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.SingletonIteration;
Expand Down Expand Up @@ -987,5 +989,4 @@ protected CloseableIteration<BindingSet> evaluateAtStatementSources(
throw new QueryEvaluationException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public void beforeEach(ExtensionContext ctx) {
for (Consumer<FedXConfig> configConsumer : configurations) {
configConsumer.accept(fedxConfig);
}

List<Endpoint> endpoints = Collections.<Endpoint>emptyList();
repository = FedXFactory.newFederation().withMembers(endpoints).withConfig(fedxConfig).create();
repository.init();
Expand Down

0 comments on commit a58ff06

Please sign in to comment.