From de0e047bc4a2c0e5ac90b15e875a2278d0c77e1e Mon Sep 17 00:00:00 2001 From: Claus Stadler Date: Fri, 6 Dec 2024 22:24:44 +0100 Subject: [PATCH] GH-2881: Mitigate cases of open iterators when cancelling queries. --- .../sparql/engine/iterator/QueryIterPeek.java | 10 +-- .../engine/iterator/QueryIteratorCheck.java | 39 +++++++++- .../engine/main/StageGeneratorGeneric.java | 18 ++++- .../engine/main/iterator/QueryIterUnion.java | 74 ++++++++++++++----- .../sparql/api/TestQueryExecutionCancel.java | 29 +++++++- 5 files changed, 140 insertions(+), 30 deletions(-) diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterPeek.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterPeek.java index c18fcb34969..37ebaee289c 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterPeek.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterPeek.java @@ -25,23 +25,23 @@ public class QueryIterPeek extends QueryIter1 { - private Binding binding = null ; + private Binding binding = null ; private boolean closed = false ; - + public static QueryIterPeek create(QueryIterator iterator, ExecutionContext cxt) { if ( iterator instanceof QueryIterPeek) return (QueryIterPeek)iterator ; return new QueryIterPeek(iterator, cxt) ; } - + private QueryIterPeek(QueryIterator iterator, ExecutionContext cxt) { super(iterator, cxt) ; } /** Returns the next binding without moving on. Returns "null" for no such element. */ - public Binding peek() + public Binding peek() { if ( closed ) return null ; if ( ! hasNextBinding() ) @@ -73,7 +73,7 @@ protected Binding moveToNextBinding() @Override protected void closeSubIterator() { closed = true ; } - + @Override protected void requestSubCancel() { } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorCheck.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorCheck.java index 5bd11f71a5e..47c4ef4bcea 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorCheck.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIteratorCheck.java @@ -23,15 +23,38 @@ import org.apache.jena.atlas.io.IndentedWriter ; import org.apache.jena.atlas.lib.Lib ; import org.apache.jena.atlas.logging.Log ; +import org.apache.jena.query.QueryException; +import org.apache.jena.sparql.SystemARQ; import org.apache.jena.sparql.engine.ExecutionContext ; import org.apache.jena.sparql.engine.QueryIterator ; import org.apache.jena.sparql.serializer.SerializationContext ; +import org.apache.jena.sparql.util.Symbol; /** Query iterator that checks everything was closed correctly */ public class QueryIteratorCheck extends QueryIteratorWrapper { private ExecutionContext execCxt ; + /** + * Whether detection of open iterator should raise an {@link OpenIteratorException}. + * This symbol should only be used for internal testing. + */ + public static Symbol failOnOpenIterator = SystemARQ.allocSymbol("failOnOpenIterator"); + + /** + * Exception to indicate open iterators. Only used for testing. + * + * @implNote + * This class does not extend {@link QueryException} because those will be caught and handled by {@link QueryIteratorBase#close()}. + */ + public static class OpenIteratorException extends RuntimeException { + private static final long serialVersionUID = 1L; + public OpenIteratorException() { super() ; } + public OpenIteratorException(Throwable cause) { super(cause) ; } + public OpenIteratorException(String msg) { super(msg) ; } + public OpenIteratorException(String msg, Throwable cause) { super(msg, cause) ; } + } + private QueryIteratorCheck(QueryIterator qIter, ExecutionContext execCxt) { super(qIter); if ( qIter instanceof QueryIteratorCheck ) @@ -72,10 +95,18 @@ private static void dump(ExecutionContext execContext, boolean includeAll) { } Iterator iterOpen = execContext.listOpenIterators(); - while (iterOpen.hasNext()) { - QueryIterator qIterOpen = iterOpen.next(); - warn(qIterOpen, "Open iterator: "); - iterOpen.remove(); + if (iterOpen.hasNext()) { + int i = 0; + while (iterOpen.hasNext()) { + QueryIterator qIterOpen = iterOpen.next(); + warn(qIterOpen, "Open iterator [execCxt@" + System.identityHashCode(execContext) + "]: "); + iterOpen.remove(); + ++i; + } + boolean enableException = execContext.getContext().get(failOnOpenIterator, false); + if (enableException) { + throw new OpenIteratorException("Unexpectedly encountered " + i + " open iterators."); + } } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/StageGeneratorGeneric.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/StageGeneratorGeneric.java index 1823c17c14f..1d286cc72bb 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/StageGeneratorGeneric.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/StageGeneratorGeneric.java @@ -20,6 +20,7 @@ import org.apache.jena.atlas.lib.Lib ; import org.apache.jena.atlas.logging.Log ; +import org.apache.jena.query.QueryCancelledException; import org.apache.jena.sparql.core.BasicPattern ; import org.apache.jena.sparql.core.Substitute ; import org.apache.jena.sparql.engine.ExecutionContext ; @@ -49,6 +50,11 @@ public QueryIterator execute(BasicPattern pattern, QueryIterator input, Executio return execute(pattern, reorder, input, execCxt) ; } + /** + * Attempts to construct an iterator that executes the input against the pattern. + * If the construction fails, such as due to {@link QueryCancelledException}, then the exception is passed on + * and the input iterator will be closed. + */ protected QueryIterator execute(BasicPattern pattern, ReorderTransformation reorder, QueryIterator input, ExecutionContext execCxt) { Explain.explain(pattern, execCxt.getContext()) ; @@ -65,13 +71,21 @@ protected QueryIterator execute(BasicPattern pattern, ReorderTransformation reor QueryIterPeek peek = QueryIterPeek.create(input, execCxt) ; // And now use this one input = peek ; - Binding b = peek.peek() ; + Binding b ; + try { + b = peek.peek() ; + } catch (Exception e) { + // Close peek iterator on failure e.g. due to cancellation. + peek.close() ; + e.addSuppressed(new RuntimeException("Error during peek().")) ; + throw e ; + } bgp2 = Substitute.substitute(pattern, b) ; } ReorderProc reorderProc = reorder.reorderIndexes(bgp2) ; pattern = reorderProc.reorder(pattern) ; } Explain.explain("Reorder/generic", pattern, execCxt.getContext()) ; - return PatternMatchData.execute(execCxt.getActiveGraph(), pattern, input, null, execCxt); + return PatternMatchData.execute(execCxt.getActiveGraph(), pattern, input, null, execCxt) ; } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/iterator/QueryIterUnion.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/iterator/QueryIterUnion.java index b22c7b33b17..351f5149ab0 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/iterator/QueryIterUnion.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/main/iterator/QueryIterUnion.java @@ -18,6 +18,7 @@ package org.apache.jena.sparql.engine.main.iterator; +import java.util.Iterator; import java.util.List ; import org.apache.jena.atlas.io.IndentedWriter ; @@ -26,20 +27,18 @@ import org.apache.jena.sparql.engine.ExecutionContext ; import org.apache.jena.sparql.engine.QueryIterator ; import org.apache.jena.sparql.engine.binding.Binding ; -import org.apache.jena.sparql.engine.iterator.QueryIterConcat ; +import org.apache.jena.sparql.engine.iterator.QueryIter; import org.apache.jena.sparql.engine.iterator.QueryIterRepeatApply ; -import org.apache.jena.sparql.engine.iterator.QueryIterSingleton ; import org.apache.jena.sparql.engine.main.QC ; import org.apache.jena.sparql.serializer.SerializationContext ; - /** Execute each sub stage against the input. * Streamed SPARQL Union. */ -public class QueryIterUnion extends QueryIterRepeatApply +public class QueryIterUnion extends QueryIterRepeatApply { protected List subOps ; - + public QueryIterUnion(QueryIterator input, List subOps, ExecutionContext context) @@ -51,21 +50,62 @@ public QueryIterUnion(QueryIterator input, @Override protected QueryIterator nextStage(Binding binding) { - QueryIterConcat unionQIter = new QueryIterConcat(getExecContext()) ; - for (Op subOp : subOps) - { - subOp = QC.substitute(subOp, binding) ; - QueryIterator parent = QueryIterSingleton.create(binding, getExecContext()) ; - QueryIterator qIter = QC.execute(subOp, parent, getExecContext()) ; - unionQIter.add(qIter) ; - } - - return unionQIter ; + Iterator subOpIt = subOps.iterator(); + return new QueryIter(getExecContext()) { + QueryIterator qIter = null; + + @Override + protected void requestCancel() { + performRequestCancel(qIter); + } + + @Override + protected Binding moveToNextBinding() { + return qIter.next(); + } + + @Override + protected boolean hasNextBinding() { + for (;;) { + if (qIter != null) { + if (qIter.hasNext()) { + return true; + } else { + qIter.close(); + qIter = null; + } + } else { + if (subOpIt.hasNext()) { + Op subOp = subOpIt.next(); + qIter = QC.execute(subOp, binding, getExecContext()); + } else { + return false; + } + } + } + } + + @Override + protected void closeIterator() { + performClose(qIter); + } + + @Override + public void output(IndentedWriter out, SerializationContext sCxt) { + QueryIterator subIter = qIter; + out.println(Lib.className(this) + "/" + Lib.className(subIter)); + if (subIter != null) { + out.incIndent(); + subIter.output(out, sCxt); + out.decIndent(); + } + } + }; } - + @Override public void output(IndentedWriter out, SerializationContext sCxt) - { + { out.println(Lib.className(this)) ; out.incIndent() ; for (Op op : subOps) diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java index 7ef22a3a496..f2f36923548 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/api/TestQueryExecutionCancel.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Random; @@ -47,6 +48,8 @@ import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.DatasetGraphFactory; import org.apache.jena.sparql.engine.ExecutionContext; +import org.apache.jena.sparql.engine.iterator.QueryIteratorCheck; +import org.apache.jena.sparql.engine.iterator.QueryIteratorCheck.OpenIteratorException; import org.apache.jena.sparql.exec.QueryExec; import org.apache.jena.sparql.exec.QueryExecBuilder; import org.apache.jena.sparql.expr.NodeValue; @@ -335,6 +338,18 @@ static void cancellationTestForIterator(String queryString, Function qeFactory = () -> QueryExecutionFactory.create(query, model); runConcurrentAbort(taskCount, maxCancelDelayInMillis, qeFactory, TestQueryExecutionCancel::doCount); @@ -375,6 +389,10 @@ public static void runConcurrentAbort(int taskCount, int maxCancelDelay, Callabl } catch (Exception e) { throw new RuntimeException("Failed to build a query execution", e); } + + // Fail if any iterators are not properly closed + qe.getContext().set(QueryIteratorCheck.failOnOpenIterator, true); + Future future = executorService.submit(() -> processor.apply(qe)); int delayToAbort = cancelDelayRandom.nextInt(maxCancelDelay); try { @@ -394,6 +412,13 @@ public static void runConcurrentAbort(int taskCount, int maxCancelDelay, Callabl e.printStackTrace(); } Assert.assertEquals(QueryCancelledException.class, cause.getClass()); + + boolean hasOpenIterators = Arrays.stream(cause.getSuppressed()) + .anyMatch(x -> x instanceof OpenIteratorException); + if (hasOpenIterators) { + throw new RuntimeException("Encountered open iterators.", e); + } + } catch (InterruptedException e) { // Ignored } finally {