diff --git a/README.md b/README.md index e48de1d..b64453e 100644 --- a/README.md +++ b/README.md @@ -510,7 +510,52 @@ and there are also gains to this different mode of operation: However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs. -## Scheduled Dispatching +## The BatchLoader Scheduler + +By default, when `dataLoader.dispatch()` is called, the `BatchLoader` / `MappedBatchLoader` function will be invoked +immediately. + +However, you can provide your own `BatchLoaderScheduler` that allows this call to be done some time into +the future. + +You will be passed a callback (`ScheduledBatchLoaderCall` / `ScheduledMapBatchLoaderCall`) and you are expected +to eventually call this callback method to make the batch loading happen. + +The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invoking the batch loading functions. + +```java + new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; +``` + +You are given the keys to be loaded and an optional `BatchLoaderEnvironment` for informative purposes. You can't change the list of +keys that will be loaded via this mechanism say. + +Also note, because there is a max batch size, it is possible for this scheduling to happen N times for a given `dispatch()` +call. The total set of keys will be sliced into batches themselves and then the `BatchLoaderScheduler` will be called for +each batch of keys. + +Do not assume that a single call to `dispatch()` results in a single call to `BatchLoaderScheduler`. + +This code is inspired from the scheduling code in the [reference JS implementation](https://github.com/graphql/dataloader#batch-scheduling) + +## Scheduled Registry Dispatching `ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked. diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 883189c..67db4e2 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -3,6 +3,7 @@ import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; import org.dataloader.stats.context.IncrementBatchLoadExceptionCountStatisticsContext; @@ -417,10 +418,23 @@ CompletableFuture> invokeLoader(List keys, List keyContexts) @SuppressWarnings("unchecked") private CompletableFuture> invokeListBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof BatchLoaderWithContext) { - loadResult = ((BatchLoaderWithContext) batchLoadFunction).load(keys, environment); + BatchLoaderWithContext loadFunction = (BatchLoaderWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchLoaderCall loadCall = () -> loadFunction.load(keys, environment); + loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, environment); + } else { + loadResult = loadFunction.load(keys, environment); + } } else { - loadResult = ((BatchLoader) batchLoadFunction).load(keys); + BatchLoader loadFunction = (BatchLoader) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchLoaderCall loadCall = () -> loadFunction.load(keys); + loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, null); + } else { + loadResult = loadFunction.load(keys); + } } return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); } @@ -434,10 +448,23 @@ private CompletableFuture> invokeListBatchLoader(List keys, BatchLoad private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; Set setOfKeys = new LinkedHashSet<>(keys); + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof MappedBatchLoaderWithContext) { - loadResult = ((MappedBatchLoaderWithContext) batchLoadFunction).load(setOfKeys, environment); + MappedBatchLoaderWithContext loadFunction = (MappedBatchLoaderWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledMappedBatchLoaderCall loadCall = () -> loadFunction.load(setOfKeys, environment); + loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, environment); + } else { + loadResult = loadFunction.load(setOfKeys, environment); + } } else { - loadResult = ((MappedBatchLoader) batchLoadFunction).load(setOfKeys); + MappedBatchLoader loadFunction = (MappedBatchLoader) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledMappedBatchLoaderCall loadCall = () -> loadFunction.load(setOfKeys); + loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, null); + } else { + loadResult = loadFunction.load(setOfKeys); + } } CompletableFuture> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); return mapBatchLoad.thenApply(map -> { diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index 4c79296..bac9476 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -18,6 +18,7 @@ import org.dataloader.annotations.PublicApi; import org.dataloader.impl.Assertions; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.NoOpStatisticsCollector; import org.dataloader.stats.StatisticsCollector; @@ -46,6 +47,7 @@ public class DataLoaderOptions { private Supplier statisticsCollector; private BatchLoaderContextProvider environmentProvider; private ValueCacheOptions valueCacheOptions; + private BatchLoaderScheduler batchLoaderScheduler; /** * Creates a new data loader options with default settings. @@ -58,6 +60,7 @@ public DataLoaderOptions() { statisticsCollector = NoOpStatisticsCollector::new; environmentProvider = NULL_PROVIDER; valueCacheOptions = ValueCacheOptions.newOptions(); + batchLoaderScheduler = null; } /** @@ -77,6 +80,7 @@ public DataLoaderOptions(DataLoaderOptions other) { this.statisticsCollector = other.statisticsCollector; this.environmentProvider = other.environmentProvider; this.valueCacheOptions = other.valueCacheOptions; + batchLoaderScheduler = other.batchLoaderScheduler; } /** @@ -304,4 +308,24 @@ public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOption this.valueCacheOptions = Assertions.nonNull(valueCacheOptions); return this; } + + /** + * @return the {@link BatchLoaderScheduler} to use, which can be null + */ + public BatchLoaderScheduler getBatchLoaderScheduler() { + return batchLoaderScheduler; + } + + /** + * Sets in a new {@link BatchLoaderScheduler} that allows the call to a {@link BatchLoader} function to be scheduled + * to some future time. + * + * @param batchLoaderScheduler the scheduler + * + * @return the data loader options for fluent coding + */ + public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) { + this.batchLoaderScheduler = batchLoaderScheduler; + return this; + } } diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java new file mode 100644 index 0000000..bcebfa0 --- /dev/null +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -0,0 +1,74 @@ +package org.dataloader.scheduler; + +import org.dataloader.BatchLoader; +import org.dataloader.BatchLoaderEnvironment; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.MappedBatchLoader; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +/** + * By default, when {@link DataLoader#dispatch()} is called, the {@link BatchLoader} / {@link MappedBatchLoader} function will be invoked + * immediately. However, you can provide your own {@link BatchLoaderScheduler} that allows this call to be done some time into + * the future. You will be passed a callback ({@link ScheduledBatchLoaderCall} / {@link ScheduledMappedBatchLoaderCall} and you are expected + * to eventually call this callback method to make the batch loading happen. + *

