Skip to content

Commit

Permalink
[modbus] Check and disconnect idle connections without transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Sami Salonen <ssalonen@gmail.com>
  • Loading branch information
ssalonen committed Jun 23, 2020
1 parent b782ebe commit 9a65a4a
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.commons.pool2.impl.EvictionPolicy;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.io.transport.modbus.internal.pooling.ModbusSlaveConnectionEvictionPolicy;

import net.wimpi.modbus.net.ModbusSlaveConnection;

Expand Down Expand Up @@ -61,6 +62,10 @@ public ModbusPoolConfig() {

// disable JMX
setJmxEnabled(false);

// Evict idle connections every 10 seconds
setEvictionPolicy(new ModbusSlaveConnectionEvictionPolicy());
setTimeBetweenEvictionRunsMillis(10000);
}

public EvictionPolicy<ModbusSlaveConnection> getEvictionPolicy() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2010-2020 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.io.transport.modbus.internal.pooling;

import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.EvictionConfig;
import org.apache.commons.pool2.impl.EvictionPolicy;
import org.openhab.io.transport.modbus.internal.pooling.ModbusSlaveConnectionFactoryImpl.PooledConnection;

import net.wimpi.modbus.net.ModbusSlaveConnection;

/**
* Eviction policy, i.e. policy for deciding when to close idle, unused connections.
*
* Connections are evicted according to {@link PooledConnection} maybeResetConnection method.
*
* @author Sami Salonen - Initial contribution
*/
public class ModbusSlaveConnectionEvictionPolicy implements EvictionPolicy<ModbusSlaveConnection> {

@Override
public boolean evict(EvictionConfig config, PooledObject<ModbusSlaveConnection> underTest, int idleCount) {
return ((PooledConnection) underTest).maybeResetConnection("evict");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@
public class ModbusSlaveConnectionFactoryImpl
extends BaseKeyedPooledObjectFactory<ModbusSlaveEndpoint, ModbusSlaveConnection> {

private static class PooledConnection extends DefaultPooledObject<ModbusSlaveConnection> {
class PooledConnection extends DefaultPooledObject<ModbusSlaveConnection> {

private long lastConnected;
private volatile long lastConnected;
private volatile @Nullable ModbusSlaveEndpoint endpoint;

public PooledConnection(ModbusSlaveConnection object) {
super(object);
Expand All @@ -72,9 +73,42 @@ public long getLastConnected() {
return lastConnected;
}

public void setLastConnected(long lastConnected) {
public void setLastConnected(ModbusSlaveEndpoint endpoint, long lastConnected) {
this.endpoint = endpoint;
this.lastConnected = lastConnected;
}

public boolean maybeResetConnection(String activityName) {
long localLastConnected = lastConnected;

ModbusSlaveConnection connection = getObject();

@Nullable
EndpointPoolConfiguration configuration = endpointPoolConfigs.get(endpoint);
long reconnectAfterMillis = configuration == null ? 0 : configuration.getReconnectAfterMillis();
long connectionAgeMillis = System.currentTimeMillis() - localLastConnected;
long disconnectIfConnectedBeforeMillis = disconnectIfConnectedBefore.getOrDefault(endpoint, -1L);
boolean disconnectSinceTooOldConnection = disconnectIfConnectedBeforeMillis < 0L ? false
: localLastConnected <= disconnectIfConnectedBeforeMillis;
boolean shouldBeDisconnected = (reconnectAfterMillis == 0
|| (reconnectAfterMillis > 0 && connectionAgeMillis > reconnectAfterMillis)
|| disconnectSinceTooOldConnection);
if (shouldBeDisconnected) {
logger.trace(
"({}) Connection {} (endpoint {}) age {}ms is over the reconnectAfterMillis={}ms limit or has been connection time ({}) is after the \"disconnectBeforeConnectedMillis\"={} -> disconnecting.",
activityName, connection, endpoint, connectionAgeMillis, reconnectAfterMillis,
localLastConnected, disconnectIfConnectedBeforeMillis);
connection.resetConnection();
return true;
} else {
logger.trace(
"({}) Connection {} (endpoint {}) age ({}ms) is below the reconnectAfterMillis ({}ms) limit and connection time ({}) is after the \"disconnectBeforeConnectedMillis\"={}. Keep the connection open.",
activityName, connection, endpoint, connectionAgeMillis, reconnectAfterMillis,
localLastConnected, disconnectIfConnectedBeforeMillis);
return false;
}
}

}

private final Logger logger = LoggerFactory.getLogger(ModbusSlaveConnectionFactoryImpl.class);
Expand Down Expand Up @@ -145,9 +179,6 @@ public void destroyObject(ModbusSlaveEndpoint endpoint, @Nullable PooledObject<M
}
logger.trace("destroyObject for connection {} and endpoint {} -> closing the connection", obj.getObject(),
endpoint);
if (obj.getObject() == null) {
return;
}
obj.getObject().resetConnection();
}

Expand All @@ -158,9 +189,6 @@ public void activateObject(ModbusSlaveEndpoint endpoint, @Nullable PooledObject<
return;
}
ModbusSlaveConnection connection = obj.getObject();
if (connection == null) {
return;
}
try {
@Nullable
EndpointPoolConfiguration config = getEndpointPoolConfiguration(endpoint);
Expand Down Expand Up @@ -191,38 +219,15 @@ public void passivateObject(ModbusSlaveEndpoint endpoint, @Nullable PooledObject
return;
}
ModbusSlaveConnection connection = obj.getObject();
if (connection == null) {
return;
}
logger.trace("Passivating connection {} for endpoint {}...", connection, endpoint);
lastPassivateMillis.put(endpoint, System.currentTimeMillis());
@Nullable
EndpointPoolConfiguration configuration = endpointPoolConfigs.get(endpoint);
long connected = ((PooledConnection) obj).getLastConnected();
long reconnectAfterMillis = configuration == null ? 0 : configuration.getReconnectAfterMillis();
long connectionAgeMillis = System.currentTimeMillis() - ((PooledConnection) obj).getLastConnected();
long disconnectIfConnectedBeforeMillis = disconnectIfConnectedBefore.getOrDefault(endpoint, -1L);
boolean disconnectSinceTooOldConnection = disconnectIfConnectedBeforeMillis < 0L ? false
: connected <= disconnectIfConnectedBeforeMillis;
if (reconnectAfterMillis == 0 || (reconnectAfterMillis > 0 && connectionAgeMillis > reconnectAfterMillis)
|| disconnectSinceTooOldConnection) {
logger.trace(
"(passivate) Connection {} (endpoint {}) age {}ms is over the reconnectAfterMillis={}ms limit or has been connection time ({}) is after the \"disconnectBeforeConnectedMillis\"={} -> disconnecting.",
connection, endpoint, connectionAgeMillis, reconnectAfterMillis, connected,
disconnectIfConnectedBeforeMillis);
connection.resetConnection();
} else {
logger.trace(
"(passivate) Connection {} (endpoint {}) age ({}ms) is below the reconnectAfterMillis ({}ms) limit and connection time ({}) is after the \"disconnectBeforeConnectedMillis\"={}. Keep the connection open.",
connection, endpoint, connectionAgeMillis, reconnectAfterMillis, connected,
disconnectIfConnectedBeforeMillis);
}
((PooledConnection) obj).maybeResetConnection("passivate");
logger.trace("...Passivated connection {} for endpoint {}", obj.getObject(), endpoint);
}

@Override
public boolean validateObject(ModbusSlaveEndpoint key, @Nullable PooledObject<ModbusSlaveConnection> p) {
boolean valid = p != null && p.getObject() != null && p.getObject().isConnected();
boolean valid = p != null && p.getObject().isConnected();
logger.trace("Validating endpoint {} connection {} -> {}", key, p.getObject(), valid);
return valid;
}
Expand Down Expand Up @@ -291,7 +296,7 @@ private void tryConnect(ModbusSlaveEndpoint endpoint, PooledObject<ModbusSlaveCo
}
connection.connect();
long curTime = System.currentTimeMillis();
((PooledConnection) obj).setLastConnected(curTime);
((PooledConnection) obj).setLastConnected(endpoint, curTime);
lastConnectMillis.put(endpoint, curTime);
break;
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.junit.Assert.*;
import static org.junit.Assume.*;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.net.InetAddress;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.lang.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.openhab.io.transport.modbus.BitArray;
import org.openhab.io.transport.modbus.ModbusCommunicationInterface;
Expand Down Expand Up @@ -66,6 +68,14 @@ public class SmokeTest extends IntegrationTestSupport {
private static final int DISCRETE_EVERY_N_TRUE = 3;
private static final int HOLDING_REGISTER_MULTIPLIER = 1;
private static final int INPUT_REGISTER_MULTIPLIER = 10;
private static final SpyingSocketFactory socketSpy = new SpyingSocketFactory();
static {
try {
Socket.setSocketImplFactory(socketSpy);
} catch (IOException e) {
fail("Could not install socket spy in SmokeTest");
}
}

/**
* Whether tests are run in Continuous Integration environment, i.e. Jenkins or Travis CI
Expand Down Expand Up @@ -120,6 +130,11 @@ private void testInputValues(ModbusRegisterArray registers, int offsetInRegister
}
}

@Before
public void setUpSocketSpy() throws IOException {
socketSpy.sockets.clear();
}

/**
* Test handling of slave error responses. In this case, error code = 2, illegal data address, since no data.
*
Expand Down Expand Up @@ -805,10 +820,6 @@ public void testConnectionCloseAfterLastCommunicationInterfaceClosed() throws Il
assumeTrue("Connection closing test supported only with TCP slaves",
endpoint instanceof ModbusTCPSlaveEndpoint);

// Starts recording sockets created during the test
SpyingSocketFactory socketSpy = new SpyingSocketFactory();
Socket.setSocketImplFactory(socketSpy);

// Generate server data
generateData();

Expand Down Expand Up @@ -863,6 +874,50 @@ public void testConnectionCloseAfterLastCommunicationInterfaceClosed() throws Il
});
}

@Test
public void testConnectionCloseAfterOneOffPoll() throws IllegalArgumentException, Exception {
assumeFalse("Running in CI! Will not test timing-sensitive details", isRunningInCI());
ModbusSlaveEndpoint endpoint = getEndpoint();
assumeTrue("Connection closing test supported only with TCP slaves",
endpoint instanceof ModbusTCPSlaveEndpoint);

// Generate server data
generateData();

EndpointPoolConfiguration config = new EndpointPoolConfiguration();
config.setReconnectAfterMillis(2_000);

// 1. capture open connections at this point
long openSocketsBefore = getNumberOfOpenClients(socketSpy);
assertThat(openSocketsBefore, is(equalTo(0L)));

// 2. make poll, binding opens the tcp connection
try (ModbusCommunicationInterface comms = modbusManager.newModbusCommunicationInterface(endpoint, config)) {
{
CountDownLatch latch = new CountDownLatch(1);
comms.submitOneTimePoll(new ModbusReadRequestBlueprint(1, ModbusReadFunctionCode.READ_COILS, 0, 1, 1),
response -> {
latch.countDown();
});
assertTrue(latch.await(60, TimeUnit.SECONDS));
}
// Right after the poll we should have one connection open
waitForAssert(() -> {
// 3. ensure one open connection
long openSocketsAfter = getNumberOfOpenClients(socketSpy);
assertThat(openSocketsAfter, is(equalTo(1L)));
});
// 4. Connection should close itself by the commons pool eviction policy (checking for old idle connection
// every now and then)
waitForAssert(() -> {
// 3. ensure one open connection
long openSocketsAfter = getNumberOfOpenClients(socketSpy);
assertThat(openSocketsAfter, is(equalTo(0L)));
}, 60_000, 50);

}
}

private long getNumberOfOpenClients(SpyingSocketFactory socketSpy) {
final InetAddress testServerAddress;
try {
Expand All @@ -884,7 +939,7 @@ private long getNumberOfOpenClients(SpyingSocketFactory socketSpy) {
* @author Sami Salonen
*
*/
private class SpyingSocketFactory implements SocketImplFactory {
private static class SpyingSocketFactory implements SocketImplFactory {

Queue<SocketImpl> sockets = new ConcurrentLinkedQueue<SocketImpl>();

Expand Down

0 comments on commit 9a65a4a

Please sign in to comment.