Skip to content

Commit

Permalink
feat: add support for Future and IDeref impl ReadPort
Browse files Browse the repository at this point in the history
  • Loading branch information
k13gomez committed Jan 6, 2024
1 parent f08561c commit c98db7e
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
This is a history of changes to k13labs/futurama

# 0.3.8-SNAPSHOT
* Add support for `Future` and `IDeref`

# 0.3.7
* Add collection helpers for: `async-reduce`, `async-some`, `async-every?`, `async-walk/prewalk/postwalk`

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Futurama is a Clojure library for more deeply integrating async abstractions in the Clojure and JVM ecosystem with Clojure [core.async](https://github.com/clojure/core.async).

It adds support for [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) and [IDeferred](https://github.com/clj-commons/manifold/blob/master/src/manifold/deferred.clj) to be used in the same fashion as Clojure [core.async](https://github.com/clojure/core.async) channels.
It adds support for [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) and [IDeferred](https://github.com/clj-commons/manifold/blob/master/src/manifold/deferred.clj) to be used in approximately the same fashion as Clojure [core.async](https://github.com/clojure/core.async) channels, and for `Future` and `IDeref` to be read in the same manner as well.

# _Usage_

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<groupId>com.github.k13labs</groupId>
<artifactId>futurama</artifactId>
<name>futurama</name>
<version>0.3.7</version>
<version>0.3.8-SNAPSHOT</version>
<scm>
<tag>0.3.7</tag>
<tag>0.3.8-SNAPSHOT</tag>
<url>https://github.com/k13labs/futurama</url>
<connection>scm:git:git://github.com/k13labs/futurama.git</connection>
<developerConnection>scm:git:ssh://git@github.com/k13labs/futurama.git</developerConnection>
Expand Down
127 changes: 124 additions & 3 deletions src/futurama/core.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
(ns futurama.core
(:require [clojure.core.async :refer [<! <!! put! take! close!] :as async]
(:require [clojure.core.async :refer [<! <!! put! take! close! thread] :as async]
[clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.channels :refer [box]]
[clojure.core.async.impl.ioc-macros :as ioc]
[clojure.core.reducers :as r]
[futurama.util :as u]
[futurama.deferred])
(:import [clojure.lang Var]
(:import [clojure.lang Var IDeref IFn]
[java.util.concurrent
CompletableFuture
CompletionException
Expand All @@ -29,7 +29,8 @@
"unwraps an ExecutionException or CompletionException via ex-cause until the root exception is returned"
^Throwable [^Throwable ex]
(if-let [ce (and (or (instance? ExecutionException ex)
(instance? CompletionException ex))
(instance? CompletionException ex)
(instance? InterruptedException ex))
(ex-cause ex))]
ce
ex))
Expand Down Expand Up @@ -68,6 +69,126 @@
(.exceptionally res-fut# cancel#)
res-fut#))

(extend-type Future
impl/ReadPort
(take! [fut handler]
(let [^Future fut fut
^Lock handler handler
commit-handler (fn do-commit []
(.lock handler)
(let [take-cb (and (impl/active? handler) (impl/commit handler))]
(.unlock handler)
take-cb))]
(when-let [cb (commit-handler)]
(cond
(realized? fut)
(let [val (try
(.get ^Future fut)
(catch Throwable e
(unwrap-exception e)))]
(if (u/satisfies? impl/ReadPort val)
(do
(take! val (fn do-read
[val]
(if (u/satisfies? impl/ReadPort val)
(take! val do-read)
(cb val))))
nil)
(box val)))

:else
(do
(thread
(let [[val ex]
(try
[(.get ^Future fut) nil]
(catch Throwable e
[nil e]))]
(cond
(u/satisfies? impl/ReadPort val)
(take! val (fn do-read
[val]
(if (u/satisfies? impl/ReadPort val)
(take! val do-read)
(cb val))))

(some? val)
(cb val)

(some? ex)
(cb ex)

:else
(cb nil))))
nil)))))

impl/Channel
(close! [fut]
(when-not (realized? fut)
(future-cancel ^Future fut)))
(closed? [fut]
(realized? ^Future fut)))

(extend-type IDeref
impl/ReadPort
(take! [ref handler]
(let [^IDeref ref ref
^Lock handler handler
commit-handler (fn do-commit []
(.lock handler)
(let [take-cb (and (impl/active? handler) (impl/commit handler))]
(.unlock handler)
take-cb))]
(when-let [cb (commit-handler)]
(cond
(realized? ref)
(let [val (try
(deref ref)
(catch Throwable e
(unwrap-exception e)))]
(if (u/satisfies? impl/ReadPort val)
(do
(take! val (fn do-read
[val]
(if (u/satisfies? impl/ReadPort val)
(take! val do-read)
(cb val))))
nil)
(box val)))

:else
(do
(thread
(let [[val ex]
(try
[(deref ref) nil]
(catch Throwable e
[nil e]))]
(cond
(u/satisfies? impl/ReadPort val)
(take! val (fn do-read
[val]
(if (u/satisfies? impl/ReadPort val)
(take! val do-read)
(cb val))))

(some? val)
(cb val)

(some? ex)
(cb ex)

:else
(cb nil))))
nil)))))

impl/Channel
(close! [ref]
(if (instance? IFn ref)
(ref nil)))
(closed? [ref]
(realized? ref)))

(extend-type CompletableFuture
impl/ReadPort
(take! [fut handler]
Expand Down
13 changes: 11 additions & 2 deletions test/futurama/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@
(<! (timeout 50))
(let [c (CompletableFuture.)]
(>! c {:foo "bar"})
c))))))))))
(delay
(future
(let [p (promise)]
(deliver p c)
p)))))))))))))
(testing "nested non-blocking take - !<!"
(<!!
(async
Expand All @@ -228,7 +232,12 @@
(completable-future
(go
(<! (timeout 50))
(CompletableFuture/completedFuture {:foo "bar"})))))))))))))
(delay
(future
(let [p (promise)]
(deliver p
(CompletableFuture/completedFuture {:foo "bar"}))
p)))))))))))))))

(deftest error-handling
(testing "throws async exception on blocking deref - @"
Expand Down

0 comments on commit c98db7e

Please sign in to comment.