From c0a16a8522792d520d4a492f5f6c209f9f6806b3 Mon Sep 17 00:00:00 2001 From: smillies Date: Thu, 14 Sep 2017 11:59:19 +0200 Subject: [PATCH 1/3] The default executor service of java.io.concurrent.Future creates daemon threads --- vavr/src/main/java/io/vavr/concurrent/Future.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/vavr/src/main/java/io/vavr/concurrent/Future.java b/vavr/src/main/java/io/vavr/concurrent/Future.java index 85ab5822f5..8fa6e93949 100644 --- a/vavr/src/main/java/io/vavr/concurrent/Future.java +++ b/vavr/src/main/java/io/vavr/concurrent/Future.java @@ -58,10 +58,14 @@ public interface Future extends Value { /** - * The default executor service is {@link Executors#newCachedThreadPool()}. - * Please note that it may prevent the VM from shutdown. - */ - ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool(); + * The default executor service is {@link Executors#newCachedThreadPool()}. The service will create daemon threads, + * in order to avoid preventing the VM from shutdown. + */ + ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool(runnable -> { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + thread.setDaemon(true); + return thread; + }); /** * Creates a failed {@code Future} with the given {@code exception}, backed by the {@link #DEFAULT_EXECUTOR_SERVICE}. From e655bc19a69f5a25cddd185ef3dd8f182af39c71 Mon Sep 17 00:00:00 2001 From: smillies Date: Sun, 17 Sep 2017 19:06:49 +0200 Subject: [PATCH 2/3] Initial support for concurrent function memoization --- .../ConcurrentTrampoliningMemoizer.java | 42 +++++++++++++++++ .../vavr/concurrent/MemoizedConcurrently.java | 47 +++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 vavr/src/main/java/io/vavr/concurrent/ConcurrentTrampoliningMemoizer.java create mode 100644 vavr/src/main/java/io/vavr/concurrent/MemoizedConcurrently.java diff --git a/vavr/src/main/java/io/vavr/concurrent/ConcurrentTrampoliningMemoizer.java b/vavr/src/main/java/io/vavr/concurrent/ConcurrentTrampoliningMemoizer.java new file mode 100644 index 0000000000..446d3765bb --- /dev/null +++ b/vavr/src/main/java/io/vavr/concurrent/ConcurrentTrampoliningMemoizer.java @@ -0,0 +1,42 @@ +package io.vavr.concurrent; + +import io.vavr.Function1; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.concurrent.Executors.newSingleThreadExecutor; + +/** + * A utility class that creates a memoizing version of an arbitrary concurrent function. Recursive calls are trampolined + * so as to avoid stack overflows. + * @see "http://sebastian-millies.blogspot.de/2016/05/concurrent-recursive-function.html" + */ +class ConcurrentTrampoliningMemoizer { + private final ConcurrentMap> memo; + + ConcurrentTrampoliningMemoizer(ConcurrentMap> cache) { + this.memo = cache; + } + + Function1> memoize(Function1> f) { + return t -> { + Promise r = memo.get(t); + if (r == null) { + // value not yet memoized: put a container in the map that will come to hold the value + final Promise compute = Promise.make(); + r = memo.putIfAbsent(t, compute); + if (r == null) { + // only the thread that first has a cache miss calls the underlying function. + // recursive asynchronous calls are bounced off the task queue inside the default executor, avoiding stack overflows. + // the computed value is placed in the container. + Future futureValue = Future.ofSupplier(() -> f.apply(t)).flatMap(Function1.identity()); // unwrap nested Future + r = compute.completeWith(futureValue); + } + } + return r.future(); + }; + } +} diff --git a/vavr/src/main/java/io/vavr/concurrent/MemoizedConcurrently.java b/vavr/src/main/java/io/vavr/concurrent/MemoizedConcurrently.java new file mode 100644 index 0000000000..3c0f194f8b --- /dev/null +++ b/vavr/src/main/java/io/vavr/concurrent/MemoizedConcurrently.java @@ -0,0 +1,47 @@ +package io.vavr.concurrent; + +import io.vavr.Function1; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Marker interface for concurrently memoized functions. + */ +public interface MemoizedConcurrently { + + /** + * Delegates to {@link #of(Function1, ConcurrentMap)}} using a ConcurrentHashMap as cache implementation. + */ + static Function1> of(Function1> function) { + if (function instanceof MemoizedConcurrently) { + return function; // make this method idempotent + } + return MemoizedConcurrently.of(function, new ConcurrentHashMap<>()); + } + + /** + * Lift the given function to a thread-safe, concurrently memoizing version. The returned function computes the + * return value for a given argument only once. On subsequent calls given the same argument the memoized value + * is returned. The returned function has a few interesting properties: + *
    + *
  • The function does not permit {@code null} values. + *
  • Different threads will always wind up using the same value instances, so the function may compute values + * that are supposed to be singletons. + *
  • Concurrent callers won't block each other. + *
+ * This method is idempotent, i. e. applying it to an already concurrently memoized function will return the + * function unchanged. + * @param function a possibly recursive (asynchronous) function + * @param cache a structure that holds the memoized values + * @return the memoizing equivalent + */ + static Function1> of(Function1> function, ConcurrentMap> cache) { + if (function instanceof MemoizedConcurrently) { + return function; // make this method idempotent + } + ConcurrentTrampoliningMemoizer memoizer = new ConcurrentTrampoliningMemoizer<>(cache); + Function1> memoized = memoizer.memoize(function); + return (Function1> & MemoizedConcurrently) memoized::apply; // mark as memoized using intersection type + } +} From e7491ff6a95974e6a130bbce010a9d4afb3ef60b Mon Sep 17 00:00:00 2001 From: smillies Date: Sun, 17 Sep 2017 19:32:27 +0200 Subject: [PATCH 3/3] Revert "The default executor service of java.io.concurrent.Future creates daemon threads" This reverts commit c0a16a8522792d520d4a492f5f6c209f9f6806b3. --- vavr/src/main/java/io/vavr/concurrent/Future.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/vavr/src/main/java/io/vavr/concurrent/Future.java b/vavr/src/main/java/io/vavr/concurrent/Future.java index 8fa6e93949..85ab5822f5 100644 --- a/vavr/src/main/java/io/vavr/concurrent/Future.java +++ b/vavr/src/main/java/io/vavr/concurrent/Future.java @@ -58,14 +58,10 @@ public interface Future extends Value { /** - * The default executor service is {@link Executors#newCachedThreadPool()}. The service will create daemon threads, - * in order to avoid preventing the VM from shutdown. - */ - ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool(runnable -> { - Thread thread = Executors.defaultThreadFactory().newThread(runnable); - thread.setDaemon(true); - return thread; - }); + * The default executor service is {@link Executors#newCachedThreadPool()}. + * Please note that it may prevent the VM from shutdown. + */ + ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool(); /** * Creates a failed {@code Future} with the given {@code exception}, backed by the {@link #DEFAULT_EXECUTOR_SERVICE}.