Skip to content

Commit 4536a9c

Browse files
committed
Major change in operations!
1 parent 4036d12 commit 4536a9c

File tree

30 files changed

+612
-543
lines changed

30 files changed

+612
-543
lines changed

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/JobObjectRememberer.java

+105-26
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,8 @@
2020
import java.io.IOException;
2121
import java.util.ArrayList;
2222
import java.util.HashMap;
23-
import java.util.LinkedList;
2423
import java.util.List;
2524
import java.util.Map;
26-
import java.util.Queue;
2725

2826
import javax.annotation.PreDestroy;
2927

@@ -34,6 +32,9 @@
3432
import com.google.errorprone.annotations.concurrent.GuardedBy;
3533

3634
import uk.ac.manchester.spinnaker.alloc.proxy.ProxyCore;
35+
import uk.ac.manchester.spinnaker.machine.ChipLocation;
36+
import uk.ac.manchester.spinnaker.protocols.FastDataIn;
37+
import uk.ac.manchester.spinnaker.protocols.download.Downloader;
3738
import uk.ac.manchester.spinnaker.transceiver.TransceiverInterface;
3839

3940
/**
@@ -52,7 +53,13 @@ class JobObjectRememberer {
5253
private final Map<Integer, List<ProxyCore>> proxies = new HashMap<>();
5354

5455
@GuardedBy("this")
55-
private final Map<Integer, Queue<TransceiverInterface>> transceivers =
56+
private final Map<Integer, TransceiverInterface> transceivers =
57+
new HashMap<>();
58+
59+
private final Map<Integer, Map<ChipLocation, FastDataIn>> fastDataCache =
60+
new HashMap<>();
61+
62+
private final Map<Integer, Map<ChipLocation, Downloader>> downloaders =
5663
new HashMap<>();
5764

5865
/**
@@ -62,12 +69,30 @@ class JobObjectRememberer {
6269
private synchronized void closeAll() {
6370
proxies.values().forEach(list -> list.forEach(ProxyCore::close));
6471
proxies.clear(); // Just in case
65-
transceivers.values().forEach(queue -> queue.forEach(txrx -> {
72+
transceivers.values().forEach(txrx -> {
6673
try {
6774
txrx.close();
6875
} catch (IOException e) {
6976
log.error("Error closing Transceiver", e);
7077
}
78+
});
79+
transceivers.clear(); // Just in case
80+
fastDataCache.values().forEach(map -> map.values().forEach(fdi -> {
81+
try {
82+
fdi.close();
83+
} catch (IOException e) {
84+
// TODO Auto-generated catch block
85+
e.printStackTrace();
86+
}
87+
}));
88+
fastDataCache.clear(); // Just in case
89+
downloaders.values().forEach(map -> map.values().forEach(dl -> {
90+
try {
91+
dl.close();
92+
} catch (IOException e) {
93+
// TODO Auto-generated catch block
94+
e.printStackTrace();
95+
}
7196
}));
7297
}
7398

@@ -103,31 +128,67 @@ synchronized void removeProxyForJob(Integer jobId, ProxyCore proxy) {
103128
*
104129
* @param jobId The job ID.
105130
*
106-
* @return The transceiver.
131+
* @return The transceiver or null if none.
107132
*/
108133
synchronized TransceiverInterface getTransceiverForJob(int jobId) {
109-
if (transceivers.containsKey(jobId)) {
110-
return transceivers.get(jobId).poll();
111-
}
112-
return null;
134+
return transceivers.get(jobId);
113135
}
114136

