Skip to content

Commit

Permalink
Porting over RulesToolkit.evaluateParallel
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh-Matsuoka committed Apr 14, 2023
1 parent d62e0fe commit 786780c
Showing 1 changed file with 128 additions and 83 deletions.
211 changes: 128 additions & 83 deletions src/main/java/io/cryostat/core/reports/InterruptibleReportGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -258,30 +261,108 @@ private Pair<Collection<IResult>, Long> generateResultHelper(
InputStream recording, Predicate<IRule> predicate)
throws InterruptedException, IOException, ExecutionException,
CouldNotLoadRecordingException {
List<Future<IResult>> resultFutures = new ArrayList<>();
Collection<IRule> rules =
RuleRegistry.getRules().stream().filter(predicate).collect(Collectors.toList());
ResultProvider resultProvider = new ResultProvider();
Map<IRule, Future<IResult>> resultFutures = new HashMap<>();
Queue<RunnableFuture<IResult>> futureQueue = new ConcurrentLinkedQueue<>();
// Map using the rule name as a key, and a Pair containing the rule (left) and it's
// dependency (right)
Map<String, Pair<IRule, IRule>> rulesWithDependencies = new HashMap<>();
Map<IRule, IResult> computedResults = new HashMap<>();
try (CountingInputStream countingRecordingStream = new CountingInputStream(recording)) {
Collection<IRule> rules =
RuleRegistry.getRules().stream().filter(predicate).collect(Collectors.toList());
resultFutures.addAll(
evaluate(rules, JfrLoaderToolkit.loadEvents(countingRecordingStream)).stream()
.map(executor::submit)
.collect(Collectors.toList()));
Collection<IResult> results = new HashSet<>();
for (Future<IResult> future : resultFutures) {
IItemCollection items = JfrLoaderToolkit.loadEvents(countingRecordingStream);
for (IRule rule : rules) {
if (RulesToolkit.matchesEventAvailabilityMap(items, rule.getRequiredEvents())) {
if (hasDependency(rule)) {
IRule depRule =
rules.stream()
.filter(r -> r.getId().equals(getRuleDependencyName(rule)))
.findFirst()
.orElse(null);
rulesWithDependencies.put(rule.getId(), new Pair<>(rule, depRule));
} else {
RunnableFuture<IResult> resultFuture =
rule.createEvaluation(
items,
IPreferenceValueProvider.DEFAULT_VALUES,
resultProvider);
resultFutures.put(rule, resultFuture);
futureQueue.add(resultFuture);
}
} else {
resultFutures.put(
rule,
CompletableFuture.completedFuture(
ResultBuilder.createFor(
rule, IPreferenceValueProvider.DEFAULT_VALUES)
.setSeverity(Severity.NA)
.build()));
}
}
for (Entry<String, Pair<IRule, IRule>> entry : rulesWithDependencies.entrySet()) {
IRule rule = entry.getValue().left;
IRule depRule = entry.getValue().right;
Future<IResult> depResultFuture = resultFutures.get(depRule);
if (depResultFuture == null) {
resultFutures.put(
rule,
CompletableFuture.completedFuture(
ResultBuilder.createFor(
rule, IPreferenceValueProvider.DEFAULT_VALUES)
.setSeverity(Severity.NA)
.build()));
} else {
IResult depResult = null;
if (!depResultFuture.isDone()) {
((Runnable) depResultFuture).run();
try {
depResult = depResultFuture.get();
resultProvider.addResults(depResult);
computedResults.put(depRule, depResult);
} catch (InterruptedException | ExecutionException e) {
logger.warn("Error retrieving results for rule: " + depResult);
}
} else {
depResult = computedResults.get(depRule);
}
if (depResult != null && shouldEvaluate(rule, depResult)) {
RunnableFuture<IResult> resultFuture =
rule.createEvaluation(
items,
IPreferenceValueProvider.DEFAULT_VALUES,
resultProvider);
resultFutures.put(rule, resultFuture);
futureQueue.add(resultFuture);
} else {
resultFutures.put(
rule,
CompletableFuture.completedFuture(
ResultBuilder.createFor(
rule,
IPreferenceValueProvider.DEFAULT_VALUES)
.setSeverity(Severity.NA)
.build()));
}
}
}
RuleEvaluator re = new RuleEvaluator(futureQueue);
executor.submit(re);
Collection<IResult> results = new HashSet<IResult>();
for (Future<IResult> future : resultFutures.values()) {
results.add(future.get());
}
long recordingSizeBytes = countingRecordingStream.getByteCount();
return new Pair<Collection<IResult>, Long>(results, recordingSizeBytes);
return new Pair<Collection<IResult>, Long>(
results, countingRecordingStream.getByteCount());
} catch (InterruptedException
| IOException
| ExecutionException
| CouldNotLoadRecordingException e) {
resultFutures.forEach(
f -> {
if (!f.isDone()) {
f.cancel(true);
}
});
for (Future f : resultFutures.values()) {
if (!f.isDone()) {
f.cancel(true);
}
}
logger.warn(e);
throw e;
}
Expand Down Expand Up @@ -389,75 +470,23 @@ private List<HtmlResultGroup> loadResultGroups(Element element) {
return groups;
}

private List<Callable<IResult>> evaluate(Collection<IRule> rules, IItemCollection items) {
List<Callable<IResult>> callables = new ArrayList<>(rules.size());
ResultProvider rp = new ResultProvider();
Map<Class<? extends IRule>, Severity> evaluatedRules = new HashMap<>();
for (IRule rule : rules) {
RunnableFuture<IResult> resultFuture =
rule.createEvaluation(items, IPreferenceValueProvider.DEFAULT_VALUES, rp);
callables.add(
() -> {
// Check that we can evaluate this rule first, some rules have dependencies
// on other rules
// and trying to run them too early will throw an NPE from trying to
// access entries in the ResultProvider
if (shouldEvaluate(evaluatedRules, rule)) {
IResult result;
logger.trace("Processing rule {}", rule.getName());
ReportRuleEvalEvent evt = new ReportRuleEvalEvent(rule.getName());
evt.begin();
try {
// Check that the rule has all of the events it needs to evaluate
// first
if (!RulesToolkit.matchesEventAvailabilityMap(
items, rule.getRequiredEvents())) {
logger.warn(
"Rule missing required events: {} ", rule.getName());
result =
ResultBuilder.createFor(
rule,
IPreferenceValueProvider.DEFAULT_VALUES)
.setSeverity(Severity.NA)
.build();
} else {
resultFuture.run();
result = resultFuture.get();
}
evaluatedRules.put(rule.getClass(), result.getSeverity());
rp.addResults(result);
return result;
} finally {
evt.end();
if (evt.shouldCommit()) {
evt.commit();
}
}
} else {
logger.warn("Rule is missing dependencies: {} ", rule.getName());
return ResultBuilder.createFor(
rule, IPreferenceValueProvider.DEFAULT_VALUES)
.setSeverity(Severity.NA)
.build();
}
});
}
return callables;
private static String getRuleDependencyName(IRule rule) {
DependsOn dependency = rule.getClass().getAnnotation(DependsOn.class);
Class<? extends IRule> dependencyType = dependency.value();
return dependencyType.getSimpleName();
}

private static boolean hasDependency(IRule rule) {
DependsOn dependency = rule.getClass().getAnnotation(DependsOn.class);
return dependency != null;
}

/** Brought over from org.openjdk.jmc.flightrecorder.rules.jdk.test.TestRulesWithJFR */
private static boolean shouldEvaluate(
Map<Class<? extends IRule>, Severity> evaluatedRules, IRule rule) {
/** Brought over from org.openjdk.jmc.flightrecorder.rules.jdk.util.RulesToolkit */
private static boolean shouldEvaluate(IRule rule, IResult depResult) {
DependsOn dependency = rule.getClass().getAnnotation(DependsOn.class);
if (dependency != null) {
Class<? extends IRule> dependencyType = dependency.value();
if (dependencyType != null) {
if (evaluatedRules.containsKey(dependencyType)) {
if (evaluatedRules.get(dependencyType).compareTo(dependency.severity()) < 0) {
return false;
}
return true;
}
if (depResult.getSeverity().compareTo(dependency.severity()) < 0) {
return false;
}
}
return true;
Expand Down Expand Up @@ -514,6 +543,22 @@ public Collection<IResult> getResults(Collection<String> topics) {
}
}

private static class RuleEvaluator implements Runnable {
private Queue<RunnableFuture<IResult>> futureQueue;

public RuleEvaluator(Queue<RunnableFuture<IResult>> futureQueue) {
this.futureQueue = futureQueue;
}

@Override
public void run() {
RunnableFuture<IResult> resultFuture;
while ((resultFuture = futureQueue.poll()) != null) {
resultFuture.run();
}
}
}

private static class SimpleResultGroup implements HtmlResultGroup {
String name;
String image = null;
Expand Down

0 comments on commit 786780c

Please sign in to comment.