Skip to content

Batch scheduler function support #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,52 @@ and there are also gains to this different mode of operation:
However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures
in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs.

## Scheduled Dispatching
## The BatchLoader Scheduler

By default, when `dataLoader.dispatch()` is called, the `BatchLoader` / `MappedBatchLoader` function will be invoked
immediately.

However, you can provide your own `BatchLoaderScheduler` that allows this call to be done some time into
the future.

You will be passed a callback (`ScheduledBatchLoaderCall` / `ScheduledMapBatchLoaderCall`) and you are expected
to eventually call this callback method to make the batch loading happen.

The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invoking the batch loading functions.

```java
new BatchLoaderScheduler() {

@Override
public <K, V> CompletionStage<List<V>> scheduleBatchLoader(ScheduledBatchLoaderCall<V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
return CompletableFuture.supplyAsync(() -> {
snooze(10);
return scheduledCall.invoke();
}).thenCompose(Function.identity());
}

@Override
public <K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall<K, V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
return CompletableFuture.supplyAsync(() -> {
snooze(10);
return scheduledCall.invoke();
}).thenCompose(Function.identity());
}
};
```

You are given the keys to be loaded and an optional `BatchLoaderEnvironment` for informative purposes. You can't change the list of
keys that will be loaded via this mechanism say.

Also note, because there is a max batch size, it is possible for this scheduling to happen N times for a given `dispatch()`
call. The total set of keys will be sliced into batches themselves and then the `BatchLoaderScheduler` will be called for
each batch of keys.

Do not assume that a single call to `dispatch()` results in a single call to `BatchLoaderScheduler`.

