Skip to content

Commit

Permalink
Updated SSHClient to interrupt KeepAlive Thread when disconnecting (#506
Browse files Browse the repository at this point in the history
) (#752)

- Changed KeepAlive.setKeepAliveInterval() to avoid starting Thread
- Updated SSHClient.onConnect() to start KeepAlive Thread when enabled
- Updated SSHClient.disconnect() to interrupt KeepAlive Thread
- Updated KeepAliveThreadTerminationTest to verify state of KeepAlive Thread

Co-authored-by: Jeroen van Erp <jeroen@hierynomus.com>
  • Loading branch information
exceptionfactory and hierynomus authored Dec 22, 2021
1 parent ce0a7d5 commit 90099bb
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 51 deletions.
32 changes: 12 additions & 20 deletions src/main/java/net/schmizz/keepalive/KeepAlive.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import net.schmizz.sshj.transport.TransportException;
import org.slf4j.Logger;

import java.util.concurrent.TimeUnit;

public abstract class KeepAlive extends Thread {
protected final Logger log;
protected final ConnectionImpl conn;
Expand All @@ -33,50 +35,40 @@ protected KeepAlive(ConnectionImpl conn, String name) {
setDaemon(true);
}

public boolean isEnabled() {
return keepAliveInterval > 0;
}

public synchronized int getKeepAliveInterval() {
return keepAliveInterval;
}

public synchronized void setKeepAliveInterval(int keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
if (keepAliveInterval > 0 && getState() == State.NEW) {
start();
}
notify();
}

synchronized protected int getPositiveInterval()
throws InterruptedException {
while (keepAliveInterval <= 0) {
wait();
}
return keepAliveInterval;
}

@Override
public void run() {
log.debug("Starting {}, sending keep-alive every {} seconds", getClass().getSimpleName(), keepAliveInterval);
log.debug("{} Started with interval [{} seconds]", getClass().getSimpleName(), keepAliveInterval);
try {
while (!isInterrupted()) {
final int hi = getPositiveInterval();
final int interval = getKeepAliveInterval();
if (conn.getTransport().isRunning()) {
log.debug("Sending keep-alive since {} seconds elapsed", hi);
log.debug("{} Sending after interval [{} seconds]", getClass().getSimpleName(), interval);
doKeepAlive();
}
Thread.sleep(hi * 1000);
TimeUnit.SECONDS.sleep(interval);
}
} catch (InterruptedException e) {
// Interrupt signal may be catched when sleeping.
log.trace("{} Interrupted while sleeping", getClass().getSimpleName());
} catch (Exception e) {
// If we weren't interrupted, kill the transport, then this exception was unexpected.
// Else we're in shutdown-mode already, so don't forcibly kill the transport.
if (!isInterrupted()) {
conn.getTransport().die(e);
}
}

log.debug("Stopping {}", getClass().getSimpleName());

log.debug("{} Stopped", getClass().getSimpleName());
}

protected abstract void doKeepAlive() throws TransportException, ConnectionException;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/net/schmizz/sshj/SSHClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package net.schmizz.sshj;

import net.schmizz.keepalive.KeepAlive;
import net.schmizz.sshj.common.*;
import net.schmizz.sshj.connection.Connection;
import net.schmizz.sshj.connection.ConnectionException;
Expand Down Expand Up @@ -424,6 +425,7 @@ public void authGssApiWithMic(String username, LoginContext context, Oid support
@Override
public void disconnect()
throws IOException {
conn.getKeepAlive().interrupt();
for (LocalPortForwarder forwarder : forwarders) {
try {
forwarder.close();
Expand Down Expand Up @@ -791,6 +793,10 @@ protected void onConnect()
throws IOException {
super.onConnect();
trans.init(getRemoteHostname(), getRemotePort(), getInputStream(), getOutputStream());
final KeepAlive keepAliveThread = conn.getKeepAlive();
if (keepAliveThread.isEnabled()) {
keepAliveThread.start();
}
doKex();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,66 @@
*/
package com.hierynomus.sshj.keepalive;

import com.hierynomus.sshj.test.KnownFailingTests;
import com.hierynomus.sshj.test.SlowTests;
import com.hierynomus.sshj.test.SshFixture;
import net.schmizz.keepalive.KeepAlive;
import net.schmizz.keepalive.KeepAliveProvider;
import net.schmizz.sshj.DefaultConfig;
import net.schmizz.sshj.SSHClient;
import net.schmizz.sshj.userauth.UserAuthException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;

import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

public class KeepAliveThreadTerminationTest {

private static final int KEEP_ALIVE_SECONDS = 1;

private static final long STOP_SLEEP = 1500;

@Rule
public SshFixture fixture = new SshFixture();

@Test
@Category({SlowTests.class, KnownFailingTests.class})
public void shouldCorrectlyTerminateThreadOnDisconnect() throws IOException, InterruptedException {
DefaultConfig defaultConfig = new DefaultConfig();
public void shouldNotStartThreadOnSetKeepAliveInterval() {
final SSHClient sshClient = setupClient();

final KeepAlive keepAlive = sshClient.getConnection().getKeepAlive();
assertTrue(keepAlive.isDaemon());
assertFalse(keepAlive.isAlive());
assertEquals(Thread.State.NEW, keepAlive.getState());
}

@Test
public void shouldStartThreadOnConnectAndInterruptOnDisconnect() throws IOException, InterruptedException {
final SSHClient sshClient = setupClient();

final KeepAlive keepAlive = sshClient.getConnection().getKeepAlive();
assertTrue(keepAlive.isDaemon());
assertEquals(Thread.State.NEW, keepAlive.getState());

fixture.connectClient(sshClient);
assertEquals(Thread.State.TIMED_WAITING, keepAlive.getState());

assertThrows(UserAuthException.class, () -> sshClient.authPassword("bad", "credentials"));

fixture.stopClient();
Thread.sleep(STOP_SLEEP);

assertFalse(keepAlive.isAlive());
assertEquals(Thread.State.TERMINATED, keepAlive.getState());
}

private SSHClient setupClient() {
final DefaultConfig defaultConfig = new DefaultConfig();
defaultConfig.setKeepAliveProvider(KeepAliveProvider.KEEP_ALIVE);
for (int i = 0; i < 10; i++) {
SSHClient sshClient = fixture.setupClient(defaultConfig);
fixture.connectClient(sshClient);
sshClient.getConnection().getKeepAlive().setKeepAliveInterval(1);
try {
sshClient.authPassword("bad", "credentials");
fail("Should not auth.");
} catch (UserAuthException e) {
// OK
}
fixture.stopClient();
Thread.sleep(2000);
}

ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
for (long l : threadMXBean.getAllThreadIds()) {
ThreadInfo threadInfo = threadMXBean.getThreadInfo(l);
if (threadInfo.getThreadName().equals("keep-alive") && threadInfo.getThreadState() != Thread.State.TERMINATED) {
fail("Found alive keep-alive thread in state " + threadInfo.getThreadState());
}
}
final SSHClient sshClient = fixture.setupClient(defaultConfig);
sshClient.getConnection().getKeepAlive().setKeepAliveInterval(KEEP_ALIVE_SECONDS);
return sshClient;
}
}

0 comments on commit 90099bb

Please sign in to comment.