Skip to content

Commit

Permalink
refactor(fio): zipWithPar() now resolves with the result instead of a…
Browse files Browse the repository at this point in the history
…n `Exit`

BREAKING CHANGE: signature of zipWithPar() has been changed.
  • Loading branch information
tusharmath committed Jul 19, 2019
1 parent 459a78a commit 766e356
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
31 changes: 31 additions & 0 deletions src/internals/Coordinate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import {Await} from '../main/Await'
import {Exit} from '../main/Exit'
import {Fiber} from '../main/Fiber'
import {FIO} from '../main/FIO'
import {Ref} from '../main/Ref'

/**
* Cancels the provided fiber on exit status.
* Sets the provided ref if two responses have been received.
* @ignore
*/
export const coordinate = <E1, A1, E2, A2>(
exit: Exit<E1, A1>,
fiber: Fiber<E2, A2>,
ref: Ref<Exit<E1, A1>>,
count: Ref<number>,
await: Await<never, boolean>
) =>
ref
.set(exit)
.chain(e =>
Exit.isFailure(e)
? fiber.abort.and(await.set(FIO.of(true)))
: count
.update(_ => _ + 1)
.and(
count.read.chain(value =>
value === 2 ? await.set(FIO.of(true)) : FIO.of(false)
)
)
)
30 changes: 6 additions & 24 deletions src/main/FIO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import {ICancellable, IScheduler} from 'ts-scheduler'

import {CB} from '../internals/CB'
import {coordinate} from '../internals/Coordinate'

import {Await} from './Await'
import {Exit} from './Exit'
Expand Down Expand Up @@ -424,8 +425,8 @@ export class FIO<E1 = unknown, A1 = unknown, R1 = NoEnv> {
*/
public zipWithPar<E2, A2, R2, C>(
that: FIO<E2, A2, R2>,
c: (e1: Exit<E1, A1>, e2: Exit<E2, A2>) => C
): FIO<never, C, iRR<R1, R2>> {
c: (e1: A1, e2: A2) => C
): FIO<E1 | E2, C, iRR<R1, R2>> {
// Create Caches
const cache = ExitRef<E1, A1>().zip(ExitRef<E2, A2>())

Expand All @@ -435,27 +436,6 @@ export class FIO<E1 = unknown, A1 = unknown, R1 = NoEnv> {
// Create an Await
const done = Await.of<never, boolean>()

const coordinate = <E_1, A_1, E_2, A_2>(
exit: Exit<E_1, A_1>,
fiber: Fiber<E_2, A_2>,
ref: Ref<Exit<E_1, A_1>>,
count: Ref<number>,
await: Await<never, boolean>
) =>
ref
.set(exit)
.chain(e =>
Exit.isFailure(e)
? fiber.abort.and(await.set(FIO.of(true)))
: count
.update(_ => _ + 1)
.and(
count.read.chain(value =>
value === 2 ? await.set(FIO.of(true)) : FIO.of(false)
)
)
)

return counter.zip(done).chain(([count, await]) =>
cache.chain(([c1, c2]) =>
this.raceWith(
Expand All @@ -464,7 +444,9 @@ export class FIO<E1 = unknown, A1 = unknown, R1 = NoEnv> {
(exit, fiber) => coordinate(exit, fiber, c2, count, await).void
)
.and(await.get)
.and(c1.read.zipWith(c2.read, c))
.and(
c1.read.chain(FIO.fromExit).zipWith(c2.read.chain(FIO.fromExit), c)
)
)
)
}
Expand Down
13 changes: 6 additions & 7 deletions test/FIO.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ describe('FIO', () => {
FIO.of(10).zipWithPar(FIO.of(20), (a, b) => [a, b])
)

assert.deepEqual(actual, [Exit.success(10), Exit.success(20)])
assert.deepEqual(actual, [10, 20])
})

it('should combine them in parallel', () => {
Expand All @@ -459,18 +459,17 @@ describe('FIO', () => {
const actual = runtime.executeSync(
left.zipWithPar(right, (a, b) => [a, b])
)
assert.deepEqual(actual, [Exit.success(10), Exit.success(20)])
assert.deepEqual(actual, [10, 20])
})

it('should abort the pending one on error', () => {
const counter = new Counter()
const left = FIO.reject(10).delay(500)
const right = FIO.of(20).delay(1000)
const right = counter.inc().delay(1000)
const runtime = testRuntime()

const actual = runtime.executeSync(
left.zipWithPar(right, (a, b) => [a, b])
)
assert.deepEqual(actual, [Exit.failure(10), Exit.pending])
runtime.executeSync(left.zipWithPar(right, (a, b) => [a, b]))
assert.deepEqual(counter.count, 0)
})
})

Expand Down

0 comments on commit 766e356

Please sign in to comment.