diff --git a/example/app-framework/src/main/java/com/mastercard/test/flow/example/framework/Discovery.java b/example/app-framework/src/main/java/com/mastercard/test/flow/example/framework/Discovery.java index d1a5472631..cd4032c97d 100644 --- a/example/app-framework/src/main/java/com/mastercard/test/flow/example/framework/Discovery.java +++ b/example/app-framework/src/main/java/com/mastercard/test/flow/example/framework/Discovery.java @@ -10,6 +10,7 @@ import java.net.SocketException; import java.net.URL; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.BiPredicate; import java.util.stream.Stream; @@ -30,8 +31,11 @@ public class Discovery { private static final ObjectMapper JSON = new ObjectMapper(); private final InetSocketAddress group; + private long advertiseTimeout = -1; private MulticastSocket listenSocket; + private long listenTimeout = -1; private volatile boolean shouldStop = false; + private volatile boolean listening = false; /** * @param group The multicast address on which to advertise and listen for @@ -52,6 +56,29 @@ public void stop() { } } + /** + * Checks if we're still listening for dependencies + * + * @return true if there still a chance of finding new dependencies + */ + public boolean listening() { + return listening; + } + + /** + * Controls how long our services will be advertised for + * + * @param count The number of units. Supply a negative number to advertise + * forever. + * @param unit The unit duration + * @return this + * @see #advertise(String, int, Set) + */ + public Discovery stoppingAfter( int count, TimeUnit unit ) { + advertiseTimeout = unit.toMillis( count ); + return this; + } + /** * Starts advertising the {@link Service}s offered by this {@link Instance} * @@ -59,6 +86,7 @@ public void stop() { * @param port The port on which the advertised services are available * @param services The set of {@link Service} class names to advertise * @return this + * @see #stoppingAfter(int, TimeUnit) */ public Discovery advertise( String protocol, int port, Set services ) { @@ -76,7 +104,11 @@ public Discovery advertise( String protocol, int port, Set services ) { try( MulticastSocket socket = new MulticastSocket( group.getPort() ) ) { socket.joinGroup( group.getAddress() ); - while( !shouldStop ) { + long limit = System.currentTimeMillis() + advertiseTimeout; + boolean limitBreached = false; + while( !shouldStop && !limitBreached ) { + limitBreached = advertiseTimeout > 0 && System.currentTimeMillis() > limit; + if( LOG.isTraceEnabled() ) { LOG.trace( "Sending {}", new String( advert.getData(), UTF_8 ) ); } @@ -103,12 +135,26 @@ public Discovery advertise( String protocol, int port, Set services ) { return this; } + /** + * Controls how long we'll wait for dependencies to be found before giving up. + * + * @param count The number of units. Supply a negative number to wait forever. + * @param unit The unit duration + * @return this + * @see #listen(BiPredicate) + */ + public Discovery abortingAfter( int count, TimeUnit unit ) { + listenTimeout = unit.toMillis( count ); + return this; + } + /** * Starts listening for services advertised on the cluster * * @param action What to do with discovered remote {@link Service}s. Return true * if all dependencies are satisfied * @return this + * @see #abortingAfter(int, TimeUnit) */ public Discovery listen( BiPredicate action ) { @@ -120,7 +166,16 @@ public Discovery listen( BiPredicate action ) { socket.joinGroup( group.getAddress() ); // this will turn to true when all of our dependencies have been satisfied boolean satisfied = false; + long limit = System.currentTimeMillis() + listenTimeout; + if( listenTimeout > 0 ) { + socket.setSoTimeout( (int) listenTimeout ); + } while( !satisfied && !shouldStop ) { + + if( listenTimeout > 0 && System.currentTimeMillis() > limit ) { + throw new IllegalStateException( "Missing dependencies after " + listenTimeout + "ms" ); + } + socket.receive( pkt ); if( LOG.isTraceEnabled() ) { LOG.trace( "Received {}", new String( data, pkt.getOffset(), pkt.getLength(), UTF_8 ) ); @@ -139,14 +194,18 @@ public Discovery listen( BiPredicate action ) { } catch( SocketException se ) { if( !shouldStop ) { + listening = false; throw new UncheckedIOException( se ); } } catch( IOException ioe ) { + listening = false; throw new UncheckedIOException( ioe ); } + listening = false; }, "Discovery listen" ); listen.setDaemon( true ); + listening = true; listen.start(); return this; diff --git a/example/app-framework/src/main/java/com/mastercard/test/flow/example/framework/Instance.java b/example/app-framework/src/main/java/com/mastercard/test/flow/example/framework/Instance.java index 4fe2d60d01..76cff260c1 100644 --- a/example/app-framework/src/main/java/com/mastercard/test/flow/example/framework/Instance.java +++ b/example/app-framework/src/main/java/com/mastercard/test/flow/example/framework/Instance.java @@ -1,5 +1,6 @@ package com.mastercard.test.flow.example.framework; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toCollection; @@ -172,6 +173,7 @@ public Instance start() { // the rest will have to come from our cluster peers. // start advertising our own services... Discovery discovery = new Discovery( cluster ) + .stoppingAfter( 30, SECONDS ) .advertise( "http", port(), services.stream() @@ -181,8 +183,9 @@ public Instance start() { // ... and listening for those that we require if( !required.isEmpty() ) { - discovery.listen( - ( typeName, rmtUrl ) -> { + discovery + .abortingAfter( 10, SECONDS ) + .listen( ( typeName, rmtUrl ) -> { @SuppressWarnings("unchecked") Class type = Optional.of( typeName ) .map( n -> { @@ -211,7 +214,7 @@ public Instance start() { } // now we wait until our dependencies have been satisfied - while( !required.isEmpty() ) { + while( discovery.listening() ) { try { synchronized( required ) { if( !required.isEmpty() ) { @@ -221,7 +224,7 @@ public Instance start() { .sorted() .collect( joining() ) ); } - required.wait(); + required.wait( 1000 ); } } } @@ -232,6 +235,14 @@ public Instance start() { } } + if( !required.isEmpty() ) { + String missing = required.stream() + .map( c -> "\n " + c.getName() ) + .sorted() + .collect( joining() ); + LOG.error( "Instance incomplete! Missing{}", missing ); + throw new IllegalStateException( "Missing dependencies:" + missing ); + } LOG.info( "Instance complete" ); return this; }