Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-4107 fix unclosed iteration fedx #4108

Merged
merged 9 commits into from
Aug 14, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void addEndpoint(Endpoint e, boolean... updateStrategy) throws FedXRuntim
federationContext.getEndpointManager().addEndpoint(e);

if (updateStrategy == null || updateStrategy.length == 0
|| (updateStrategy.length == 1 && updateStrategy[0] == true)) {
|| (updateStrategy.length == 1 && updateStrategy[0])) {
updateFederationType();
}
}
Expand Down Expand Up @@ -240,7 +240,7 @@ public void removeEndpoint(Endpoint e, boolean... updateStrategy) throws Reposit
federationContext.getEndpointManager().removeEndpoint(e);

if (updateStrategy == null || updateStrategy.length == 0
|| (updateStrategy.length == 1 && updateStrategy[0] == true)) {
|| (updateStrategy.length == 1 && updateStrategy[0])) {
updateFederationType();
}
}
Expand Down Expand Up @@ -275,35 +275,60 @@ public void removeAll() throws RepositoryException {
*/
public synchronized void shutDown() throws FedXException {

log.info("Shutting down federation and all underlying repositories ...");
// Abort all running queries
federationContext.getQueryManager().shutdown();
executor.shutdown();
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to shutdown executor:" + e.getMessage());
log.debug("Details:", e);
}
try {
joinScheduler.shutdown();
} catch (Exception e) {
log.warn("Failed to shutdown join scheduler: " + e.getMessage());
log.debug("Details: ", e);
}
try {
unionScheduler.shutdown();
} catch (Exception e) {
log.warn("Failed to shutdown union scheduler: " + e.getMessage());
log.debug("Details: ", e);
}
try {
leftJoinScheduler.shutdown();
} catch (Exception e) {
log.warn("Failed to shutdown left join scheduler: " + e.getMessage());
log.debug("Details: ", e);
log.info("Shutting down federation and all underlying repositories ...");
// Abort all running queries
federationContext.getQueryManager().shutdown();
executor.shutdown();
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to shutdown executor:" + e.getMessage());
log.debug("Details:", e);
Thread.currentThread().interrupt();
} finally {
executor.shutdownNow();
}
} finally {
try {
try {
joinScheduler.shutdown();
} catch (Exception e) {
log.warn("Failed to shutdown join scheduler: " + e.getMessage());
log.debug("Details: ", e);
} finally {
joinScheduler.abort();
}
} finally {
try {
try {
unionScheduler.shutdown();
} catch (Exception e) {
log.warn("Failed to shutdown union scheduler: " + e.getMessage());
log.debug("Details: ", e);
} finally {
unionScheduler.abort();
}
} finally {
try {
try {
leftJoinScheduler.shutdown();
} catch (Exception e) {
log.warn("Failed to shutdown left join scheduler: " + e.getMessage());
log.debug("Details: ", e);
} finally {
leftJoinScheduler.abort();
}
} finally {
federationContext.getFederatedServiceResolver().shutDown();
}

}

}

}
federationContext.getFederatedServiceResolver().shutDown();

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,14 @@ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Binding
}

} catch (RepositoryException | MalformedQueryException e) {
union.close();
if (union != null) {
union.close();
}
throw new QueryEvaluationException(e);
} catch (Throwable t) {
union.close();
if (union != null) {
union.close();
}
throw t;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,26 +507,22 @@ protected QueryEvaluationStep prepareNJoin(NJoin join, QueryEvaluationContext co

ControlledWorkerScheduler<BindingSet> joinScheduler = federationContext.getManager().getJoinScheduler();

return new QueryEvaluationStep() {

@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) {
boolean completed = false;
CloseableIteration<BindingSet, QueryEvaluationException> result = resultProvider.evaluate(bindings);
try {
for (int i = 1, n = join.getNumberOfArguments(); i < n; i++) {

result = executeJoin(joinScheduler, result, join.getArg(i), join.getJoinVariables(i), bindings,
join.getQueryInfo());
}
completed = true;
} finally {
if (!completed) {
result.close();
}
return bindings -> {
boolean completed = false;
CloseableIteration<BindingSet, QueryEvaluationException> result = resultProvider.evaluate(bindings);
try {
for (int i = 1, n = join.getNumberOfArguments(); i < n; i++) {

result = executeJoin(joinScheduler, result, join.getArg(i), join.getJoinVariables(i), bindings,
join.getQueryInfo());
}
completed = true;
} finally {
if (!completed) {
result.close();
}
return result;
}
return result;
};

}
Expand Down Expand Up @@ -634,23 +630,19 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext
ControlledWorkerScheduler<BindingSet> unionScheduler = federationContext.getManager().getUnionScheduler();
ControlledWorkerUnion<BindingSet> unionRunnable = new ControlledWorkerUnion<>(unionScheduler,
union.getQueryInfo());

int numberOfArguments = union.getNumberOfArguments();
QueryEvaluationStep[] args = new QueryEvaluationStep[numberOfArguments];
for (int i = 0; i < numberOfArguments; i++) {
args[i] = precompile(union.getArg(i), context);
}
return new QueryEvaluationStep() {

@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) {
for (int i = 0; i < numberOfArguments; i++) {
unionRunnable.addTask(new ParallelUnionOperatorTask(unionRunnable, args[i], bindings));
}
executor.execute(unionRunnable);

return unionRunnable;
return bindings -> {
for (int i = 0; i < numberOfArguments; i++) {
unionRunnable.addTask(new ParallelUnionOperatorTask(unionRunnable, args[i], bindings));
}
executor.execute(unionRunnable);

return unionRunnable;
};
}

Expand Down Expand Up @@ -706,14 +698,7 @@ protected QueryEvaluationStep prepareExclusiveTupleExpr(
throws RepositoryException, MalformedQueryException, QueryEvaluationException {

if (expr instanceof StatementTupleExpr) {
return new QueryEvaluationStep() {

@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) {
return ((StatementTupleExpr) expr).evaluate(bindings);
}

};
return bindings -> ((StatementTupleExpr) expr).evaluate(bindings);
}

if (!(expr instanceof ExclusiveTupleExprRenderer)) {
Expand All @@ -725,25 +710,22 @@ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Binding
.getEndpoint(expr.getOwner().getEndpointID());
TripleSource t = ownedEndpoint.getTripleSource();

return new QueryEvaluationStep() {
@Override
public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) {
AtomicBoolean isEvaluated = new AtomicBoolean(false);
FilterValueExpr filterValueExpr = null; // TODO consider optimization using FilterTuple
try {
String preparedQuery = QueryStringUtil.selectQueryString((ExclusiveTupleExprRenderer) expr,
bindings, filterValueExpr, isEvaluated, expr.getQueryInfo().getDataset());
return t.getStatements(preparedQuery, bindings, (isEvaluated.get() ? null : filterValueExpr),
expr.getQueryInfo());
} catch (IllegalQueryException e) {
/* no projection vars, e.g. local vars only, can occur in joins */
if (t.hasStatements(expr, bindings)) {
return new SingleBindingSetIteration(bindings);
}
return new EmptyIteration<>();
return bindings -> {
AtomicBoolean isEvaluated = new AtomicBoolean(false);
FilterValueExpr filterValueExpr = null; // TODO consider optimization using FilterTuple
try {
String preparedQuery = QueryStringUtil.selectQueryString((ExclusiveTupleExprRenderer) expr,
bindings, filterValueExpr, isEvaluated, expr.getQueryInfo().getDataset());
return t.getStatements(preparedQuery, bindings, (isEvaluated.get() ? null : filterValueExpr),
expr.getQueryInfo());
} catch (IllegalQueryException e) {
/* no projection vars, e.g. local vars only, can occur in joins */
if (t.hasStatements(expr, bindings)) {
return new SingleBindingSetIteration(bindings);
}

return new EmptyIteration<>();
}

};

}
Expand Down Expand Up @@ -843,13 +825,9 @@ protected QueryValueEvaluationStep prepare(FilterExpr node, QueryEvaluationConte
throws ValueExprEvaluationException, QueryEvaluationException {

QueryValueEvaluationStep expr = precompile(node.getExpression(), context);
return new QueryValueEvaluationStep() {

@Override
public Value evaluate(BindingSet bindings) throws ValueExprEvaluationException, QueryEvaluationException {
Value v = expr.evaluate(bindings);
return BooleanLiteral.valueOf(QueryEvaluationUtil.getEffectiveBooleanValue(v));
}
return bindings -> {
Value v = expr.evaluate(bindings);
return BooleanLiteral.valueOf(QueryEvaluationUtil.getEffectiveBooleanValue(v));
};
}

Expand All @@ -867,25 +845,21 @@ protected QueryValueEvaluationStep prepare(ConjunctiveFilterExpr node, QueryEval
.map((e) -> precompile(e, context))
.collect(Collectors.toList());

return new QueryValueEvaluationStep() {

@Override
public Value evaluate(BindingSet bindings) throws ValueExprEvaluationException, QueryEvaluationException {
ValueExprEvaluationException error = null;
try {
for (QueryValueEvaluationStep ves : collect) {
Value v = ves.evaluate(bindings);
if (QueryEvaluationUtil.getEffectiveBooleanValue(v) == false) {
return BooleanLiteral.FALSE;
}
return bindings -> {
ValueExprEvaluationException error = null;
try {
for (QueryValueEvaluationStep ves : collect) {
Value v = ves.evaluate(bindings);
if (QueryEvaluationUtil.getEffectiveBooleanValue(v) == false) {
return BooleanLiteral.FALSE;
}
} catch (ValueExprEvaluationException e) {
error = e;
}
if (error != null)
throw error;
return BooleanLiteral.TRUE;
} catch (ValueExprEvaluationException e) {
error = e;
}
if (error != null)
throw error;
return BooleanLiteral.TRUE;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
* pool with a fixed number of worker threads. Once notified a worker picks the next task from the queue and executes
* it. The results is then returned to the controlling instance retrieved from the task.
*
*
* @author Andreas Schwarte
*
* @see ControlledWorkerUnion
* @see ControlledWorkerJoin
* @see ControlledWorkerBoundJoin
Expand Down Expand Up @@ -80,7 +78,7 @@ public ControlledWorkerScheduler(int nWorkers, String name) {
*/
@Override
public void schedule(ParallelTask<T> task) {

assert !task.getControl().isFinished();
Runnable runnable = new WorkerRunnable(task);

// Note: for specific use-cases the runnable may be wrapped (e.g. to allow injection of thread-contexts). By
Expand Down Expand Up @@ -139,13 +137,16 @@ private ExecutorService createExecutorService() {

@Override
public void abort() {
log.info("Aborting workers of " + name + ".");
if (!executor.isTerminated()) {
log.info("Aborting workers of " + name + ".");

executor.shutdownNow();
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new FedXRuntimeException(e);
executor.shutdownNow();
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new FedXRuntimeException(e);
}
}
}

Expand Down Expand Up @@ -185,7 +186,6 @@ public boolean isRunning() {
* Determine if there are still task running or queued for the specified control.
*
* @param control
*
* @return true, if there are unfinished tasks, false otherwise
*/
public boolean isRunning(ParallelExecutor<T> control) {
Expand Down Expand Up @@ -215,13 +215,22 @@ public void run() {
return;
}

if (Thread.currentThread().isInterrupted()) {
return;
}

ParallelExecutor<T> taskControl = task.getControl();

if (taskControl.isFinished()) {
return;
}

CloseableIteration<T, QueryEvaluationException> res = null;
try {
if (log.isTraceEnabled()) {
log.trace("Performing task " + task.toString() + " in " + Thread.currentThread().getName());
log.trace("Performing task " + task + " in " + Thread.currentThread().getName());
}

res = task.performTask();
taskControl.addResult(res);
if (aborted) {
Expand Down Expand Up @@ -271,6 +280,7 @@ public void shutdown() {
try {
executor.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new FedXRuntimeException(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ protected QueryEvaluationException convert(Exception e) {
if (e instanceof QueryEvaluationException) {
return (QueryEvaluationException) e;
}
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return super.convert(e);
}

Expand Down
Loading