diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/CloseableIterationSpliterator.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/CloseableIterationSpliterator.java index d43767605d6..6ff73bb7ae9 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/CloseableIterationSpliterator.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/CloseableIterationSpliterator.java @@ -47,9 +47,15 @@ public boolean tryAdvance(Consumer action) { return false; } } catch (Throwable e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } try { iteration.close(); } catch (Exception ex) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(ex); } if (e instanceof Error) { @@ -70,6 +76,9 @@ public void forEachRemaining(Consumer action) { action.accept(iteration.next()); } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (e instanceof RuntimeException) { throw (RuntimeException) e; } diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/ExceptionConvertingIteration.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/ExceptionConvertingIteration.java index 46f802632d7..29dfc421a5d 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/ExceptionConvertingIteration.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/ExceptionConvertingIteration.java @@ -71,6 +71,9 @@ public boolean hasNext() throws X { } return result; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw convert(e); } } @@ -95,6 +98,9 @@ public E next() throws X { } catch (IllegalStateException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw convert(e); } } @@ -117,6 +123,9 @@ public void remove() throws X { } catch (UnsupportedOperationException | IllegalStateException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw convert(e); } } @@ -132,6 +141,9 @@ protected void handleClose() throws X { try { Iterations.closeCloseable(iter); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw convert(e); } } diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IterationSpliterator.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IterationSpliterator.java index c6595edbe0c..65ec8eee032 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IterationSpliterator.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/IterationSpliterator.java @@ -55,6 +55,9 @@ public boolean tryAdvance(Consumer action) { } return false; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (e instanceof RuntimeException) { throw (RuntimeException) e; } @@ -77,6 +80,9 @@ public void forEachRemaining(final Consumer action) { action.accept(iteration.next()); } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (e instanceof RuntimeException) { throw (RuntimeException) e; } diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/Iterations.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/Iterations.java index d51b9df3b1c..f068e2235ee 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/Iterations.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/Iterations.java @@ -144,6 +144,9 @@ public static Stream stream(Iteration iteration) } catch (RuntimeException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } }); @@ -166,6 +169,9 @@ public static Stream stream(CloseableIteration it } catch (RuntimeException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } }); diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/QueueIteration.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/QueueIteration.java index 154fa7a55b7..545c7b0db07 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/QueueIteration.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/QueueIteration.java @@ -105,7 +105,6 @@ public void put(E item) throws InterruptedException, T { close(); } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); close(); throw e; } @@ -153,7 +152,8 @@ public E getNextElement() throws T { } catch (InterruptedException e) { checkException(); close(); - throw convert(e); + Thread.currentThread().interrupt(); + return null; } } @@ -171,12 +171,16 @@ public void handleClose() throws T { } public void checkException() throws T { - if (!exceptions.isEmpty()) { + while (!exceptions.isEmpty()) { try { close(); throw exceptions.remove(); } catch (Exception e) { - throw convert(e); + if (e instanceof InterruptedException || Thread.interrupted()) { + Thread.currentThread().interrupt(); + } else { + throw convert(e); + } } } } diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/SilentIteration.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/SilentIteration.java index c5da60a9fa2..12c385ac304 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/SilentIteration.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/SilentIteration.java @@ -33,6 +33,9 @@ public boolean hasNext() throws E { try { return super.hasNext(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (logger.isTraceEnabled()) { logger.trace("Suppressed error in SILENT iteration: " + e.getMessage(), e); } @@ -48,6 +51,9 @@ public T next() throws E { // pass through throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (logger.isTraceEnabled()) { logger.trace("Converted error in SILENT iteration: " + e.getMessage(), e); } @@ -60,6 +66,9 @@ protected void handleClose() throws E { try { super.handleClose(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (logger.isTraceEnabled()) { logger.trace("Suppressed error in SILENT iteration: " + e.getMessage(), e); } diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/TimeLimitIteration.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/TimeLimitIteration.java index 3393ee64c7f..760fcf5f476 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/TimeLimitIteration.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/TimeLimitIteration.java @@ -108,6 +108,9 @@ private void checkInterrupted() throws X { try { close(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } logger.warn("TimeLimitIteration timed out and failed to close successfully: ", e); } } @@ -134,6 +137,9 @@ void interrupt() { try { close(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } logger.warn("TimeLimitIteration timed out and failed to close successfully: ", e); } } diff --git a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/UnionIteration.java b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/UnionIteration.java index 71552c3c7ff..db2aecbc685 100644 --- a/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/UnionIteration.java +++ b/core/common/iterator/src/main/java/org/eclipse/rdf4j/common/iteration/UnionIteration.java @@ -100,6 +100,9 @@ protected void handleClose() throws X { try { Iterations.closeCloseable(argIter.next()); } catch (Throwable e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } collectedExceptions.add(e); } } diff --git a/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/BackgroundResultExecutor.java b/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/BackgroundResultExecutor.java index 4a7a7ffa9f9..56a1abd1e24 100644 --- a/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/BackgroundResultExecutor.java +++ b/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/BackgroundResultExecutor.java @@ -62,6 +62,9 @@ public void close() { try { onclose.close(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } logger.error(e.toString(), e); } } diff --git a/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SharedHttpClientSessionManager.java b/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SharedHttpClientSessionManager.java index a70691e5906..73865fc4a91 100644 --- a/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SharedHttpClientSessionManager.java +++ b/core/http/client/src/main/java/org/eclipse/rdf4j/http/client/SharedHttpClientSessionManager.java @@ -91,15 +91,16 @@ public boolean retryRequest(IOException ioe, int count, HttpContext context) { } HttpClientContext clientContext = HttpClientContext.adapt(context); HttpConnection conn = clientContext.getConnection(); - - synchronized (this) { - if (conn.isStale()) { - try { - logger.warn("Closing stale connection"); - conn.close(); - return true; - } catch (IOException e) { - logger.error("Error closing stale connection", e); + if (conn != null) { + synchronized (this) { + if (conn.isStale()) { + try { + logger.warn("Closing stale connection"); + conn.close(); + return true; + } catch (IOException e) { + logger.error("Error closing stale connection", e); + } } } } diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/evaluationsteps/StatementPatternQueryEvaluationStep.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/evaluationsteps/StatementPatternQueryEvaluationStep.java index f329ea55570..cb0b965b9da 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/evaluationsteps/StatementPatternQueryEvaluationStep.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/impl/evaluationsteps/StatementPatternQueryEvaluationStep.java @@ -252,6 +252,9 @@ private JoinStatementWithBindingSetIterator getIteration(BindingSet bindings) { if (iteration != null) { iteration.close(); } + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new QueryEvaluationException(t); } } @@ -284,6 +287,9 @@ private ConvertStatementToBindingSetIterator getIteration() { if (iteration != null) { iteration.close(); } + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new QueryEvaluationException(t); } } diff --git a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/QueryJoinOptimizer.java b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/QueryJoinOptimizer.java index 7fb9b8043e5..bb21e7102b2 100644 --- a/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/QueryJoinOptimizer.java +++ b/core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/QueryJoinOptimizer.java @@ -652,6 +652,9 @@ public List getVars() { try { tupleExpr.visit(this); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new IllegalStateException(e); } if (vars == null) { diff --git a/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/federation/JoinExecutorBase.java b/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/federation/JoinExecutorBase.java index 71583091be4..04b268395f4 100644 --- a/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/federation/JoinExecutorBase.java +++ b/core/repository/sparql/src/main/java/org/eclipse/rdf4j/repository/sparql/federation/JoinExecutorBase.java @@ -69,6 +69,9 @@ public final void run() { try { handleBindings(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } toss(e); } finally { finished = true; diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java index 44bcbb656d7..b1d48dfb437 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java @@ -966,8 +966,11 @@ private void forceCloseActiveOperations() throws SailException { try { logger.warn("Unclosed iteration", entry.getValue()); entry.getKey().close(); - } catch (Exception ignored) { - logger.warn("Exception occurred while closing unclosed iterations.", ignored); + } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + logger.warn("Exception occurred while closing unclosed iterations.", e); } } diff --git a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/CleanerIteration.java b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/CleanerIteration.java index aae42b479d1..6d8a662997f 100644 --- a/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/CleanerIteration.java +++ b/core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/CleanerIteration.java @@ -67,6 +67,9 @@ public void run() { "Forced closing of unclosed iteration. Set the system property 'org.eclipse.rdf4j.repository.debug' to 'true' to get stack traces."); iteration.close(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } } diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/TripleSourceIterationWrapper.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/TripleSourceIterationWrapper.java index 4c6f52e7ca3..c2c2dd1f885 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/TripleSourceIterationWrapper.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/TripleSourceIterationWrapper.java @@ -46,6 +46,9 @@ public boolean hasNext() throws QueryEvaluationException { } catch (QueryEvaluationException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new QueryEvaluationException(e); } } @@ -70,6 +73,9 @@ public T next() throws QueryEvaluationException { } catch (IllegalStateException | QueryEvaluationException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new QueryEvaluationException(e); } } @@ -92,6 +98,9 @@ public void remove() throws QueryEvaluationException { } catch (UnsupportedOperationException | IllegalStateException | QueryEvaluationException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new QueryEvaluationException(e); } } diff --git a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java index b298a14be38..c6e6939646a 100644 --- a/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java +++ b/core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceConnection.java @@ -359,6 +359,9 @@ private boolean runQueryForExplain(TupleExpr tupleExpr, Dataset dataset, Binding evaluate.next(); } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (!timedOut.get()) { throw e; } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java index cf82c18f719..d841474ecc4 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedXConnection.java @@ -322,6 +322,9 @@ protected SailException convert(Exception e) { } catch (RuntimeException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new SailException(e); } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java index e417a1010ed..0b2df494f4d 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java @@ -202,7 +202,7 @@ public void addEndpoint(Endpoint e, boolean... updateStrategy) throws FedXRuntim federationContext.getEndpointManager().addEndpoint(e); if (updateStrategy == null || updateStrategy.length == 0 - || (updateStrategy.length == 1 && updateStrategy[0] == true)) { + || (updateStrategy.length == 1 && updateStrategy[0])) { updateFederationType(); } } @@ -240,7 +240,7 @@ public void removeEndpoint(Endpoint e, boolean... updateStrategy) throws Reposit federationContext.getEndpointManager().removeEndpoint(e); if (updateStrategy == null || updateStrategy.length == 0 - || (updateStrategy.length == 1 && updateStrategy[0] == true)) { + || (updateStrategy.length == 1 && updateStrategy[0])) { updateFederationType(); } } @@ -275,35 +275,60 @@ public void removeAll() throws RepositoryException { */ public synchronized void shutDown() throws FedXException { - log.info("Shutting down federation and all underlying repositories ..."); - // Abort all running queries - federationContext.getQueryManager().shutdown(); - executor.shutdown(); try { - executor.awaitTermination(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - log.warn("Failed to shutdown executor:" + e.getMessage()); - log.debug("Details:", e); - } - try { - joinScheduler.shutdown(); - } catch (Exception e) { - log.warn("Failed to shutdown join scheduler: " + e.getMessage()); - log.debug("Details: ", e); - } - try { - unionScheduler.shutdown(); - } catch (Exception e) { - log.warn("Failed to shutdown union scheduler: " + e.getMessage()); - log.debug("Details: ", e); - } - try { - leftJoinScheduler.shutdown(); - } catch (Exception e) { - log.warn("Failed to shutdown left join scheduler: " + e.getMessage()); - log.debug("Details: ", e); + log.info("Shutting down federation and all underlying repositories ..."); + // Abort all running queries + federationContext.getQueryManager().shutdown(); + executor.shutdown(); + try { + executor.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + log.warn("Failed to shutdown executor:" + e.getMessage()); + log.debug("Details:", e); + Thread.currentThread().interrupt(); + } finally { + executor.shutdownNow(); + } + } finally { + try { + try { + joinScheduler.shutdown(); + } catch (Exception e) { + log.warn("Failed to shutdown join scheduler: " + e.getMessage()); + log.debug("Details: ", e); + } finally { + joinScheduler.abort(); + } + } finally { + try { + try { + unionScheduler.shutdown(); + } catch (Exception e) { + log.warn("Failed to shutdown union scheduler: " + e.getMessage()); + log.debug("Details: ", e); + } finally { + unionScheduler.abort(); + } + } finally { + try { + try { + leftJoinScheduler.shutdown(); + } catch (Exception e) { + log.warn("Failed to shutdown left join scheduler: " + e.getMessage()); + log.debug("Details: ", e); + } finally { + leftJoinScheduler.abort(); + } + } finally { + federationContext.getFederatedServiceResolver().shutDown(); + } + + } + + } + } - federationContext.getFederatedServiceResolver().shutDown(); + } /** diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/StatementSourcePattern.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/StatementSourcePattern.java index 3d60f063636..5f6a36bb72e 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/StatementSourcePattern.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/StatementSourcePattern.java @@ -117,10 +117,14 @@ public CloseableIteration evaluate(Binding } } catch (RepositoryException | MalformedQueryException e) { - union.close(); + if (union != null) { + union.close(); + } throw new QueryEvaluationException(e); } catch (Throwable t) { - union.close(); + if (union != null) { + union.close(); + } throw t; } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java index 68d97386fca..9e43a5f8d3b 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java @@ -507,26 +507,22 @@ protected QueryEvaluationStep prepareNJoin(NJoin join, QueryEvaluationContext co ControlledWorkerScheduler joinScheduler = federationContext.getManager().getJoinScheduler(); - return new QueryEvaluationStep() { - - @Override - public CloseableIteration evaluate(BindingSet bindings) { - boolean completed = false; - CloseableIteration result = resultProvider.evaluate(bindings); - try { - for (int i = 1, n = join.getNumberOfArguments(); i < n; i++) { - - result = executeJoin(joinScheduler, result, join.getArg(i), join.getJoinVariables(i), bindings, - join.getQueryInfo()); - } - completed = true; - } finally { - if (!completed) { - result.close(); - } + return bindings -> { + boolean completed = false; + CloseableIteration result = resultProvider.evaluate(bindings); + try { + for (int i = 1, n = join.getNumberOfArguments(); i < n; i++) { + + result = executeJoin(joinScheduler, result, join.getArg(i), join.getJoinVariables(i), bindings, + join.getQueryInfo()); + } + completed = true; + } finally { + if (!completed) { + result.close(); } - return result; } + return result; }; } @@ -634,23 +630,19 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext ControlledWorkerScheduler unionScheduler = federationContext.getManager().getUnionScheduler(); ControlledWorkerUnion unionRunnable = new ControlledWorkerUnion<>(unionScheduler, union.getQueryInfo()); + int numberOfArguments = union.getNumberOfArguments(); QueryEvaluationStep[] args = new QueryEvaluationStep[numberOfArguments]; for (int i = 0; i < numberOfArguments; i++) { args[i] = precompile(union.getArg(i), context); } - return new QueryEvaluationStep() { - - @Override - public CloseableIteration evaluate(BindingSet bindings) { - for (int i = 0; i < numberOfArguments; i++) { - unionRunnable.addTask(new ParallelUnionOperatorTask(unionRunnable, args[i], bindings)); - } - executor.execute(unionRunnable); - - return unionRunnable; + return bindings -> { + for (int i = 0; i < numberOfArguments; i++) { + unionRunnable.addTask(new ParallelUnionOperatorTask(unionRunnable, args[i], bindings)); } + executor.execute(unionRunnable); + return unionRunnable; }; } @@ -706,14 +698,7 @@ protected QueryEvaluationStep prepareExclusiveTupleExpr( throws RepositoryException, MalformedQueryException, QueryEvaluationException { if (expr instanceof StatementTupleExpr) { - return new QueryEvaluationStep() { - - @Override - public CloseableIteration evaluate(BindingSet bindings) { - return ((StatementTupleExpr) expr).evaluate(bindings); - } - - }; + return bindings -> ((StatementTupleExpr) expr).evaluate(bindings); } if (!(expr instanceof ExclusiveTupleExprRenderer)) { @@ -725,25 +710,22 @@ public CloseableIteration evaluate(Binding .getEndpoint(expr.getOwner().getEndpointID()); TripleSource t = ownedEndpoint.getTripleSource(); - return new QueryEvaluationStep() { - @Override - public CloseableIteration evaluate(BindingSet bindings) { - AtomicBoolean isEvaluated = new AtomicBoolean(false); - FilterValueExpr filterValueExpr = null; // TODO consider optimization using FilterTuple - try { - String preparedQuery = QueryStringUtil.selectQueryString((ExclusiveTupleExprRenderer) expr, - bindings, filterValueExpr, isEvaluated, expr.getQueryInfo().getDataset()); - return t.getStatements(preparedQuery, bindings, (isEvaluated.get() ? null : filterValueExpr), - expr.getQueryInfo()); - } catch (IllegalQueryException e) { - /* no projection vars, e.g. local vars only, can occur in joins */ - if (t.hasStatements(expr, bindings)) { - return new SingleBindingSetIteration(bindings); - } - return new EmptyIteration<>(); + return bindings -> { + AtomicBoolean isEvaluated = new AtomicBoolean(false); + FilterValueExpr filterValueExpr = null; // TODO consider optimization using FilterTuple + try { + String preparedQuery = QueryStringUtil.selectQueryString((ExclusiveTupleExprRenderer) expr, + bindings, filterValueExpr, isEvaluated, expr.getQueryInfo().getDataset()); + return t.getStatements(preparedQuery, bindings, (isEvaluated.get() ? null : filterValueExpr), + expr.getQueryInfo()); + } catch (IllegalQueryException e) { + /* no projection vars, e.g. local vars only, can occur in joins */ + if (t.hasStatements(expr, bindings)) { + return new SingleBindingSetIteration(bindings); } - + return new EmptyIteration<>(); } + }; } @@ -843,13 +825,9 @@ protected QueryValueEvaluationStep prepare(FilterExpr node, QueryEvaluationConte throws ValueExprEvaluationException, QueryEvaluationException { QueryValueEvaluationStep expr = precompile(node.getExpression(), context); - return new QueryValueEvaluationStep() { - - @Override - public Value evaluate(BindingSet bindings) throws ValueExprEvaluationException, QueryEvaluationException { - Value v = expr.evaluate(bindings); - return BooleanLiteral.valueOf(QueryEvaluationUtil.getEffectiveBooleanValue(v)); - } + return bindings -> { + Value v = expr.evaluate(bindings); + return BooleanLiteral.valueOf(QueryEvaluationUtil.getEffectiveBooleanValue(v)); }; } @@ -867,25 +845,21 @@ protected QueryValueEvaluationStep prepare(ConjunctiveFilterExpr node, QueryEval .map((e) -> precompile(e, context)) .collect(Collectors.toList()); - return new QueryValueEvaluationStep() { - - @Override - public Value evaluate(BindingSet bindings) throws ValueExprEvaluationException, QueryEvaluationException { - ValueExprEvaluationException error = null; - try { - for (QueryValueEvaluationStep ves : collect) { - Value v = ves.evaluate(bindings); - if (QueryEvaluationUtil.getEffectiveBooleanValue(v) == false) { - return BooleanLiteral.FALSE; - } + return bindings -> { + ValueExprEvaluationException error = null; + try { + for (QueryValueEvaluationStep ves : collect) { + Value v = ves.evaluate(bindings); + if (QueryEvaluationUtil.getEffectiveBooleanValue(v) == false) { + return BooleanLiteral.FALSE; } - } catch (ValueExprEvaluationException e) { - error = e; } - if (error != null) - throw error; - return BooleanLiteral.TRUE; + } catch (ValueExprEvaluationException e) { + error = e; } + if (error != null) + throw error; + return BooleanLiteral.TRUE; }; } @@ -945,6 +919,9 @@ protected CloseableIteration evaluateAtSta return result; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new QueryEvaluationException(e); } } @@ -979,6 +956,9 @@ protected CloseableIteration evaluateAtSta return result; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new QueryEvaluationException(e); } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java index 23ad67d7861..a34ab3e13ef 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java @@ -101,6 +101,9 @@ public CloseableIteration evaluateBoundJoi if (result != null) { result.close(); } + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw ExceptionUtil.toQueryEvaluationException(t); } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index bf9525c6a73..f78b32b78bf 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -32,9 +32,7 @@ * pool with a fixed number of worker threads. Once notified a worker picks the next task from the queue and executes * it. The results is then returned to the controlling instance retrieved from the task. * - * * @author Andreas Schwarte - * * @see ControlledWorkerUnion * @see ControlledWorkerJoin * @see ControlledWorkerBoundJoin @@ -80,7 +78,7 @@ public ControlledWorkerScheduler(int nWorkers, String name) { */ @Override public void schedule(ParallelTask task) { - + assert !task.getControl().isFinished(); Runnable runnable = new WorkerRunnable(task); // Note: for specific use-cases the runnable may be wrapped (e.g. to allow injection of thread-contexts). By @@ -139,13 +137,16 @@ private ExecutorService createExecutorService() { @Override public void abort() { - log.info("Aborting workers of " + name + "."); + if (!executor.isTerminated()) { + log.info("Aborting workers of " + name + "."); - executor.shutdownNow(); - try { - executor.awaitTermination(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new FedXRuntimeException(e); + executor.shutdownNow(); + try { + executor.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new FedXRuntimeException(e); + } } } @@ -185,7 +186,6 @@ public boolean isRunning() { * Determine if there are still task running or queued for the specified control. * * @param control - * * @return true, if there are unfinished tasks, false otherwise */ public boolean isRunning(ParallelExecutor control) { @@ -215,13 +215,22 @@ public void run() { return; } + if (Thread.currentThread().isInterrupted()) { + return; + } + ParallelExecutor taskControl = task.getControl(); + if (taskControl.isFinished()) { + return; + } + CloseableIteration res = null; try { if (log.isTraceEnabled()) { - log.trace("Performing task " + task.toString() + " in " + Thread.currentThread().getName()); + log.trace("Performing task " + task + " in " + Thread.currentThread().getName()); } + res = task.performTask(); taskControl.addResult(res); if (aborted) { @@ -230,6 +239,9 @@ public void run() { taskControl.done(); } catch (Throwable t) { + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.debug("Exception encountered while evaluating task (" + t.getClass().getSimpleName() + "): " + t.getMessage()); @@ -271,6 +283,7 @@ public void shutdown() { try { executor.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new FedXRuntimeException(e); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/FedXQueueCursor.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/FedXQueueCursor.java index b483eb3d668..2fd86a03d96 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/FedXQueueCursor.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/FedXQueueCursor.java @@ -56,6 +56,9 @@ protected QueryEvaluationException convert(Exception e) { if (e instanceof QueryEvaluationException) { return (QueryEvaluationException) e; } + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } return super.convert(e); } @@ -76,6 +79,9 @@ public void handleClose() throws QueryEvaluationException { ((CloseableIteration) take).close(); } } catch (Throwable t) { + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (throwable != null) { t.addSuppressed(throwable); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelExecutorBase.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelExecutorBase.java index 4c7a0348111..ae053c9b5f4 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelExecutorBase.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelExecutorBase.java @@ -29,9 +29,8 @@ /** * Base class for common parallel executors such as {@link JoinExecutorBase} and {@link UnionExecutorBase}. * - * @author Andreas Schwarte - * * @param + * @author Andreas Schwarte * @see JoinExecutorBase * @see UnionExecutorBase */ @@ -67,6 +66,9 @@ public final void run() { } evaluationThread = Thread.currentThread(); + if (evaluationThread.isInterrupted()) { + return; + } if (log.isTraceEnabled()) { log.trace("Performing execution of " + getDisplayId() + ", thread: " + evaluationThread.getName()); @@ -80,6 +82,9 @@ public final void run() { } done(); + } catch (InterruptedException e) { + toss(ExceptionUtil.toException(e)); + evaluationThread.interrupt(); } catch (Throwable t) { toss(ExceptionUtil.toException(t)); } finally { @@ -92,7 +97,7 @@ public final void run() { /** * Perform the parallel execution. - * + *

* Note that this method must block until the execution is completed. * * @throws Exception @@ -117,6 +122,7 @@ public void addResult(CloseableIteration res) { res.close(); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); res.close(); throw new RuntimeException("Error adding element to right queue", e); } @@ -171,22 +177,26 @@ protected void checkTimeout() throws QueryInterruptedException { @Override public void handleClose() throws QueryEvaluationException { - try { - rightQueue.close(); - } finally { - - if (rightIter != null) { - try { - rightIter.close(); - rightIter = null; - } catch (Throwable ignore) { - log.trace("Failed to send interrupt signal:", ignore); + try { + rightQueue.close(); + } finally { + if (rightIter != null) { + try { + rightIter.close(); + rightIter = null; + } catch (Throwable ignore) { + if (ignore instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + log.trace("Failed to send interrupt signal:", ignore); + } } } + } finally { + super.handleClose(); } - super.handleClose(); } /** @@ -216,7 +226,6 @@ public String getDisplayId() { } /** - * * @return the executor type, e.g. "Join". Default "Executor" */ protected String getExecutorType() { diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelServiceExecutor.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelServiceExecutor.java index 4e79238bfce..bf6c89455a9 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelServiceExecutor.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelServiceExecutor.java @@ -130,6 +130,7 @@ protected BindingSet getNextElement() throws QueryEvaluationException { throw new QueryInterruptedException("Timeout during service evaluation"); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.debug("Error while evaluating service expression. Thread got interrupted."); error = e; } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTask.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTask.java index c8fd34c0959..0b95dca5323 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTask.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTask.java @@ -44,4 +44,11 @@ default QueryInfo getQueryInfo() { * Optional implementation to cancel this task on a best effort basis */ void cancel(); + + /** + * Optional implementation to close this task on a best effort basis. + */ + default void close() { + + } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTaskBase.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTaskBase.java index e3b3d668f6e..cfbf0874eab 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTaskBase.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ParallelTaskBase.java @@ -13,47 +13,66 @@ import java.util.concurrent.Future; import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.query.QueryEvaluationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class ParallelTaskBase implements ParallelTask { - private static final Logger _log = LoggerFactory.getLogger(ParallelExecutorBase.class); + private static final Logger logger = LoggerFactory.getLogger(ParallelExecutorBase.class); protected Future scheduledFuture; private CloseableIteration closableIter; private volatile boolean cancelled = false; + private volatile boolean closed = false; @Override public void cancel() { cancelled = true; - - if (scheduledFuture != null) { - if (scheduledFuture.isDone()) { - _log.trace("Task is already done: " + toString()); - } else { - _log.debug("Attempting to cancel task " + toString()); - boolean successfullyCanceled = scheduledFuture.cancel(true); - if (!successfullyCanceled) { - _log.debug("Task " + toString() + " could not be cancelled properly."); - } - } - } - if (closableIter != null) { - closableIter.close(); - } + close(); } @Override public CloseableIteration performTask() throws Exception { + if (closed) { + return new EmptyIteration<>(); + } if (cancelled) { throw new QueryEvaluationException("Evaluation has been cancelled"); } - closableIter = performTaskInternal(); + try { + closableIter = performTaskInternal(); + } catch (Exception e) { + if (Thread.interrupted() || e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + if (closed) { + logger.trace( + "Exception was thrown while performing task, but it was ignored because the task was closed.", + e); + return new EmptyIteration<>(); + } else if (cancelled) { + throw new QueryEvaluationException("Evaluation has been cancelled", e); + } else { + throw new QueryEvaluationException("Evaluation has been interrupted", e); + } + } else if (closed) { + assert Thread.currentThread() + .isInterrupted() : "Exception was thrown and task was closed, but the current thread is not interrupted which means that the exception was either something bad or some code forgot to re-interrupt the current thread: " + + e; + logger.trace( + "Exception was thrown while performing task, but it was ignored because the task was closed.", + e); + return new EmptyIteration<>(); + } - if (cancelled) { + assert !cancelled && !closed; + + throw e; + } + + if (cancelled || closed) { // proactively close when this task has been cancelled in the meantime closableIter.close(); } @@ -71,4 +90,34 @@ public void setScheduledFuture(Future future) { public String toString() { return getClass().getSimpleName() + " (Query: " + getQueryInfo().getQueryID() + ")"; } + + @Override + public void close() { + if (!closed) { + closed = true; + try { + Future scheduledFuture = this.scheduledFuture; + this.scheduledFuture = null; + + if (scheduledFuture != null) { + if (scheduledFuture.isDone()) { + logger.trace("Task is already done: {}", this); + } else { + logger.debug("Attempting to cancel task {}", this); + boolean successfullyCanceled = scheduledFuture.cancel(true); + if (!successfullyCanceled) { + logger.debug("Task {} could not be cancelled properly.", this); + } + } + } + + } finally { + if (closableIter != null) { + closableIter.close(); + } + } + + } + + } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/union/ControlledWorkerUnion.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/union/ControlledWorkerUnion.java index 65e47238b8a..33afcd447fb 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/union/ControlledWorkerUnion.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/union/ControlledWorkerUnion.java @@ -15,6 +15,7 @@ import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; import org.eclipse.rdf4j.federated.structures.QueryInfo; +import org.eclipse.rdf4j.query.QueryEvaluationException; /** * Execution of union tasks with {@link ControlledWorkerScheduler}. Tasks can be added using the provided functions. @@ -22,7 +23,6 @@ * Results are then contained in this iteration. * * @author Andreas Schwarte - * */ public class ControlledWorkerUnion extends WorkerUnionBase { @@ -61,4 +61,15 @@ public void toss(Exception e) { super.toss(e); phaser.arriveAndDeregister(); } + + @Override + public void handleClose() throws QueryEvaluationException { + try { + super.handleClose(); + } finally { + // signal the phaser to close (if currently being blocked) + phaser.forceTermination(); + } + } + } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/union/WorkerUnionBase.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/union/WorkerUnionBase.java index 0df842502ee..0151cc2b453 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/union/WorkerUnionBase.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/union/WorkerUnionBase.java @@ -15,12 +15,12 @@ import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask; import org.eclipse.rdf4j.federated.structures.QueryInfo; +import org.eclipse.rdf4j.query.QueryEvaluationException; /** * Base class for worker unions providing convenience functions to add tasks. * * @author Andreas Schwarte - * * @see SynchronousWorkerUnion * @see ControlledWorkerUnion */ @@ -43,4 +43,15 @@ public void addTask(ParallelTask task) { } tasks.add(task); } + + @Override + public void handleClose() throws QueryEvaluationException { + try { + for (ParallelTask task : tasks) { + task.close(); + } + } finally { + super.handleClose(); + } + } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/exception/ExceptionUtil.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/exception/ExceptionUtil.java index 12e6107d185..194cb2481bc 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/exception/ExceptionUtil.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/exception/ExceptionUtil.java @@ -60,6 +60,10 @@ public class ExceptionUtil { public static QueryEvaluationException traceExceptionSource(Endpoint endpoint, Throwable ex, String additionalInfo) { + if (ex instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + String eID; if (endpoint == null) { @@ -192,6 +196,10 @@ public static QueryEvaluationException toQueryEvaluationException(Throwable t) { if (res instanceof QueryEvaluationException) { return (QueryEvaluationException) res; } + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + return new QueryEvaluationException(res); } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/GenericInfoOptimizer.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/GenericInfoOptimizer.java index 3d901b60720..227a7e39c10 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/GenericInfoOptimizer.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/GenericInfoOptimizer.java @@ -87,6 +87,9 @@ public void optimize(TupleExpr tupleExpr) { } catch (RuntimeException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/LimitOptimizer.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/LimitOptimizer.java index aa254507c53..648397dc3fb 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/LimitOptimizer.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/LimitOptimizer.java @@ -46,6 +46,9 @@ public void optimize(TupleExpr tupleExpr) { } catch (RuntimeException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new RuntimeException(e); } } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/ServiceOptimizer.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/ServiceOptimizer.java index 6a0c9bc0b92..41476cedbee 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/ServiceOptimizer.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/ServiceOptimizer.java @@ -54,6 +54,9 @@ public void optimize(TupleExpr tupleExpr) { } catch (RuntimeException e) { throw e; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new FedXRuntimeException(e); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/SourceSelection.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/SourceSelection.java index f558afa979f..ab40d3d2009 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/SourceSelection.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/optimizer/SourceSelection.java @@ -223,6 +223,7 @@ private void executeRemoteSourceSelection(List tasks, SourceSelec throw new OptimizationException("Source selection has run into a timeout"); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.debug("Error during source selection. Thread got interrupted."); errors.add(e); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/structures/QueryInfo.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/structures/QueryInfo.java index 5ff2b53f44c..88896346492 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/structures/QueryInfo.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/structures/QueryInfo.java @@ -220,7 +220,7 @@ public synchronized void close() { } done = true; - abortScheduledTasks(); + closeScheduledTasks(); } /** @@ -251,6 +251,31 @@ protected void abortScheduledTasks() { } } + private void closeScheduledTasks() { + + Throwable throwable = null; + + for (ParallelTask task : scheduledSubtasks) { + try { + task.close(); + } catch (Throwable t) { + if (throwable != null) { + t.addSuppressed(throwable); + } + throwable = t; + } + } + + scheduledSubtasks.clear(); + + if (throwable != null) { + if (throwable instanceof RuntimeException) { + throw ((RuntimeException) throwable); + } + throw ((Error) throwable); + } + } + @Override public int hashCode() { final int prime = 31; diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/MediumConcurrencyTest.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/MediumConcurrencyTest.java index b101f9be5e2..49eb98fe4bf 100644 --- a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/MediumConcurrencyTest.java +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/MediumConcurrencyTest.java @@ -18,13 +18,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; public class MediumConcurrencyTest extends SPARQLBaseTest { @@ -43,9 +41,10 @@ public static void beforeClass() { } @AfterAll - public static void afterClass() { + public static void afterClass() throws InterruptedException { if (executor != null) { executor.shutdownNow(); + executor.awaitTermination(30, TimeUnit.SECONDS); } } @@ -81,31 +80,6 @@ public void queryMix() throws Throwable { log.info("Done"); } - @Test - @Disabled // just a test for showing the phaser - public void testPhaser() throws Exception { - - final Phaser p1 = new Phaser(1); - final Random rand = new Random(325656342); - - for (int i = 0; i < 10; i++) { - final int tid = i; - executor.submit(() -> { - p1.register(); - try { - Thread.sleep(rand.nextInt(10) * 1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - System.out.println("Task " + tid + " done"); - p1.arriveAndDeregister(); - }); - } - System.out.println("Waiting for tasks to finish"); - p1.awaitAdvanceInterruptibly(p1.arrive(), 15, TimeUnit.SECONDS); - System.out.println("Done"); - } - protected Future submit(final String query, final int queryId) { return executor.submit(() -> { log.info("Executing query " + queryId + ": " + query);