115137
/** Set the transceiver for a job.
116138
*
117139
* @param jobId The job ID.
118140
* @param txrx The transceiver.
119-
* @throws RuntimeException If the job already has a transceiver
120141
*/
121-
synchronized void releaseTransceiverForJob(Integer jobId,
142+
synchronized void rememberTransceiverForJob(Integer jobId,
122143
TransceiverInterface txrx) {
123-
if (!transceivers.containsKey(jobId)) {
124-
transceivers.put(jobId, new LinkedList<>());
125-
}
126-
transceivers.get(jobId).add(txrx);
144+
transceivers.put(jobId, txrx);
145+
}
146+
147+
/** Get the fast data in for a job.
148+
*
149+
* @param jobId The job ID.
150+
* @param chip The ethernet chip to get the fast data in for.
151+
* @return The fast data in or null if none.
152+
*/
153+
synchronized FastDataIn getFastDataIn(Integer jobId, ChipLocation chip) {
154+
return fastDataCache.getOrDefault(jobId, Map.of()).get(chip);
155+
}
156+
157+
/**
158+
* Remember the fast data in for a job.
159+
*
160+
* @param jobId The job ID.
161+
* @param chip The ethernet chip to remember the fast data in for.
162+
* @param fdi The fast data in.
163+
*/
164+
synchronized void rememberFastDataIn(Integer jobId, ChipLocation chip,
165+
FastDataIn fdi) {
166+
fastDataCache.computeIfAbsent(jobId, __ -> new HashMap<>()).put(
167+
chip, fdi);
127168
}
128169

129-
private synchronized List<ProxyCore> removeProxyListForJob(Integer jobId) {
130-
return proxies.remove(jobId);
170+
/**
171+
* Get the downloader for a job.
172+
*
173+
* @param jobId The job ID.
174+
* @param chip The ethernet chip to get the downloader for.
175+
* @return The downloader or null if none.
176+
*/
177+
synchronized Downloader getDownloader(Integer jobId, ChipLocation chip) {
178+
return downloaders.getOrDefault(jobId, Map.of()).get(chip);
179+
}
180+
181+
/**
182+
* Remember the downloader for a job.
183+
*
184+
* @param jobId The job ID.
185+
* @param chip The ethernet chip to remember the downloader for.
186+
* @param downloader The downloader.
187+
*/
188+
synchronized void rememberDownloader(Integer jobId, ChipLocation chip,
189+
Downloader downloader) {
190+
downloaders.computeIfAbsent(jobId, __ -> new HashMap<>()).put(
191+
chip, downloader);
131192
}
132193

133194
/**
@@ -139,18 +200,36 @@ private synchronized List<ProxyCore> removeProxyListForJob(Integer jobId) {
139200
* The job ID.
140201
*/
141202
void closeJob(Integer jobId) {
142-
var list = removeProxyListForJob(jobId);
143-
if (nonNull(list)) {
144-
list.forEach(ProxyCore::close);
145-
}
146203
synchronized (this) {
147-
var queue = transceivers.remove(jobId);
148-
if (queue != null) {
149-
queue.forEach(txrx -> {
204+
var proxyList = proxies.remove(jobId);
205+
if (nonNull(proxyList)) {
206+
proxyList.forEach(ProxyCore::close);
207+
}
208+
var txrx = transceivers.remove(jobId);
209+
if (nonNull(txrx)) {
210+
try {
211+
txrx.close();
212+
} catch (IOException e) {
213+
log.error("Error closing Transceiver", e);
214+
}
215+
}
216+
var fdc = fastDataCache.remove(jobId);
217+
if (nonNull(fdc)) {
218+
fdc.values().forEach(fdi -> {
219+
try {
220+
fdi.close();
221+
} catch (IOException e) {
222+
log.error("Error closing FastDataIn", e);
223+
}
224+
});
225+
}
226+
var dl = downloaders.remove(jobId);
227+
if (nonNull(dl)) {
228+
dl.values().forEach(downloader -> {
150229
try {
151-
txrx.close();
230+
downloader.close();
152231
} catch (IOException e) {
153-
log.error("Error closing Transceiver", e);
232+
log.error("Error closing Downloader", e);
154233
}
155234
});
156235
}

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/Spalloc.java

+33-3
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,19 @@
8484
import uk.ac.manchester.spinnaker.alloc.web.IssueReportRequest.ReportedBoard;
8585
import uk.ac.manchester.spinnaker.connections.SCPConnection;
8686
import uk.ac.manchester.spinnaker.machine.ChipLocation;
87+
import uk.ac.manchester.spinnaker.machine.CoreLocation;
8788
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
8889
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
8990
import uk.ac.manchester.spinnaker.machine.MachineVersion;
9091
import uk.ac.manchester.spinnaker.machine.board.BMPCoords;
9192
import uk.ac.manchester.spinnaker.machine.board.PhysicalCoords;
9293
import uk.ac.manchester.spinnaker.machine.board.TriadCoords;
94+
import uk.ac.manchester.spinnaker.machine.tags.IPTag;
95+
import uk.ac.manchester.spinnaker.protocols.FastDataIn;
96+
import uk.ac.manchester.spinnaker.protocols.download.Downloader;
9397
import uk.ac.manchester.spinnaker.spalloc.messages.BoardCoordinates;
9498
import uk.ac.manchester.spinnaker.spalloc.messages.BoardPhysicalCoordinates;
99+
import uk.ac.manchester.spinnaker.transceiver.ProcessException;
95100
import uk.ac.manchester.spinnaker.transceiver.SpinnmanException;
96101
import uk.ac.manchester.spinnaker.transceiver.Transceiver;
97102
import uk.ac.manchester.spinnaker.transceiver.TransceiverInterface;
@@ -1735,14 +1740,39 @@ public TransceiverInterface getTransceiver() throws IOException,
17351740
connections.add(new SCPConnection(conn.getChip(),
17361741
null, null, InetAddress.getByName(conn.getHostname())));
17371742
}
1738-
return new Transceiver(MachineVersion.FIVE, connections);
1743+
txrx = new Transceiver(MachineVersion.FIVE, connections);
1744+
rememberer.rememberTransceiverForJob(id, txrx);
1745+
return txrx;
17391746
}
17401747

17411748
@Override
1742-
public void releaseTransceiver(TransceiverInterface transceiver) {
1743-
rememberer.releaseTransceiverForJob(id, transceiver);
1749+
public FastDataIn getFastDataIn(CoreLocation gathererCore, IPTag iptag)
1750+
throws ProcessException, IOException, InterruptedException {
1751+
var fdi = rememberer.getFastDataIn(id, iptag.getDestination());
1752+
if (fdi != null) {
1753+
return fdi;
1754+
}
1755+
fdi = new FastDataIn(gathererCore, iptag);
1756+
rememberer.rememberFastDataIn(id, iptag.getDestination(), fdi);
1757+
return fdi;
17441758
}
17451759

1760+
@Override
1761+
public Downloader getDownloader(IPTag iptag)
1762+
throws ProcessException, IOException, InterruptedException {
1763+
var downloader = rememberer.getDownloader(id,
1764+
iptag.getDestination());
1765+
if (downloader != null) {
1766+
return downloader;
1767+
}
1768+
downloader = new Downloader(iptag);
1769+
rememberer.rememberDownloader(id, iptag.getDestination(),
1770+
downloader);
1771+
return downloader;
1772+
}
1773+
1774+
1775+
17461776
@Override
17471777
public boolean equals(Object other) {
17481778
// Equality is defined exactly by the database ID

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/allocator/SpallocAPI.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import uk.ac.manchester.spinnaker.alloc.security.Permit;
5858
import uk.ac.manchester.spinnaker.alloc.web.IssueReportRequest;
5959
import uk.ac.manchester.spinnaker.machine.ChipLocation;
60+
import uk.ac.manchester.spinnaker.machine.CoreLocation;
6061
import uk.ac.manchester.spinnaker.machine.HasChipLocation;
6162
import uk.ac.manchester.spinnaker.machine.HasCoreLocation;
6263
import uk.ac.manchester.spinnaker.machine.ValidX;
@@ -66,8 +67,12 @@
6667
import uk.ac.manchester.spinnaker.machine.board.TriadCoords;
6768
import uk.ac.manchester.spinnaker.machine.board.ValidTriadHeight;
6869
import uk.ac.manchester.spinnaker.machine.board.ValidTriadWidth;
70+
import uk.ac.manchester.spinnaker.machine.tags.IPTag;
71+
import uk.ac.manchester.spinnaker.protocols.FastDataIn;
72+
import uk.ac.manchester.spinnaker.protocols.download.Downloader;
6973
import uk.ac.manchester.spinnaker.spalloc.messages.BoardCoordinates;
7074
import uk.ac.manchester.spinnaker.spalloc.messages.BoardPhysicalCoordinates;
75+
import uk.ac.manchester.spinnaker.transceiver.ProcessException;
7176
import uk.ac.manchester.spinnaker.transceiver.SpinnmanException;
7277
import uk.ac.manchester.spinnaker.transceiver.TransceiverInterface;
7378
import uk.ac.manchester.spinnaker.utils.UsedInJavadocOnly;
@@ -993,11 +998,29 @@ TransceiverInterface getTransceiver()
993998
throws IOException, InterruptedException, SpinnmanException;
994999

9951000
/**
996-
* Release the transceiver back to the pool.
1001+
* Get a FastDataIn protocol instance for an Ethernet within a job.
9971002
*
998-
* @param transceiver The transceiver to release.
1003+
* @param gathererCore The core that will do the gathering.
1004+
* @param iptag The IPTag to use.
1005+
* @return A FastDataIn instance.
1006+
* @throws ProcessException if there is an issue setting up the tag.
1007+
* @throws IOException if there is an issue communicating.
1008+
* @throws InterruptedException if the operation is interrupted.
1009+
*/
1010+
FastDataIn getFastDataIn(CoreLocation gathererCore, IPTag iptag)
1011+
throws ProcessException, IOException, InterruptedException;
1012+
1013+
/**
1014+
* Get a Downloader protocol instance for an Ethernet within a job.
1015+
*
1016+
* @param iptag The IPTag to use.
1017+
* @return A Downloader instance.
1018+
* @throws ProcessException if there is an issue setting up the tag.
1019+
* @throws IOException if there is an issue communicating.
1020+
* @throws InterruptedException if the operation is interrupted.
9991021
*/
1000-
void releaseTransceiver(TransceiverInterface transceiver);
1022+
Downloader getDownloader(IPTag iptag)
1023+
throws ProcessException, IOException, InterruptedException;
10011024
}
10021025

10031026
/**

SpiNNaker-allocserv/src/main/java/uk/ac/manchester/spinnaker/alloc/web/SpallocServiceAPIImplBuilder.java

+16-36
Original file line numberDiff line numberDiff line change
@@ -295,12 +295,8 @@ public void writeDataToJob(int x, int y, long address,
295295
AsyncResponse response) {
296296
bgAction(response, () -> {
297297
var txrx = j.getTransceiver();
298-
try {
299-
txrx.writeMemory(new ChipLocation(x, y),
298+
txrx.writeMemory(new ChipLocation(x, y),
300299
new MemoryLocation(address), bytes);
301-
} finally {
302-
j.releaseTransceiver(txrx);
303-
}
304300
return accepted().build();
305301
});
306302
}
@@ -311,13 +307,9 @@ public void readDataFromJob(int x, int y, long address, int size,
311307
bgAction(response, () -> {
312308
var txrx = j.getTransceiver();
313309
var arr = new byte[size];
314-
try {
315-
var buffer = txrx.readMemory(new ChipLocation(x, y),
316-
new MemoryLocation(address), size);
317-
buffer.get(arr);
318-
} finally {
319-
j.releaseTransceiver(txrx);
320-
}
310+
var buffer = txrx.readMemory(new ChipLocation(x, y),
311+
new MemoryLocation(address), size);
312+
buffer.get(arr);
321313
return ok(arr).build();
322314
});
323315

@@ -330,18 +322,12 @@ public void fastDataWrite(@ValidX int gatherX, @ValidY int gatherY,
330322
@ValidX int x, @ValidY int y, long address, byte[] bytes,
331323
AsyncResponse response) {
332324
bgAction(response, () -> {
333-
var txrx = j.getTransceiver();
334-
try {
335-
txrx.writeMemoryFast(
336-
new CoreLocation(gatherX, gatherY, gatherP),
337-
new ChipLocation(ethX, ethY), ethAddress,
338-
new IPTag(ethAddress, iptag, ethX, ethY,
339-
"localhost", null, true, null),
340-
new ChipLocation(x, y),
341-
new MemoryLocation(address), wrap(bytes));
342-
} finally {
343-
j.releaseTransceiver(txrx);
344-
}
325+
var fdi = j.getFastDataIn(
326+
new CoreLocation(gatherX, gatherY, gatherP),
327+
new IPTag(ethAddress, iptag, ethX, ethY, "localhost",
328+
null, true, null));
329+
fdi.fastWrite(new ChipLocation(x, y),
330+
new MemoryLocation(address), wrap(bytes));
345331
return accepted().build();
346332
});
347333
}
@@ -353,19 +339,13 @@ public void fastDataRead(@ValidX int gatherX, @ValidY int gatherY,
353339
@ValidX int x, @ValidY int y, @ValidP int p,
354340
long address, int size, AsyncResponse response) {
355341
bgAction(response, () -> {
356-
var txrx = j.getTransceiver();
342+
var downloader = j.getDownloader(
343+
new IPTag(ethAddress, iptag, ethX, ethY, "localhost",
344+
null, true, null));
345+
var buffer = downloader.doDownload(new CoreLocation(x, y, p),
346+
new MemoryLocation(address), size);
357347
var arr = new byte[size];
358-
try {
359-
var buffer = txrx.readMemoryFast(
360-
new ChipLocation(gatherX, gatherY),
361-
new IPTag(ethAddress, iptag, ethX, ethY,
362-
"localhost", null, true, null),
363-
new CoreLocation(x, y, p),
364-
new MemoryLocation(address), size);
365-
buffer.get(arr);
366-
} finally {
367-
j.releaseTransceiver(txrx);
368-
}
348+
buffer.get(arr);
369349
return ok(arr).build();
370350
});
371351
}

0 commit comments

Comments
 (0)