Skip to content

Commit

Permalink
Adapting to new List type (part 8).
Browse files Browse the repository at this point in the history
  • Loading branch information
eduardoejp committed Mar 25, 2024
1 parent 22efb27 commit c0d9793
Show file tree
Hide file tree
Showing 15 changed files with 338 additions and 274 deletions.
87 changes: 59 additions & 28 deletions stdlib/source/library/lux/concurrency/future.lux
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
[data
["[0]" product]
[collection
["[0]" stack]]]
["[0]" list (.use "[1]#[0]" monoid)]]]
[macro
["^" pattern]
["[0]" expansion]
Expand All @@ -31,24 +31,29 @@
Maybe)

(the Handler
(template.macro (_ a)
[(-> a (IO Any))]))
(template.macro (_ it)
[(-> it
(IO Any))]))

(nominal.every (Future'' a)
(Atom [(Value a) (Stack (Handler a))])
(nominal.every (Future'' it)
(Atom [(Value it)
(List (Handler it))])

(every .public (Future' r w)
(Future'' (Mutable r w)))

(every .public (Future a)
(Future'' (Mutable a a)))
(every .public (Future it)
(Future'' (Mutable it it)))

(every .public (Resolver w)
(-> w (IO Bit)))
(every .public (Resolver it)
(-> it
(IO Bit)))

... Sets a future's value if it has not been done yet.
(the (resolver future)
(for_any (_ r w) (-> (Future' r w) (Resolver w)))
(for_any (_ r w)
(-> (Future' r w)
(Resolver w)))
(function (resolve value)
(let [future (nominal.reification future)]
(do [! io.monad]
Expand All @@ -59,32 +64,41 @@

{.#None}
(do !
[succeeded? (atom.compare_and_swap! old [{.#Some (variance.write value)} (stack)] future)]
[succeeded? (atom.compare_and_swap! old [{.#Some (variance.write value)} (list)] future)]
(if succeeded?
(do !
[_ (stack.each' ! (function.on (variance.write value))
_observers)]
[_ (list.each' ! (function.on (variance.write value))
_observers)]
(in true))
(resolve value))))))))

(the .public (resolved value)
(for_any (_ a) (-> a (Future a)))
(nominal.abstraction (atom [{.#Some (variance.write value)} (stack)])))
(for_any (_ it)
(-> it
(Future it)))
(nominal.abstraction (atom [{.#Some (variance.write value)} (list)])))

(the .public (future _)
(for_any (_ r w) (-> Any [(Future' r w) (Resolver w)]))
(let [future (nominal.abstraction (atom [{.#None} (stack)]))]
(for_any (_ r w)
(-> Any
[(Future' r w)
(Resolver w)]))
(let [future (nominal.abstraction (atom [{.#None} (list)]))]
[future (..resolver future)]))

(the .public value
(for_any (_ r w) (-> (Future' r w) (IO (Value r))))
(for_any (_ r w)
(-> (Future' r w)
(IO (Value r))))
(|>> nominal.reification
atom.read!
(by io.functor each (|>> product.left
(maybe#each (|>> variance.read))))))

(the .public (upon! f future)
(for_any (_ r w) (-> (Handler r) (Future' r w) (IO Any)))
(for_any (_ r w)
(-> (Handler r) (Future' r w)
(IO Any)))
(do [! io.monad]
[.let [future (nominal.reification future)]
(^.let old [_value _observers]) (atom.read! future)]
Expand All @@ -94,14 +108,18 @@

{.#None}
(do !
[swapped? (atom.compare_and_swap! old [_value {.#Top (|>> variance.read f) _observers}] future)]
[swapped? (atom.compare_and_swap! old
[_value (list#composite _observers (list (|>> variance.read f)))]
future)]
(if swapped?
(in [])
(upon! f (nominal.abstraction future)))))))
)

(the .public resolved?
(for_any (_ r w) (-> (Future' r w) (IO Bit)))
(for_any (_ r w)
(-> (Future' r w)
(IO Bit)))
(|>> ..value
(by io.functor each
(|>> (pipe.when
Expand Down Expand Up @@ -142,7 +160,9 @@
ma)))))

(the .public (and left right)
(for_any (_ lr lw rr rw) (-> (Future' lr lw) (Future' rr rw) (Future [lr rr])))
(for_any (_ lr lw rr rw)
(-> (Future' lr lw) (Future' rr rw)
(Future [lr rr])))
(let [[read! write!] (sharing [lr lw rr rw]
(is [(Future' lr lw) (Future' rr rw)]
[left right])
Expand All @@ -156,7 +176,9 @@
read!))

(the .public (or left right)
(for_any (_ lr lw rr rw) (-> (Future' lr lw) (Future' rr rw) (Future (Or lr rr))))
(for_any (_ lr lw rr rw)
(-> (Future' lr lw) (Future' rr rw)
(Future (Or lr rr))))
(let [[left|right resolve] (sharing [lr lw rr rw]
(is [(Future' lr lw) (Future' rr rw)]
[left right])
Expand All @@ -173,7 +195,9 @@
left|right))))

(the .public (either left right)
(for_any (_ a lw rw) (-> (Future' a lw) (Future' a rw) (Future a)))
(for_any (_ a lw rw)
(-> (Future' a lw) (Future' a rw)
(Future a)))
(let [[left||right resolve] (sharing [a lw rw]
(is [(Future' a lw) (Future' a rw)]
[left right])
Expand All @@ -188,7 +212,9 @@
left||right))))

(the .public (schedule! milli_seconds computation)
(for_any (_ a) (-> Delay (IO a) (Future a)))
(for_any (_ it)
(-> Delay (IO it)
(Future it)))
(let [[!out resolve] (sharing [a]
(is (IO a)
computation)
Expand All @@ -209,14 +235,19 @@
(..schedule! 0))

(the .public (after milli_seconds value)
(for_any (_ a) (-> Delay a (Future a)))
(for_any (_ it)
(-> Delay it
(Future it)))
(..schedule! milli_seconds (io value)))

(the .public (delay milli_seconds)
(-> Delay (Future Any))
(-> Delay
(Future Any))
(..after milli_seconds []))

(the .public (within milli_seconds future)
(for_any (_ r w) (-> Delay (Future' r w) (Future (Maybe r))))
(for_any (_ r w)
(-> Delay (Future' r w)
(Future (Maybe r))))
(..or (..delay milli_seconds)
future))
85 changes: 52 additions & 33 deletions stdlib/source/library/lux/concurrency/incremental.lux
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,32 @@
[data
["[0]" product]
[collection
["[0]" stack]]]
["[0]" list (.use "[1]#[0]" monoid)]]]
[type
["[0]" nominal]]]]
[//
["[0]" atom (.only Atom)]])

(every (Dependency a)
(-> a (IO Any)))
(every (Dependency it)
(-> it
(IO Any)))

(nominal.every .public (Computation a)
(Atom [a (Stack (Dependency a))])
(nominal.every .public (Computation it)
(Atom [it (List (Dependency it))])

(the .public value
(for_any (_ a) (-> (Computation a) (IO a)))
(for_any (_ it)
(-> (Computation it)
(IO it)))
(|>> nominal.reification
atom.read!
(io#each product.left)))

(the (computation value)
(for_any (_ a) (-> a (Computation a)))
(nominal.abstraction (atom.atom [value (stack)])))
(for_any (_ it)
(-> it
(Computation it)))
(nominal.abstraction (atom.atom [value (list)])))

(the .public functor
(Functor Computation)
Expand All @@ -44,41 +49,47 @@
.let [[current dependencies] old
output (computation ($ current))]
? (atom.compare_and_swap! old
[current (stack.partial (function (retry! next)
(do !
[old (atom.read! (nominal.reification output))
.let [[_ dependencies] old]
? (atom.compare_and_swap! old
[($ next) dependencies]
(nominal.reification output))]
(if ?
(in [])
(retry! next))))
dependencies)]
[current (list#composite dependencies
(list (function (retry! next)
(do !
[old (atom.read! (nominal.reification output))
.let [[_ dependencies] old]
? (atom.compare_and_swap! old
[($ next) dependencies]
(nominal.reification output))]
(if ?
(in [])
(retry! next))))))]
(nominal.reification input))]
(in (if ?
output
(each $ input))))))))

(the (watch! dependency it)
(for_any (_ a) (-> (Dependency a) (Computation a) (IO Any)))
(for_any (_ it)
(-> (Dependency it) (Computation it)
(IO Any)))
(do io.monad
[.let [it' (nominal.reification it)]
old (atom.read! it')
.let [[current dependencies] old]
? (atom.compare_and_swap! old [current (stack.partial dependency dependencies)] it')]
? (atom.compare_and_swap! old [current (list#composite dependencies (list dependency))] it')]
(if ?
(in [])
(watch! dependency it))))

(the (update! $ output)
(for_any (_ a) (-> (-> a a) (Computation a) (IO Any)))
(for_any (_ it)
(-> (-> it it) (Computation it)
(IO Any)))
(atom.update! (function (_ [current dependencies])
[($ current) dependencies])
(nominal.reification output)))

(the .public (or left right)
(for_any (_ a b) (-> (Computation a) (Computation b) (Computation (Or a b))))
(for_any (_ left right)
(-> (Computation left) (Computation right)
(Computation (Or left right))))
(io.value (do io.monad
[left' (value left)
.let [output (computation {.#Left left'})]
Expand All @@ -91,7 +102,9 @@
(in output))))

(the .public (and left right)
(for_any (_ a b) (-> (Computation a) (Computation b) (Computation (And a b))))
(for_any (_ left right)
(-> (Computation left) (Computation right)
(Computation (And left right))))
(io.value (do io.monad
[left' (value left)
right' (value right)
Expand All @@ -109,7 +122,9 @@
(in output))))

(the .public (either left right)
(for_any (_ a) (-> (Computation a) (Computation a) (Computation a)))
(for_any (_ it)
(-> (Computation it) (Computation it)
(Computation it)))
(io.value (do io.monad
[left' (value left)
.let [output (computation left')]
Expand All @@ -119,22 +134,26 @@
_ (watch! update! right)]
(in output))))

(nominal.every .public (Var a)
(Computation a)
(nominal.every .public (Var it)
(Computation it)

(the .public (var value)
(for_any (_ a) (-> a (Var a)))
(for_any (_ it)
(-> it
(Var it)))
(<| (nominal.abstraction Var)
(nominal.abstraction Computation)
(atom.atom [value (stack)])))
(atom.atom [value (list)])))

(the .public mutations
(for_any (_ a) (-> (Var a) (Computation a)))
(for_any (_ it)
(-> (Var it)
(Computation it)))
(|>> (nominal.reification Var)))

(the .public (mutate! value it)
(for_any (_ a)
(-> a (Var a)
(for_any (_ it)
(-> it (Var it)
(IO Any)))
(do [! io.monad]
[.let [it' (|> it
Expand All @@ -145,7 +164,7 @@
? (atom.compare_and_swap! old [value dependencies] it')]
(if ?
(do !
[_ (stack.each' ! (function.on value) dependencies)]
[_ (list.each' ! (function.on value) dependencies)]
(in []))
(mutate! value it))))
)
Expand Down
10 changes: 5 additions & 5 deletions stdlib/source/library/lux/concurrency/semaphore.lux
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
(Record
[#max_positions Natural
#open_positions Integer
#waiting_stack (Queue (Resolver Any))]))
#waiting_list (Queue (Resolver Any))]))

(nominal.every .public Semaphore
(Atom State)
Expand All @@ -51,7 +51,7 @@
(nominal.abstraction
(atom.atom [#max_positions max_positions
#open_positions (.integer max_positions)
#waiting_stack queue.empty]))))
#waiting_list queue.empty]))))

(the .public (wait! semaphore)
(-> Semaphore
Expand All @@ -66,7 +66,7 @@
[[_ state'] (atom.update! (|>> (revised #open_positions --)
(pipe.if [<had_open_position?>]
[]
[(revised #waiting_stack (queue.end sink))]))
[(revised #waiting_list (queue.end sink))]))
semaphore)]
(expansion.let [<go_ahead> (sink [])
<get_in_line> (in false)]
Expand All @@ -92,12 +92,12 @@
state
(|> state
(revised #open_positions ++)
(revised #waiting_stack queue.next))))
(revised #waiting_list queue.next))))
semaphore)]
(if (same? pre post)
(in (exception.except ..semaphore_is_maxed_out [(its #max_positions pre)]))
(do !
[_ (when (queue.front (its #waiting_stack pre))
[_ (when (queue.front (its #waiting_list pre))
{try.#Success sink}
(sink [])

Expand Down
Loading

0 comments on commit c0d9793

Please sign in to comment.