Skip to content

Commit

Permalink
apacheGH-2881: Mitigate cases of open iterators when cancelling queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
Aklakan committed Dec 14, 2024
1 parent d6e124a commit de0e047
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@

public class QueryIterPeek extends QueryIter1
{
private Binding binding = null ;
private Binding binding = null ;
private boolean closed = false ;

public static QueryIterPeek create(QueryIterator iterator, ExecutionContext cxt)
{
if ( iterator instanceof QueryIterPeek)
return (QueryIterPeek)iterator ;
return new QueryIterPeek(iterator, cxt) ;
}

private QueryIterPeek(QueryIterator iterator, ExecutionContext cxt)
{
super(iterator, cxt) ;
}

/** Returns the next binding without moving on. Returns "null" for no such element. */
public Binding peek()
public Binding peek()
{
if ( closed ) return null ;
if ( ! hasNextBinding() )
Expand Down Expand Up @@ -73,7 +73,7 @@ protected Binding moveToNextBinding()
@Override
protected void closeSubIterator()
{ closed = true ; }

@Override
protected void requestSubCancel()
{ }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,38 @@
import org.apache.jena.atlas.io.IndentedWriter ;
import org.apache.jena.atlas.lib.Lib ;
import org.apache.jena.atlas.logging.Log ;
import org.apache.jena.query.QueryException;
import org.apache.jena.sparql.SystemARQ;
import org.apache.jena.sparql.engine.ExecutionContext ;
import org.apache.jena.sparql.engine.QueryIterator ;
import org.apache.jena.sparql.serializer.SerializationContext ;
import org.apache.jena.sparql.util.Symbol;

/** Query iterator that checks everything was closed correctly */
public class QueryIteratorCheck extends QueryIteratorWrapper
{
private ExecutionContext execCxt ;

/**
* Whether detection of open iterator should raise an {@link OpenIteratorException}.
* This symbol should only be used for internal testing.
*/
public static Symbol failOnOpenIterator = SystemARQ.allocSymbol("failOnOpenIterator");

/**
* Exception to indicate open iterators. Only used for testing.
*
* @implNote
* This class does not extend {@link QueryException} because those will be caught and handled by {@link QueryIteratorBase#close()}.
*/
public static class OpenIteratorException extends RuntimeException {
private static final long serialVersionUID = 1L;
public OpenIteratorException() { super() ; }
public OpenIteratorException(Throwable cause) { super(cause) ; }
public OpenIteratorException(String msg) { super(msg) ; }
public OpenIteratorException(String msg, Throwable cause) { super(msg, cause) ; }
}

private QueryIteratorCheck(QueryIterator qIter, ExecutionContext execCxt) {
super(qIter);
if ( qIter instanceof QueryIteratorCheck )
Expand Down Expand Up @@ -72,10 +95,18 @@ private static void dump(ExecutionContext execContext, boolean includeAll) {
}

Iterator<QueryIterator> iterOpen = execContext.listOpenIterators();
while (iterOpen.hasNext()) {
QueryIterator qIterOpen = iterOpen.next();
warn(qIterOpen, "Open iterator: ");
iterOpen.remove();
if (iterOpen.hasNext()) {
int i = 0;
while (iterOpen.hasNext()) {
QueryIterator qIterOpen = iterOpen.next();
warn(qIterOpen, "Open iterator [execCxt@" + System.identityHashCode(execContext) + "]: ");
iterOpen.remove();
++i;
}
boolean enableException = execContext.getContext().get(failOnOpenIterator, false);
if (enableException) {
throw new OpenIteratorException("Unexpectedly encountered " + i + " open iterators.");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.jena.atlas.lib.Lib ;
import org.apache.jena.atlas.logging.Log ;
import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.sparql.core.BasicPattern ;
import org.apache.jena.sparql.core.Substitute ;
import org.apache.jena.sparql.engine.ExecutionContext ;
Expand Down Expand Up @@ -49,6 +50,11 @@ public QueryIterator execute(BasicPattern pattern, QueryIterator input, Executio
return execute(pattern, reorder, input, execCxt) ;
}

/**
* Attempts to construct an iterator that executes the input against the pattern.
* If the construction fails, such as due to {@link QueryCancelledException}, then the exception is passed on
* and the input iterator will be closed.
*/
protected QueryIterator execute(BasicPattern pattern, ReorderTransformation reorder,
QueryIterator input, ExecutionContext execCxt) {
Explain.explain(pattern, execCxt.getContext()) ;
Expand All @@ -65,13 +71,21 @@ protected QueryIterator execute(BasicPattern pattern, ReorderTransformation reor
QueryIterPeek peek = QueryIterPeek.create(input, execCxt) ;
// And now use this one
input = peek ;
Binding b = peek.peek() ;
Binding b ;
try {
b = peek.peek() ;
} catch (Exception e) {
// Close peek iterator on failure e.g. due to cancellation.
peek.close() ;
e.addSuppressed(new RuntimeException("Error during peek().")) ;
throw e ;
}
bgp2 = Substitute.substitute(pattern, b) ;
}
ReorderProc reorderProc = reorder.reorderIndexes(bgp2) ;
pattern = reorderProc.reorder(pattern) ;
}
Explain.explain("Reorder/generic", pattern, execCxt.getContext()) ;
return PatternMatchData.execute(execCxt.getActiveGraph(), pattern, input, null, execCxt);
return PatternMatchData.execute(execCxt.getActiveGraph(), pattern, input, null, execCxt) ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.jena.sparql.engine.main.iterator;

import java.util.Iterator;
import java.util.List ;

import org.apache.jena.atlas.io.IndentedWriter ;
Expand All @@ -26,20 +27,18 @@
import org.apache.jena.sparql.engine.ExecutionContext ;
import org.apache.jena.sparql.engine.QueryIterator ;
import org.apache.jena.sparql.engine.binding.Binding ;
import org.apache.jena.sparql.engine.iterator.QueryIterConcat ;
import org.apache.jena.sparql.engine.iterator.QueryIter;
import org.apache.jena.sparql.engine.iterator.QueryIterRepeatApply ;
import org.apache.jena.sparql.engine.iterator.QueryIterSingleton ;
import org.apache.jena.sparql.engine.main.QC ;
import org.apache.jena.sparql.serializer.SerializationContext ;


/** Execute each sub stage against the input.
* Streamed SPARQL Union. */

public class QueryIterUnion extends QueryIterRepeatApply
public class QueryIterUnion extends QueryIterRepeatApply
{
protected List<Op> subOps ;

public QueryIterUnion(QueryIterator input,
List<Op> subOps,
ExecutionContext context)
Expand All @@ -51,21 +50,62 @@ public QueryIterUnion(QueryIterator input,
@Override
protected QueryIterator nextStage(Binding binding)
{
QueryIterConcat unionQIter = new QueryIterConcat(getExecContext()) ;
for (Op subOp : subOps)
{
subOp = QC.substitute(subOp, binding) ;
QueryIterator parent = QueryIterSingleton.create(binding, getExecContext()) ;
QueryIterator qIter = QC.execute(subOp, parent, getExecContext()) ;
unionQIter.add(qIter) ;
}

return unionQIter ;
Iterator<Op> subOpIt = subOps.iterator();
return new QueryIter(getExecContext()) {
QueryIterator qIter = null;

@Override
protected void requestCancel() {
performRequestCancel(qIter);
}

@Override
protected Binding moveToNextBinding() {
return qIter.next();
}

@Override
protected boolean hasNextBinding() {
for (;;) {
if (qIter != null) {
if (qIter.hasNext()) {
return true;
} else {
qIter.close();
qIter = null;
}
} else {
if (subOpIt.hasNext()) {
Op subOp = subOpIt.next();
qIter = QC.execute(subOp, binding, getExecContext());
} else {
return false;
}
}
}
}

@Override
protected void closeIterator() {
performClose(qIter);
}

@Override
public void output(IndentedWriter out, SerializationContext sCxt) {
QueryIterator subIter = qIter;
out.println(Lib.className(this) + "/" + Lib.className(subIter));
if (subIter != null) {
out.incIndent();
subIter.output(out, sCxt);
out.decIndent();
}
}
};
}

@Override
public void output(IndentedWriter out, SerializationContext sCxt)
{
{
out.println(Lib.className(this)) ;
out.incIndent() ;
for (Op op : subOps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
Expand All @@ -47,6 +48,8 @@
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.DatasetGraphFactory;
import org.apache.jena.sparql.engine.ExecutionContext;
import org.apache.jena.sparql.engine.iterator.QueryIteratorCheck;
import org.apache.jena.sparql.engine.iterator.QueryIteratorCheck.OpenIteratorException;
import org.apache.jena.sparql.exec.QueryExec;
import org.apache.jena.sparql.exec.QueryExecBuilder;
import org.apache.jena.sparql.expr.NodeValue;
Expand Down Expand Up @@ -335,6 +338,18 @@ static <T> void cancellationTestForIterator(String queryString, Function<QueryEx
* If this test hangs then it is likely that something went wrong in the cancellation machinery. */
@Test(timeout = 10000)
public void test_cancel_concurrent_1() {
// Create a query that creates 3 cross joins - resulting in one billion result rows.
test_cancel_concurrent("SELECT * { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }");
}

@Test(timeout = 10000)
public void test_cancel_concurrent_2() {
// Create a query that creates 3 cross joins - resulting in one billion result rows.
// Tests against additional operators, namely UNION and BIND.
test_cancel_concurrent("SELECT * { { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . } UNION { BIND('x' AS ?x) } }");
}

private static void test_cancel_concurrent(String queryString) {
int maxCancelDelayInMillis = 100;

int cpuCount = Runtime.getRuntime().availableProcessors();
Expand All @@ -344,8 +359,7 @@ public void test_cancel_concurrent_1() {
// Create a model with 1000 triples
Model model = ModelFactory.createModelForGraph(createTestGraph());

// Create a query that creates 3 cross joins - resulting in one billion result rows
Query query = QueryFactory.create("SELECT * { ?a ?b ?c . ?d ?e ?f . ?g ?h ?i . }");
Query query = QueryFactory.create(queryString);
Callable<QueryExecution> qeFactory = () -> QueryExecutionFactory.create(query, model);

runConcurrentAbort(taskCount, maxCancelDelayInMillis, qeFactory, TestQueryExecutionCancel::doCount);
Expand Down Expand Up @@ -375,6 +389,10 @@ public static void runConcurrentAbort(int taskCount, int maxCancelDelay, Callabl
} catch (Exception e) {
throw new RuntimeException("Failed to build a query execution", e);
}

// Fail if any iterators are not properly closed
qe.getContext().set(QueryIteratorCheck.failOnOpenIterator, true);

Future<?> future = executorService.submit(() -> processor.apply(qe));
int delayToAbort = cancelDelayRandom.nextInt(maxCancelDelay);
try {
Expand All @@ -394,6 +412,13 @@ public static void runConcurrentAbort(int taskCount, int maxCancelDelay, Callabl
e.printStackTrace();
}
Assert.assertEquals(QueryCancelledException.class, cause.getClass());

boolean hasOpenIterators = Arrays.stream(cause.getSuppressed())
.anyMatch(x -> x instanceof OpenIteratorException);
if (hasOpenIterators) {
throw new RuntimeException("Encountered open iterators.", e);
}

} catch (InterruptedException e) {
// Ignored
} finally {
Expand Down

0 comments on commit de0e047

Please sign in to comment.