diff --git a/CHANGELOG.md b/CHANGELOG.md index 363acce..218b855 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ This is a history of changes to k13labs/futurama +# 0.5.0 +* Add custom state to keep track of async items +* Add custom cancel strategy which combines bound state, global weak state, and custom protocol impl + # 0.4.0 * Add `fixed-threadpool` method to create a FixedThreadPool * The default `*thread-pool*` is now a FixedThreadPool which can be interrupted. diff --git a/pom.xml b/pom.xml index 0a5adc6..d530f2f 100644 --- a/pom.xml +++ b/pom.xml @@ -5,9 +5,9 @@ com.github.k13labs futurama futurama - 0.4.0 + 0.5.0 - 0.4.0 + 0.5.0 https://github.com/k13labs/futurama scm:git:git://github.com/k13labs/futurama.git scm:git:ssh://git@github.com/k13labs/futurama.git diff --git a/src/futurama/core.clj b/src/futurama/core.clj index f55fe84..c27407f 100644 --- a/src/futurama/core.clj +++ b/src/futurama/core.clj @@ -4,7 +4,9 @@ [clojure.core.async.impl.channels :refer [box]] [clojure.core.async.impl.ioc-macros :as ioc] [clojure.core.reducers :as r] + [futurama.protocols :as proto] [futurama.util :as u] + [futurama.state :as state] [futurama.deferred]) (:import [clojure.lang Var IDeref IFn] [java.util.concurrent @@ -17,6 +19,8 @@ [java.util.concurrent.locks Lock] [java.util.function Function BiConsumer])) +(def ^:dynamic *thread-pool* nil) + (defn fixed-threadpool "Creates a fixed-threadpool, by default uses the number of available processors." ([] @@ -30,8 +34,6 @@ default-pool (delay (fixed-threadpool))) -(def ^:dynamic *thread-pool* nil) - (defmacro with-pool "Utility macro which binds *thread-pool* to the supplied pool and then evaluates the `body`." [pool & body] @@ -48,8 +50,7 @@ "unwraps an ExecutionException or CompletionException via ex-cause until the root exception is returned" ^Throwable [^Throwable ex] (if-let [ce (and (or (instance? ExecutionException ex) - (instance? CompletionException ex) - (instance? InterruptedException ex)) + (instance? CompletionException ex)) (ex-cause ex))] ce ex)) @@ -61,6 +62,29 @@ (throw (unwrap-exception v)) v)) +(defn cancel! + "Cancels the async item." + [item] + (let [proto-cancel (when (u/instance-satisfies? proto/AsyncCancellable item) + (proto/cancel item)) + stack-cancel (state/set-value! item :cancelled true)] + (or proto-cancel stack-cancel false))) + +(defn cancelled? + "Checks if the current executing async item or one of its parents or provided item has been cancelled. + Also checks if the thread has been interrupted and restores the interrupt status." + ([] + (or (when (Thread/interrupted) + (.. (Thread/currentThread) + (interrupt)) + true) + (some cancelled? (state/get-dynamic-items)) + false)) + ([item] + (or (proto/cancelled? item) + (state/get-value item :cancelled) + false))) + (defn async? "returns true if v instance-satisfies? core.async's `ReadPort`" ^Boolean [v] @@ -70,25 +94,37 @@ "Asynchronously invokes the body inside a completable future, preserves the current thread binding frame, using by default the `ForkJoinPool/commonPool`, the pool used can be specified via `*thread-pool*` binding." ^CompletableFuture [& body] - `(let [binding-frame# (Var/cloneThreadBindingFrame) ;;; capture the thread local binding frame before start - ^CompletableFuture res-fut# (CompletableFuture.) ;;; this is the CompletableFuture being returned - ^Runnable fbody# (fn do-complete# - [] - (try - (Var/resetThreadBindingFrame binding-frame#) ;;; set the Clojure binding frame captured above - (.complete res-fut# (do ~@body)) ;;; send the result of evaluating the body to the CompletableFuture - (catch Throwable ~'e - (.completeExceptionally res-fut# (unwrap-exception ~'e))))) ;;; if we catch an exception we send it to the CompletableFuture - ^Future fut# (dispatch fbody#) - ^Function cancel# (reify Function - (apply [~'_ ~'_] - (future-cancel fut#)))] ;;; submit the work to the pool and get the FutureTask doing the work - ;;; if the CompletableFuture returns exceptionally - ;;; then cancel the Future which is currently doing the work - (.exceptionally res-fut# cancel#) - res-fut#)) + `(let [^CompletableFuture res-fut# (CompletableFuture.)] ;;; this is the CompletableFuture being returned + (state/push-item res-fut# + (let [binding-frame# (Var/cloneThreadBindingFrame) ;;; capture the thread local binding frame before start + ^Runnable fbody# (fn do-complete# + [] + (let [thread-frame# (Var/getThreadBindingFrame)] ;;; get the Thread's binding frame + (Var/resetThreadBindingFrame binding-frame#) ;;; set the Clojure binding frame captured + (try + (.complete res-fut# (do ~@body)) ;;; send the result of evaluating the body to the CompletableFuture + (catch Throwable ~'e + ;;; if we catch an exception we send it to the CompletableFuture + (.completeExceptionally res-fut# (unwrap-exception ~'e))) + (finally + (Var/resetThreadBindingFrame thread-frame#))))) ;;; restore the original Thread's binding frame + ^Future fut# (dispatch fbody#) + ^Function cancel# (reify Function + (apply [~'_ ~'_] + (cancel! res-fut#) + (future-cancel fut#)))] ;;; submit the work to the pool and get the FutureTask doing the work + ;;; if the CompletableFuture returns exceptionally + ;;; then cancel the Future which is currently doing the work + (.exceptionally res-fut# cancel#) + res-fut#)))) (extend-type Future + proto/AsyncCancellable + (cancel [this] + (future-cancel this)) + (cancelled? [this] + (future-cancelled? this)) + impl/ReadPort (take! [fut handler] (let [^Future fut fut @@ -209,6 +245,12 @@ (realized? ref))) (extend-type CompletableFuture + proto/AsyncCancellable + (cancel [this] + (future-cancel this)) + (cancelled? [this] + (future-cancelled? this)) + impl/ReadPort (take! [fut handler] (let [^CompletableFuture fut fut @@ -299,26 +341,27 @@ completed; the pool used can be specified via `*thread-pool*` binding." [port & body] (let [crossing-env (zipmap (keys &env) (repeatedly gensym))] - `(let [c# ~port - captured-bindings# (Var/getThreadBindingFrame) - ^Runnable task# (^:once fn* [] - (let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env) - f# ~(ioc/state-machine `(try - ~@body - (catch Throwable ~'e - (unwrap-exception ~'e))) 1 [crossing-env &env] ioc/async-custom-terminators) - state# (-> (f#) - (ioc/aset-all! ioc/USER-START-IDX c# - ioc/BINDINGS-IDX captured-bindings#))] - (ioc/run-state-machine-wrapped state#))) - ^Future fut# (dispatch task#)] - (when (instance? CompletableFuture c#) - (.exceptionally ^CompletableFuture c# - ^Function (reify Function - (apply [~'_ ~'_] - (println "cancelling future!") - (future-cancel fut#))))) - c#))) + `(let [c# ~port] + (state/push-item c# + (let [captured-bindings# (Var/getThreadBindingFrame) + ^Runnable task# (^:once fn* [] + (let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env) + f# ~(ioc/state-machine `(try + ~@body + (catch Throwable ~'e + (unwrap-exception ~'e))) 1 [crossing-env &env] ioc/async-custom-terminators) + state# (-> (f#) + (ioc/aset-all! ioc/USER-START-IDX c# + ioc/BINDINGS-IDX captured-bindings#))] + (ioc/run-state-machine-wrapped state#))) + ^Future fut# (dispatch task#)] + (when (instance? CompletableFuture c#) + (.exceptionally ^CompletableFuture c# + ^Function (reify Function + (apply [~'_ ~'_] + (cancel! c#) + (future-cancel fut#))))) + c#))))) (defmacro async "Asynchronously executes the body, returning immediately to the diff --git a/src/futurama/protocols.clj b/src/futurama/protocols.clj new file mode 100644 index 0000000..af337ae --- /dev/null +++ b/src/futurama/protocols.clj @@ -0,0 +1,5 @@ +(ns futurama.protocols) + +(defprotocol AsyncCancellable + (cancel [this]) + (cancelled? [this])) diff --git a/src/futurama/state.clj b/src/futurama/state.clj new file mode 100644 index 0000000..ac9c8a3 --- /dev/null +++ b/src/futurama/state.clj @@ -0,0 +1,50 @@ +(ns futurama.state + (:import [java.util Map WeakHashMap])) + +(defonce ^{:doc "Internal global state mananged using a Synchronized WeakHashMap" + :private true} + GLOBAL_STATE + (WeakHashMap.)) + +(def ^:dynamic *async-state* {}) + +(defn ^:no-doc put-global-state* + [key val] + (locking GLOBAL_STATE + (.put ^Map GLOBAL_STATE key val))) + +(defn ^:no-doc get-global-state* + [key] + (locking GLOBAL_STATE + (.get ^Map GLOBAL_STATE key))) + +(defmacro push-item + "Pushes the async item into the `*async-state*` with the default state of: `{}`" + [item & body] + `(let [key# ~item + val# (atom {})] + (binding [*async-state* (conj *async-state* [key# val#])] + (put-global-state* key# val#) + ~@body))) + +(defn- get-state + [item] + (or (get *async-state* item) + (get-global-state* item))) + +(defn set-value! + "Changes the state of an item key in the state map." + [item key value] + (when-let [state-atom (get-state item)] + (-> (swap! state-atom conj [key value]) + (get key)))) + +(defn get-value + "Gets the value of a state key in item's state map" + [item key] + (some-> (get-state item) deref (get key))) + +(defn get-dynamic-items + "Lists all the items in the dynamic `*async-state*` map" + [] + (keys *async-state*)) diff --git a/test/futurama/core_test.clj b/test/futurama/core_test.clj index f04cf2e..6afde78 100644 --- a/test/futurama/core_test.clj +++ b/test/futurama/core_test.clj @@ -5,7 +5,7 @@ async-for async-map async-reduce async-some async-every? async-prewalk async-postwalk - completable-future]] + completable-future] :as async] [clojure.core.async :refer [go timeout put! take! !