Skip to content

Commit

Permalink
interop-testing: Add concurrency condition to the soak test using exi…
Browse files Browse the repository at this point in the history
…sting blocking api

The goal of this PR is to increase the test coverage of the C2P E2E load test by improving the rpc_soak and channel_soak tests to support concurrency.

**rpc_soak:**
The client performs many large_unary RPCs in sequence over the same channel. The test can run in either a concurrent or non-concurrent mode, depending on the number of threads specified (soak_num_threads):
  - Non-Concurrent Mode: When soak_num_threads = 1, all RPCs are performed sequentially on a single thread.
  - Concurrent Mode: When soak_num_threads > 1, the client uses multiple threads to distribute the workload. Each thread performs a portion of the total soak_iterations, executing its own set of RPCs concurrently.

**channel_soak:**
Similar to rpc_soak, but this time each RPC is performed on a new channel. The channel is created just before each RPC and is destroyed just after. Note on Concurrent Execution and Channel Creation: In a concurrent execution setting (i.e., when soak_num_threads > 1), each thread performs a portion of the total soak_iterations and creates and destroys its own channel for each RPC iteration.
- createNewChannel Function: In channel_soak, the createNewChannel function is used by each thread to create a new channel before every RPC. This function ensures that each RPC has a separate channel, preventing race conditions by isolating channels between threads. It shuts down the previous channel (if any) and creates a new one for each iteration, ensuring accurate latency measurement per RPC.

- Thread-specific logs will include the thread_id, helping to track performance across threads, especially when each thread is managing its own channel lifecycle.
  • Loading branch information
zbilun authored Nov 18, 2024
1 parent 4ae04b7 commit 6a92a2a
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.Assert.fail;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
Expand Down Expand Up @@ -1698,9 +1699,28 @@ public Status getStatus() {
private Status status = Status.OK;
}


private static class ThreadResults {
private int threadFailures = 0;
private int iterationsDone = 0;
private Histogram latencies = new Histogram(4);

public int getThreadFailures() {
return threadFailures;
}

public int getIterationsDone() {
return iterationsDone;
}

public Histogram getLatencies() {
return latencies;
}
}

