Skip to content

Commit 0f3344f

Browse files
committed
Add mapAsync (#96)
* Add mapAsync * Doc comment * Remove warning * Remove unnecessary code
1 parent 26d50b9 commit 0f3344f

File tree

11 files changed

+421
-54
lines changed

11 files changed

+421
-54
lines changed

src/Specular/FRP/Base.purs

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ module Specular.FRP.Base
5353

5454
, map2
5555
, mapN
56+
57+
, mapAsync
58+
59+
, module X.Incremental
5660
) where
5761

5862
import Prelude
@@ -67,13 +71,12 @@ import Effect.Class (class MonadEffect, liftEffect)
6771
import Effect.Uncurried (EffectFn2, mkEffectFn1, mkEffectFn2, runEffectFn1, runEffectFn2, runEffectFn3)
6872
import Effect.Unsafe (unsafePerformEffect)
6973
import Safe.Coerce (coerce)
74+
import Specular.Internal.Incremental (AsyncComputation(..), AsyncState(..)) as X.Incremental
7075
import Specular.Internal.Incremental as I
7176
import Specular.Internal.Incremental.Node (Node)
7277
import Specular.Internal.Incremental.Node as Node
7378
import Specular.Internal.Incremental.Optional as Optional
7479
import Specular.Internal.Profiling as Profiling
75-
import Specular.Internal.Queue (Queue)
76-
import Specular.Internal.Queue as Queue
7780

7881
-- | import Partial.Unsafe (unsafeCrashWith)
7982
-- | import Unsafe.Coerce (unsafeCoerce)
@@ -112,8 +115,6 @@ readNode node = do
112115

113116
-------------------------------------------------------------
114117

115-
type Unsubscribe = Effect Unit
116-
117118
-- | A source of occurences.
118119
-- |
119120
-- | During a frame, an Event occurs at most once with a value of type a.
@@ -166,7 +167,7 @@ filterEvent f (Event node) = Event $ unsafePerformEffect do
166167

167168
subscribeNode :: forall m a. MonadEffect m => MonadCleanup m => (a -> Effect Unit) -> Node a -> m Unit
168169
subscribeNode handler event = do
169-
unsub <- liftEffect $ runEffectFn2 _subscribeNode handler event
170+
unsub <- liftEffect $ runEffectFn2 I.subscribeNode handler event
170171
onCleanup unsub
171172

172173
filterJustEvent :: forall a. Event (Maybe a) -> Event a
@@ -175,23 +176,9 @@ filterJustEvent = filterMapEvent identity
175176
subscribeEvent_ :: forall m a. MonadEffect m => MonadCleanup m => (a -> Effect Unit) -> Event a -> m Unit
176177
subscribeEvent_ handler (Event node) = subscribeNode handler node
177178

178-
globalEffectQueue :: Queue (Effect Unit)
179-
globalEffectQueue = unsafePerformEffect Queue.new
180-
181-
drainEffects :: Effect Unit
182-
drainEffects = runEffectFn2 Queue.drain globalEffectQueue (mkEffectFn1 \handler -> handler)
183-
184-
_subscribeEvent :: forall a. EffectFn2 (a -> Effect Unit) (Event a) Unsubscribe
179+
_subscribeEvent :: forall a. EffectFn2 (a -> Effect Unit) (Event a) I.Unsubscribe
185180
_subscribeEvent = mkEffectFn2 \handler (Event node) ->
186-
runEffectFn2 _subscribeNode handler node
187-
188-
_subscribeNode :: forall a. EffectFn2 (a -> Effect Unit) (Node a) Unsubscribe
189-
_subscribeNode = mkEffectFn2 \handler node -> do
190-
let
191-
h = mkEffectFn1 \value -> do
192-
runEffectFn2 Queue.enqueue globalEffectQueue (handler value)
193-
runEffectFn2 I.addObserver node h
194-
pure (runEffectFn2 I.removeObserver node h)
181+
runEffectFn2 I.subscribeNode handler node
195182

196183
-- | Create an Event that can be triggered externally.
197184
-- | Each `fire` will run a frame where the event occurs.
@@ -210,11 +197,7 @@ newEvent = liftEffect do
210197
}
211198

212199
stabilize :: Effect Unit
213-
stabilize = do
214-
mark <- runEffectFn1 Profiling.begin "Specular.stabilize"
215-
I.stabilize
216-
drainEffects
217-
runEffectFn1 Profiling.end mark
200+
stabilize = I.stabilize
218201

219202
-- | Create a new Behavior whose value can be modified outside a frame.
220203
newBehavior :: forall m a. MonadEffect m => a -> m { behavior :: Behavior a, set :: a -> Effect Unit }
@@ -500,3 +483,16 @@ instance semigroupDynamic :: Semigroup a => Semigroup (Dynamic a) where
500483

