From fa2411c07dcebf0b02615bf029caeaf69aeaef90 Mon Sep 17 00:00:00 2001 From: Joel Kaasinen Date: Tue, 10 Aug 2021 16:05:40 +0300 Subject: [PATCH] Introduce net.cgrand.xforms/parallel --- src/net/cgrand/xforms.cljc | 43 ++++++++++++++++++++++++++++++++ test/net/cgrand/xforms_test.cljc | 43 +++++++++++++++++++++++++++++++- 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/src/net/cgrand/xforms.cljc b/src/net/cgrand/xforms.cljc index 3740693..6d46c64 100644 --- a/src/net/cgrand/xforms.cljc +++ b/src/net/cgrand/xforms.cljc @@ -806,3 +806,46 @@ ([dimensions valfn summary-fn coll] (into {} (rollup dimensions valfn summary-fn) coll))) ) + +(defn parallel [rf] + "A transducer that runs the given reducing function in the background. + That is, when (parallel rf) is given an element x to reduce, it + starts a future running rf on x. When given the next element, it + will wait for the previous future to finish first before launching + the (rf acc y) future. + + When (parallel rf) is completed (called with arity 1), it waits + until the previous element has been reduced, and then finalizes rf + synchronously. + + This can be used to parallelize a chain of transducers. However, + since (parallel rf) only does a single call to rf on the + background, it's only useful when parallelizing transducers with + expensive operations on single elements. It probably won't help + with speeding up a situation where a large number of elements are + reduced with a relatively cheap function. + + In general, (comp parallelize xform) and (comp xform parallelize) + has the same result as xform. + + Usage patterns: + (comp (map expensive-operation) + parallelize + (map expensive-operation2)) + + (reducing-context (parallelize expensive-reducing-function))" + (let [memory (atom nil)] + (fn + ([] (rf)) + ([acc] + (let [prev (if-let [pending @memory] + @pending + acc)] + (rf (unreduced prev)))) ;; ??? + ([acc input] + (let [prev (if-let [pending @memory] + @pending + acc)] + (when-not (reduced? prev) + (reset! memory (future (rf prev input)))) + prev))))) diff --git a/test/net/cgrand/xforms_test.cljc b/test/net/cgrand/xforms_test.cljc index d3b0466..30b189a 100644 --- a/test/net/cgrand/xforms_test.cljc +++ b/test/net/cgrand/xforms_test.cljc @@ -70,6 +70,10 @@ (is (trial (x/for [x % y (range x)] [x y]) 4 (range 16))) (is (trial (x/reduce +) + 4 (range 16))) + (is (trial x/parallel + 4 (range 16))) + (is (trial (comp x/parallel (map inc) x/parallel) 4 (range 16))))) (deftest reductions @@ -139,4 +143,41 @@ (is (= (range 100) (x/into [] (x/sort) (shuffle (range 100))))) (is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100))))) (is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100))))) - (is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))) \ No newline at end of file + (is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))) + +(deftest parallel + (is (= [1 2 3 4 5] + (into [] (comp x/parallel (map inc)) (range 5)) + (into [] (comp (map inc) x/parallel) (range 5)) + (into [] (comp x/parallel (map inc) x/parallel) (range 5)))) + (let [barrier-1 (java.util.concurrent.CyclicBarrier. 2) + tick! #(.await barrier-1 500 java.util.concurrent.TimeUnit/MILLISECONDS) + barrier-2 (java.util.concurrent.CyclicBarrier. 2) + tock! #(.await barrier-2 500 java.util.concurrent.TimeUnit/MILLISECONDS) + trace (atom []) + log! #(swap! trace conj %) + rf (fn + ([acc] + (log! [:finish acc]) + acc) + ([acc x] + (log! [:start x]) + (tick!) + (tock!) + (log! [:end x]) + (+ acc x)))] + (testing "test concurrency" + (dotimes [_ 100] + (reset! trace []) + (let [par (x/parallel rf)] + (is (= 10 (par 10 1)) "first call just returns initial state") + (tick!) + (is (= [[:start 1]] @trace) "first elements starts reducing on the background") + (tock!) + (is (= 11 (par 10 2)) "second call returns first state") + (tick!) + (is (= [[:start 1] [:end 1] [:start 2]] @trace) "first element finished, second starts reducing") + (tock!) + (is (= 13 (par 11)) "completion call returns final state") + (is (= [[:start 1] [:end 1] [:start 2] [:end 2] [:finish 13]] @trace) + "second element finishes, completion"))))))