+ * Note: Because there is a {@link DataLoaderOptions#maxBatchSize()} it is possible for this scheduling to happen N times for a given {@link DataLoader#dispatch()} + * call. The total set of keys will be sliced into batches themselves and then the {@link BatchLoaderScheduler} will be called for + * each batch of keys. Do not assume that a single call to {@link DataLoader#dispatch()} results in a single call to {@link BatchLoaderScheduler}. + */ +public interface BatchLoaderScheduler { + + + /** + * This represents a callback that will invoke a {@link BatchLoader} function under the covers + * + * @param the value type + */ + interface ScheduledBatchLoaderCall { + CompletionStage> invoke(); + } + + /** + * This represents a callback that will invoke a {@link MappedBatchLoader} function under the covers + * + * @param the key type + * @param the value type + */ + interface ScheduledMappedBatchLoaderCall { + CompletionStage> invoke(); + } + + /** + * This is called to schedule a {@link BatchLoader} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link BatchLoader} to proceed. + * @param keys this is the list of keys that will be passed to the {@link BatchLoader}. + * This is provided only for informative reasons and you cant change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link BatchLoader} call + * @param the key type + * @param the value type + * + * @return a promise to the values that come from the {@link BatchLoader} + */ + CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); + + /** + * This is called to schedule a {@link MappedBatchLoader} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link MappedBatchLoader} to proceed. + * @param keys this is the list of keys that will be passed to the {@link MappedBatchLoader}. + * This is provided only for informative reasons and you cant change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link MappedBatchLoader} call + * @param the key type + * @param the value type + * + * @return a promise to the values that come from the {@link BatchLoader} + */ + CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); +} diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index e37550e..40a0260 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -12,6 +12,7 @@ import org.dataloader.fixtures.UserManager; import org.dataloader.registries.DispatchPredicate; import org.dataloader.registries.ScheduledDataLoaderRegistry; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; @@ -23,6 +24,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Function; import java.util.stream.Collectors; import static java.lang.String.format; @@ -278,6 +280,30 @@ private void statsConfigExample() { DataLoader userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options); } + private void snooze(int i) { + } + + private void BatchLoaderSchedulerExample() { + new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + } + private void ScheduledDispatche() { DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10) .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 5c87148..0206bd9 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -1,13 +1,19 @@ package org.dataloader.fixtures; import org.dataloader.BatchLoader; +import org.dataloader.BatchLoaderWithContext; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderOptions; +import org.dataloader.MappedBatchLoader; +import org.dataloader.MappedBatchLoaderWithContext; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static java.util.stream.Collectors.toList; @@ -19,6 +25,27 @@ public static BatchLoader keysAsValues() { return CompletableFuture::completedFuture; } + public static BatchLoaderWithContext keysAsValuesWithContext() { + return (keys, env) -> CompletableFuture.completedFuture(keys); + } + + public static MappedBatchLoader keysAsMapOfValues() { + return keys -> mapOfKeys(keys); + } + + public static MappedBatchLoaderWithContext keysAsMapOfValuesWithContext() { + return (keys, env) -> mapOfKeys(keys); + } + + private static CompletableFuture> mapOfKeys(Set keys) { + Map map = new HashMap<>(); + for (K key : keys) { + //noinspection unchecked + map.put(key, (V) key); + } + return CompletableFuture.completedFuture(map); + } + public static BatchLoader keysAsValues(List> loadCalls) { return keys -> { List ks = new ArrayList<>(keys); diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java new file mode 100644 index 0000000..beb7c18 --- /dev/null +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -0,0 +1,167 @@ +package org.dataloader.scheduler; + +import org.dataloader.BatchLoaderEnvironment; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import static org.awaitility.Awaitility.await; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.fixtures.TestKit.keysAsMapOfValues; +import static org.dataloader.fixtures.TestKit.keysAsMapOfValuesWithContext; +import static org.dataloader.fixtures.TestKit.keysAsValues; +import static org.dataloader.fixtures.TestKit.keysAsValuesWithContext; +import static org.dataloader.fixtures.TestKit.snooze; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class BatchLoaderSchedulerTest { + + BatchLoaderScheduler immediateScheduling = new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return scheduledCall.invoke(); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return scheduledCall.invoke(); + } + }; + + private BatchLoaderScheduler delayedScheduling(int ms) { + return new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(ms); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(ms); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + } + + private static void commonSetupAndSimpleAsserts(DataLoader identityLoader) { + CompletableFuture future1 = identityLoader.load(1); + CompletableFuture future2 = identityLoader.load(2); + + identityLoader.dispatch(); + + await().until(() -> future1.isDone() && future2.isDone()); + assertThat(future1.join(), equalTo(1)); + assertThat(future2.join(), equalTo(2)); + } + + @Test + public void can_allow_a_simple_scheduler() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_context() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newDataLoader(keysAsValuesWithContext(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_mapped_batch_load() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newMappedDataLoader(keysAsMapOfValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_mapped_batch_load_with_context() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newMappedDataLoader(keysAsMapOfValuesWithContext(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_an_async_scheduler() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(delayedScheduling(50)); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + + @Test + public void can_allow_a_funky_scheduler() { + AtomicBoolean releaseTheHounds = new AtomicBoolean(); + BatchLoaderScheduler funkyScheduler = new BatchLoaderScheduler() { + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + while (!releaseTheHounds.get()) { + snooze(10); + } + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + while (!releaseTheHounds.get()) { + snooze(10); + } + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(funkyScheduler); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + CompletableFuture future1 = identityLoader.load(1); + CompletableFuture future2 = identityLoader.load(2); + + identityLoader.dispatch(); + + // we can spin around for a while - nothing will happen until we release the hounds + for (int i = 0; i < 5; i++) { + assertThat(future1.isDone(), equalTo(false)); + assertThat(future2.isDone(), equalTo(false)); + snooze(50); + } + + releaseTheHounds.set(true); + + await().until(() -> future1.isDone() && future2.isDone()); + assertThat(future1.join(), equalTo(1)); + assertThat(future2.join(), equalTo(2)); + } + + +}