Skip to content

Commit

Permalink
single underlying get method in AsyncLoader
Browse files Browse the repository at this point in the history
The nonblocking version of get just runs getBlocking in a thread.
NetworkPreloader has corresponding preload and preloadBlocking methods.
  • Loading branch information
abyrd committed Oct 31, 2024
1 parent fe68b94 commit ea16ea5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 20 deletions.
9 changes: 3 additions & 6 deletions src/main/java/com/conveyal/r5/analyst/NetworkPreloader.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public NetworkPreloader(TransportNetworkCache transportNetworkCache) {
this.transportNetworkCache = transportNetworkCache;
}

public LoaderState<TransportNetwork> preloadData (AnalysisWorkerTask task) {
public LoaderState<TransportNetwork> preload (AnalysisWorkerTask task) {
if (task.scenario != null) {
transportNetworkCache.rememberScenario(task.scenario);
}
Expand All @@ -96,11 +96,8 @@ public LoaderState<TransportNetwork> preloadData (AnalysisWorkerTask task) {
* This is provided specifically for regional tasks, to ensure that they remain in preloading mode while all this
* data is prepared.
*/
public TransportNetwork synchronousPreload (AnalysisWorkerTask task) {
Key key = Key.forTask(task);
TransportNetwork scenarioNetwork = buildValue(key);
setComplete(key, scenarioNetwork);
return scenarioNetwork;
public TransportNetwork preloadBlocking (AnalysisWorkerTask task) {
return getBlocking(Key.forTask(task));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public static void sleepSeconds (int seconds) {
protected byte[] handleAndSerializeOneSinglePointTask (TravelTimeSurfaceTask task) throws IOException {
LOG.debug("Handling single-point task {}", task.toString());
// Get all the data needed to run one analysis task, or at least begin preparing it.
final AsyncLoader.LoaderState<TransportNetwork> networkLoaderState = networkPreloader.preloadData(task);
final AsyncLoader.LoaderState<TransportNetwork> networkLoaderState = networkPreloader.preload(task);

// If loading is not complete, bail out of this function.
// Ideally we'd stall briefly using something like Future.get(timeout) in case loading finishes quickly.
Expand Down Expand Up @@ -462,7 +462,7 @@ protected void handleOneRegionalTask (RegionalTask task) throws Throwable {
// Note we're completely bypassing the async loader here and relying on the older nested LoadingCaches.
// If those are ever removed, the async loader will need a synchronous mode with per-path blocking (kind of
// reinventing the wheel of LoadingCache) or we'll need to make preparation for regional tasks async.
TransportNetwork transportNetwork = networkPreloader.synchronousPreload(task);
TransportNetwork transportNetwork = networkPreloader.preloadBlocking(task);

// If we are generating a static site, there must be a single metadata file for an entire batch of results.
// Arbitrarily we create this metadata as part of the first task in the job.
Expand Down
26 changes: 14 additions & 12 deletions src/main/java/com/conveyal/r5/util/AsyncLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ public String toString() {
}
}

/** This has been factored out of the executor runnables so subclasses can force a blocking (non-async) load. */
protected V getBlocking (K key) {
setProgress(key, 0, "Starting...");
V value = buildValue(key);
synchronized (map) {
map.put(key, new LoaderState(Status.PRESENT, "Loaded", 100, value));
}
return value;
}

/**
* Attempt to fetch the value for the supplied key.
* If the value is not yet present, and not yet being computed / fetched, enqueue a task to do so.
Expand All @@ -109,7 +119,7 @@ public LoaderState<V> get (K key) {
state = map.get(key);
if (state == null) {
// Only enqueue a task to load the value for this key if another call hasn't already done it.
state = new LoaderState<V>(Status.WAITING, "Enqueued task...", 0, null);
state = new LoaderState<>(Status.WAITING, "Enqueued task...", 0, null);
map.put(key, state);
enqueueLoadTask = true;
}
Expand All @@ -120,10 +130,8 @@ public LoaderState<V> get (K key) {
// Enqueue task outside the above block (synchronizing the fewest lines possible).
if (enqueueLoadTask) {
executor.execute(() -> {
setProgress(key, 0, "Starting...");
try {
V value = buildValue(key);
setComplete(key, value);
getBlocking(key);
} catch (Throwable t) {
// It's essential to trap Throwable rather than just Exception. Otherwise the executor
// threads can be killed by any Error that happens, stalling the executor.
Expand All @@ -143,7 +151,7 @@ public LoaderState<V> get (K key) {
* It's not entirely clear this should return a value - might be better to call setValue within the overridden
* method, just as we call setProgress or setError.
*/
protected abstract V buildValue(K key) throws Exception;
protected abstract V buildValue(K key);

/**
* Call this method inside the buildValue method to indicate progress.
Expand All @@ -154,15 +162,9 @@ public void setProgress(K key, int percentComplete, String message) {
}
}

public void setComplete(K key, V value) {
synchronized (map) {
map.put(key, new LoaderState(Status.PRESENT, "Loaded", 100, value));
}
}

/**
* Call this method inside the buildValue method to indicate that an unrecoverable error has happened.
* FIXME this will permanently associate an error with the key. No further attempt will ever be made to create the value.
* This will permanently associate an error with the key. No further attempt will ever be made to create the value.
*/
protected void setError (K key, Throwable throwable) {
synchronized (map) {
Expand Down

0 comments on commit ea16ea5

Please sign in to comment.