501484
instance monoidDynamic :: Monoid a => Monoid (Dynamic a) where
502485
mempty = pure mempty
486+
487+
-- | Map a possibly-asynchronous function over a Dynamic.
488+
-- |
489+
-- | When the source dynamic changes, the mapping function is re-evaluated. If it returns `Sync`,
490+
-- | this works like `map` - the change is propagated in the same cycle.
491+
-- |
492+
-- | If it returns `Async`, then the dynamic will first transition to `InProgress` and start the async computation.
493+
-- | After it finished, it will transition to `Finished` (which might contain an error).
494+
mapAsync :: forall a b. (a -> I.AsyncComputation b) -> Dynamic a -> Dynamic (I.AsyncState b)
495+
mapAsync f (Dynamic node) = Dynamic $ unsafePerformEffect do
496+
n <- runEffectFn2 I.mapAsync f node
497+
runEffectFn2 Node.annotate n ("mapAsync " <> Node.name node)
498+
pure n

src/Specular/FRP/List.js

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/Specular/FRP/List.purs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,18 @@ module Specular.FRP.List
1111

1212
import Prelude
1313

14-
import Effect (Effect)
15-
import Effect.Class (liftEffect)
1614
import Control.Monad.Replace (class MonadReplace, Slot, newSlot, appendSlot, replaceSlot, destroySlot)
1715
import Data.Array as Array
1816
import Data.Maybe (Maybe(..))
1917
import Data.Traversable (traverse)
18+
import Effect (Effect)
19+
import Effect.Class (liftEffect)
20+
import Effect.Ref (Ref, new, read, write)
2021
import Specular.FRP (Dynamic, holdDyn, subscribeDyn_)
2122
import Specular.FRP.Base (class MonadFRP, holdUniqDynBy, newEvent)
2223
import Specular.FRP.WeakDynamic (WeakDynamic, holdWeakDyn, subscribeWeakDyn_, weaken)
2324
import Effect.Ref (Ref, new, read, write)
25+
import Specular.Internal.Effect (nextMicrotask)
2426
import Unsafe.Reference (unsafeRefEq)
2527

2628
-- | `dynamicListWithIndex dynArray handler`
@@ -107,8 +109,6 @@ updateList latestRef mainSlot handler newArray = do
107109
write newLatest latestRef
108110
pure $ map _.result newLatest
109111

110-
foreign import nextMicrotask :: Effect Unit -> Effect Unit
111-
112112
dynamicList
113113
:: forall m a b
114114
. MonadFRP m

src/Specular/Internal/Effect.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,10 @@ export function sequenceEffects(effects) {
2929
}
3030
};
3131
}
32+
33+
// nextMicrotask :: Effect Unit -> Effect Unit
34+
export function nextMicrotask(eff) {
35+
return function () {
36+
Promise.resolve().then(eff);
37+
};
38+
}

src/Specular/Internal/Effect.purs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ module Specular.Internal.Effect
44
, pushDelayed
55
, unsafeFreezeDelayed
66
, sequenceEffects
7+
, nextMicrotask
78
) where
89

910
import Prelude
@@ -19,3 +20,5 @@ foreign import pushDelayed :: DelayedEffects -> Effect Unit -> Effect Unit
1920
foreign import unsafeFreezeDelayed :: DelayedEffects -> Effect (Array (Effect Unit))
2021

2122
foreign import sequenceEffects :: Array (Effect Unit) -> Effect Unit
23+
24+
foreign import nextMicrotask :: Effect Unit -> Effect Unit
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
module Specular.Internal.ExclusiveTask where
2+
3+
import Prelude
4+
5+
import Control.Monad.Error.Class (class MonadError, catchJust)
6+
import Data.Maybe (Maybe(..))
7+
import Effect (Effect)
8+
import Effect.Aff (Aff, Error, Fiber, error, joinFiber, killFiber, launchAff_, launchSuspendedAff)
9+
import Effect.Class (class MonadEffect, liftEffect)
10+
import Effect.Ref as ERef
11+
import Unsafe.Reference (unsafeRefEq)
12+
13+
data State
14+
= Idle
15+
| Running (Fiber Unit)
16+
17+
newtype ExclusiveTask = ExclusiveTask
18+
{ state :: ERef.Ref State
19+
}
20+
21+
new :: forall m. MonadEffect m => m ExclusiveTask
22+
new = do
23+
state_ <- liftEffect $ ERef.new Idle
24+
pure $ ExclusiveTask { state: state_ }
25+
26+
-- | Run an Aff action in this exclusive task slot.
27+
-- | If there was a previous task running, it is first cancelled.
28+
run :: ExclusiveTask -> Aff Unit -> Effect Unit
29+
run (ExclusiveTask self) block = do
30+
newFiber <- launchSuspendedAff block
31+
launchAff_ $ catchCancelled do
32+
state <- liftEffect $ ERef.read self.state
33+
case state of
34+
Idle ->
35+
pure unit
36+
Running fiber ->
37+
killFiber cancelledError fiber
38+
39+
liftEffect $ ERef.write (Running newFiber) self.state
40+
-- Only now resume the new fiber
41+
joinFiber newFiber
42+
43+
liftEffect $ ERef.write Idle self.state
44+
45+
cancelledError :: Error
46+
cancelledError = error "Cancelled"
47+
48+
isCancelledError :: Error -> Boolean
49+
isCancelledError e = e `unsafeRefEq` cancelledError
50+
51+
catchCancelled :: forall m. MonadError Error m => m Unit -> m Unit
52+
catchCancelled block =
53+
catchJust (\e -> if isCancelledError e then Just e else Nothing) block (\_ -> pure unit)

