From 9a65a4abe2e9845b351e5adcc569f6a2bb50e2c7 Mon Sep 17 00:00:00 2001 From: Sami Salonen Date: Tue, 23 Jun 2020 20:26:54 +0300 Subject: [PATCH] [modbus] Check and disconnect idle connections without transactions Signed-off-by: Sami Salonen --- .../modbus/internal/ModbusPoolConfig.java | 5 ++ .../ModbusSlaveConnectionEvictionPolicy.java | 36 +++++++++ .../ModbusSlaveConnectionFactoryImpl.java | 75 ++++++++++--------- .../io/transport/modbus/test/SmokeTest.java | 65 ++++++++++++++-- 4 files changed, 141 insertions(+), 40 deletions(-) create mode 100644 bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/pooling/ModbusSlaveConnectionEvictionPolicy.java diff --git a/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/ModbusPoolConfig.java b/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/ModbusPoolConfig.java index d15c1dc3de0b9..f8d4befde3382 100644 --- a/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/ModbusPoolConfig.java +++ b/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/ModbusPoolConfig.java @@ -18,6 +18,7 @@ import org.apache.commons.pool2.impl.EvictionPolicy; import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.openhab.io.transport.modbus.internal.pooling.ModbusSlaveConnectionEvictionPolicy; import net.wimpi.modbus.net.ModbusSlaveConnection; @@ -61,6 +62,10 @@ public ModbusPoolConfig() { // disable JMX setJmxEnabled(false); + + // Evict idle connections every 10 seconds + setEvictionPolicy(new ModbusSlaveConnectionEvictionPolicy()); + setTimeBetweenEvictionRunsMillis(10000); } public EvictionPolicy getEvictionPolicy() { diff --git a/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/pooling/ModbusSlaveConnectionEvictionPolicy.java b/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/pooling/ModbusSlaveConnectionEvictionPolicy.java new file mode 100644 index 0000000000000..e543e42af2f54 --- /dev/null +++ b/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/pooling/ModbusSlaveConnectionEvictionPolicy.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2010-2020 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.io.transport.modbus.internal.pooling; + +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.EvictionConfig; +import org.apache.commons.pool2.impl.EvictionPolicy; +import org.openhab.io.transport.modbus.internal.pooling.ModbusSlaveConnectionFactoryImpl.PooledConnection; + +import net.wimpi.modbus.net.ModbusSlaveConnection; + +/** + * Eviction policy, i.e. policy for deciding when to close idle, unused connections. + * + * Connections are evicted according to {@link PooledConnection} maybeResetConnection method. + * + * @author Sami Salonen - Initial contribution + */ +public class ModbusSlaveConnectionEvictionPolicy implements EvictionPolicy { + + @Override + public boolean evict(EvictionConfig config, PooledObject underTest, int idleCount) { + return ((PooledConnection) underTest).maybeResetConnection("evict"); + } + +} diff --git a/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/pooling/ModbusSlaveConnectionFactoryImpl.java b/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/pooling/ModbusSlaveConnectionFactoryImpl.java index b897ee4da1a8e..dcbfb053d2395 100644 --- a/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/pooling/ModbusSlaveConnectionFactoryImpl.java +++ b/bundles/org.openhab.io.transport.modbus/src/main/java/org/openhab/io/transport/modbus/internal/pooling/ModbusSlaveConnectionFactoryImpl.java @@ -60,9 +60,10 @@ public class ModbusSlaveConnectionFactoryImpl extends BaseKeyedPooledObjectFactory { - private static class PooledConnection extends DefaultPooledObject { + class PooledConnection extends DefaultPooledObject { - private long lastConnected; + private volatile long lastConnected; + private volatile @Nullable ModbusSlaveEndpoint endpoint; public PooledConnection(ModbusSlaveConnection object) { super(object); @@ -72,9 +73,42 @@ public long getLastConnected() { return lastConnected; } - public void setLastConnected(long lastConnected) { + public void setLastConnected(ModbusSlaveEndpoint endpoint, long lastConnected) { + this.endpoint = endpoint; this.lastConnected = lastConnected; } + + public boolean maybeResetConnection(String activityName) { + long localLastConnected = lastConnected; + + ModbusSlaveConnection connection = getObject(); + + @Nullable + EndpointPoolConfiguration configuration = endpointPoolConfigs.get(endpoint); + long reconnectAfterMillis = configuration == null ? 0 : configuration.getReconnectAfterMillis(); + long connectionAgeMillis = System.currentTimeMillis() - localLastConnected; + long disconnectIfConnectedBeforeMillis = disconnectIfConnectedBefore.getOrDefault(endpoint, -1L); + boolean disconnectSinceTooOldConnection = disconnectIfConnectedBeforeMillis < 0L ? false + : localLastConnected <= disconnectIfConnectedBeforeMillis; + boolean shouldBeDisconnected = (reconnectAfterMillis == 0 + || (reconnectAfterMillis > 0 && connectionAgeMillis > reconnectAfterMillis) + || disconnectSinceTooOldConnection); + if (shouldBeDisconnected) { + logger.trace( + "({}) Connection {} (endpoint {}) age {}ms is over the reconnectAfterMillis={}ms limit or has been connection time ({}) is after the \"disconnectBeforeConnectedMillis\"={} -> disconnecting.", + activityName, connection, endpoint, connectionAgeMillis, reconnectAfterMillis, + localLastConnected, disconnectIfConnectedBeforeMillis); + connection.resetConnection(); + return true; + } else { + logger.trace( + "({}) Connection {} (endpoint {}) age ({}ms) is below the reconnectAfterMillis ({}ms) limit and connection time ({}) is after the \"disconnectBeforeConnectedMillis\"={}. Keep the connection open.", + activityName, connection, endpoint, connectionAgeMillis, reconnectAfterMillis, + localLastConnected, disconnectIfConnectedBeforeMillis); + return false; + } + } + } private final Logger logger = LoggerFactory.getLogger(ModbusSlaveConnectionFactoryImpl.class); @@ -145,9 +179,6 @@ public void destroyObject(ModbusSlaveEndpoint endpoint, @Nullable PooledObject closing the connection", obj.getObject(), endpoint); - if (obj.getObject() == null) { - return; - } obj.getObject().resetConnection(); } @@ -158,9 +189,6 @@ public void activateObject(ModbusSlaveEndpoint endpoint, @Nullable PooledObject< return; } ModbusSlaveConnection connection = obj.getObject(); - if (connection == null) { - return; - } try { @Nullable EndpointPoolConfiguration config = getEndpointPoolConfiguration(endpoint); @@ -191,38 +219,15 @@ public void passivateObject(ModbusSlaveEndpoint endpoint, @Nullable PooledObject return; } ModbusSlaveConnection connection = obj.getObject(); - if (connection == null) { - return; - } logger.trace("Passivating connection {} for endpoint {}...", connection, endpoint); lastPassivateMillis.put(endpoint, System.currentTimeMillis()); - @Nullable - EndpointPoolConfiguration configuration = endpointPoolConfigs.get(endpoint); - long connected = ((PooledConnection) obj).getLastConnected(); - long reconnectAfterMillis = configuration == null ? 0 : configuration.getReconnectAfterMillis(); - long connectionAgeMillis = System.currentTimeMillis() - ((PooledConnection) obj).getLastConnected(); - long disconnectIfConnectedBeforeMillis = disconnectIfConnectedBefore.getOrDefault(endpoint, -1L); - boolean disconnectSinceTooOldConnection = disconnectIfConnectedBeforeMillis < 0L ? false - : connected <= disconnectIfConnectedBeforeMillis; - if (reconnectAfterMillis == 0 || (reconnectAfterMillis > 0 && connectionAgeMillis > reconnectAfterMillis) - || disconnectSinceTooOldConnection) { - logger.trace( - "(passivate) Connection {} (endpoint {}) age {}ms is over the reconnectAfterMillis={}ms limit or has been connection time ({}) is after the \"disconnectBeforeConnectedMillis\"={} -> disconnecting.", - connection, endpoint, connectionAgeMillis, reconnectAfterMillis, connected, - disconnectIfConnectedBeforeMillis); - connection.resetConnection(); - } else { - logger.trace( - "(passivate) Connection {} (endpoint {}) age ({}ms) is below the reconnectAfterMillis ({}ms) limit and connection time ({}) is after the \"disconnectBeforeConnectedMillis\"={}. Keep the connection open.", - connection, endpoint, connectionAgeMillis, reconnectAfterMillis, connected, - disconnectIfConnectedBeforeMillis); - } + ((PooledConnection) obj).maybeResetConnection("passivate"); logger.trace("...Passivated connection {} for endpoint {}", obj.getObject(), endpoint); } @Override public boolean validateObject(ModbusSlaveEndpoint key, @Nullable PooledObject p) { - boolean valid = p != null && p.getObject() != null && p.getObject().isConnected(); + boolean valid = p != null && p.getObject().isConnected(); logger.trace("Validating endpoint {} connection {} -> {}", key, p.getObject(), valid); return valid; } @@ -291,7 +296,7 @@ private void tryConnect(ModbusSlaveEndpoint endpoint, PooledObject { + latch.countDown(); + }); + assertTrue(latch.await(60, TimeUnit.SECONDS)); + } + // Right after the poll we should have one connection open + waitForAssert(() -> { + // 3. ensure one open connection + long openSocketsAfter = getNumberOfOpenClients(socketSpy); + assertThat(openSocketsAfter, is(equalTo(1L))); + }); + // 4. Connection should close itself by the commons pool eviction policy (checking for old idle connection + // every now and then) + waitForAssert(() -> { + // 3. ensure one open connection + long openSocketsAfter = getNumberOfOpenClients(socketSpy); + assertThat(openSocketsAfter, is(equalTo(0L))); + }, 60_000, 50); + + } + } + private long getNumberOfOpenClients(SpyingSocketFactory socketSpy) { final InetAddress testServerAddress; try { @@ -884,7 +939,7 @@ private long getNumberOfOpenClients(SpyingSocketFactory socketSpy) { * @author Sami Salonen * */ - private class SpyingSocketFactory implements SocketImplFactory { + private static class SpyingSocketFactory implements SocketImplFactory { Queue sockets = new ConcurrentLinkedQueue();