Skip to content

Commit

Permalink
Fix Agent freeze on long command responses
Browse files Browse the repository at this point in the history
  • Loading branch information
jordeu committed Jan 13, 2022
1 parent b17a52c commit 28e90f3
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
43 changes: 28 additions & 15 deletions src/main/java/io/seqera/tower/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package io.seqera.tower.agent;

import com.sun.security.auth.module.UnixSystem;
import io.micronaut.configuration.picocli.PicocliRunner;
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.HttpRequest;
Expand Down Expand Up @@ -64,6 +63,7 @@
)
public class Agent implements Runnable {
public static final int HEARTBEAT_MINUTES_INTERVAL = 1;
public static final int MAX_WEBSOCKET_PAYLOAD_SIZE = 5242880;
private static final Logger logger = LoggerFactory.getLogger(Agent.class);

@Parameters(index = "0", paramLabel = "AGENT_CONNECTION_ID", description = "Agent connection ID to identify this agent.", arity = "1")
Expand Down Expand Up @@ -125,15 +125,18 @@ private void connectTower() {
sendInfoMessage();
} catch (URISyntaxException e) {
logger.error("Invalid URI: {}/agent/{}/connect - {}", url, agentKey, e.getMessage());
System.exit(1);
} catch (WebSocketClientException e) {
logger.error("Connection error - {}", e.getMessage());
System.exit(1);
} catch (Exception e) {
if (e.getCause() instanceof TimeoutException) {
logger.error("Connection timeout [trying to reconnect in {} minutes]", HEARTBEAT_MINUTES_INTERVAL);
} else {
logger.error("Unknown problem");
e.printStackTrace();
}
System.exit(1);
}
}

Expand All @@ -143,11 +146,14 @@ private void connectTower() {
* @param message Command request message
*/
private void execCommand(CommandRequest message) {
CommandResponse response;

try {
logger.info("Executing request {}", message.getId());
logger.trace("REQUEST: {}", message);
Process process = new ProcessBuilder().command("sh", "-c", message.getCommand()).start();
int exitStatus = process.waitFor();
Process process = new ProcessBuilder()
.command("sh", "-c", message.getCommand())
.redirectErrorStream(true)
.start();

// read the stdout
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder builder = new StringBuilder();
Expand All @@ -156,18 +162,25 @@ private void execCommand(CommandRequest message) {
builder.append(line);
builder.append("\n");
}
String result = builder.toString();

// send result
CommandResponse response = new CommandResponse(message.getId(), result.getBytes(), exitStatus);
logger.info("Sending response {}'", response.getId());
logger.trace("RESPONSE: {}", response);
agentClient.send(response);
} catch (Exception e) {
// send result
CommandResponse response = new CommandResponse(message.getId(), e.getMessage().getBytes(), 1);
agentClient.send(response);
// truncate response to fit the maximum websocket size
if (builder.length() > (MAX_WEBSOCKET_PAYLOAD_SIZE - 100)) {
logger.warn("Response to [{}] '{}' was truncated", message.getId(), message.getCommand());
builder.setLength(MAX_WEBSOCKET_PAYLOAD_SIZE - 100);
}

String result = builder.toString();
process.waitFor(10, TimeUnit.SECONDS);
int exitStatus = process.exitValue();
process.destroy();
response = new CommandResponse(message.getId(), result.getBytes(), exitStatus);
} catch (Throwable e) {
response = new CommandResponse(message.getId(), e.getMessage().getBytes(), 1);
}
// send result
logger.info("Sending response {}'", response.getId());
logger.trace("RESPONSE: {}", response);
agentClient.sendAsync(response);
}

/**
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/io/seqera/tower/agent/AgentClientSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Future;
import java.util.function.Consumer;

/**
Expand All @@ -51,7 +52,7 @@ void onOpen(WebSocketSession session) {
logger.debug("Websocket session URL: {}", session.getRequestURI());
}

@OnMessage
@OnMessage(maxPayloadLength=Agent.MAX_WEBSOCKET_PAYLOAD_SIZE)
void onMessage(AgentMessage message) {
if (message instanceof HeartbeatMessage) {
logger.info("Received heartbeat");
Expand Down Expand Up @@ -92,6 +93,8 @@ void onClose(CloseReason reason) {

abstract void send(AgentMessage message);

public abstract Future<String> sendAsync(AgentMessage message);

public boolean isOpen() {
return session.isOpen();
}
Expand Down

0 comments on commit 28e90f3

Please sign in to comment.