Skip to content

Commit

Permalink
Add Try/catch around runAsync() for tests so we always print results …
Browse files Browse the repository at this point in the history
…and dispose of progress (Azure#22352)

* Add Try/catch around runAsync() for tests so we always print results.

* Always dispose of progress.
  • Loading branch information
conniey authored Jun 17, 2021
1 parent f4a0ced commit c2c5265
Showing 1 changed file with 50 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

package com.azure.perf.test.core;

import com.beust.jcommander.JCommander;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
Expand All @@ -14,16 +23,6 @@
import java.util.function.Supplier;
import java.util.stream.IntStream;

import com.beust.jcommander.JCommander;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/**
* Represents the main program class which reflectively runs and manages the performance tests.
*/
Expand All @@ -46,9 +45,10 @@ private static double getOperationsPerSecond() {
/**
* Runs the performance tests passed to be executed.
*
* @throws RuntimeException if the execution fails.
* @param classes the performance test classes to execute.
* @param args the command line arguments ro run performance tests with.
*
* @throws RuntimeException if the execution fails.
*/
public static void run(Class<?>[] classes, String[] args) {
List<Class<?>> classList = new ArrayList<>(Arrays.asList(classes));
Expand All @@ -62,13 +62,13 @@ public static void run(Class<?>[] classes, String[] args) {
}

String[] commands = classList.stream().map(c -> getCommandName(c.getSimpleName()))
.toArray(i -> new String[i]);
.toArray(i -> new String[i]);

PerfStressOptions[] options = classList.stream().map(c -> {
try {
return c.getConstructors()[0].getParameterTypes()[0].getConstructors()[0].newInstance();
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | SecurityException e) {
| InvocationTargetException | SecurityException e) {
throw new RuntimeException(e);
}
}).toArray(i -> new PerfStressOptions[i]);
Expand Down Expand Up @@ -99,9 +99,10 @@ private static String getCommandName(String testName) {
/**
* Run the performance test passed to be executed.
*
* @throws RuntimeException if the execution fails.
* @param testClass the performance test class to execute.
* @param options the configuration ro run performance test with.
*
* @throws RuntimeException if the execution fails.
*/
public static void run(Class<?> testClass, PerfStressOptions options) {
System.out.println("=== Options ===");
Expand All @@ -125,7 +126,7 @@ public static void run(Class<?> testClass, PerfStressOptions options) {
try {
tests[i] = (PerfStressTest<?>) testClass.getConstructor(options.getClass()).newInstance(options);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | SecurityException | NoSuchMethodException e) {
| InvocationTargetException | SecurityException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
Expand All @@ -149,9 +150,7 @@ public static void run(Class<?> testClass, PerfStressOptions options) {
}
} finally {
if (!options.isNoCleanup()) {
if (cleanupStatus == null) {
cleanupStatus = printStatus("=== Cleanup ===", () -> ".", false, false);
}
cleanupStatus = printStatus("=== Cleanup ===", () -> ".", false, false);

Flux.just(tests).flatMap(t -> t.cleanupAsync()).blockLast();
}
Expand All @@ -174,12 +173,13 @@ public static void run(Class<?> testClass, PerfStressOptions options) {
/**
* Runs the performance tests passed to be executed.
*
* @throws RuntimeException if the execution fails.
* @param tests the performance tests to be executed.
* @param sync indicate if synchronous test should be run.
* @param parallel the number of parallel threads to run the performance test on.
* @param durationSeconds the duration for which performance test should be run on.
* @param title the title of the performance tests.
*
* @throws RuntimeException if the execution fails.
* @throws IllegalStateException if zero operations completed of the performance test.
*/
public static void runTests(PerfStressTest<?>[] tests, boolean sync, int parallel, int durationSeconds, String title) {
Expand All @@ -188,9 +188,9 @@ public static void runTests(PerfStressTest<?>[] tests, boolean sync, int paralle

long endNanoTime = System.nanoTime() + ((long) durationSeconds * 1000000000);

int[] lastCompleted = new int[] { 0 };
int[] lastCompleted = new int[]{0};
Disposable progressStatus = printStatus(
"=== " + title + " ===" + System.lineSeparator() + "Current\t\tTotal\t\tAverage", () -> {
"=== " + title + " ===" + System.lineSeparator() + "Current\t\tTotal\t\tAverage", () -> {
int totalCompleted = getCompletedOperations();
int currentCompleted = totalCompleted - lastCompleted[0];
double averageCompleted = getOperationsPerSecond();
Expand All @@ -199,35 +199,39 @@ public static void runTests(PerfStressTest<?>[] tests, boolean sync, int paralle
return String.format("%d\t\t%d\t\t%.2f", currentCompleted, totalCompleted, averageCompleted);
}, true, true);

if (sync) {
ForkJoinPool forkJoinPool = new ForkJoinPool(parallel);
try {
try {
if (sync) {
ForkJoinPool forkJoinPool = new ForkJoinPool(parallel);
forkJoinPool.submit(() -> {
IntStream.range(0, parallel).parallel().forEach(i -> runLoop(tests[i], i, endNanoTime));
}).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);

} else {
// Exceptions like OutOfMemoryError are handled differently by the default Reactor schedulers. Instead of terminating the
// Flux, the Flux will hang and the exception is only sent to the thread's uncaughtExceptionHandler and the Reactor
// Schedulers.onHandleError. This handler ensures the perf framework will fail fast on any such exceptions.
Schedulers.onHandleError((t, e) -> {
System.err.print(t + " threw exception: ");
e.printStackTrace();
System.exit(1);
});

Flux.range(0, parallel)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(i -> runLoopAsync(tests[i], i, endNanoTime))
.then()
.block();
}
} else {
// Exceptions like OutOfMemoryError are handled differently by the default Reactor schedulers. Instead of terminating the
// Flux, the Flux will hang and the exception is only sent to the thread's uncaughtExceptionHandler and the Reactor
// Schedulers.onHandleError. This handler ensures the perf framework will fail fast on any such exceptions.
Schedulers.onHandleError((t, e) -> {
System.err.print(t + " threw exception: ");
e.printStackTrace();
System.exit(1);
});

Flux.range(0, parallel)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(i -> runLoopAsync(tests[i], i, endNanoTime))
.then()
.block();
} catch (InterruptedException | ExecutionException e) {
System.err.println("Error occurred when submitting jobs to ForkJoinPool. " + System.lineSeparator() + e);
throw new RuntimeException(e);
} catch (Exception e) {
System.err.println("Error occurred running tests: " + System.lineSeparator() + e);
} finally {
progressStatus.dispose();
}

progressStatus.dispose();

System.out.println("=== Results ===");

int totalOperations = getCompletedOperations();
Expand All @@ -239,7 +243,7 @@ public static void runTests(PerfStressTest<?>[] tests, boolean sync, int paralle
double weightedAverageSeconds = totalOperations / operationsPerSecond;

System.out.printf("Completed %,d operations in a weighted-average of %,.2fs (%,.2f ops/s, %,.3f s/op)%n",
totalOperations, weightedAverageSeconds, operationsPerSecond, secondsPerOperation);
totalOperations, weightedAverageSeconds, operationsPerSecond, secondsPerOperation);
System.out.println();
}

Expand Down Expand Up @@ -269,7 +273,7 @@ private static Mono<Void> runLoopAsync(PerfStressTest<?> test, int index, long e
private static Disposable printStatus(String header, Supplier<Object> status, boolean newLine, boolean printFinalStatus) {
System.out.println(header);

boolean[] needsExtraNewline = new boolean[] { false };
boolean[] needsExtraNewline = new boolean[]{false};

return Flux.interval(Duration.ofSeconds(1)).doFinally(s -> {
if (printFinalStatus) {
Expand Down

0 comments on commit c2c5265

Please sign in to comment.