Skip to content

Commit

Permalink
Chaos Testing Tuning (#1055)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Dec 19, 2023
1 parent 4d7b99d commit b05b87e
Show file tree
Hide file tree
Showing 28 changed files with 1,640 additions and 74 deletions.
150 changes: 150 additions & 0 deletions src/examples/java/io/nats/examples/chaosTestApp/ChaosTestApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.chaosTestApp;

import io.nats.client.Connection;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.examples.chaosTestApp.support.CommandLine;
import io.nats.examples.chaosTestApp.support.CommandLineConsumer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class ChaosTestApp {

public static String[] MANUAL_ARGS = (
// "--servers nats://192.168.50.99:4222"
"--servers nats://localhost:4222"
+ " --stream app-stream"
+ " --subject app-subject"
// + " --runtime 3600 // 1 hour in seconds
+ " --screen left"
+ " --create"
+ " --r3"
+ " --publish"
+ " --pubjitter 30"
// + " --simple ordered,100,5000"
// + " --simple durable 100 5000" // space or commas work, the parser figures it out
+ " --fetch durable,100,5000"
// + " --push ordered"
// + " --push durable"
// + " --logdir c:\\temp"
).split(" ");

public static void main(String[] args) throws Exception {

args = MANUAL_ARGS; // comment out for real command line

CommandLine cmd = new CommandLine(args);
Monitor monitor;

try {
Output.start(cmd);
Output.controlMessage("APP", cmd.toString().replace(" --", " \n--"));
CountDownLatch waiter = new CountDownLatch(1);

Publisher publisher = null;
List<ConnectableConsumer> cons = null;

if (cmd.create) {
Options options = cmd.makeManagmentOptions();
try (Connection nc = Nats.connect(options)) {
System.out.println(nc.getServerInfo());
JetStreamManagement jsm = nc.jetStreamManagement();
createOrReplaceStream(cmd, jsm);
}
catch (Exception e) {
Output.errorMessage("APP", e.getMessage());
}
}

if (!cmd.commandLineConsumers.isEmpty()) {
cons = new ArrayList<>();
for (CommandLineConsumer clc : cmd.commandLineConsumers) {
ConnectableConsumer con;
switch (clc.consumerType) {
case Push:
con = new PushConsumer(cmd, clc.consumerKind);
break;
case Simple:
con = new SimpleConsumer(cmd, clc.consumerKind, clc.batchSize, clc.expiresIn);
break;
case Fetch:
con = new SimpleFetchConsumer(cmd, clc.consumerKind, clc.batchSize, clc.expiresIn);
break;
default:
throw new IllegalArgumentException("Unsupported consumer type: " + clc.consumerType);
}
Output.errorMessage("APP", con.label);
cons.add(con);
}
}

if (cmd.publish) {
publisher = new Publisher(cmd, cmd.pubjitter);
Thread pubThread = new Thread(publisher);
pubThread.start();
}

// just creating the stream?
if (publisher == null && cons == null) {
return;
}

monitor = new Monitor(cmd, publisher, cons);
Thread monThread = new Thread(monitor);
monThread.start();

long runtime = cmd.runtime < 1 ? Long.MAX_VALUE : cmd.runtime;
//noinspection ResultOfMethodCallIgnored
waiter.await(runtime, TimeUnit.MILLISECONDS);
}
catch (Exception e) {
//noinspection CallToPrintStackTrace
e.printStackTrace();
}
finally {
Output.dumpControl();
System.exit(0);
}
}

public static void createOrReplaceStream(CommandLine cmd, JetStreamManagement jsm) {
try {
jsm.deleteStream(cmd.stream);
}
catch (Exception ignore) {}
try {
StreamConfiguration sc = StreamConfiguration.builder()
.name(cmd.stream)
.storageType(StorageType.File)
.subjects(cmd.subject)
.replicas(cmd.r3 ? 3 : 1)
.build();
StreamInfo si = jsm.addStream(sc);
Output.controlMessage("APP", "Create Stream\n" + Output.formatted(si.getConfiguration()));
}
catch (Exception e) {
Output.errorMessage("FATAL", "Failed creating stream: '" + cmd.stream + "' " + e);
System.exit(-1);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.chaosTestApp;

import io.nats.client.*;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.nats.examples.chaosTestApp.support.CommandLine;
import io.nats.examples.chaosTestApp.support.ConsumerKind;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

public abstract class ConnectableConsumer implements ConnectionListener {

protected final Connection nc;
protected final JetStream js;
protected final OutputErrorListener errorListener;
protected final AtomicLong lastReceivedSequence;
protected final MessageHandler handler;
protected final ConsumerKind consumerKind;

protected final CommandLine cmd;
protected String initials;
protected String name;
protected String durableName;
protected String label;

public ConnectableConsumer(CommandLine cmd, String initials, ConsumerKind consumerKind) throws IOException, InterruptedException, JetStreamApiException {
this.cmd = cmd;
lastReceivedSequence = new AtomicLong(0);
this.consumerKind = consumerKind;
switch (consumerKind) {
case Durable:
durableName = initials + "-dur-" + new NUID().nextSequence();
name = durableName;
break;
case Ephemeral:
durableName = null;
name = initials + "-eph-" + new NUID().nextSequence();
break;
case Ordered:
durableName = null;
name = initials + "-ord-" + new NUID().nextSequence();
break;
}
this.initials = initials;
updateNameAndLabel(name);

errorListener = new OutputErrorListener(label);

Options options = cmd.makeOptions(this, errorListener);
nc = Nats.connect(options);
js = nc.jetStream();

handler = this::onMessage;
}

public void onMessage(Message m) throws InterruptedException {
m.ack();
long seq = m.metaData().streamSequence();
lastReceivedSequence.set(seq);
Output.workMessage(label, "Last Received Seq: " + seq);
}

public abstract void refreshInfo();

@Override
public void connectionEvent(Connection conn, Events type) {
Output.controlMessage(label, "Connection: " + conn.getServerInfo().getPort() + " " + type.name().toLowerCase());
refreshInfo();
}

protected void updateNameAndLabel(String updatedName) {
name = updatedName;
if (updatedName == null) {
label = consumerKind.name();
}
else {
label = name + " (" + consumerKind.name() + ")";
}
}

public long getLastReceivedSequence() {
return lastReceivedSequence.get();
}

protected ConsumerConfiguration.Builder newCreateConsumer() {
return recreateConsumer(0);
}

private ConsumerConfiguration.Builder recreateConsumer(long last) {
return ConsumerConfiguration.builder()
.name(consumerKind == ConsumerKind.Ordered ? null : name)
.durable(durableName)
.deliverPolicy(last == 0 ? DeliverPolicy.All : DeliverPolicy.ByStartSequence)
.startSequence(last == 0 ? -1 : last + 1)
.filterSubject(cmd.subject);
}
}
131 changes: 131 additions & 0 deletions src/examples/java/io/nats/examples/chaosTestApp/Monitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.chaosTestApp;

import io.nats.client.Connection;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.StreamInfo;
import io.nats.examples.chaosTestApp.support.CommandLine;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.nats.examples.chaosTestApp.Output.formatted;

public class Monitor implements Runnable, java.util.function.Consumer<String> {

static final String LABEL = "MONITOR";
static final long REPORT_FREQUENCY = 5000;
static final int SHORT_REPORTS = 50;

final CommandLine cmd;
final Publisher publisher;
final List<ConnectableConsumer> consumers;
final AtomicBoolean reportFull;

public Monitor(CommandLine cmd, Publisher publisher, List<ConnectableConsumer> consumers) {
this.cmd = cmd;
this.publisher = publisher;
this.consumers = consumers;
reportFull = new AtomicBoolean(true);
}

@Override
public void accept(String s) {
reportFull.set(true);
// Output.print(LABEL, s);
}

@Override
public void run() {
Options options = new Options.Builder()
.servers(cmd.servers)
.connectionListener((c, t) -> {
reportFull.set(true);
String s = "Connection: " + c.getServerInfo().getPort() + " " + t;
Output.controlMessage(LABEL, s);
// Output.print(LABEL, s);
})
.errorListener(new OutputErrorListener(LABEL, this) {})
.maxReconnects(-1)
.build();

long started = System.currentTimeMillis();
int shortReportsOwed = 0;
try (Connection nc = Nats.connect(options)) {
JetStreamManagement jsm = nc.jetStreamManagement();
//noinspection InfiniteLoopStatement
while (true) {
//noinspection BusyWait
Thread.sleep(REPORT_FREQUENCY);
try {
StringBuilder conReport = new StringBuilder();
if (reportFull.get()) {
StreamInfo si = jsm.getStreamInfo(cmd.stream);
String message = "Stream\n" + formatted(si.getConfiguration())
+ "\n" + formatted(si.getClusterInfo());
Output.controlMessage(LABEL, message);
reportFull.set(false);
if (consumers != null) {
for (ConnectableConsumer con : consumers) {
con.refreshInfo();
}
}
}
if (shortReportsOwed < 1) {
shortReportsOwed = SHORT_REPORTS;
if (consumers != null) {
for (ConnectableConsumer con : consumers) {
conReport.append("\n").append(con.label).append(" | Last Sequence: ").append(con.getLastReceivedSequence());
}
}
}
else {
shortReportsOwed--;
if (consumers != null) {
for (ConnectableConsumer con : consumers) {
conReport.append(" | ")
.append(con.name)
.append(": ")
.append(con.getLastReceivedSequence());
}
}
}

String pubReport = "";
if (publisher != null) {
pubReport = "| Publisher: " + publisher.getLastSeqno() +
(publisher.isInErrorState() ? " (Paused)" : " (Running)");
}
Output.controlMessage(LABEL, "Uptime: " + uptime(started) + pubReport + conReport);
}
catch (Exception e) {
Output.controlMessage(LABEL, e.getMessage());
reportFull.set(true);
}
}
}
catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}

private static String uptime(long started) {
return Duration.ofMillis(System.currentTimeMillis() - started).toString().replace("PT", "");
}
}
Loading

0 comments on commit b05b87e

Please sign in to comment.