Skip to content

Commit

Permalink
Add simple chaos monkey to kill consumers (opensource4you#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Aug 11, 2022
1 parent 69303f1 commit 3c45fc5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 0 deletions.
24 changes: 24 additions & 0 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.astraea.app.admin.Compression;
import org.astraea.app.admin.TopicPartition;
import org.astraea.app.argument.CompressionField;
import org.astraea.app.argument.DurationField;
import org.astraea.app.argument.NonEmptyStringField;
import org.astraea.app.argument.NonNegativeShortField;
import org.astraea.app.argument.PathField;
Expand Down Expand Up @@ -166,6 +167,20 @@ public static String execute(final Argument param) throws InterruptedException,
var fileWriterFuture =
fileWriter.map(CompletableFuture::runAsync).orElse(CompletableFuture.completedFuture(null));

var chaos =
param.chaosDuration == null
? CompletableFuture.completedFuture(null)
: CompletableFuture.runAsync(
() -> {
while (!consumerThreads.stream().allMatch(AbstractThread::closed)) {
var thread =
consumerThreads.get((int) (Math.random() * consumerThreads.size()));
thread.unsubscribe();
Utils.sleep(param.chaosDuration);
thread.resubscribe();
}
});

CompletableFuture.runAsync(
() -> {
producerThreads.forEach(AbstractThread::waitForDone);
Expand All @@ -189,6 +204,7 @@ public static String execute(final Argument param) throws InterruptedException,
consumerThreads.forEach(AbstractThread::waitForDone);
tracker.waitForDone();
fileWriterFuture.join();
chaos.join();
return param.topic;
}

Expand Down Expand Up @@ -346,5 +362,13 @@ Producer<byte[], byte[]> createProducer() {
description = "Output format for the report",
converter = ReportFormat.ReportFormatConverter.class)
ReportFormat reportFormat = ReportFormat.CSV;

@Parameter(
names = {"--chaos.frequency"},
description =
"time to run the chaos monkey. It will kill consumer arbitrarily. There is no monkey by default",
validateWith = DurationField.class,
converter = DurationField.class)
Duration chaosDuration = null;
}
}
10 changes: 10 additions & 0 deletions app/src/test/java/org/astraea/app/performance/PerformanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.astraea.app.performance;

import com.beust.jcommander.ParameterException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiConsumer;
import org.astraea.app.argument.Argument;
Expand Down Expand Up @@ -156,4 +157,13 @@ void testArgument() {
Assertions.assertThrows(
ParameterException.class, () -> Argument.parse(new Performance.Argument(), arguments14));
}

@Test
void testChaosFrequency() {
var args =
Argument.parse(
new Performance.Argument(),
new String[] {"--bootstrap.servers", "localhost:9092", "--chaos.frequency", "10s"});
Assertions.assertEquals(Duration.ofSeconds(10), args.chaosDuration);
}
}

0 comments on commit 3c45fc5

Please sign in to comment.