This code is inspired from the scheduling code in the [reference JS implementation](https://github.com/graphql/dataloader#batch-scheduling)

## Scheduled Registry Dispatching

`ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a
predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked.
Expand Down
35 changes: 31 additions & 4 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.dataloader.annotations.GuardedBy;
import org.dataloader.annotations.Internal;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.StatisticsCollector;
import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext;
import org.dataloader.stats.context.IncrementBatchLoadExceptionCountStatisticsContext;
Expand Down Expand Up @@ -417,10 +418,23 @@ CompletableFuture<List<V>> invokeLoader(List<K> keys, List<Object> keyContexts)
@SuppressWarnings("unchecked")
private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
CompletionStage<List<V>> loadResult;
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
if (batchLoadFunction instanceof BatchLoaderWithContext) {
loadResult = ((BatchLoaderWithContext<K, V>) batchLoadFunction).load(keys, environment);
BatchLoaderWithContext<K, V> loadFunction = (BatchLoaderWithContext<K, V>) batchLoadFunction;
if (batchLoaderScheduler != null) {
BatchLoaderScheduler.ScheduledBatchLoaderCall<V> loadCall = () -> loadFunction.load(keys, environment);
loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, environment);
} else {
loadResult = loadFunction.load(keys, environment);
}
} else {
loadResult = ((BatchLoader<K, V>) batchLoadFunction).load(keys);
BatchLoader<K, V> loadFunction = (BatchLoader<K, V>) batchLoadFunction;
if (batchLoaderScheduler != null) {
BatchLoaderScheduler.ScheduledBatchLoaderCall<V> loadCall = () -> loadFunction.load(keys);
loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, null);
} else {
loadResult = loadFunction.load(keys);
}
}
return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture();
}
Expand All @@ -434,10 +448,23 @@ private CompletableFuture<List<V>> invokeListBatchLoader(List<K> keys, BatchLoad
private CompletableFuture<List<V>> invokeMapBatchLoader(List<K> keys, BatchLoaderEnvironment environment) {
CompletionStage<Map<K, V>> loadResult;
Set<K> setOfKeys = new LinkedHashSet<>(keys);
BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler();
if (batchLoadFunction instanceof MappedBatchLoaderWithContext) {
loadResult = ((MappedBatchLoaderWithContext<K, V>) batchLoadFunction).load(setOfKeys, environment);
MappedBatchLoaderWithContext<K, V> loadFunction = (MappedBatchLoaderWithContext<K, V>) batchLoadFunction;
if (batchLoaderScheduler != null) {
BatchLoaderScheduler.ScheduledMappedBatchLoaderCall<K, V> loadCall = () -> loadFunction.load(setOfKeys, environment);
loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, environment);
} else {
loadResult = loadFunction.load(setOfKeys, environment);
}
} else {
loadResult = ((MappedBatchLoader<K, V>) batchLoadFunction).load(setOfKeys);
MappedBatchLoader<K, V> loadFunction = (MappedBatchLoader<K, V>) batchLoadFunction;
if (batchLoaderScheduler != null) {
BatchLoaderScheduler.ScheduledMappedBatchLoaderCall<K, V> loadCall = () -> loadFunction.load(setOfKeys);
loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, null);
} else {
loadResult = loadFunction.load(setOfKeys);
}
}
CompletableFuture<Map<K, V>> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture();
return mapBatchLoad.thenApply(map -> {
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.dataloader.annotations.PublicApi;
import org.dataloader.impl.Assertions;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.NoOpStatisticsCollector;
import org.dataloader.stats.StatisticsCollector;

Expand Down Expand Up @@ -46,6 +47,7 @@ public class DataLoaderOptions {
private Supplier<StatisticsCollector> statisticsCollector;
private BatchLoaderContextProvider environmentProvider;
private ValueCacheOptions valueCacheOptions;
private BatchLoaderScheduler batchLoaderScheduler;

/**
* Creates a new data loader options with default settings.
Expand All @@ -58,6 +60,7 @@ public DataLoaderOptions() {
statisticsCollector = NoOpStatisticsCollector::new;
environmentProvider = NULL_PROVIDER;
valueCacheOptions = ValueCacheOptions.newOptions();
batchLoaderScheduler = null;
}

/**
Expand All @@ -77,6 +80,7 @@ public DataLoaderOptions(DataLoaderOptions other) {
this.statisticsCollector = other.statisticsCollector;
this.environmentProvider = other.environmentProvider;
this.valueCacheOptions = other.valueCacheOptions;
batchLoaderScheduler = other.batchLoaderScheduler;
}

/**
Expand Down Expand Up @@ -304,4 +308,24 @@ public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOption
this.valueCacheOptions = Assertions.nonNull(valueCacheOptions);
return this;
}

/**
* @return the {@link BatchLoaderScheduler} to use, which can be null
*/
public BatchLoaderScheduler getBatchLoaderScheduler() {
return batchLoaderScheduler;
}

/**
* Sets in a new {@link BatchLoaderScheduler} that allows the call to a {@link BatchLoader} function to be scheduled
* to some future time.
*
* @param batchLoaderScheduler the scheduler
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) {
this.batchLoaderScheduler = batchLoaderScheduler;
return this;
}
}
74 changes: 74 additions & 0 deletions src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.dataloader.scheduler;

import org.dataloader.BatchLoader;
import org.dataloader.BatchLoaderEnvironment;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderOptions;
import org.dataloader.MappedBatchLoader;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;

/**
* By default, when {@link DataLoader#dispatch()} is called, the {@link BatchLoader} / {@link MappedBatchLoader} function will be invoked
* immediately. However, you can provide your own {@link BatchLoaderScheduler} that allows this call to be done some time into
* the future. You will be passed a callback ({@link ScheduledBatchLoaderCall} / {@link ScheduledMappedBatchLoaderCall} and you are expected
* to eventually call this callback method to make the batch loading happen.
* <p>
* Note: Because there is a {@link DataLoaderOptions#maxBatchSize()} it is possible for this scheduling to happen N times for a given {@link DataLoader#dispatch()}
* call. The total set of keys will be sliced into batches themselves and then the {@link BatchLoaderScheduler} will be called for
* each batch of keys. Do not assume that a single call to {@link DataLoader#dispatch()} results in a single call to {@link BatchLoaderScheduler}.
*/
public interface BatchLoaderScheduler {


/**
* This represents a callback that will invoke a {@link BatchLoader} function under the covers
*
* @param <V> the value type
*/
interface ScheduledBatchLoaderCall<V> {
CompletionStage<List<V>> invoke();
}

/**
* This represents a callback that will invoke a {@link MappedBatchLoader} function under the covers
*
* @param <K> the key type
* @param <V> the value type
*/
interface ScheduledMappedBatchLoaderCall<K, V> {
CompletionStage<Map<K, V>> invoke();
}

/**
* This is called to schedule a {@link BatchLoader} call.
*
* @param scheduledCall the callback that needs to be invoked to allow the {@link BatchLoader} to proceed.
* @param keys this is the list of keys that will be passed to the {@link BatchLoader}.
* This is provided only for informative reasons and you cant change the keys that are used
* @param environment this is the {@link BatchLoaderEnvironment} in place,
* which can be null if it's a simple {@link BatchLoader} call
* @param <K> the key type
* @param <V> the value type
*
* @return a promise to the values that come from the {@link BatchLoader}
*/
<K, V> CompletionStage<List<V>> scheduleBatchLoader(ScheduledBatchLoaderCall<V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment);

/**
* This is called to schedule a {@link MappedBatchLoader} call.
*
* @param scheduledCall the callback that needs to be invoked to allow the {@link MappedBatchLoader} to proceed.
* @param keys this is the list of keys that will be passed to the {@link MappedBatchLoader}.
* This is provided only for informative reasons and you cant change the keys that are used
* @param environment this is the {@link BatchLoaderEnvironment} in place,
* which can be null if it's a simple {@link MappedBatchLoader} call
* @param <K> the key type
* @param <V> the value type
*
* @return a promise to the values that come from the {@link BatchLoader}
*/
<K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall<K, V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment);
}
26 changes: 26 additions & 0 deletions src/test/java/ReadmeExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.dataloader.fixtures.UserManager;
import org.dataloader.registries.DispatchPredicate;
import org.dataloader.registries.ScheduledDataLoaderRegistry;
import org.dataloader.scheduler.BatchLoaderScheduler;
import org.dataloader.stats.Statistics;
import org.dataloader.stats.ThreadLocalStatisticsCollector;

Expand All @@ -23,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.lang.String.format;
Expand Down Expand Up @@ -278,6 +280,30 @@ private void statsConfigExample() {
DataLoader<String, User> userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options);
}

private void snooze(int i) {
}

private void BatchLoaderSchedulerExample() {
new BatchLoaderScheduler() {

@Override
public <K, V> CompletionStage<List<V>> scheduleBatchLoader(ScheduledBatchLoaderCall<V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
return CompletableFuture.supplyAsync(() -> {
snooze(10);
return scheduledCall.invoke();
}).thenCompose(Function.identity());
}

@Override
public <K, V> CompletionStage<Map<K, V>> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall<K, V> scheduledCall, List<K> keys, BatchLoaderEnvironment environment) {
return CompletableFuture.supplyAsync(() -> {
snooze(10);
return scheduledCall.invoke();
}).thenCompose(Function.identity());
}
};
}

private void ScheduledDispatche() {
DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10)
.or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200)));
Expand Down
27 changes: 27 additions & 0 deletions src/test/java/org/dataloader/fixtures/TestKit.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package org.dataloader.fixtures;

import org.dataloader.BatchLoader;
import org.dataloader.BatchLoaderWithContext;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderFactory;
import org.dataloader.DataLoaderOptions;
import org.dataloader.MappedBatchLoader;
import org.dataloader.MappedBatchLoaderWithContext;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static java.util.stream.Collectors.toList;
Expand All @@ -19,6 +25,27 @@ public static <T> BatchLoader<T, T> keysAsValues() {
return CompletableFuture::completedFuture;
}

public static <T> BatchLoaderWithContext<T, T> keysAsValuesWithContext() {
return (keys, env) -> CompletableFuture.completedFuture(keys);
}

public static <K, V> MappedBatchLoader<K, V> keysAsMapOfValues() {
return keys -> mapOfKeys(keys);
}

public static <K, V> MappedBatchLoaderWithContext<K, V> keysAsMapOfValuesWithContext() {
return (keys, env) -> mapOfKeys(keys);
}

private static <K, V> CompletableFuture<Map<K, V>> mapOfKeys(Set<K> keys) {
Map<K, V> map = new HashMap<>();
for (K key : keys) {
//noinspection unchecked
map.put(key, (V) key);
}
return CompletableFuture.completedFuture(map);
}

public static <K, V> BatchLoader<K, V> keysAsValues(List<List<K>> loadCalls) {
return keys -> {
List<K> ks = new ArrayList<>(keys);
Expand Down
Loading