Skip to content

Commit

Permalink
Merge pull request #4083 from eclipse/main
Browse files Browse the repository at this point in the history
Merge main into develop
  • Loading branch information
hmottestad authored Jul 27, 2022
2 parents a1e4f71 + 5820209 commit 3f3157e
Show file tree
Hide file tree
Showing 37 changed files with 822 additions and 720 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public Iterable<QueryOptimizer> getOptimizers() {
UNION_SCOPE_CHANGE_OPTIMIZER,
QUERY_MODEL_NORMALIZER,
PROJECTION_REMOVAL_OPTIMIZER, // Make sure this is after the UnionScopeChangeOptimizer
FILTER_OPTIMIZER,
new QueryJoinOptimizer(evaluationStatistics, strategy.isTrackResultSize()),
ITERATIVE_EVALUATION_OPTIMIZER,
FILTER_OPTIMIZER,
ORDER_LIMIT_OPTIMIZER,
PARENT_REFERENCE_CLEANER
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,30 +955,26 @@ private void forceCloseActiveOperations() throws SailException {

if (debugEnabled) {

List<SailException> toThrowExceptions = new ArrayList<>();

var activeIterationsCopy = new IdentityHashMap<>(activeIterationsDebug);
activeIterationsDebug.clear();

for (var entry : activeIterationsCopy.entrySet()) {
var ci = entry.getKey();
Throwable creatorTrace = entry.getValue();

try {
if (creatorTrace != null) {
logger.warn("Forced closing of unclosed iteration that was created in:", creatorTrace);
if (!activeIterationsCopy.isEmpty()) {
for (var entry : activeIterationsCopy.entrySet()) {
try {
logger.warn("Unclosed iteration", entry.getValue());
entry.getKey().close();
} catch (Exception ignored) {
logger.warn("Exception occurred while closing unclosed iterations.", ignored);
}
ci.close();
} catch (SailException e) {
toThrowExceptions.add(e);
} catch (Exception e) {
toThrowExceptions.add(new SailException(e));
}
}

if (!toThrowExceptions.isEmpty()) {
throw toThrowExceptions.get(0);
var entry = activeIterationsCopy.entrySet().stream().findAny().orElseThrow();

throw new SailException(
"Connection closed before all iterations were closed: " + entry.getKey().toString(),
entry.getValue());
}

}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

class RangeIterator implements RecordIterator, NodeListener {

/**
*
*/
private final BTree tree;

private final byte[] searchKey;
Expand Down Expand Up @@ -46,6 +43,8 @@ class RangeIterator implements RecordIterator, NodeListener {

private volatile int currentIdx;

private volatile boolean closed = false;

public RangeIterator(BTree tree, byte[] searchKey, byte[] searchMask, byte[] minValue, byte[] maxValue) {
this.tree = tree;
this.searchKey = searchKey;
Expand Down Expand Up @@ -164,16 +163,23 @@ public void set(byte[] value) {
}

@Override
public synchronized void close() throws IOException {
tree.btreeLock.readLock().lock();
try {
while (popStacks()) {
public void close() throws IOException {
if (!closed) {
synchronized (this) {
if (!closed) {
closed = true;
tree.btreeLock.readLock().lock();
try {
while (popStacks()) {
}

assert parentNodeStack.isEmpty();
assert parentIndexStack.isEmpty();
} finally {
tree.btreeLock.readLock().unlock();
}
}
}

assert parentNodeStack.isEmpty();
assert parentIndexStack.isEmpty();
} finally {
tree.btreeLock.readLock().unlock();
}
}

Expand All @@ -185,7 +191,7 @@ private void pushStacks(Node newChildNode) {
currentIdx = 0;
}

private boolean popStacks() throws IOException {
private synchronized boolean popStacks() throws IOException {
Node nextCurrentNode = currentNode;
if (nextCurrentNode == null) {
// There's nothing to pop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.eclipse.rdf4j.testsuite.repository;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -144,7 +145,7 @@ public void testIterator() {
count++;
}

Assertions.assertTrue(count > 1, "query should have multiple results.");
assertTrue(count > 1, "query should have multiple results.");
}
}

Expand All @@ -161,7 +162,7 @@ public void testCountMatchesAllSelect() {
try (TupleQueryResult result = con.prepareTupleQuery("SELECT * WHERE {?s ?p ?o}").evaluate()) {
long size = con.size();
for (int i = 0; i < size; i++) {
Assertions.assertTrue(result.hasNext());
assertTrue(result.hasNext());
BindingSet next = result.next();
Assertions.assertNotNull(next);
}
Expand Down Expand Up @@ -236,6 +237,10 @@ public void testNotClosingResult() {
for (int i = 0; i < 100; i++) {
try (RepositoryConnection repCon = rep.getConnection()) {
evaluateQueryWithoutClosing(repCon);
} catch (SailException e) {
assertTrue(e.toString()
.startsWith(
"org.eclipse.rdf4j.sail.SailException: Connection closed before all iterations were closed: org.eclipse.rdf4j.sail.helpers.SailBaseIteration@"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@
* Prior to evaluation various optimizations are performed, see
* {@link org.eclipse.rdf4j.federated.optimizer.FedXOptimizer} for further details.
* <p>
*
* <p>
* Since 4.0 FedX supports write operations using the supplied {@link WriteStrategy}, e.g. by writing to a designated
* federation member. Note: the {@link WriteStrategy} is initialized lazily upon first access to a write operation, see
* {@link #getWriteStrategyInternal()}.
*
* <p>
* Implementation notes: - not all methods are implemented as of now
*
* @author Andreas Schwarte
Expand Down Expand Up @@ -101,45 +101,42 @@ protected CloseableIteration<? extends BindingSet, QueryEvaluationException> eva
TupleExpr query, Dataset dataset, BindingSet bindings,
boolean includeInferred) throws SailException {

final TupleExpr _orgQuery = query;
final TupleExpr originalQuery = query;

FederationEvalStrategy strategy = federationContext.createStrategy(dataset);

long start = 0;
QueryInfo queryInfo = null;
if (true) {
String queryString = getOriginalQueryString(bindings);
if (queryString == null) {
log.warn("Query string is null. Please check your FedX setup.");
}
queryInfo = new QueryInfo(queryString, getOriginalBaseURI(bindings), getOriginalQueryType(bindings),
getOriginalMaxExecutionTime(bindings), includeInferred, federationContext, strategy, dataset);

// check if we have pass-through result handler information for single source queries
if (query instanceof PassThroughTupleExpr) {
PassThroughTupleExpr node = ((PassThroughTupleExpr) query);
queryInfo.setResultHandler(node.getResultHandler());
query = node.getExpr();
}
String queryString = getOriginalQueryString(bindings);
if (queryString == null) {
log.warn("Query string is null. Please check your FedX setup.");
}
QueryInfo queryInfo = new QueryInfo(queryString, getOriginalBaseURI(bindings), getOriginalQueryType(bindings),
getOriginalMaxExecutionTime(bindings), includeInferred, federationContext, strategy, dataset);

// check if we have pass-through result handler information for single source queries
if (query instanceof PassThroughTupleExpr) {
PassThroughTupleExpr node = ((PassThroughTupleExpr) query);
queryInfo.setResultHandler(node.getResultHandler());
query = node.getExpr();
}

if (log.isDebugEnabled()) {
log.debug("Optimization start (Query: " + queryInfo.getQueryID() + ")");
start = System.currentTimeMillis();
}
try {
federationContext.getMonitoringService().monitorQuery(queryInfo);
FederationEvaluationStatistics stats = new FederationEvaluationStatistics(queryInfo, dataset);
query = strategy.optimize(query, stats, bindings);
} catch (Exception e) {
log.warn("Exception occured during optimization (Query: " + queryInfo.getQueryID() + "): "
+ e.getMessage());
log.debug("Details: ", e);
throw new SailException(e);
}
if (log.isDebugEnabled()) {
log.debug(("Optimization duration: " + ((System.currentTimeMillis() - start))) + " (Query: "
+ queryInfo.getQueryID() + ")");
}
if (log.isDebugEnabled()) {
log.debug("Optimization start (Query: " + queryInfo.getQueryID() + ")");
start = System.currentTimeMillis();
}
try {
federationContext.getMonitoringService().monitorQuery(queryInfo);
FederationEvaluationStatistics stats = new FederationEvaluationStatistics(queryInfo, dataset);
query = strategy.optimize(query, stats, bindings);
} catch (Exception e) {
log.warn("Exception occured during optimization (Query: " + queryInfo.getQueryID() + "): "
+ e.getMessage());
log.debug("Details: ", e);
throw new SailException(e);
}
if (log.isDebugEnabled()) {
log.debug(("Optimization duration: " + ((System.currentTimeMillis() - start))) + " (Query: "
+ queryInfo.getQueryID() + ")");
}

// log the optimized query plan, if Config#isLogQueryPlan(), otherwise void operation
Expand All @@ -165,17 +162,26 @@ protected CloseableIteration<? extends BindingSet, QueryEvaluationException> eva
});
queryBindings = actualQueryBindings;
}
CloseableIteration<? extends BindingSet, QueryEvaluationException> res = strategy.evaluate(query,
queryBindings);

// mark the query as PassedThrough, such that outer result handlers are aware of this
// Note: for SingleSourceQuery (i.e. where we use pass through) res is explicitly
// EmptyIteration. Thus we can use it as indicator
if (_orgQuery instanceof PassThroughTupleExpr && res instanceof EmptyIteration) {
((PassThroughTupleExpr) _orgQuery).setPassedThrough(true);

CloseableIteration<? extends BindingSet, QueryEvaluationException> res = null;
try {
res = strategy.evaluate(query, queryBindings);

// mark the query as PassedThrough, such that outer result handlers are aware of this
// Note: for SingleSourceQuery (i.e. where we use pass through) res is explicitly
// EmptyIteration. Thus we can use it as indicator
if (originalQuery instanceof PassThroughTupleExpr && res instanceof EmptyIteration) {
((PassThroughTupleExpr) originalQuery).setPassedThrough(true);
}
res = new StopRemainingExecutionsOnCloseIteration(res, queryInfo);
return res;
} catch (Throwable t) {
if (res != null) {
res.close();
}
throw t;
}
res = new StopRemainingExecutionsOnCloseIteration(res, queryInfo);
return res;

} catch (QueryEvaluationException e) {
throw new SailException(e);
}
Expand Down Expand Up @@ -230,12 +236,11 @@ protected void commitInternal() throws SailException {
protected CloseableIteration<? extends Resource, SailException> getContextIDsInternal() throws SailException {

FederationEvalStrategy strategy = federationContext.createStrategy(new SimpleDataset());
final WorkerUnionBase<Resource> union = new SynchronousWorkerUnion<>(
new QueryInfo("getContextIDsInternal", null, QueryType.UNKNOWN, 0,
federationContext.getConfig().getIncludeInferredDefault(), federationContext, strategy,
new SimpleDataset()));
WorkerUnionBase<Resource> union = new SynchronousWorkerUnion<>(new QueryInfo("getContextIDsInternal", null,
QueryType.UNKNOWN, 0, federationContext.getConfig().getIncludeInferredDefault(), federationContext,
strategy, new SimpleDataset()));

for (final Endpoint e : federation.getMembers()) {
for (Endpoint e : federation.getMembers()) {
union.addTask(new ParallelTask<>() {
@Override
public CloseableIteration<Resource, QueryEvaluationException> performTask() throws Exception {
Expand Down Expand Up @@ -286,24 +291,31 @@ protected CloseableIteration<? extends Namespace, SailException> getNamespacesIn
}

@Override
protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(
Resource subj, IRI pred, Value obj, final boolean includeInferred,
final Resource... contexts) throws SailException {
protected CloseableIteration<? extends Statement, SailException> getStatementsInternal(Resource subj, IRI pred,
Value obj, boolean includeInferred, Resource... contexts) throws SailException {

try {
Dataset dataset = new SimpleDataset();
FederationEvalStrategy strategy = federationContext.createStrategy(dataset);
QueryInfo queryInfo = new QueryInfo(subj, pred, obj, 0, includeInferred, federationContext, strategy,
dataset);
federationContext.getMonitoringService().monitorQuery(queryInfo);
CloseableIteration<Statement, QueryEvaluationException> res = strategy.getStatements(queryInfo, subj, pred,
obj, contexts);
return new ExceptionConvertingIteration<>(res) {
@Override
protected SailException convert(Exception e) {
return new SailException(e);
CloseableIteration<Statement, QueryEvaluationException> res = null;
try {
res = strategy.getStatements(queryInfo, subj, pred, obj, contexts);
return new ExceptionConvertingIteration<>(res) {
@Override
protected SailException convert(Exception e) {
return new SailException(e);
}
};
} catch (Throwable t) {
if (res != null) {
res.close();
}
};
throw t;
}

} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
Expand Down Expand Up @@ -365,7 +377,7 @@ protected long sizeInternal(Resource... contexts) throws SailException {
}
}
if (errorEndpoints.size() > 0) {
throw new SailException("Could not determine size for members " + errorEndpoints.toString() +
throw new SailException("Could not determine size for members " + errorEndpoints +
"(Supported for NativeStore and RemoteRepository only). Computed size: " + size);
}
return size;
Expand Down Expand Up @@ -448,7 +460,6 @@ private static int getOriginalMaxExecutionTime(BindingSet b) {
* for the constructor call.
*
* @author as
*
*/
protected static class SailBaseDefaultImpl extends AbstractSail {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.eclipse.rdf4j.federated.algebra;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

Expand Down
Loading

0 comments on commit 3f3157e

Please sign in to comment.