From 0b33b8a551d29efd93b1628eb1e6e61fb42e7d94 Mon Sep 17 00:00:00 2001 From: alex079 <> Date: Tue, 1 Mar 2022 23:27:13 +0100 Subject: [PATCH 1/4] implement recursive dispatching Support composition of data loader invocations. - when using 'thenCompose' method to chain data loader calls, all composed load calls will be dispatched - the methods 'dispatchAll' or 'dispatchAllWithCount' will recur until there are no more calls to dispatch - the method 'dispatchAllWithCount' will block in order to return the counter of dispatches --- .../org/dataloader/DataLoaderRegistry.java | 22 +++++- .../dataloader/DataLoaderRegistryTest.java | 78 +++++++++++++++++++ 2 files changed, 98 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 9b19c29..93426c3 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -4,12 +4,15 @@ import org.dataloader.stats.Statistics; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; @@ -131,7 +134,13 @@ public Set getKeys() { * {@link org.dataloader.DataLoader}s */ public void dispatchAll() { - getDataLoaders().forEach(DataLoader::dispatch); + CompletableFuture[] futuresToDispatch = getDataLoaders().stream() + .filter(dataLoader -> dataLoader.dispatchDepth() > 0) + .map(DataLoader::dispatch) + .toArray(CompletableFuture[]::new); + if (futuresToDispatch.length > 0) { + CompletableFuture.allOf(futuresToDispatch).whenComplete((__, throwable) -> dispatchAll()); + } } /** @@ -142,8 +151,17 @@ public void dispatchAll() { */ public int dispatchAllWithCount() { int sum = 0; + List> futuresToDispatch = new ArrayList<>(); for (DataLoader dataLoader : getDataLoaders()) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); + if (dataLoader.dispatchDepth() > 0) { + DispatchResult dispatchResult = dataLoader.dispatchWithCounts(); + sum += dispatchResult.getKeysCount(); + futuresToDispatch.add(dispatchResult.getPromisedResults()); + } + } + if (futuresToDispatch.size() > 0) { + CompletableFuture.allOf(futuresToDispatch.toArray(new CompletableFuture[0])).join(); + sum += dispatchAllWithCount(); } return sum; } diff --git a/src/test/java/org/dataloader/DataLoaderRegistryTest.java b/src/test/java/org/dataloader/DataLoaderRegistryTest.java index 6d70654..d15576b 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/DataLoaderRegistryTest.java @@ -4,6 +4,7 @@ import org.junit.Test; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.dataloader.DataLoaderFactory.newDataLoader; @@ -14,6 +15,8 @@ public class DataLoaderRegistryTest { final BatchLoader identityBatchLoader = CompletableFuture::completedFuture; + final BatchLoader incrementalBatchLoader = + v -> CompletableFuture.supplyAsync(() -> v.stream().map(i -> ++i).collect(Collectors.toList())); @Test public void registration_works() { @@ -159,6 +162,81 @@ public void dispatch_counts_are_maintained() { assertThat(dispatchDepth, equalTo(0)); } + @Test + public void composed_dispatch_counts_are_maintained() { + + DataLoaderRegistry registry = new DataLoaderRegistry(); + + DataLoader dlA = newDataLoader(incrementalBatchLoader); + DataLoader dlB = newDataLoader(incrementalBatchLoader); + + registry.register("a", dlA); + registry.register("b", dlB); + + CompletableFuture test1 = dlA.load(10) + .thenCompose(dlA::load) + .thenCompose(dlB::load) + .thenCompose(dlB::load); + CompletableFuture test2 = dlB.load(20) + .thenCompose(dlB::load) + .thenCompose(dlA::load) + .thenCompose(dlA::load); + + int dispatchDepth = registry.dispatchDepth(); + assertThat(dispatchDepth, equalTo(2)); + + int dispatchedCount = registry.dispatchAllWithCount(); + dispatchDepth = registry.dispatchDepth(); + assertThat(dispatchedCount, equalTo(8)); + assertThat(dispatchDepth, equalTo(0)); + assertThat(test1.join(), equalTo(14)); + assertThat(test2.join(), equalTo(24)); + } + + @Test + public void composed_stats_can_be_collected() { + + DataLoaderRegistry registry = new DataLoaderRegistry(); + + DataLoader dlA = newDataLoader(incrementalBatchLoader); + DataLoader dlB = newDataLoader(incrementalBatchLoader); + DataLoader dlC = newDataLoader(incrementalBatchLoader); + + registry.register("a", dlA).register("b", dlB).register("c", dlC); + + CompletableFuture test1 = dlA.load(10) + .thenCompose(dlB::load) + .thenCompose(dlC::load); + CompletableFuture test2 = dlC.load(20) + .thenCompose(dlB::load) + .thenCompose(dlA::load); + + registry.dispatchAll(); + CompletableFuture.allOf(test1, test2).join(); // wait for composed dispatches to settle + + CompletableFuture test3 = dlA.load(10) + .thenCompose(dlB::load) + .thenCompose(dlC::load); + CompletableFuture test4 = dlC.load(20) + .thenCompose(dlB::load) + .thenCompose(dlA::load); + + registry.dispatchAll(); + CompletableFuture.allOf(test3, test4).join(); // wait for composed dispatches to settle + + Statistics statistics = registry.getStatistics(); + + assertThat(statistics.getLoadCount(), equalTo(12L)); + assertThat(statistics.getBatchLoadCount(), equalTo(6L)); + assertThat(statistics.getCacheHitCount(), equalTo(6L)); + assertThat(statistics.getLoadErrorCount(), equalTo(0L)); + assertThat(statistics.getBatchLoadExceptionCount(), equalTo(0L)); + assertThat(test1.join(), equalTo(13)); + assertThat(test2.join(), equalTo(23)); + assertThat(test3.join(), equalTo(13)); + assertThat(test4.join(), equalTo(23)); + } + @Test public void builder_works() { DataLoader dlA = newDataLoader(identityBatchLoader); From bc20cb11b99a4a37e11b9fa6f8ed271910e91171 Mon Sep 17 00:00:00 2001 From: alex079 <> Date: Tue, 1 Mar 2022 23:33:09 +0100 Subject: [PATCH 2/4] remove unused imports --- src/main/java/org/dataloader/DataLoaderRegistry.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 93426c3..4dabf8c 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -4,8 +4,6 @@ import org.dataloader.stats.Statistics; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; From a9866159e9b801b35f1f12cac7d69051a939d12e Mon Sep 17 00:00:00 2001 From: alex079 <> Date: Sat, 26 Mar 2022 21:02:51 +0100 Subject: [PATCH 3/4] trying to fix after review - revert changes in `dispatchAll` and `dispatchAllWithCount` methods - add a new method `dispatch` --- .../org/dataloader/DataLoaderRegistry.java | 39 ++++++----- .../dataloader/DataLoaderRegistryTest.java | 66 +++++++------------ 2 files changed, 46 insertions(+), 59 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 4dabf8c..4e9b78f 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -12,6 +12,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; /** @@ -128,17 +129,34 @@ public Set getKeys() { } /** - * This will called {@link org.dataloader.DataLoader#dispatch()} on each of the registered - * {@link org.dataloader.DataLoader}s + * This method will call {@link org.dataloader.DataLoader#dispatch()} on registered {@link org.dataloader.DataLoader}s + * repeatedly until there are no more calls to dispatch. + * @return the promise of total count of dispatched keys. */ - public void dispatchAll() { + public CompletableFuture dispatch() { + AtomicInteger count = new AtomicInteger(); CompletableFuture[] futuresToDispatch = getDataLoaders().stream() .filter(dataLoader -> dataLoader.dispatchDepth() > 0) - .map(DataLoader::dispatch) + .map(DataLoader::dispatchWithCounts) + .map(dispatchResult -> { + count.addAndGet(dispatchResult.getKeysCount()); + return dispatchResult.getPromisedResults(); + }) .toArray(CompletableFuture[]::new); if (futuresToDispatch.length > 0) { - CompletableFuture.allOf(futuresToDispatch).whenComplete((__, throwable) -> dispatchAll()); + return CompletableFuture.allOf(futuresToDispatch) + .thenCompose(__ -> dispatch()) + .thenApply(count::addAndGet); } + return CompletableFuture.completedFuture(count.get()); + } + + /** + * This will called {@link org.dataloader.DataLoader#dispatch()} on each of the registered + * {@link org.dataloader.DataLoader}s + */ + public void dispatchAll() { + getDataLoaders().forEach(DataLoader::dispatch); } /** @@ -149,17 +167,8 @@ public void dispatchAll() { */ public int dispatchAllWithCount() { int sum = 0; - List> futuresToDispatch = new ArrayList<>(); for (DataLoader dataLoader : getDataLoaders()) { - if (dataLoader.dispatchDepth() > 0) { - DispatchResult dispatchResult = dataLoader.dispatchWithCounts(); - sum += dispatchResult.getKeysCount(); - futuresToDispatch.add(dispatchResult.getPromisedResults()); - } - } - if (futuresToDispatch.size() > 0) { - CompletableFuture.allOf(futuresToDispatch.toArray(new CompletableFuture[0])).join(); - sum += dispatchAllWithCount(); + sum += dataLoader.dispatchWithCounts().getKeysCount(); } return sum; } diff --git a/src/test/java/org/dataloader/DataLoaderRegistryTest.java b/src/test/java/org/dataloader/DataLoaderRegistryTest.java index d15576b..6279167 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/DataLoaderRegistryTest.java @@ -163,66 +163,43 @@ public void dispatch_counts_are_maintained() { } @Test - public void composed_dispatch_counts_are_maintained() { + public void composed_dispatch_counts_and_stats_are_maintained() { DataLoaderRegistry registry = new DataLoaderRegistry(); DataLoader dlA = newDataLoader(incrementalBatchLoader); DataLoader dlB = newDataLoader(incrementalBatchLoader); + DataLoader dlC = newDataLoader(incrementalBatchLoader); - registry.register("a", dlA); - registry.register("b", dlB); - - CompletableFuture test1 = dlA.load(10) - .thenCompose(dlA::load) + registry.register("a", dlA).register("b", dlB).register("c", dlC); + + CompletableFuture test1 = dlA.load(100) .thenCompose(dlB::load) - .thenCompose(dlB::load); - CompletableFuture test2 = dlB.load(20) + .thenCompose(dlC::load); + CompletableFuture test2 = dlC.load(200) .thenCompose(dlB::load) - .thenCompose(dlA::load) .thenCompose(dlA::load); - int dispatchDepth = registry.dispatchDepth(); - assertThat(dispatchDepth, equalTo(2)); - - int dispatchedCount = registry.dispatchAllWithCount(); - dispatchDepth = registry.dispatchDepth(); - assertThat(dispatchedCount, equalTo(8)); - assertThat(dispatchDepth, equalTo(0)); - assertThat(test1.join(), equalTo(14)); - assertThat(test2.join(), equalTo(24)); - } + assertThat("Initially dispatching only top level load calls", registry.dispatchDepth(), equalTo(2)); - @Test - public void composed_stats_can_be_collected() { + CompletableFuture dispatchedKeys1 = registry.dispatch(); - DataLoaderRegistry registry = new DataLoaderRegistry(); + assertThat("Total count of dispatched keys in first iteration", dispatchedKeys1.join(), equalTo(6)); + assertThat("Zero dispatch depth after first iteration done", registry.dispatchDepth(), equalTo(0)); - DataLoader dlA = newDataLoader(incrementalBatchLoader); - DataLoader dlB = newDataLoader(incrementalBatchLoader); - DataLoader dlC = newDataLoader(incrementalBatchLoader); - - registry.register("a", dlA).register("b", dlB).register("c", dlC); - - CompletableFuture test1 = dlA.load(10) + CompletableFuture test3 = dlA.load(100) .thenCompose(dlB::load) .thenCompose(dlC::load); - CompletableFuture test2 = dlC.load(20) + CompletableFuture test4 = dlC.load(200) .thenCompose(dlB::load) .thenCompose(dlA::load); - registry.dispatchAll(); - CompletableFuture.allOf(test1, test2).join(); // wait for composed dispatches to settle + assertThat("Not dispatching the same keys twice", registry.dispatchDepth(), equalTo(0)); - CompletableFuture test3 = dlA.load(10) - .thenCompose(dlB::load) - .thenCompose(dlC::load); - CompletableFuture test4 = dlC.load(20) - .thenCompose(dlB::load) - .thenCompose(dlA::load); + CompletableFuture dispatchedKeys2 = registry.dispatch(); - registry.dispatchAll(); - CompletableFuture.allOf(test3, test4).join(); // wait for composed dispatches to settle + assertThat("Zero dispatched keys in second iteration", dispatchedKeys2.join(), equalTo(0)); + assertThat("Zero dispatch depth after second iteration done", registry.dispatchDepth(), equalTo(0)); Statistics statistics = registry.getStatistics(); @@ -231,10 +208,11 @@ public void composed_stats_can_be_collected() { assertThat(statistics.getCacheHitCount(), equalTo(6L)); assertThat(statistics.getLoadErrorCount(), equalTo(0L)); assertThat(statistics.getBatchLoadExceptionCount(), equalTo(0L)); - assertThat(test1.join(), equalTo(13)); - assertThat(test2.join(), equalTo(23)); - assertThat(test3.join(), equalTo(13)); - assertThat(test4.join(), equalTo(23)); + + assertThat(test1.join(), equalTo(103)); + assertThat(test2.join(), equalTo(203)); + assertThat(test3.join(), equalTo(103)); + assertThat(test4.join(), equalTo(203)); } @Test From 399fe86192f814aa31dd52341e6e4eb81032799d Mon Sep 17 00:00:00 2001 From: alex079 <> Date: Sat, 2 Apr 2022 17:32:43 +0200 Subject: [PATCH 4/4] update after merge --- .../java/org/dataloader/DataLoaderRegistryTest.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/test/java/org/dataloader/DataLoaderRegistryTest.java b/src/test/java/org/dataloader/DataLoaderRegistryTest.java index 7921b3b..69f947f 100644 --- a/src/test/java/org/dataloader/DataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/DataLoaderRegistryTest.java @@ -170,7 +170,7 @@ public void dispatch_counts_are_maintained() { } @Test - public void composed_dispatch_counts_and_stats_are_maintained() { + public void composed_dispatch_counts_are_maintained() { DataLoaderRegistry registry = new DataLoaderRegistry(); @@ -208,14 +208,6 @@ public void composed_dispatch_counts_and_stats_are_maintained() { assertThat("Zero dispatched keys in second iteration", dispatchedKeys2.join(), equalTo(0)); assertThat("Zero dispatch depth after second iteration done", registry.dispatchDepth(), equalTo(0)); - Statistics statistics = registry.getStatistics(); - - assertThat(statistics.getLoadCount(), equalTo(12L)); - assertThat(statistics.getBatchLoadCount(), equalTo(6L)); - assertThat(statistics.getCacheHitCount(), equalTo(6L)); - assertThat(statistics.getLoadErrorCount(), equalTo(0L)); - assertThat(statistics.getBatchLoadExceptionCount(), equalTo(0L)); - assertThat(test1.join(), equalTo(103)); assertThat(test2.join(), equalTo(203)); assertThat(test3.join(), equalTo(103));