Skip to content

Commit

Permalink
Adds async reduce and walk
Browse files Browse the repository at this point in the history
... and betters some doc strings
  • Loading branch information
jtkDvlp committed Jul 30, 2022
1 parent eaa1559 commit 953a402
Showing 1 changed file with 90 additions and 29 deletions.
119 changes: 90 additions & 29 deletions src/jtk_dvlp/async.cljc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns jtk-dvlp.async
(:refer-clojure
:exclude [map pmap amap reduce into])
:exclude [map pmap amap areduce reduce into])

#?(:cljs
(:require-macros
Expand All @@ -17,7 +17,7 @@

#?(:clj
(:import
[clojure.lang ExceptionInfo]
[clojure.lang ExceptionInfo MapEntry]
[clojure.core.async.impl.channels ManyToManyChannel]))

,,,)
Expand Down Expand Up @@ -97,6 +97,37 @@
(ex-info "unknown" {:code :unknown} e#))))
chs))

(defn smap
"Like `clojure.core/map` but given function `<f` is async. Execution of `<f` with values of `xs` will be sequential with the given order of `xs`. Carries thrown exception (will convert to `ExceptionInfo`) as result.
Also see `amap`"
[<f & xs]
(go-loop [result [], xs xs]
(if (ffirst xs)
(let [next-result
(->> xs
(mapv first)
(apply <f)
(<!))]

(recur
(conj result next-result)
(mapv next xs)))

result)))

(def chain
"Alias for `smap`"
smap)

(defn amap
"Like `clojure.core/map` but given function `<f` is async. Execution of `<f` with values of `xs` can be unordered an for clojure (not clojurescript) in parallel. Carries thrown exception (will convert to `ExceptionInfo`) as result.
Also see `smap`"
[<f & xs]
(->> (apply clojure.core/map <f xs)
(map vector)))

(defn reduce
"Like `core.async/reduce` but carries thrown exception (will convert to `ExceptionInfo`) as result."
[f init ch]
Expand All @@ -112,37 +143,67 @@
(reduced (ex-info "unknown" {:code :unknown} e#)))))
init ch))

(defn areduce
"Like `clojure.core/reduce` but given function `<f` is async. Carries thrown exception (will convert to `ExceptionInfo`) as result."
[<f init coll]
(go-loop [accu init, [item & rest-coll] coll]
(if item
(recur
(<! (<f accu item))
rest-coll)
accu)))

(defn into
"Like `core.async/into` but carries thrown exception (will convert to `ExceptionInfo`) as result."
[coll ch]
(reduce conj coll ch))

(defn smap
"Applies async function `<f` on every item of seqs `xs` *chaining its execution to make sure its run sequentially*. All seqs of `xs` must have the same length. Returns vector of all results applying `<f`. Supports error handling.
(defn awalk
"Like `clojure.core/walk` but given function `<inner` and `<outer` are async. Execution with values of `form` can be unordered an for clojure (not clojurescript) in parallel. Carries thrown exception (will convert to `ExceptionInfo`) as result."
[<inner <outer form]
(go
(cond
(list? form)
(<outer (apply list (<! (amap <inner form))))

Also see `amap`"
[<f & xs]
(go-loop [result [], xs xs]
(if (ffirst xs)
#?(:cljs (map-entry? form) :clj (instance? clojure.lang.IMapEntry form))
(do
(let [next-result
(->> xs
(mapv first)
(apply <f)
(<!))]

(recur
(conj result next-result)
(mapv next xs))))

result)))

(def chain smap)

(defn amap
"Applies async function `<f` on every item of seqs `xs`. All seqs of `xs` must have the same length. Returns vector of all results applying `<f`. Supports error handling.
Also see `smap`"
[<f & xs]
(->> (apply clojure.core/map <f xs)
(map vector)))
(<! (<outer #?(:cljs
(MapEntry.
(<! (<inner (key form)))
(<! (<inner (val form)))
nil)

:clj
(clojure.lang.MapEntry/create
(<! (<inner (key form)))
(<! (<inner (val form))))))))

(seq? form)
(<! (<outer (<! (amap <inner form))))

(record? form)
(<! (<outer (<! (areduce (fn [r x] (let [c (async/chan 1)] (async/take! (<inner x) #(conj r %)) c)) form form))))

(coll? form)
(<! (<outer (clojure.core/into (empty form) (<! (amap <inner form)))))

:else
(<! (<outer form)))))

(defn apostwalk
"Like `clojure.core/postwalk` but given function `<f` is async. Execution with values of `form` can be unordered an for clojure (not clojurescript) in parallel. Carries thrown exception (will convert to `ExceptionInfo`) as result."
[<f form]
(awalk (partial apostwalk <f) <f form))

(defn aprewalk
"Like `clojure.core/prewalk` but given function `<f` is async. Execution with values of `form` can be unordered an for clojure (not clojurescript) in parallel. Carries thrown exception (will convert to `ExceptionInfo`) as result."
[<f form]
(go
(<!
(awalk
(partial aprewalk <f)
#(let [c (async/chan 1)]
(async/put! c (identity %))
c)
(<! (<f form))))))

0 comments on commit 953a402

Please sign in to comment.