Skip to content

Commit

Permalink
Added timeouts for example service discovery (#197)
Browse files Browse the repository at this point in the history
* Added timeouts for example service discovery

* killed main thread

* moar blocking fixes
  • Loading branch information
therealryan authored Jan 11, 2023
1 parent 99b83b2 commit 2790eca
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.net.SocketException;
import java.net.URL;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import java.util.stream.Stream;

Expand All @@ -30,8 +31,11 @@ public class Discovery {
private static final ObjectMapper JSON = new ObjectMapper();

private final InetSocketAddress group;
private long advertiseTimeout = -1;
private MulticastSocket listenSocket;
private long listenTimeout = -1;
private volatile boolean shouldStop = false;
private volatile boolean listening = false;

/**
* @param group The multicast address on which to advertise and listen for
Expand All @@ -52,13 +56,37 @@ public void stop() {
}
}

/**
* Checks if we're still listening for dependencies
*
* @return <code>true</code> if there still a chance of finding new dependencies
*/
public boolean listening() {
return listening;
}

/**
* Controls how long our services will be advertised for
*
* @param count The number of units. Supply a negative number to advertise
* forever.
* @param unit The unit duration
* @return <code>this</code>
* @see #advertise(String, int, Set)
*/
public Discovery stoppingAfter( int count, TimeUnit unit ) {
advertiseTimeout = unit.toMillis( count );
return this;
}

/**
* Starts advertising the {@link Service}s offered by this {@link Instance}
*
* @param protocol The protocol to use for the advertised services
* @param port The port on which the advertised services are available
* @param services The set of {@link Service} class names to advertise
* @return <code>this</code>
* @see #stoppingAfter(int, TimeUnit)
*/
public Discovery advertise( String protocol, int port, Set<String> services ) {

Expand All @@ -76,7 +104,11 @@ public Discovery advertise( String protocol, int port, Set<String> services ) {
try( MulticastSocket socket = new MulticastSocket( group.getPort() ) ) {
socket.joinGroup( group.getAddress() );

while( !shouldStop ) {
long limit = System.currentTimeMillis() + advertiseTimeout;
boolean limitBreached = false;
while( !shouldStop && !limitBreached ) {
limitBreached = advertiseTimeout > 0 && System.currentTimeMillis() > limit;

if( LOG.isTraceEnabled() ) {
LOG.trace( "Sending {}", new String( advert.getData(), UTF_8 ) );
}
Expand All @@ -103,12 +135,26 @@ public Discovery advertise( String protocol, int port, Set<String> services ) {
return this;
}

/**
* Controls how long we'll wait for dependencies to be found before giving up.
*
* @param count The number of units. Supply a negative number to wait forever.
* @param unit The unit duration
* @return <code>this</code>
* @see #listen(BiPredicate)
*/
public Discovery abortingAfter( int count, TimeUnit unit ) {
listenTimeout = unit.toMillis( count );
return this;
}

/**
* Starts listening for services advertised on the cluster
*
* @param action What to do with discovered remote {@link Service}s. Return true
* if all dependencies are satisfied
* @return <code>this</code>
* @see #abortingAfter(int, TimeUnit)
*/
public Discovery listen( BiPredicate<String, URL> action ) {

Expand All @@ -120,7 +166,16 @@ public Discovery listen( BiPredicate<String, URL> action ) {
socket.joinGroup( group.getAddress() );
// this will turn to true when all of our dependencies have been satisfied
boolean satisfied = false;
long limit = System.currentTimeMillis() + listenTimeout;
if( listenTimeout > 0 ) {
socket.setSoTimeout( (int) listenTimeout );
}
while( !satisfied && !shouldStop ) {

if( listenTimeout > 0 && System.currentTimeMillis() > limit ) {
throw new IllegalStateException( "Missing dependencies after " + listenTimeout + "ms" );
}

socket.receive( pkt );
if( LOG.isTraceEnabled() ) {
LOG.trace( "Received {}", new String( data, pkt.getOffset(), pkt.getLength(), UTF_8 ) );
Expand All @@ -139,14 +194,18 @@ public Discovery listen( BiPredicate<String, URL> action ) {
}
catch( SocketException se ) {
if( !shouldStop ) {
listening = false;
throw new UncheckedIOException( se );
}
}
catch( IOException ioe ) {
listening = false;
throw new UncheckedIOException( ioe );
}
listening = false;
}, "Discovery listen" );
listen.setDaemon( true );
listening = true;
listen.start();

return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mastercard.test.flow.example.framework;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toCollection;

Expand Down Expand Up @@ -172,6 +173,7 @@ public Instance start() {
// the rest will have to come from our cluster peers.
// start advertising our own services...
Discovery discovery = new Discovery( cluster )
.stoppingAfter( 30, SECONDS )
.advertise(
"http", port(),
services.stream()
Expand All @@ -181,8 +183,9 @@ public Instance start() {

// ... and listening for those that we require
if( !required.isEmpty() ) {
discovery.listen(
( typeName, rmtUrl ) -> {
discovery
.abortingAfter( 10, SECONDS )
.listen( ( typeName, rmtUrl ) -> {
@SuppressWarnings("unchecked")
Class<? extends Service> type = Optional.of( typeName )
.map( n -> {
Expand Down Expand Up @@ -211,7 +214,7 @@ public Instance start() {
}

// now we wait until our dependencies have been satisfied
while( !required.isEmpty() ) {
while( discovery.listening() ) {
try {
synchronized( required ) {
if( !required.isEmpty() ) {
Expand All @@ -221,7 +224,7 @@ public Instance start() {
.sorted()
.collect( joining() ) );
}
required.wait();
required.wait( 1000 );
}
}
}
Expand All @@ -232,6 +235,14 @@ public Instance start() {
}
}

if( !required.isEmpty() ) {
String missing = required.stream()
.map( c -> "\n " + c.getName() )
.sorted()
.collect( joining() );
LOG.error( "Instance incomplete! Missing{}", missing );
throw new IllegalStateException( "Missing dependencies:" + missing );
}
LOG.info( "Instance complete" );
return this;
}
Expand Down

0 comments on commit 2790eca

Please sign in to comment.