Skip to content

Commit

Permalink
Pump process inputs before checking completion
Browse files Browse the repository at this point in the history
Process completion methods can hang because internal
buffers are full. This commit "pumps" the inputs
before calling the completion method, which should
help to finish properly the command call.

(cherry picked from commit 17afaca)
  • Loading branch information
acogoluegnes committed Feb 7, 2022
1 parent ff9bff2 commit 22ddbdb
Showing 1 changed file with 58 additions and 11 deletions.
69 changes: 58 additions & 11 deletions src/test/java/com/rabbitmq/tools/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,68 @@ public static String capture(InputStream is)
return buff.toString();
}

public static Process executeCommand(String command) throws IOException
public static ProcessState executeCommand(String command) throws IOException
{
Process pr = executeCommandProcess(command);
InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream());
InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream());

int ev = waitForExitValue(pr);
int ev = waitForExitValue(pr, inputState, errorState);
inputState.pump();
errorState.pump();
if (ev != 0) {
String stdout = capture(pr.getInputStream());
String stderr = capture(pr.getErrorStream());
throw new IOException("unexpected command exit value: " + ev +
"\ncommand: " + command + "\n" +
"\nstdout:\n" + stdout +
"\nstderr:\n" + stderr + "\n");
"\nstdout:\n" + inputState.buffer.toString() +
"\nstderr:\n" + errorState.buffer.toString() + "\n");
}
return pr;
return new ProcessState(pr, inputState, errorState);
}

static class ProcessState {

private final Process process;
private final InputStreamPumpState inputState;
private final InputStreamPumpState errorState;

ProcessState(Process process, InputStreamPumpState inputState,
InputStreamPumpState errorState) {
this.process = process;
this.inputState = inputState;
this.errorState = errorState;
}

private String output() {
return inputState.buffer.toString();
}

}

private static class InputStreamPumpState {

private final BufferedReader reader;
private final StringBuilder buffer;

private InputStreamPumpState(InputStream in) {
this.reader = new BufferedReader(new InputStreamReader(in));
this.buffer = new StringBuilder();
}

void pump() throws IOException {
String line;
while ((line = reader.readLine()) != null) {
buffer.append(line).append("\n");
}
}

}

private static int waitForExitValue(Process pr) {
private static int waitForExitValue(Process pr, InputStreamPumpState inputState,
InputStreamPumpState errorState) throws IOException {
while(true) {
try {
inputState.pump();
errorState.pump();
pr.waitFor();
break;
} catch (InterruptedException ignored) {}
Expand All @@ -83,6 +126,10 @@ private static int waitForExitValue(Process pr) {
public static Process executeCommandIgnoringErrors(String command) throws IOException
{
Process pr = executeCommandProcess(command);
InputStreamPumpState inputState = new InputStreamPumpState(pr.getInputStream());
InputStreamPumpState errorState = new InputStreamPumpState(pr.getErrorStream());
inputState.pump();
errorState.pump();
boolean exited = false;
try {
exited = pr.waitFor(30, TimeUnit.SECONDS);
Expand Down Expand Up @@ -118,7 +165,7 @@ public static boolean isRabbitMqCtlCommandAvailable(String command) throws IOExc
return exitValue == 0;
}

public static Process rabbitmqctl(String command) throws IOException {
public static ProcessState rabbitmqctl(String command) throws IOException {
return executeCommand(rabbitmqctlCommand() +
rabbitmqctlNodenameArgument() +
" " + command);
Expand All @@ -142,7 +189,7 @@ public static void clearResourceAlarm(String source) throws IOException {
rabbitmqctl("eval 'rabbit_alarm:clear_alarm({resource_limit, " + source + ", node()}).'");
}

public static Process invokeMakeTarget(String command) throws IOException {
public static ProcessState invokeMakeTarget(String command) throws IOException {
File rabbitmqctl = new File(rabbitmqctlCommand());
return executeCommand(makeCommand() +
" -C \'" + rabbitmqDir() + "\'" +
Expand Down Expand Up @@ -307,7 +354,7 @@ public String toString() {
}

public static List<ConnectionInfo> listConnections() throws IOException {
String output = capture(rabbitmqctl("list_connections -q pid peer_port client_properties").getInputStream());
String output = rabbitmqctl("list_connections -q pid peer_port client_properties").output();
// output (header line presence depends on broker version):
// pid peer_port
// <rabbit@mercurio.1.11491.0> 58713
Expand Down

0 comments on commit 22ddbdb

Please sign in to comment.