Skip to content

Commit a11fe52

Browse files
committed
Try to detect issues with reusing connections in processes
1 parent 2d7348b commit a11fe52

File tree

2 files changed

+28
-3
lines changed

2 files changed

+28
-3
lines changed

SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/connections/UDPConnection.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@
4242
import java.net.SocketTimeoutException;
4343
import java.net.UnknownHostException;
4444
import java.nio.ByteBuffer;
45+
import java.util.ConcurrentModificationException;
4546

4647
import org.slf4j.Logger;
4748

4849
import com.google.errorprone.annotations.ForOverride;
4950
import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
51+
import com.google.errorprone.annotations.concurrent.GuardedBy;
5052

5153
import uk.ac.manchester.spinnaker.connections.model.Connection;
5254
import uk.ac.manchester.spinnaker.messages.sdp.SDPHeader;
@@ -102,6 +104,9 @@ public enum TrafficClass {
102104

103105
private int receivePacketSize = PACKET_MAX_SIZE;
104106

107+
@GuardedBy("this")
108+
private boolean inUse = false;
109+
105110
/**
106111
* Main constructor, any argument of which could {@code null}.
107112
* <p>
@@ -683,7 +688,8 @@ public boolean isClosed() {
683688
* port in a NAT and/or firewall to allow incoming packets to be received.
684689
*
685690
* @param host
686-
* The address of the SpiNNaker board to which the message should
691+
* The address of the SpiNNaker board to which the
692+
* message should
687693
* be sent
688694
* @throws IOException
689695
* If anything goes wrong
@@ -707,4 +713,16 @@ public String toString() {
707713
getClass().getSimpleName().replaceAll("^.*\\.", ""),
708714
localAddr(), isClosed() ? "|" : "", remoteAddr());
709715
}
716+
717+
public synchronized void setInUse() {
718+
if (inUse) {
719+
throw new ConcurrentModificationException(
720+
"Connection " + this + " is already in use!");
721+
}
722+
this.inUse = true;
723+
}
724+
725+
public synchronized void setNotInUse() {
726+
this.inUse = false;
727+
}
710728
}

SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/TxrxProcess.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,13 @@ protected <Conn extends SCPConnection> TxrxProcess(
303303
* @return The pipeline instance.
304304
*/
305305
private RequestPipeline pipeline(SCPRequest<?> request) {
306-
return requestPipelines.computeIfAbsent(
307-
selector.getNextConnection(request), RequestPipeline::new);
306+
var connection = selector.getNextConnection(request);
307+
if (!requestPipelines.containsKey(connection)) {
308+
var pipeline = new RequestPipeline(connection);
309+
requestPipelines.put(connection, pipeline);
310+
connection.setInUse();
311+
}
312+
return requestPipelines.get(connection);
308313
}
309314

310315
/**
@@ -331,6 +336,7 @@ protected final void finishBatch()
331336
for (var pipe : requestPipelines.values()) {
332337
pipe.finish();
333338
}
339+
requestPipelines.clear();
334340
if (failure != null) {
335341
var hdr = failure.req.sdpHeader;
336342
throw makeInstance(hdr.getDestination(), failure.exn);
@@ -799,6 +805,7 @@ void finish() throws IOException, InterruptedException {
799805
}
800806
log.debug("Finished called on {} with connection {}", this,
801807
connection);
808+
this.connection.setNotInUse();
802809
}
803810

804811
/**

0 commit comments

Comments
 (0)