Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement custom state and cancellation strategy for async items #2

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
This is a history of changes to k13labs/futurama

# 0.5.0
* Add custom state to keep track of async items
* Add custom cancel strategy which combines bound state, global weak state, and custom protocol impl

# 0.4.0
* Add `fixed-threadpool` method to create a FixedThreadPool
* The default `*thread-pool*` is now a FixedThreadPool which can be interrupted.
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<groupId>com.github.k13labs</groupId>
<artifactId>futurama</artifactId>
<name>futurama</name>
<version>0.4.0</version>
<version>0.5.0</version>
<scm>
<tag>0.4.0</tag>
<tag>0.5.0</tag>
<url>https://github.com/k13labs/futurama</url>
<connection>scm:git:git://github.com/k13labs/futurama.git</connection>
<developerConnection>scm:git:ssh://git@github.com/k13labs/futurama.git</developerConnection>
Expand Down
125 changes: 84 additions & 41 deletions src/futurama/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
[clojure.core.async.impl.channels :refer [box]]
[clojure.core.async.impl.ioc-macros :as ioc]
[clojure.core.reducers :as r]
[futurama.protocols :as proto]
[futurama.util :as u]
[futurama.state :as state]
[futurama.deferred])
(:import [clojure.lang Var IDeref IFn]
[java.util.concurrent
Expand All @@ -17,6 +19,8 @@
[java.util.concurrent.locks Lock]
[java.util.function Function BiConsumer]))

(def ^:dynamic *thread-pool* nil)

(defn fixed-threadpool
"Creates a fixed-threadpool, by default uses the number of available processors."
([]
Expand All @@ -30,8 +34,6 @@
default-pool
(delay (fixed-threadpool)))

(def ^:dynamic *thread-pool* nil)

(defmacro with-pool
"Utility macro which binds *thread-pool* to the supplied pool and then evaluates the `body`."
[pool & body]
Expand All @@ -48,8 +50,7 @@
"unwraps an ExecutionException or CompletionException via ex-cause until the root exception is returned"
^Throwable [^Throwable ex]
(if-let [ce (and (or (instance? ExecutionException ex)
(instance? CompletionException ex)
(instance? InterruptedException ex))
(instance? CompletionException ex))
(ex-cause ex))]
ce
ex))
Expand All @@ -61,6 +62,29 @@
(throw (unwrap-exception v))
v))

(defn cancel!
"Cancels the async item."
[item]
(let [proto-cancel (when (u/instance-satisfies? proto/AsyncCancellable item)
(proto/cancel item))
stack-cancel (state/set-value! item :cancelled true)]
(or proto-cancel stack-cancel false)))

(defn cancelled?
"Checks if the current executing async item or one of its parents or provided item has been cancelled.
Also checks if the thread has been interrupted and restores the interrupt status."
([]
(or (when (Thread/interrupted)
(.. (Thread/currentThread)
(interrupt))
true)
(some cancelled? (state/get-dynamic-items))
false))
([item]
(or (proto/cancelled? item)
(state/get-value item :cancelled)
false)))