src/Specular/Internal/Incremental.purs

Lines changed: 94 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,38 @@ module Specular.Internal.Incremental where
22

33
import Prelude
44

5+
import Data.Either (Either(..))
56
import Data.Function.Uncurried (Fn2, runFn2)
7+
import Data.Generic.Rep (class Generic)
8+
import Data.Maybe (Maybe(..))
9+
import Data.Show.Generic (genericShow)
610
import Effect (Effect)
11+
import Effect.Aff (Aff, Error)
12+
import Effect.Class (liftEffect)
713
import Effect.Console as Console
814
import Effect.Uncurried (EffectFn1, EffectFn2, EffectFn3, mkEffectFn1, mkEffectFn2, mkEffectFn3, runEffectFn1, runEffectFn2, runEffectFn3, runEffectFn4)
915
import Effect.Unsafe (unsafePerformEffect)
1016
import Partial.Unsafe (unsafeCrashWith)
17+
import Specular.Internal.ExclusiveTask as ExclusiveTask
18+
import Specular.Internal.Effect (nextMicrotask)
1119
import Specular.Internal.Incremental.Array as Array
1220
import Specular.Internal.Incremental.Effect (foreachUntil)
13-
import Specular.Internal.Incremental.Global (globalCurrentStabilizationNum, globalTotalRefcount, globalLastStabilizationNum, stabilizationIsNotInProgress)
21+
import Specular.Internal.Incremental.Global (globalCurrentStabilizationNum, globalLastStabilizationNum, globalTotalRefcount, stabilizationIsNotInProgress)
1422
import Specular.Internal.Incremental.Mutable (Field(..))
1523
import Specular.Internal.Incremental.MutableArray as MutableArray
16-
import Specular.Internal.Incremental.Node (Node, SomeNode, Observer, toSomeNode, toSomeNodeArray)
24+
import Specular.Internal.Incremental.Node (Node, Observer, SomeNode, toSomeNode, toSomeNodeArray)
1725
import Specular.Internal.Incremental.Node as Node
1826
import Specular.Internal.Incremental.Optional (Optional)
1927
import Specular.Internal.Incremental.Optional as Optional
2028
import Specular.Internal.Incremental.PriorityQueue as PQ
2129
import Specular.Internal.Incremental.Ref as Ref
2230
import Specular.Internal.Profiling as Profiling
31+
import Specular.Internal.Queue (Queue)
32+
import Specular.Internal.Queue as Queue
2333
import Unsafe.Coerce (unsafeCoerce)
2434

35+
type Unsubscribe = Effect Unit
36+
2537
-- | Priority queue for propagating node changes in dependency order.
2638
globalRecomputeQueue :: PQ.PQ SomeNode
2739
globalRecomputeQueue = unsafePerformEffect $
@@ -49,6 +61,10 @@ newVar = mkEffectFn1 \val -> do
4961
setVar :: forall a. EffectFn2 (Var a) a Unit
5062
setVar = mkEffectFn2 \(Var node) val -> do
5163
runEffectFn2 Node.set_value node (Optional.some val)
64+
runEffectFn1 addToRecomputeQueue node
65+
66+
addToRecomputeQueue :: forall a. EffectFn1 (Node a) Unit
67+
addToRecomputeQueue = mkEffectFn1 \node -> do
5268
_ <- runEffectFn2 PQ.add globalRecomputeQueue (toSomeNode node)
5369
pure unit
5470

@@ -175,12 +191,30 @@ disconnect = mkEffectFn1 \node -> do
175191

176192
runEffectFn1 Profiling.end mark
177193

