Skip to content

Commit

Permalink
[test] Further polish BoundedElasticScheduler test
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 18, 2020
1 parent 282947e commit cce796c
Showing 1 changed file with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -402,8 +404,18 @@ public void evictionForWorkerScheduling() {

@Test
public void lifoEvictionNoThreadRegrowth() throws InterruptedException {
int otherThreads = Thread.activeCount(); //don't count the evictor at shutdown
Set<String> preExistingEvictors = dumpThreadNames().filter(s -> s.startsWith("boundedElastic-evictor")).collect(Collectors.toSet());
BoundedElasticScheduler scheduler = afterTest.autoDispose(new BoundedElasticScheduler(200, Integer.MAX_VALUE,
r -> new Thread(r, "dequeueEviction"), 1));

List<String> newEvictors = dumpThreadNames()
.filter(s -> s.startsWith("boundedElastic-evictor"))
.filter(s -> !preExistingEvictors.contains(s))
.collect(Collectors.toList());
assertThat(newEvictors).as("new evictors").hasSize(1);
String newEvictor = newEvictors.get(0);

try {

int cacheSleep = 100; //slow tasks last 100ms
Expand All @@ -423,21 +435,21 @@ public void lifoEvictionNoThreadRegrowth() throws InterruptedException {
int threadCountChange = 1;

int oldActive = 0;
int activeAtBeginning;
int activeAtBeginning = 0;
for (int i = 0; i < fastCount; i++) {
Mono.just(i)
.subscribeOn(scheduler)
.doFinally(sig -> latch.countDown())
.subscribe();

if (i == 0) {
activeAtBeginning = Thread.activeCount();
activeAtBeginning = Thread.activeCount() - otherThreads;
threadCountTrend[0] = activeAtBeginning;
oldActive = activeAtBeginning;
LOGGER.debug("{} threads active in round 1/{}", activeAtBeginning, fastCount);
}
else {
int newActive = Thread.activeCount();
int newActive = Thread.activeCount() - otherThreads;
if (oldActive != newActive) {
threadCountTrend[threadCountChange++] = newActive;
oldActive = newActive;
Expand All @@ -449,20 +461,23 @@ public void lifoEvictionNoThreadRegrowth() throws InterruptedException {

assertThat(scheduler.estimateBusy()).as("busy at end of loop").isZero();
assertThat(threadCountTrend).as("no thread regrowth").isSortedAccordingTo(Comparator.reverseOrder());
assertThat(dumpThreadNames().filter(name -> name.contains("dequeueEviction") || name.contains("boundedElastic-evictor")).count())
.as("at most 1 worker + 1 evictor at end").isLessThanOrEqualTo(2);
assertThat(dumpThreadNames().filter(name -> name.contains("dequeueEviction")).count())
.as("at most 1 worker at end").isLessThanOrEqualTo(1);

System.out.println(Arrays.toString(Arrays.copyOf(threadCountTrend, threadCountChange)));
}
finally {
scheduler.dispose();
Thread.sleep(100);
final long postShutdown = dumpThreadNames().filter(name -> name.contains("dequeueEviction") || name.contains("boundedElastic-evictor")).count();
LOGGER.info("{} threads active post shutdown", postShutdown);
final long postShutdown = dumpThreadNames().filter(name -> name.contains("dequeueEviction")).count();
LOGGER.info("{} worker threads active post shutdown", postShutdown);
assertThat(postShutdown)
.as("post shutdown")
.withFailMessage("Thread count after shutdown is not zero. threads: %s", Thread.getAllStackTraces().keySet())
.withFailMessage("worker thread count after shutdown is not zero. threads: %s", Thread.getAllStackTraces().keySet())
.isNotPositive();
assertThat(dumpThreadNames())
.as("current evictor " + newEvictor + " shutdown")
.doesNotContain(newEvictor);
}
}

Expand Down

0 comments on commit cce796c

Please sign in to comment.