private SoakIterationResult performOneSoakIteration(
TestServiceGrpc.TestServiceBlockingStub soakStub, int soakRequestSize, int soakResponseSize)
throws Exception {
throws InterruptedException {
long startNs = System.nanoTime();
Status status = Status.OK;
try {
Expand All @@ -1724,71 +1744,67 @@ private SoakIterationResult performOneSoakIteration(
}

/**
* Runs large unary RPCs in a loop with configurable failure thresholds
* and channel creation behavior.
* Runs large unary RPCs in a loop with configurable failure thresholds
* and channel creation behavior.
*/
public void performSoakTest(
String serverUri,
boolean resetChannelPerIteration,
int soakIterations,
int maxFailures,
int maxAcceptablePerIterationLatencyMs,
int minTimeMsBetweenRpcs,
int overallTimeoutSeconds,
int soakRequestSize,
int soakResponseSize)
throws Exception {
int iterationsDone = 0;
int totalFailures = 0;
Histogram latencies = new Histogram(4 /* number of significant value digits */);
int soakResponseSize,
int numThreads,
Function<ManagedChannel, ManagedChannel> createNewChannel)
throws InterruptedException {
if (soakIterations % numThreads != 0) {
throw new IllegalArgumentException("soakIterations must be evenly divisible by numThreads.");
}
ManagedChannel sharedChannel = createChannel();
long startNs = System.nanoTime();
ManagedChannel soakChannel = createChannel();
TestServiceGrpc.TestServiceBlockingStub soakStub = TestServiceGrpc
.newBlockingStub(soakChannel)
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
for (int i = 0; i < soakIterations; i++) {
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
break;
}
long earliestNextStartNs = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);
if (resetChannelPerIteration) {
soakChannel.shutdownNow();
soakChannel.awaitTermination(10, TimeUnit.SECONDS);
soakChannel = createChannel();
soakStub = TestServiceGrpc
.newBlockingStub(soakChannel)
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
}
SoakIterationResult result =
performOneSoakIteration(soakStub, soakRequestSize, soakResponseSize);
SocketAddress peer = clientCallCapture
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
StringBuilder logStr = new StringBuilder(
String.format(
Locale.US,
"soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
if (!result.getStatus().equals(Status.OK)) {
totalFailures++;
logStr.append(String.format(" failed: %s", result.getStatus()));
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
totalFailures++;
logStr.append(
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
} else {
logStr.append(" succeeded");
}
System.err.println(logStr.toString());
iterationsDone++;
latencies.recordValue(result.getLatencyMs());
long remainingNs = earliestNextStartNs - System.nanoTime();
if (remainingNs > 0) {
TimeUnit.NANOSECONDS.sleep(remainingNs);
}
Thread[] threads = new Thread[numThreads];
int soakIterationsPerThread = soakIterations / numThreads;
List<ThreadResults> threadResultsList = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
threadResultsList.add(new ThreadResults());
}
for (int threadInd = 0; threadInd < numThreads; threadInd++) {
final int currentThreadInd = threadInd;
threads[threadInd] = new Thread(() -> {
try {
executeSoakTestInThread(
soakIterationsPerThread,
startNs,
minTimeMsBetweenRpcs,
soakRequestSize,
soakResponseSize,
maxAcceptablePerIterationLatencyMs,
overallTimeoutSeconds,
serverUri,
threadResultsList.get(currentThreadInd),
sharedChannel,
createNewChannel);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted: " + e.getMessage(), e);
}
});
threads[threadInd].start();
}
for (Thread thread : threads) {
thread.join();
}

int totalFailures = 0;
int iterationsDone = 0;
Histogram latencies = new Histogram(4);
for (ThreadResults threadResult :threadResultsList) {
totalFailures += threadResult.getThreadFailures();
iterationsDone += threadResult.getIterationsDone();
latencies.add(threadResult.getLatencies());
}
soakChannel.shutdownNow();
soakChannel.awaitTermination(10, TimeUnit.SECONDS);
System.err.println(
String.format(
Locale.US,
Expand Down Expand Up @@ -1820,6 +1836,77 @@ public void performSoakTest(
+ "threshold: %d.",
serverUri, totalFailures, maxFailures);
assertTrue(tooManyFailuresErrorMessage, totalFailures <= maxFailures);
shutdownChannel(sharedChannel);
}

private void shutdownChannel(ManagedChannel channel) throws InterruptedException {
if (channel != null) {
channel.shutdownNow();
channel.awaitTermination(10, TimeUnit.SECONDS);
}
}

protected ManagedChannel createNewChannel(ManagedChannel currentChannel) {
try {
shutdownChannel(currentChannel);
return createChannel();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while creating a new channel", e);
}
}

private void executeSoakTestInThread(
int soakIterationsPerThread,
long startNs,
int minTimeMsBetweenRpcs,
int soakRequestSize,
int soakResponseSize,
int maxAcceptablePerIterationLatencyMs,
int overallTimeoutSeconds,
String serverUri,
ThreadResults threadResults,
ManagedChannel sharedChannel,
Function<ManagedChannel, ManagedChannel> maybeCreateChannel) throws InterruptedException {
ManagedChannel currentChannel = sharedChannel;
for (int i = 0; i < soakIterationsPerThread; i++) {
if (System.nanoTime() - startNs >= TimeUnit.SECONDS.toNanos(overallTimeoutSeconds)) {
break;
}
long earliestNextStartNs = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(minTimeMsBetweenRpcs);

currentChannel = maybeCreateChannel.apply(currentChannel);
TestServiceGrpc.TestServiceBlockingStub currentStub = TestServiceGrpc
.newBlockingStub(currentChannel)
.withInterceptors(recordClientCallInterceptor(clientCallCapture));
SoakIterationResult result = performOneSoakIteration(currentStub,
soakRequestSize, soakResponseSize);
SocketAddress peer = clientCallCapture
.get().getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
StringBuilder logStr = new StringBuilder(
String.format(
Locale.US,
"thread id: %d soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s",
Thread.currentThread().getId(),
i, result.getLatencyMs(), peer != null ? peer.toString() : "null", serverUri));
if (!result.getStatus().equals(Status.OK)) {
threadResults.threadFailures++;
logStr.append(String.format(" failed: %s", result.getStatus()));
} else if (result.getLatencyMs() > maxAcceptablePerIterationLatencyMs) {
threadResults.threadFailures++;
logStr.append(
" exceeds max acceptable latency: " + maxAcceptablePerIterationLatencyMs);
} else {
logStr.append(" succeeded");
}
System.err.println(logStr.toString());
threadResults.iterationsDone++;
threadResults.getLatencies().recordValue(result.getLatencyMs());
long remainingNs = earliestNextStartNs - System.nanoTime();
if (remainingNs > 0) {
TimeUnit.NANOSECONDS.sleep(remainingNs);
}
}
}

private static void assertSuccess(StreamRecorder<?> recorder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public static void main(String[] args) throws Exception {
soakIterations * soakPerIterationMaxAcceptableLatencyMs / 1000;
private int soakRequestSize = 271828;
private int soakResponseSize = 314159;
private int numThreads = 1;
private String additionalMetadata = "";
private static LoadBalancerProvider customBackendMetricsLoadBalancerProvider;

Expand Down Expand Up @@ -214,6 +215,8 @@ void parseArgs(String[] args) throws Exception {
soakRequestSize = Integer.parseInt(value);
} else if ("soak_response_size".equals(key)) {
soakResponseSize = Integer.parseInt(value);
} else if ("soak_num_threads".equals(key)) {
numThreads = Integer.parseInt(value);
} else if ("additional_metadata".equals(key)) {
additionalMetadata = value;
} else {
Expand Down Expand Up @@ -290,6 +293,9 @@ void parseArgs(String[] args) throws Exception {
+ "\n --soak_response_size "
+ "\n The response size in a soak RPC. Default "
+ c.soakResponseSize
+ "\n --soak_num_threads The number of threads for concurrent execution of the "
+ "\n soak tests (rpc_soak or channel_soak). Default "
+ c.numThreads
+ "\n --additional_metadata "
+ "\n Additional metadata to send in each request, as a "
+ "\n semicolon-separated list of key:value pairs. Default "
Expand Down Expand Up @@ -519,30 +525,31 @@ private void runTest(TestCases testCase) throws Exception {
case RPC_SOAK: {
tester.performSoakTest(
serverHost,
false /* resetChannelPerIteration */,
soakIterations,
soakMaxFailures,
soakPerIterationMaxAcceptableLatencyMs,
soakMinTimeMsBetweenRpcs,
soakOverallTimeoutSeconds,
soakRequestSize,
soakResponseSize);
soakResponseSize,
numThreads,
(currentChannel) -> currentChannel);
break;
}

case CHANNEL_SOAK: {
tester.performSoakTest(
serverHost,
true /* resetChannelPerIteration */,
soakIterations,
soakMaxFailures,
soakPerIterationMaxAcceptableLatencyMs,
soakMinTimeMsBetweenRpcs,
soakOverallTimeoutSeconds,
soakRequestSize,
soakResponseSize);
soakResponseSize,
numThreads,
(currentChannel) -> tester.createNewChannel(currentChannel));
break;

}

case ORCA_PER_RPC: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,29 +245,41 @@ public boolean runSucceeded() {
/**
* Run the intended soak test.
*/
public void run() {
boolean resetChannelPerIteration;
switch (testCase) {
case "rpc_soak":
resetChannelPerIteration = false;
break;
case "channel_soak":
resetChannelPerIteration = true;
break;
default:
throw new RuntimeException("invalid testcase: " + testCase);
}
public void run() throws InterruptedException {
try {
performSoakTest(
serverUri,
resetChannelPerIteration,
soakIterations,
soakMaxFailures,
soakPerIterationMaxAcceptableLatencyMs,
soakMinTimeMsBetweenRpcs,
soakOverallTimeoutSeconds,
soakRequestSize,
soakResponseSize);
switch (testCase) {
case "rpc_soak": {
performSoakTest(
serverUri,
soakIterations,
soakMaxFailures,
soakPerIterationMaxAcceptableLatencyMs,
soakMinTimeMsBetweenRpcs,
soakOverallTimeoutSeconds,
soakRequestSize,
soakResponseSize,
1,
(currentChannel) -> currentChannel);
}
break;
case "channel_soak": {
performSoakTest(
serverUri,
soakIterations,
soakMaxFailures,
soakPerIterationMaxAcceptableLatencyMs,
soakMinTimeMsBetweenRpcs,
soakOverallTimeoutSeconds,
soakRequestSize,
soakResponseSize,
1,
(currentChannel) -> createNewChannel(currentChannel));
}
break;
default:
throw new RuntimeException("invalid testcase: " + testCase);
}

logger.info("Test case: " + testCase + " done for server: " + serverUri);
runSucceeded = true;
} catch (Exception e) {
Expand Down Expand Up @@ -295,11 +307,18 @@ protected ManagedChannelBuilder<?> createChannelBuilder() {
}
}

private void run() throws Exception {
private void run() throws InterruptedException {
logger.info("Begin test case: " + testCase);
ArrayList<Thread> threads = new ArrayList<>();
for (InnerClient c : clients) {
Thread t = new Thread(c::run);
Thread t = new Thread(() -> {
try {
c.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Properly re-interrupt the thread
throw new RuntimeException("Thread was interrupted during execution", e);
}
});
t.start();
threads.add(t);
}
Expand Down

0 comments on commit 6a92a2a

Please sign in to comment.