(defn async?
"returns true if v instance-satisfies? core.async's `ReadPort`"
^Boolean [v]
Expand All @@ -70,25 +94,37 @@
"Asynchronously invokes the body inside a completable future, preserves the current thread binding frame,
using by default the `ForkJoinPool/commonPool`, the pool used can be specified via `*thread-pool*` binding."
^CompletableFuture [& body]
`(let [binding-frame# (Var/cloneThreadBindingFrame) ;;; capture the thread local binding frame before start
^CompletableFuture res-fut# (CompletableFuture.) ;;; this is the CompletableFuture being returned
^Runnable fbody# (fn do-complete#
[]
(try
(Var/resetThreadBindingFrame binding-frame#) ;;; set the Clojure binding frame captured above
(.complete res-fut# (do ~@body)) ;;; send the result of evaluating the body to the CompletableFuture
(catch Throwable ~'e
(.completeExceptionally res-fut# (unwrap-exception ~'e))))) ;;; if we catch an exception we send it to the CompletableFuture
^Future fut# (dispatch fbody#)
^Function cancel# (reify Function
(apply [~'_ ~'_]
(future-cancel fut#)))] ;;; submit the work to the pool and get the FutureTask doing the work
;;; if the CompletableFuture returns exceptionally
;;; then cancel the Future which is currently doing the work
(.exceptionally res-fut# cancel#)
res-fut#))
`(let [^CompletableFuture res-fut# (CompletableFuture.)] ;;; this is the CompletableFuture being returned
(state/push-item res-fut#
(let [binding-frame# (Var/cloneThreadBindingFrame) ;;; capture the thread local binding frame before start
^Runnable fbody# (fn do-complete#
[]
(let [thread-frame# (Var/getThreadBindingFrame)] ;;; get the Thread's binding frame
(Var/resetThreadBindingFrame binding-frame#) ;;; set the Clojure binding frame captured
(try
(.complete res-fut# (do ~@body)) ;;; send the result of evaluating the body to the CompletableFuture
(catch Throwable ~'e
;;; if we catch an exception we send it to the CompletableFuture
(.completeExceptionally res-fut# (unwrap-exception ~'e)))
(finally
(Var/resetThreadBindingFrame thread-frame#))))) ;;; restore the original Thread's binding frame
^Future fut# (dispatch fbody#)
^Function cancel# (reify Function
(apply [~'_ ~'_]
(cancel! res-fut#)
(future-cancel fut#)))] ;;; submit the work to the pool and get the FutureTask doing the work
;;; if the CompletableFuture returns exceptionally
;;; then cancel the Future which is currently doing the work
(.exceptionally res-fut# cancel#)
res-fut#))))

(extend-type Future
proto/AsyncCancellable
(cancel [this]
(future-cancel this))
(cancelled? [this]
(future-cancelled? this))

impl/ReadPort
(take! [fut handler]
(let [^Future fut fut
Expand Down Expand Up @@ -209,6 +245,12 @@
(realized? ref)))

(extend-type CompletableFuture
proto/AsyncCancellable
(cancel [this]
(future-cancel this))
(cancelled? [this]
(future-cancelled? this))

impl/ReadPort
(take! [fut handler]
(let [^CompletableFuture fut fut
Expand Down Expand Up @@ -299,26 +341,27 @@
completed; the pool used can be specified via `*thread-pool*` binding."
[port & body]
(let [crossing-env (zipmap (keys &env) (repeatedly gensym))]
`(let [c# ~port
captured-bindings# (Var/getThreadBindingFrame)
^Runnable task# (^:once fn* []
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
f# ~(ioc/state-machine `(try
~@body
(catch Throwable ~'e
(unwrap-exception ~'e))) 1 [crossing-env &env] ioc/async-custom-terminators)
state# (-> (f#)
(ioc/aset-all! ioc/USER-START-IDX c#
ioc/BINDINGS-IDX captured-bindings#))]
(ioc/run-state-machine-wrapped state#)))
^Future fut# (dispatch task#)]
(when (instance? CompletableFuture c#)
(.exceptionally ^CompletableFuture c#
^Function (reify Function
(apply [~'_ ~'_]
(println "cancelling future!")
(future-cancel fut#)))))
c#)))
`(let [c# ~port]
(state/push-item c#
(let [captured-bindings# (Var/getThreadBindingFrame)
^Runnable task# (^:once fn* []
(let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env)
f# ~(ioc/state-machine `(try
~@body
(catch Throwable ~'e
(unwrap-exception ~'e))) 1 [crossing-env &env] ioc/async-custom-terminators)
state# (-> (f#)
(ioc/aset-all! ioc/USER-START-IDX c#
ioc/BINDINGS-IDX captured-bindings#))]
(ioc/run-state-machine-wrapped state#)))
^Future fut# (dispatch task#)]
(when (instance? CompletableFuture c#)
(.exceptionally ^CompletableFuture c#
^Function (reify Function
(apply [~'_ ~'_]
(cancel! c#)
(future-cancel fut#)))))
c#)))))

(defmacro async
"Asynchronously executes the body, returning immediately to the
Expand Down
5 changes: 5 additions & 0 deletions src/futurama/protocols.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
(ns futurama.protocols)

(defprotocol AsyncCancellable
(cancel [this])
(cancelled? [this]))
50 changes: 50 additions & 0 deletions src/futurama/state.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
(ns futurama.state
(:import [java.util Map WeakHashMap]))

(defonce ^{:doc "Internal global state mananged using a Synchronized WeakHashMap"
:private true}
GLOBAL_STATE
(WeakHashMap.))

(def ^:dynamic *async-state* {})

(defn ^:no-doc put-global-state*
[key val]
(locking GLOBAL_STATE
(.put ^Map GLOBAL_STATE key val)))

(defn ^:no-doc get-global-state*
[key]
(locking GLOBAL_STATE
(.get ^Map GLOBAL_STATE key)))

(defmacro push-item
"Pushes the async item into the `*async-state*` with the default state of: `{}`"
[item & body]
`(let [key# ~item
val# (atom {})]
(binding [*async-state* (conj *async-state* [key# val#])]
(put-global-state* key# val#)
~@body)))

(defn- get-state
[item]
(or (get *async-state* item)
(get-global-state* item)))

(defn set-value!
"Changes the state of an item key in the state map."
[item key value]
(when-let [state-atom (get-state item)]
(-> (swap! state-atom conj [key value])
(get key))))

(defn get-value
"Gets the value of a state key in item's state map"
[item key]
(some-> (get-state item) deref (get key)))

(defn get-dynamic-items
"Lists all the items in the dynamic `*async-state*` map"
[]
(keys *async-state*))
72 changes: 58 additions & 14 deletions test/futurama/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
async-for async-map async-reduce
async-some async-every?
async-prewalk async-postwalk
completable-future]]
completable-future] :as async]
[clojure.core.async :refer [go timeout put! take! <! >! <!!] :as a]
[criterium.core :refer [report-result
quick-benchmark
Expand All @@ -31,33 +31,77 @@
(delay
(Executors/newSingleThreadExecutor)))

(deftest interruptible-completable-future-test
(deftest test-completable-future
(testing "basic completable-future test"
(let [a (atom false)
f (completable-future
;; Body can contain multiple elements.
(reset! a true)
(range 10))]
(is (= @f (range 10)))
(is (true? @a))))
(testing "binding frame completable-future test"
(binding [*test-val1* 1]
(let [f (completable-future *test-val1*)] ;;; found result of *test-val1* should be returned
(is (= @f 1))))))

(deftest cancel-async-test
(testing "cancellable completable-future is interrupted test"
(let [a (promise)
s (atom 0)
f (completable-future
(while (not (Thread/interrupted)) ;;; this loop goes on infinitely until the thread is interrupted
(println "future looping..." (swap! s inc)))
(println "ended future looping.")
(deliver a true))]
(try
(while (not (async/cancelled?)) ;;; this loop goes on infinitely until the thread is interrupted
(Thread/sleep 10)
(println "future looping..." (swap! s inc)))
(println "ended future looping.")
(deliver a true)
(catch InterruptedException _
(println "interrupted looping.")
(deliver a true))))]
(go
(<! (timeout 100))
(future-cancel f)) ;;; cancelling the completable future causes the backing thread to be interrupted
(async/cancel! f)) ;;; cancelling the completable future causes the backing thread to be interrupted
(is (true? @a))
(is (true? (.isCancelled f)))))
(is (true? (async/cancelled? f)))))
(testing "cancellable async block is interrupted test"
(let [a (promise)
s (atom 0)
f (async
(while (not (Thread/interrupted)) ;;; this loop goes on infinitely until the thread is interrupted
(println "async looping..." (swap! s inc)))
(println "ended async looping.")
(deliver a true))]
(try
(while (not (async/cancelled?)) ;;; this loop goes on infinitely until the thread is interrupted
(!<! (timeout 10))
(println "future looping..." (swap! s inc)))
(println "ended future looping.")
(deliver a true)
(catch InterruptedException _
(println "interrupted looping.")
(deliver a true))))]
(go
(<! (timeout 100))
(async/cancel! f)) ;;; cancelling the completable future causes the backing thread to be interrupted
(is (true? @a))
(is (true? (async/cancelled? f)))))
(testing "cancellable nested async cancellable is interrupted test"
(let [a (promise)
s (atom 0)
f (async
(completable-future
(async
(try
(while (not (async/cancelled?)) ;;; this loop goes on infinitely until the thread is interrupted
(!<! (timeout 10))
(println "future looping..." (swap! s inc)))
(println "ended future looping.")
(deliver a true)
(catch InterruptedException _
(println "interrupted looping.")
(deliver a true))))))]
(go
(<! (timeout 100))
(future-cancel f)) ;;; cancelling the completable future causes the backing thread to be interrupted
(async/cancel! f)) ;;; cancelling the completable future causes the backing thread to be interrupted
(is (true? @a))
(is (true? (.isCancelled f))))))
(is (true? (async/cancelled? f))))))

(deftest with-pool-macro-test
(testing "with-pool evals body"
Expand Down