Skip to content

Commit

Permalink
Automatically reconnect the Agent. Reauthenticate the agent on reques…
Browse files Browse the repository at this point in the history
…t. Alert on duplicated Agent. Improve logging output. Resolves #3. Resolves #5
  • Loading branch information
jordeu committed Nov 4, 2021
1 parent c5d03f0 commit f6fe7d7
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 58 deletions.
2 changes: 1 addition & 1 deletion VERSION-API
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.6
1.8
12 changes: 11 additions & 1 deletion conf/jni-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,24 @@
{
"name":"com.sun.management.internal.DiagnosticCommandInfo[]"
},
{
"name":"io.seqera.tower.agent.Agent",
"methods":[{"name":"main","parameterTypes":["java.lang.String[]"] }]
},
{
"name":"java.lang.ClassLoader",
"methods":[{"name":"getPlatformClassLoader","parameterTypes":[] }]
"methods":[
{"name":"getPlatformClassLoader","parameterTypes":[] },
{"name":"loadClass","parameterTypes":["java.lang.String"] }
]
},
{
"name":"java.util.Arrays",
"methods":[{"name":"asList","parameterTypes":["java.lang.Object[]"] }]
},
{
"name":"jdk.internal.loader.ClassLoaders$PlatformClassLoader"
},
{
"name":"sun.management.VMManagementImpl",
"fields":[
Expand Down
59 changes: 59 additions & 0 deletions conf/reflect-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@
"name":"io.micronaut.http.client.netty.DefaultHttpClient$11",
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
},
{
"name":"io.micronaut.http.client.netty.DefaultHttpClient$5"
},
{
"name":"io.micronaut.http.client.netty.DefaultHttpClient$HttpClientInitializer"
},
Expand All @@ -339,6 +342,14 @@
"name":"io.micronaut.http.client.netty.ssl.$NettyClientSslBuilder$Definition$Reference",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"io.micronaut.http.client.netty.websocket.NettyWebSocketClientHandler",
"methods":[
{"name":"channelActive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] },
{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] },
{"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }
]
},
{
"name":"io.micronaut.http.codec.$CodecConfiguration$Definition$Reference",
"methods":[{"name":"<init>","parameterTypes":[] }]
Expand Down Expand Up @@ -1233,6 +1244,10 @@
{"name":"read","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }
]
},
{
"name":"io.netty.channel.ChannelHandlerAdapter",
"methods":[{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }]
},
{
"name":"io.netty.channel.ChannelInboundHandlerAdapter",
"methods":[
Expand All @@ -1254,6 +1269,18 @@
{"name":"exceptionCaught","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Throwable"] }
]
},
{
"name":"io.netty.channel.ChannelOutboundHandlerAdapter",
"methods":[
{"name":"bind","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.net.SocketAddress","io.netty.channel.ChannelPromise"] },
{"name":"close","parameterTypes":["io.netty.channel.ChannelHandlerContext","io.netty.channel.ChannelPromise"] },
{"name":"connect","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.net.SocketAddress","java.net.SocketAddress","io.netty.channel.ChannelPromise"] },
{"name":"deregister","parameterTypes":["io.netty.channel.ChannelHandlerContext","io.netty.channel.ChannelPromise"] },
{"name":"disconnect","parameterTypes":["io.netty.channel.ChannelHandlerContext","io.netty.channel.ChannelPromise"] },
{"name":"flush","parameterTypes":["io.netty.channel.ChannelHandlerContext"] },
{"name":"read","parameterTypes":["io.netty.channel.ChannelHandlerContext"] }
]
},
{
"name":"io.netty.channel.CombinedChannelDuplexHandler",
"methods":[
Expand Down Expand Up @@ -1319,7 +1346,9 @@
{
"name":"io.netty.handler.codec.ByteToMessageDecoder",
"methods":[
{"name":"channelInactive","parameterTypes":["io.netty.channel.ChannelHandlerContext"] },
{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] },
{"name":"channelReadComplete","parameterTypes":["io.netty.channel.ChannelHandlerContext"] },
{"name":"userEventTriggered","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }
]
},
Expand All @@ -1334,6 +1363,10 @@
"name":"io.netty.handler.codec.MessageToMessageDecoder",
"methods":[{"name":"channelRead","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object"] }]
},
{
"name":"io.netty.handler.codec.MessageToMessageEncoder",
"methods":[{"name":"write","parameterTypes":["io.netty.channel.ChannelHandlerContext","java.lang.Object","io.netty.channel.ChannelPromise"] }]
},
{
"name":"io.netty.handler.codec.http.HttpClientCodec"
},
Expand All @@ -1347,6 +1380,12 @@
{
"name":"io.netty.handler.codec.http.HttpContentDecompressor"
},
{
"name":"io.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder"
},
{
"name":"io.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder"
},
{
"name":"io.netty.handler.ssl.SslHandler",
"methods":[
Expand Down Expand Up @@ -1418,6 +1457,10 @@
{
"name":"io.reactivex.Single"
},
{
"name":"io.seqera.tower.agent.$Agent$Definition$Reference",
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"io.seqera.tower.agent.$AgentClientSocket$Intercepted$Definition$Reference",
"methods":[{"name":"<init>","parameterTypes":[] }]
Expand All @@ -1428,6 +1471,18 @@
"allDeclaredMethods":true,
"methods":[{"name":"<init>","parameterTypes":[] }]
},
{
"name":"io.seqera.tower.agent.exchange.AgentMessage",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"io.seqera.tower.agent.exchange.HeartbeatMessage",
"allDeclaredFields":true,
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"io.seqera.tower.agent.model.ServiceInfo",
"allDeclaredFields":true,
Expand All @@ -1449,6 +1504,10 @@
{
"name":"java.io.FilePermission"
},
{
"name":"java.io.Serializable",
"allDeclaredMethods":true
},
{
"name":"java.lang.Boolean",
"fields":[{"name":"TYPE"}]
Expand Down
101 changes: 82 additions & 19 deletions src/main/java/io/seqera/tower/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.micronaut.rxjava2.http.client.websockets.RxWebSocketClient;
import io.micronaut.scheduling.TaskScheduler;
import io.micronaut.websocket.exceptions.WebSocketClientException;
import io.seqera.tower.agent.exchange.CommandRequest;
import io.seqera.tower.agent.exchange.CommandResponse;
import io.seqera.tower.agent.exchange.HeartbeatMessage;
import io.seqera.tower.agent.model.ServiceInfoResponse;
import io.seqera.tower.agent.utils.VersionProvider;
Expand All @@ -28,12 +30,16 @@
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.module.ModuleDescriptor;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Command(
name = "tw-agent",
Expand All @@ -51,6 +57,7 @@
optionListHeading = "%nOptions:%n"
)
public class Agent implements Runnable {
public static final int HEARTBEAT_MINUTES_INTERVAL = 1;
private static Logger logger = LoggerFactory.getLogger(Agent.class);

@Parameters(index = "0", paramLabel = "AGENT_CONNECTION_ID", description = "Agent connection ID to identify this agent", arity = "1")
Expand All @@ -75,27 +82,72 @@ public static void main(String[] args) throws Exception {

@Override
public void run() {
try {
checkTower();
connectTower();
sendPeriodicHeartbeat();
} catch (Exception e) {
logger.error(e.getMessage());
System.exit(-1);
}
}

checkTower();

final URI uri;
/**
* Connect the agent to Tower using websockets
*/
private void connectTower() {
try {
uri = new URI(url + "/agent/" + agentKey + "/connect");
final URI uri = new URI(url + "/agent/" + agentKey + "/connect");
final MutableHttpRequest<?> req = HttpRequest.GET(uri).bearerAuth(token);
final RxWebSocketClient webSocketClient = ctx.getBean(RxWebSocketClient.class);
agentClient = webSocketClient.connect(AgentClientSocket.class, req).blockingFirst();
logger.info("Connected");

sendPeriodicHeartbeat();
agentClient = webSocketClient.connect(AgentClientSocket.class, req)
.timeout(5, TimeUnit.SECONDS)
.blockingFirst();
agentClient.setConnectCallback(this::connectTower);
agentClient.setCommandRequestCallback(this::execCommand);
} catch (URISyntaxException e) {
logger.error(String.format("Invalid URI: %s/agent/%s/connect - %s", url, agentKey, e.getMessage()));
System.exit(-1);
logger.error("Invalid URI: {}/agent/{}/connect - {}", url, agentKey, e.getMessage());
} catch (WebSocketClientException e) {
logger.error(String.format("Connection error - %s", e.getMessage()));
System.exit(-1);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
logger.error("Connection error - {}", e.getMessage());
} catch (Exception e) {
if (e.getCause() instanceof TimeoutException) {
logger.error("Connection timeout [trying to reconnect in {} minutes]", HEARTBEAT_MINUTES_INTERVAL);
} else {
logger.error("Unknown problem");
e.printStackTrace();
}
}
}

/**
* Executes a command request and sends the response back to Tower
*
* @param message Command request message
*/
private void execCommand(CommandRequest message) {
try {
logger.info("Execute: {}", message.getCommand());
Process process = new ProcessBuilder().command("sh", "-c", message.getCommand()).start();
int exitStatus = process.waitFor();
// read the stdout
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder builder = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null) {
builder.append(line);
builder.append("\n");
}
String result = builder.toString();

// send result
CommandResponse response = new CommandResponse(message.getId(), result.getBytes(), exitStatus);
logger.info("Sending response --> {}", response);
agentClient.send(response);
logger.info("Response sent");
} catch (Exception e) {
// send result
CommandResponse response = new CommandResponse(message.getId(), e.getMessage().getBytes(), -1);
agentClient.send(response);
}
}

Expand All @@ -104,10 +156,15 @@ public void run() {
*/
private void sendPeriodicHeartbeat() {
TaskScheduler scheduler = ctx.getBean(TaskScheduler.class);

scheduler.scheduleAtFixedRate(null, Duration.ofMinutes(1), () -> {
System.out.println("Sending heartbeat");
agentClient.send(new HeartbeatMessage());
Duration interval = Duration.ofMinutes(HEARTBEAT_MINUTES_INTERVAL);
scheduler.scheduleAtFixedRate(interval, interval, () -> {
if (agentClient.isOpen()) {
logger.info("Sending heartbeat");
agentClient.send(new HeartbeatMessage());
} else {
logger.info("Trying to reconnect");
connectTower();
}
});
}

Expand Down Expand Up @@ -150,6 +207,12 @@ private void checkTower() {
}
}

/**
* Minimum API required version
*
* @return Required API version
* @throws IOException On reading properties file
*/
private String getVersionApi() throws IOException {
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("/META-INF/build-info.properties"));
Expand Down
Loading

0 comments on commit f6fe7d7

Please sign in to comment.