194+
-- * Effect queue
195+
196+
globalEffectQueue :: Queue (Effect Unit)
197+
globalEffectQueue = unsafePerformEffect Queue.new
198+
199+
subscribeNode :: forall a. EffectFn2 (a -> Effect Unit) (Node a) Unsubscribe
200+
subscribeNode = mkEffectFn2 \handler node -> do
201+
let
202+
h = mkEffectFn1 \value -> do
203+
runEffectFn2 Queue.enqueue globalEffectQueue (handler value)
204+
runEffectFn2 addObserver node h
205+
pure (runEffectFn2 removeObserver node h)
206+
178207
-- * Recompute
179208

180209
stabilize :: Effect Unit
181210
stabilize = do
182211
mark <- runEffectFn1 Profiling.begin "stabilize"
183212

213+
stabilizationNum <- runEffectFn1 Ref.read globalCurrentStabilizationNum
214+
if stabilizationNum /= stabilizationIsNotInProgress then
215+
unsafeCrashWith "Specular: stabilize called when stabilization already in progress"
216+
else pure unit
217+
184218
oldStabilizationNum <- runEffectFn1 Ref.read globalLastStabilizationNum
185219
let currentStabilizationNum = oldStabilizationNum + 1
186220
runEffectFn2 Ref.write globalLastStabilizationNum currentStabilizationNum
@@ -191,6 +225,10 @@ stabilize = do
191225
runEffectFn2 Ref.write globalCurrentStabilizationNum stabilizationIsNotInProgress
192226
runEffectFn1 Profiling.end mark
193227

228+
mark2 <- runEffectFn1 Profiling.begin "drainEffects"
229+
runEffectFn2 Queue.drain globalEffectQueue (mkEffectFn1 \handler -> handler)
230+
runEffectFn1 Profiling.end mark2
231+
194232
recomputeNode :: EffectFn1 SomeNode Unit
195233
recomputeNode = mkEffectFn1 \node -> do
196234
height <- runEffectFn1 Node.get_height node
@@ -282,6 +320,60 @@ mapOptional = mkEffectFn2 \fn a -> do
282320
, dependencies: pure deps
283321
}
284322

323+
data AsyncComputation a = Sync a | Async (Aff a)
324+
325+
data AsyncState a
326+
= InProgress (Maybe (Either Error a))
327+
| Finished (Either Error a)
328+
329+
derive instance Generic (AsyncState a) _
330+
instance Show a => Show (AsyncState a) where
331+
show = genericShow
332+
333+
mapAsync :: forall a b. EffectFn2 (a -> AsyncComputation b) (Node a) (Node (AsyncState b))
334+
mapAsync = mkEffectFn2 \fn a -> do
335+
let deps = [ toSomeNode a ]
336+
task <- ExclusiveTask.new
337+
finishedRef <- runEffectFn1 Ref.new Nothing
338+
runEffectFn1 Node.create
339+
{ compute: mkEffectFn1 \self -> do
340+
-- Need to determine why we're updating - because the input changed, or because the computation finished?
341+
finished <- runEffectFn1 Ref.read finishedRef
342+
case finished of
343+
Nothing -> do
344+
value_a <- runEffectFn1 Node.valueExc a
345+
case fn value_a of
346+
Sync x ->
347+
pure (Optional.some (Finished (Right x)))
348+
Async aff -> do
349+
nextMicrotask do
350+
ExclusiveTask.run task do
351+
newValue <- aff
352+
liftEffect do
353+
runEffectFn2 Ref.write finishedRef (Just (Right newValue))
354+
runEffectFn1 addToRecomputeQueue self
355+
stabilize
356+
previous <- runEffectFn1 Node.get_value self
357+
pure (Optional.some (InProgress (getPreviousValue previous)))
358+
Just x -> do
359+
-- Hmm. Can the input also be changing at the same time we're reporting the result of async computation?
360+
-- Currently not, because we always stabilize after changing a node.
361+
-- But if we introduced some batching later on, it could happen,
362+
-- in which case we'd need to check `isChangingInCurrentStabilization` of our dependency.
363+
364+
runEffectFn2 Ref.write finishedRef Nothing
365+
pure (Optional.some (Finished x))
366+
, dependencies: pure deps
367+
}
368+
369+
where
370+
getPreviousValue opt
371+
| Optional.isSome opt =
372+
case Optional.fromSome opt of
373+
InProgress x -> x
374+
Finished x -> Just x
375+
| otherwise = Nothing
376+
285377
map2 :: forall a b c. EffectFn3 (Fn2 a b c) (Node a) (Node b) (Node c)
286378
map2 = mkEffectFn3 \fn a b -> do
287379
let deps = [ toSomeNode a, toSomeNode b ]

0 commit comments

Comments
 (0)