From f5e6fe0d3f12268fefa8191a5ba45d49e5bca523 Mon Sep 17 00:00:00 2001 From: rees_s Date: Wed, 13 May 2020 17:26:03 +0200 Subject: [PATCH] Fix #82 Restructure using standard class template. --- .../java/org/epics/ca/impl/ContextImpl.java | 841 ++++++++++-------- 1 file changed, 446 insertions(+), 395 deletions(-) diff --git a/src/main/java/org/epics/ca/impl/ContextImpl.java b/src/main/java/org/epics/ca/impl/ContextImpl.java index d00b927..80c1dd8 100644 --- a/src/main/java/org/epics/ca/impl/ContextImpl.java +++ b/src/main/java/org/epics/ca/impl/ContextImpl.java @@ -1,5 +1,9 @@ +/*- Package Declaration ------------------------------------------------------*/ + package org.epics.ca.impl; +/*- Imported packages --------------------------------------------------------*/ + import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -13,7 +17,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.Logger; @@ -27,17 +30,23 @@ import org.epics.ca.impl.reactor.ReactorHandler; import org.epics.ca.impl.reactor.lf.LeaderFollowersHandler; import org.epics.ca.impl.reactor.lf.LeaderFollowersThreadPool; -import org.epics.ca.impl.repeater.CARepeater; import org.epics.ca.impl.repeater.CARepeaterStarter; import org.epics.ca.impl.search.ChannelSearchManager; import org.epics.ca.util.IntHashMap; -import org.epics.ca.util.logging.ConsoleLogHandler; +import org.epics.ca.util.logging.LibraryLogManager; import org.epics.ca.util.net.InetAddressUtil; import org.epics.ca.util.sync.NamedLockPattern; + +/*- Interface Declaration ----------------------------------------------------*/ +/*- Class Declaration --------------------------------------------------------*/ + public class ContextImpl implements AutoCloseable, Constants { +/*- Public attributes --------------------------------------------------------*/ +/*- Private attributes -------------------------------------------------------*/ + static { // force only IPv4 sockets, since EPICS does not work with IPv6 sockets @@ -47,7 +56,7 @@ public class ContextImpl implements AutoCloseable, Constants /** * Context logger. */ - private static final Logger logger = Logger.getLogger( ContextImpl.class.getName() ); + private static final Logger logger = LibraryLogManager.getLogger( ContextImpl.class ); /** * Debug level, turns on low-level debugging. @@ -162,7 +171,7 @@ public class ContextImpl implements AutoCloseable, Constants /** * Map of channels (keys are CIDs). */ - protected final IntHashMap> channelsByCID = new IntHashMap> (); + protected final IntHashMap> channelsByCID = new IntHashMap<>(); /** * Last IOID cache. @@ -172,7 +181,7 @@ public class ContextImpl implements AutoCloseable, Constants /** * Map of requests (keys are IOID). */ - protected final IntHashMap responseRequests = new IntHashMap (); + protected final IntHashMap responseRequests = new IntHashMap<>(); /** * Cached hostname. @@ -194,6 +203,10 @@ public class ContextImpl implements AutoCloseable, Constants */ protected final Map beaconHandlers = new HashMap<> (); + +/*- Main ---------------------------------------------------------------------*/ +/*- Constructor --------------------------------------------------------------*/ + public ContextImpl() { this (System.getProperties ()); @@ -202,10 +215,12 @@ public ContextImpl() public ContextImpl( Properties properties ) { if ( properties == null ) - throw new IllegalArgumentException ("null properties"); + { + throw new IllegalArgumentException( "null properties" ); + } - initializeLogger (properties); - loadConfig (properties); + initializeLogger( properties ); + loadConfig( properties ); hostName = InetAddressUtil.getHostName (); userName = System.getProperty ("user.name", "nobody"); @@ -224,279 +239,268 @@ public ContextImpl( Properties properties ) leaderFollowersThreadPool = new LeaderFollowersThreadPool (); // spawn initial leader - leaderFollowersThreadPool.promoteLeader (() -> reactor.process ()); + leaderFollowersThreadPool.promoteLeader (reactor::process); broadcastTransport.set (initializeUDPTransport ()); - // spawn repeater registration task - InetSocketAddress repeaterLocalAddress = new InetSocketAddress (InetAddress.getLoopbackAddress (), repeaterPort); + // Start task to register with CA Repeater + final InetSocketAddress repeaterLocalAddress = new InetSocketAddress (InetAddress.getLoopbackAddress (), repeaterPort ); repeaterRegistrationFuture = timer.scheduleWithFixedDelay ( new RepeaterRegistrationTask( repeaterLocalAddress ), 0, CA_REPEATER_REGISTRATION_INTERVAL, TimeUnit.SECONDS ); + // Attempt to spawn the CA Repeater if not already running. try { - CARepeaterStarter.spawnRepeaterIfNotAlreadyRunning( repeaterPort ); + CARepeaterStarter.startRepeaterIfNotAlreadyRunning( repeaterPort ); + } + catch ( RuntimeException ex ) + { + logger.log( Level.WARNING, "Failed to start CA Repeater on port " + repeaterPort, ex ) ; } - catch ( Throwable th ) - { /* noop */ } - - channelSearchManager = new ChannelSearchManager (broadcastTransport.get() ); - monitorNotificationServiceFactory = MonitorNotificationServiceFactoryCreator.create(monitorNotifierConfigImpl ); + channelSearchManager = new ChannelSearchManager( broadcastTransport.get() ); + monitorNotificationServiceFactory = MonitorNotificationServiceFactoryCreator.create( monitorNotifierConfigImpl ); } - protected MonitorNotificationServiceFactory getMonitorNotificationServiceFactory() - { - return monitorNotificationServiceFactory; - } - protected String readStringProperty( Properties properties, String key, String defaultValue ) +/*- Public methods -----------------------------------------------------------*/ + + /** + * Searches for a response request with given channel IOID. + * + * @param ioid I/O ID. + * @return request response with given I/O ID. + */ + public ResponseRequest getResponseRequest( int ioid ) { - String sValue = properties.getProperty (key, System.getenv (key)); - return (sValue != null) ? sValue : defaultValue; + synchronized ( responseRequests ) + { + return responseRequests.get (ioid); + } } - protected boolean readBooleanProperty( Properties properties, String key, boolean defaultValue ) + /** + * Register response request. + * + * @param request request to register. + * @return request ID (IOID). + */ + public int registerResponseRequest( ResponseRequest request ) { - String sValue = properties.getProperty (key, System.getenv (key)); - if ( sValue != null ) + synchronized ( responseRequests ) { - if ( sValue.equalsIgnoreCase ("YES") ) - return true; - else if ( sValue.equalsIgnoreCase ("NO") ) - return false; - else - { - logger.log( Level.CONFIG, "Failed to parse boolean value for property " + key + ": \"" + sValue + "\", \"YES\" or \"NO\" expected."); - return defaultValue; - } + int ioid = generateIOID (); + responseRequests.put (ioid, request); + return ioid; } - else - return defaultValue; } - protected float readFloatProperty( Properties properties, String key, float defaultValue ) + /** + * Unregister response request. + * + * @param request the request. + * @return removed object, can be null + */ + public ResponseRequest unregisterResponseRequest( ResponseRequest request ) { - String sValue = properties.getProperty (key, System.getenv (key)); - if ( sValue != null ) + synchronized ( responseRequests ) { - try - { - return Float.parseFloat (sValue); - } - catch ( Throwable th ) - { - logger.log( Level.CONFIG, "Failed to parse float value for property " + key + ": \"" + sValue + "\"."); - } + return responseRequests.remove (request.getIOID ()); } - return defaultValue; } - protected int readIntegerProperty( Properties properties, String key, int defaultValue ) + /** + * Searches for a channel with given channel ID. + * + * @param channelID CID. + * @return channel with given CID, null if non-existent. + */ + public ChannelImpl getChannel( int channelID ) { - String sValue = properties.getProperty (key, System.getenv (key)); - if ( sValue != null ) + synchronized ( channelsByCID ) { - try - { - return Integer.parseInt (sValue); - } - catch ( Throwable th ) - { - logger.log( Level.CONFIG, "Failed to parse integer value for property " + key + ": \"" + sValue + "\"."); - } + return channelsByCID.get (channelID); } - return defaultValue; } - protected void loadConfig( Properties properties ) + public ChannelSearchManager getChannelSearchManager() { - // dump version - logger.log( Level.CONFIG, "Java CA v" + LibraryVersion.getAsString() ); - - addressList = readStringProperty (properties, Context.Configuration.EPICS_CA_ADDR_LIST.toString (), addressList); - logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_ADDR_LIST.toString () + ": " + addressList); - - autoAddressList = readBooleanProperty (properties, Context.Configuration.EPICS_CA_AUTO_ADDR_LIST.toString (), autoAddressList); - logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_AUTO_ADDR_LIST.toString () + ": " + autoAddressList); + return channelSearchManager; + } - connectionTimeout = readFloatProperty (properties, Context.Configuration.EPICS_CA_CONN_TMO.toString (), connectionTimeout); - connectionTimeout = Math.max (0.1f, connectionTimeout); - logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_CONN_TMO.toString () + ": " + connectionTimeout); + public BroadcastTransport getBroadcastTransport() + { + return broadcastTransport.get (); + } - beaconPeriod = readFloatProperty (properties, Context.Configuration.EPICS_CA_BEACON_PERIOD.toString (), beaconPeriod); - beaconPeriod = Math.max (0.1f, beaconPeriod); - logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_BEACON_PERIOD.toString () + ": " + beaconPeriod); + public int getServerPort() + { + return serverPort; + } - repeaterPort = readIntegerProperty (properties, Context.Configuration.EPICS_CA_REPEATER_PORT.toString (), repeaterPort); - logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_REPEATER_PORT.toString () + ": " + repeaterPort); + public float getConnectionTimeout() + { + return connectionTimeout; + } - serverPort = readIntegerProperty (properties, Context.Configuration.EPICS_CA_SERVER_PORT.toString (), serverPort); - logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_SERVER_PORT.toString () + ": " + serverPort); + public int getMaxArrayBytes() + { + return maxArrayBytes; + } - maxArrayBytes = readIntegerProperty (properties, Context.Configuration.EPICS_CA_MAX_ARRAY_BYTES.toString (), maxArrayBytes); - if ( maxArrayBytes > 0 ) - maxArrayBytes = Math.max (1024, maxArrayBytes); - logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_MAX_ARRAY_BYTES.toString () + ": " + (maxArrayBytes > 0 ? maxArrayBytes : "(undefined)")); + public TransportRegistry getTransportRegistry() + { + return transportRegistry; + } - monitorNotifierConfigImpl = readStringProperty(properties, CA_MONITOR_NOTIFIER_IMPL, CA_MONITOR_NOTIFIER_DEFAULT_IMPL); - logger.log( Level.CONFIG, "CA_MONITOR_NOTIFIER_IMPL: " + monitorNotifierConfigImpl); + public LeaderFollowersThreadPool getLeaderFollowersThreadPool() + { + return leaderFollowersThreadPool; } /** - * Initialize context logger. + * Search response from server (channel found). * - * @param properties the properties to be used for the logger.logge + * @param cid client channel ID. + * @param sid server channel ID. + * @param type channel native type code. + * @param count channel element count. + * @param minorRevision server minor CA revision. + * @param serverAddress server address. */ - protected void initializeLogger( Properties properties ) + public void searchResponse( int cid, int sid, short type, int count, short minorRevision, InetSocketAddress serverAddress ) { - debugLevel = readIntegerProperty (properties, CA_DEBUG, debugLevel); + final ChannelImpl channel = getChannel( cid ); + if ( channel == null ) + { + return; + } - if ( debugLevel > 0 ) + logger.log ( Level.FINER, "Search response for channel " + channel.getName () + " received."); + + // check for multiple responses + synchronized( channel ) { - logger.setLevel (Level.ALL); + TCPTransport transport = channel.getTransport (); + if ( transport != null ) + { + // multiple defined PV or reconnect request (same server address) + if ( !transport.getRemoteAddress ().equals (serverAddress) ) + { + logger.log( Level.INFO,"More than one PVs with name '" + channel.getName () + + "' detected, additional response from: " + serverAddress); + return; + } + } + + // do not search anymore (also unregisters) + channelSearchManager.searchResponse (channel); - // install console logger only if there is no already installed - Logger inspectedLogger = logger; - boolean found = false; - while ( inspectedLogger != null ) + transport = getTransport (channel, serverAddress, minorRevision, channel.getPriority ()); + if ( transport == null ) { - for ( Handler handler : inspectedLogger.getHandlers () ) - if ( handler instanceof ConsoleLogHandler ) - { - found = true; - break; - } - inspectedLogger = inspectedLogger.getParent (); + channel.createChannelFailed (); + return; } - if ( !found ) - logger.addHandler (new ConsoleLogHandler ()); + // create channel + channel.createChannel (transport, sid, type, count); } } - private class RepeaterRegistrationTask implements Runnable + public void repeaterConfirm( InetSocketAddress responseFrom ) { + logger.log( Level.FINE, "Repeater registration confirmed from: " + responseFrom ); - private final InetSocketAddress repeaterLocalAddress; - private final ByteBuffer buffer = ByteBuffer.allocate (Constants.CA_MESSAGE_HEADER_SIZE); + ScheduledFuture sf = repeaterRegistrationFuture; + if ( sf != null ) + sf.cancel (false); + } - RepeaterRegistrationTask( InetSocketAddress repeaterLocalAddress ) + public boolean enqueueStatefullEvent( StatefullEventSource event ) + { + if ( event.allowEnqueue () ) { - this.repeaterLocalAddress = repeaterLocalAddress; + executorService.execute (event); + return true; + } + else + return false; + } - Messages.generateRepeaterRegistration (buffer); + public void beaconAnomalyNotify() + { + if ( channelSearchManager != null ) + { + channelSearchManager.beaconAnomalyNotify (); } + } - public void run() + /** + * Get (and if necessary create) beacon handler. + * + * @param responseFrom remote source address of received beacon. + * @return beacon handler for particular server. + */ + public BeaconHandler getBeaconHandler( InetSocketAddress responseFrom ) + { + synchronized ( beaconHandlers ) { - try - { - getBroadcastTransport ().send (buffer, repeaterLocalAddress); - } - catch ( Throwable th ) + BeaconHandler handler = beaconHandlers.get (responseFrom); + if ( handler == null ) { - logger.log (Level.FINE, th, () -> "Failed to send repeater registration message to: " + repeaterLocalAddress); + handler = new BeaconHandler (this, responseFrom); + beaconHandlers.put (responseFrom, handler); } + return handler; } } - protected BroadcastTransport initializeUDPTransport() + public ScheduledExecutorService getScheduledExecutor() { - // set broadcast address list - InetSocketAddress[] broadcastAddressList = null; - if ( addressList != null && addressList.length () > 0 ) - { - // if auto is true, add it to specified list - InetSocketAddress[] appendList = null; - if ( autoAddressList == true ) - appendList = InetAddressUtil.getBroadcastAddresses (serverPort); + return timer; + } - InetSocketAddress[] list = InetAddressUtil.getSocketAddressList (addressList, serverPort, appendList); - if ( list != null && list.length > 0 ) - broadcastAddressList = list; - } - else if ( autoAddressList == false ) - logger.log ( Level.WARNING, "Empty broadcast search address list, all connects will fail."); - else - broadcastAddressList = InetAddressUtil.getBroadcastAddresses (serverPort); - if ( logger.isLoggable (Level.CONFIG) && broadcastAddressList != null ) - for ( int i = 0; i < broadcastAddressList.length; i++ ) - logger.log( Level.CONFIG, "Broadcast address #" + i + ": " + broadcastAddressList[ i ] + '.'); - - // any address - InetSocketAddress connectAddress = new InetSocketAddress (0); - logger.log( Level.FINER, "Creating datagram socket to: " + connectAddress); - - DatagramChannel channel = null; - try - { - channel = DatagramChannel.open (); - - // use non-blocking channel (no need for soTimeout) - channel.configureBlocking (false); - - // set SO_BROADCAST - channel.socket ().setBroadcast (true); - - // explicitly bind first - channel.socket ().setReuseAddress (true); - channel.socket ().bind (new InetSocketAddress (0)); - - // create transport - BroadcastTransport transport = new BroadcastTransport (this, ResponseHandlers::handleResponse, channel, - connectAddress, broadcastAddressList); - - // and register to the selector - ReactorHandler handler = new LeaderFollowersHandler (reactor, transport, leaderFollowersThreadPool); - reactor.register (channel, SelectionKey.OP_READ, handler); - - return transport; - } - catch ( Throwable th ) - { - // close socket, if open - try - { - if ( channel != null ) - channel.close (); - } - catch ( Throwable t ) - { /* noop */ } - - throw new RuntimeException ("Failed to connect to '" + connectAddress + "'.", th); - } - - } - - public Channel createChannel( String channelName, Class channelType ) - { - return createChannel (channelName, channelType, CHANNEL_PRIORITY_DEFAULT); - } + public Channel createChannel( String channelName, Class channelType ) + { + return createChannel (channelName, channelType, CHANNEL_PRIORITY_DEFAULT); + } public Channel createChannel( String channelName, Class channelType, int priority ) { if ( closed.get () ) - throw new RuntimeException ("context closed"); + { + throw new RuntimeException("context closed"); + } if ( channelName == null || channelName.length () == 0 ) - throw new IllegalArgumentException ("null or empty channel name"); + { + throw new IllegalArgumentException("null or empty channel name"); + } else if ( channelName.length () > Math.min (MAX_UDP_SEND - CA_MESSAGE_HEADER_SIZE, UNREASONABLE_CHANNEL_NAME_LENGTH) ) - throw new IllegalArgumentException ("name too long"); + { + throw new IllegalArgumentException("name too long"); + } if ( channelType == null ) - throw new IllegalArgumentException ("null channel type"); + { + throw new IllegalArgumentException("null channel type"); + } if ( !TypeSupports.isNativeType (channelType) && !channelType.equals (Object.class) ) - throw new IllegalArgumentException ("invalid channel native type"); + { + throw new IllegalArgumentException("invalid channel native type"); + } if ( priority < CHANNEL_PRIORITY_MIN || priority > CHANNEL_PRIORITY_MAX ) - throw new IllegalArgumentException ("priority out of bounds"); + { + throw new IllegalArgumentException("priority out of bounds"); + } - return new ChannelImpl (this, channelName, channelType, priority); + return new ChannelImpl<>(this, channelName, channelType, priority); } @Override @@ -504,7 +508,9 @@ public void close() { if ( closed.getAndSet (true) ) + { return; + } channelSearchManager.cancel (); broadcastTransport.get ().close (); @@ -531,40 +537,211 @@ public void close() executorService.shutdownNow (); } - /** - * Destroy all channels. - */ - private void destroyAllChannels() + public Reactor getReactor() { + return reactor; + } - ChannelImpl[] channels; - synchronized ( channelsByCID ) +/*- Protected methods --------------------------------------------------------*/ + + protected MonitorNotificationServiceFactory getMonitorNotificationServiceFactory() + { + return monitorNotificationServiceFactory; + } + + protected String readStringProperty( Properties properties, String key, String defaultValue ) + { + String sValue = properties.getProperty (key, System.getenv (key)); + return (sValue != null) ? sValue : defaultValue; + } + + protected boolean readBooleanProperty( Properties properties, String key, boolean defaultValue ) + { + String sValue = properties.getProperty (key, System.getenv (key)); + if ( sValue != null ) { - channels = (ChannelImpl[]) new ChannelImpl[ channelsByCID.size () ]; - channelsByCID.toArray (channels); - channelsByCID.clear (); + if ( sValue.equalsIgnoreCase ("YES") ) + { + return true; + } + else if ( sValue.equalsIgnoreCase ("NO") ) + { + return false; + } + else + { + logger.log( Level.CONFIG, "Failed to parse boolean value for property " + key + ": \"" + sValue + "\", \"YES\" or \"NO\" expected."); + return defaultValue; + } + } + else + { + return defaultValue; + } + } + + protected float readFloatProperty( Properties properties, String key, float defaultValue ) + { + String sValue = properties.getProperty (key, System.getenv (key)); + if ( sValue != null ) + { + try + { + return Float.parseFloat (sValue); + } + catch ( Throwable th ) + { + logger.log( Level.CONFIG, "Failed to parse float value for property " + key + ": \"" + sValue + "\"."); + } } + return defaultValue; + } - for ( int i = 0; i < channels.length; i++ ) + protected int readIntegerProperty( Properties properties, String key, int defaultValue ) + { + String sValue = properties.getProperty (key, System.getenv (key)); + if ( sValue != null ) { try { - if ( channels[ i ] != null ) - channels[ i ].close (); + return Integer.parseInt (sValue); } catch ( Throwable th ) { - logger.log (Level.SEVERE, "Unexpected exception caught while closing a channel", th); + logger.log( Level.CONFIG, "Failed to parse integer value for property " + key + ": \"" + sValue + "\"."); } } + return defaultValue; } - public Reactor getReactor() + protected void loadConfig( Properties properties ) { - return reactor; + // dump version + logger.log( Level.CONFIG, "Java CA v" + LibraryVersion.getAsString() ); + + addressList = readStringProperty (properties, Context.Configuration.EPICS_CA_ADDR_LIST.toString (), addressList); + logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_ADDR_LIST.toString () + ": " + addressList); + + autoAddressList = readBooleanProperty (properties, Context.Configuration.EPICS_CA_AUTO_ADDR_LIST.toString (), autoAddressList); + logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_AUTO_ADDR_LIST.toString () + ": " + autoAddressList); + + connectionTimeout = readFloatProperty (properties, Context.Configuration.EPICS_CA_CONN_TMO.toString (), connectionTimeout); + connectionTimeout = Math.max (0.1f, connectionTimeout); + logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_CONN_TMO.toString () + ": " + connectionTimeout); + + beaconPeriod = readFloatProperty (properties, Context.Configuration.EPICS_CA_BEACON_PERIOD.toString (), beaconPeriod); + beaconPeriod = Math.max (0.1f, beaconPeriod); + logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_BEACON_PERIOD.toString () + ": " + beaconPeriod); + + repeaterPort = readIntegerProperty (properties, Context.Configuration.EPICS_CA_REPEATER_PORT.toString (), repeaterPort); + logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_REPEATER_PORT.toString () + ": " + repeaterPort); + + serverPort = readIntegerProperty (properties, Context.Configuration.EPICS_CA_SERVER_PORT.toString (), serverPort); + logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_SERVER_PORT.toString () + ": " + serverPort); + + maxArrayBytes = readIntegerProperty (properties, Context.Configuration.EPICS_CA_MAX_ARRAY_BYTES.toString (), maxArrayBytes); + if ( maxArrayBytes > 0 ) + maxArrayBytes = Math.max (1024, maxArrayBytes); + logger.log( Level.CONFIG, Context.Configuration.EPICS_CA_MAX_ARRAY_BYTES.toString () + ": " + (maxArrayBytes > 0 ? maxArrayBytes : "(undefined)")); + + monitorNotifierConfigImpl = readStringProperty(properties, CA_MONITOR_NOTIFIER_IMPL, CA_MONITOR_NOTIFIER_DEFAULT_IMPL); + logger.log( Level.CONFIG, "CA_MONITOR_NOTIFIER_IMPL: " + monitorNotifierConfigImpl); + } + + /** + * Initialize context logger. + * + * @param properties the properties to be used for the logger. + */ + protected void initializeLogger( Properties properties ) + { + debugLevel = readIntegerProperty( properties, CA_DEBUG, debugLevel ); + + if ( debugLevel > 0 ) + { + logger.setLevel( Level.ALL ); + } + } + + protected BroadcastTransport initializeUDPTransport() + { + // set broadcast address list + InetSocketAddress[] broadcastAddressList = null; + if ( addressList != null && addressList.length () > 0 ) + { + // if auto is true, add it to specified list + InetSocketAddress[] appendList = null; + if ( autoAddressList ) + { + appendList = InetAddressUtil.getBroadcastAddresses( serverPort ); + } + + final InetSocketAddress[] list = InetAddressUtil.getSocketAddressList( addressList, serverPort, appendList ); + if ( list.length > 0 ) + { + broadcastAddressList = list; + } + } + else if ( !autoAddressList ) + { + logger.log ( Level.WARNING, "Empty broadcast search address list, all connects will fail."); + } + else + { + broadcastAddressList = InetAddressUtil.getBroadcastAddresses (serverPort); + } + + if ( logger.isLoggable (Level.CONFIG) && broadcastAddressList != null ) + for ( int i = 0; i < broadcastAddressList.length; i++ ) + logger.log( Level.CONFIG, "Broadcast address #" + i + ": " + broadcastAddressList[ i ] + '.'); + + // any address + InetSocketAddress connectAddress = new InetSocketAddress (0); + logger.log( Level.FINER, "Creating datagram socket to: " + connectAddress); + + DatagramChannel channel = null; + try + { + channel = DatagramChannel.open (); + + // use non-blocking channel (no need for soTimeout) + channel.configureBlocking (false); + + // set SO_BROADCAST + channel.socket ().setBroadcast (true); + + // explicitly bind first + channel.socket ().setReuseAddress (true); + channel.socket ().bind (new InetSocketAddress (0)); + + // create transport + BroadcastTransport transport = new BroadcastTransport (this, ResponseHandlers::handleResponse, channel, + connectAddress, broadcastAddressList); + + // and register to the selector + ReactorHandler handler = new LeaderFollowersHandler (reactor, transport, leaderFollowersThreadPool); + reactor.register (channel, SelectionKey.OP_READ, handler); + + return transport; + } + catch ( Throwable th ) + { + // close socket, if open + try + { + if ( channel != null ) + channel.close (); + } + catch ( Throwable t ) + { /* noop */ } + + throw new RuntimeException ("Failed to connect to '" + connectAddress + "'.", th); + } } +/*- Package-level methods ----------------------------------------------------*/ + /** * Generate Client channel ID (CID). * @@ -575,7 +752,11 @@ int generateCID() synchronized ( channelsByCID ) { // search first free (theoretically possible loop of death) - while ( channelsByCID.containsKey (++lastCID) ) ; + while ( channelsByCID.containsKey (++lastCID) ) + { + // Intentionally left blank + } + // reserve CID channelsByCID.put (lastCID, null); return lastCID; @@ -585,7 +766,7 @@ int generateCID() /** * Register channel. * - * @param channel + * @param channel the channel. */ void registerChannel( ChannelImpl channel ) { @@ -598,7 +779,7 @@ void registerChannel( ChannelImpl channel ) /** * Unregister channel. * - * @param channel + * @param channel the channel. */ void unregisterChannel( ChannelImpl channel ) { @@ -608,47 +789,32 @@ void unregisterChannel( ChannelImpl channel ) } } - /** - * Searches for a response request with given channel IOID. - * - * @param ioid I/O ID. - * @return request response with given I/O ID. - */ - public ResponseRequest getResponseRequest( int ioid ) - { - synchronized ( responseRequests ) - { - return responseRequests.get (ioid); - } - } +/*- Private methods ----------------------------------------------------------*/ /** - * Register response request. - * - * @param request request to register. - * @return request ID (IOID). + * Destroy all channels. */ - public int registerResponseRequest( ResponseRequest request ) + private void destroyAllChannels() { - synchronized ( responseRequests ) + ChannelImpl[] channels; + synchronized ( channelsByCID ) { - int ioid = generateIOID (); - responseRequests.put (ioid, request); - return ioid; + channels = (ChannelImpl[]) new ChannelImpl[ channelsByCID.size () ]; + channelsByCID.toArray (channels); + channelsByCID.clear (); } - } - /** - * Unregister response request. - * - * @param request the request. - * @return removed object, can be null - */ - public ResponseRequest unregisterResponseRequest( ResponseRequest request ) - { - synchronized ( responseRequests ) + for ( ChannelImpl channel : channels ) { - return responseRequests.remove (request.getIOID ()); + try + { + if ( channel != null ) + channel.close(); + } + catch ( Throwable th ) + { + logger.log(Level.SEVERE, "Unexpected exception caught while closing a channel", th); + } } } @@ -662,7 +828,10 @@ private int generateIOID() synchronized ( responseRequests ) { // search first free (theoretically possible loop of death) - while ( responseRequests.containsKey (++lastIOID) ) ; + while ( responseRequests.containsKey (++lastIOID) ) + { + // Intentionally left blank + } // reserve IOID responseRequests.put (lastIOID, null); return lastIOID; @@ -670,108 +839,6 @@ private int generateIOID() } /** - * Searches for a channel with given channel ID. - * - * @param channelID CID. - * @return channel with given CID, null if non-existent. - */ - public ChannelImpl getChannel( int channelID ) - { - synchronized ( channelsByCID ) - { - return channelsByCID.get (channelID); - } - } - - public ChannelSearchManager getChannelSearchManager() - { - return channelSearchManager; - } - - public BroadcastTransport getBroadcastTransport() - { - return broadcastTransport.get (); - } - - public int getServerPort() - { - return serverPort; - } - - public float getConnectionTimeout() - { - return connectionTimeout; - } - - public int getMaxArrayBytes() - { - return maxArrayBytes; - } - - public TransportRegistry getTransportRegistry() - { - return transportRegistry; - } - - public LeaderFollowersThreadPool getLeaderFollowersThreadPool() - { - return leaderFollowersThreadPool; - } - - /** - * Search response from server (channel found). - * - * @param cid client channel ID. - * @param sid server channel ID. - * @param type channel native type code. - * @param count channel element count. - * @param minorRevision server minor CA revision. - * @param serverAddress server address. - */ - public void searchResponse( - int cid, int sid, short type, int count, - short minorRevision, InetSocketAddress serverAddress - ) - { - ChannelImpl channel = getChannel (cid); - if ( channel == null ) - return; - - logger.log ( Level.FINER, "Search response for channel " + channel.getName () + " received."); - - // check for multiple responses - synchronized ( channel ) - { - TCPTransport transport = channel.getTransport (); - if ( transport != null ) - { - // multiple defined PV or reconnect request (same server address) - if ( !transport.getRemoteAddress ().equals (serverAddress) ) - { - logger.log( Level.INFO,"More than one PVs with name '" + channel.getName () + - "' detected, additional response from: " + serverAddress); - return; - } - } - - // do not search anymore (also unregisters) - channelSearchManager.searchResponse (channel); - - transport = getTransport (channel, serverAddress, minorRevision, channel.getPriority ()); - if ( transport == null ) - { - channel.createChannelFailed (); - return; - } - - // create channel - channel.createChannel (transport, sid, type, count); - } - - } - - /** - * g * Get, or create if necessary, transport of given server address. * * @param priority process priority. @@ -785,7 +852,7 @@ private TCPTransport getTransport( TransportClient client, InetSocketAddress add TCPTransport transport = (TCPTransport) transportRegistry.get (address, priority); if ( transport != null ) { - logger.log ( Level.FINER,"Reusing existant connection to CA server: " + address); + logger.log ( Level.FINER,"Reusing existing connection to CA server: " + address); if ( transport.acquire (client) ) return transport; } @@ -799,9 +866,11 @@ private TCPTransport getTransport( TransportClient client, InetSocketAddress add transport = (TCPTransport) transportRegistry.get (address, priority); if ( transport != null ) { - logger.log ( Level.FINER,"Reusing existant connection to CA server: " + address); + logger.log ( Level.FINER,"Reusing existing connection to CA server: " + address); if ( transport.acquire (client) ) + { return transport; + } } logger.log ( Level.FINER, "Connecting to CA server: " + address); @@ -818,12 +887,13 @@ private TCPTransport getTransport( TransportClient client, InetSocketAddress add socket.socket ().setKeepAlive (true); // create transport - transport = new TCPTransport (this, client, ResponseHandlers::handleResponse, - socket, minorRevision, priority); + transport = new TCPTransport (this, client, ResponseHandlers::handleResponse, socket, minorRevision, priority ); ReactorHandler handler = transport; if ( leaderFollowersThreadPool != null ) + { handler = new LeaderFollowersHandler (reactor, handler, leaderFollowersThreadPool); + } // register to reactor reactor.register (socket, SelectionKey.OP_READ, handler); @@ -908,55 +978,36 @@ private SocketChannel tryConnect( InetSocketAddress address, int tries ) throws throw lastException; } - public void repeaterConfirm( InetSocketAddress responseFrom ) - { - logger.log( Level.FINE, "Repeater registration confirmed from: " + responseFrom ); +/*- Nested classes -----------------------------------------------------------*/ - ScheduledFuture sf = repeaterRegistrationFuture; - if ( sf != null ) - sf.cancel (false); - } - - public boolean enqueueStatefullEvent( StatefullEventSource event ) + /** + * RepeaterRegistrationTask + */ + private class RepeaterRegistrationTask implements Runnable { - if ( event.allowEnqueue () ) + private final InetSocketAddress repeaterLocalAddress; + private final ByteBuffer buffer = ByteBuffer.allocate (Constants.CA_MESSAGE_HEADER_SIZE); + + RepeaterRegistrationTask( InetSocketAddress repeaterLocalAddress ) { - executorService.execute (event); - return true; - } - else - return false; - } + this.repeaterLocalAddress = repeaterLocalAddress; - public void beaconAnomalyNotify() - { - if ( channelSearchManager != null ) - channelSearchManager.beaconAnomalyNotify (); - } + Messages.generateRepeaterRegistration( buffer ); + } - /** - * Get (and if necessary create) beacon handler. - * - * @param responseFrom remote source address of received beacon. - * @return beacon handler for particular server. - */ - public BeaconHandler getBeaconHandler( InetSocketAddress responseFrom ) - { - synchronized ( beaconHandlers ) + public void run() { - BeaconHandler handler = beaconHandlers.get (responseFrom); - if ( handler == null ) + try { - handler = new BeaconHandler (this, responseFrom); - beaconHandlers.put (responseFrom, handler); + logger.fine( "Attempting to register with repeater at address: '" + repeaterLocalAddress + "'." ); + getBroadcastTransport ().send( buffer, repeaterLocalAddress ); + logger.fine( "Repeater registration message sent ok." ); + } + catch ( Throwable th ) + { + logger.log( Level.FINE, th, () -> "Failed to send repeater registration message to: " + repeaterLocalAddress); } - return handler; } } - public ScheduledExecutorService getScheduledExecutor() - { - return timer; - } - }