Skip to content

Commit

Permalink
feat(fio): add flattenM static method
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath committed Oct 3, 2019
1 parent 0b325aa commit cc817ee
Showing 1 changed file with 85 additions and 41 deletions.
126 changes: 85 additions & 41 deletions src/main/FIO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,30 @@ export class FIO<E1 = unknown, A1 = unknown, R1 = NoEnv> {
return fio.chain(Id)
}

/**
* Takes in a effect-ful function that return a FIO and unwraps it.
* This is an alias to `FIO.flatten(UIO(fn))`
*
* ```ts
* // An impure function that creates mutable state but also returns a FIO.
* const FN = () => {
* let count = 0
*
* return FIO.try(() => count++)
* }
* // Using flatten
* FIO.flatten(UIO(FN))
*
* // Using flattenM
* FIO.flattenM(FN)
* ```
*/
public static flattenM<E1, A1, R1>(
fio: () => FIO<E1, A1, R1>
): FIO<E1, A1, R1> {
return FIO.flatten(UIO(fio))
}

/**
* Creates a new [[Fiber]] to run the given [[IO]].
*/
Expand Down Expand Up @@ -668,49 +692,69 @@ export class FIO<E1 = unknown, A1 = unknown, R1 = NoEnv> {
c: (e1: A1, e2: A2) => C
): FIO<E1 | E2, C, R1 & R2> {
// Create Caches
const cache = ExitRef<E1, A1>().zip(ExitRef<E2, A2>())

// Create a Counter and an Await
return Ref.of(0)
.zip(Await.of<never, boolean>())
.chain(({0: count, 1: done}) => {
// Cancels the provided fiber on exit status.
// tslint:disable-next-line:no-shadowed-variable
const coordinate = <E1, A1, E2, A2>(
exit: Either<E1, A1>,
fiber: Fiber<E2, A2>,
ref: Ref<Either<E1, A1>>
): UIO<boolean> =>
ref
.set(exit)
.chain(e =>
e.fold(
FIO.of(false),
() => fiber.abort.and(done.set(FIO.of(true))),
() =>
count
.update(_ => _ + 1)
.and(
count.read.chain(value =>
value === 2 ? done.set(FIO.of(true)) : FIO.of(false)
)
)
const Caches = ExitRef<E1, A1>().zip(ExitRef<E2, A2>())

// Maintains the count of results produced
const Counter = Ref.of(0)

// Is set only when the IO is completed.
const Done = Await.of<never, boolean>()

return Counter.zip(Done).chain(({0: count, 1: isDone}) => {
// Cancels the provided fiber on exit status.
const coordinate = <EE1, AA1, EE2, AA2>(
exit: Either<EE1, AA1>,
fiber: Fiber<EE2, AA2>,
cache: Ref<Either<EE1, AA1>>
): UIO<boolean> => {
// Saves the result into a [[Ref]] instance
const cacheResult = cache.set(exit)

// Abort the other Fiber
const abortFiber = fiber.abort

// Set the result
const markAsDone = isDone.set(FIO.of(true))

// Increases the result count by 1
const incCount = count.update(_ => _ + 1)

return cacheResult.and(
exit.fold(
FIO.of(false),

// On failure —
// 1. Increase count
// 2. Abort fiber
// 3. Set final result
() => abortFiber.and(markAsDone),

// On success —
// 1. Increase count
// 2. if count === 2 : Set final result
() =>
incCount.and(
count.read.chain(value =>
value === 2 ? markAsDone : FIO.of(false)
)
)
)

return cache.chain(({0: c1, 1: c2}) =>
this.raceWith(
that,
(exit, fiber) => coordinate(exit, fiber, c1).void,
(exit, fiber) => coordinate(exit, fiber, c2).void
)
.and(done.get)
.and(
c1.read
.chain(FIO.fromEither)
.zipWith(c2.read.chain(FIO.fromEither), c)
)
)
})
}

return Caches.chain(({0: cacheL, 1: cacheR}) =>
this.raceWith(
that,
(exit, fiber) => coordinate(exit, fiber, cacheL).void,
(exit, fiber) => coordinate(exit, fiber, cacheR).void
)
.and(isDone.get)
.and(
cacheL.read
.chain(FIO.fromEither)
.zipWith(cacheR.read.chain(FIO.fromEither), c)
)
)
})
}
}

0 comments on commit cc817ee

Please sign in to comment.